Products

中文

中文

  • Pipy 文档
  • Introduction
    • 概述
    • 概念
  • Getting Started
    • 构建和安装
    • 快速入门
    • 获取帮助
  • Release
    • 发布历史
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • 01 你好
    • 02 回显
    • 03 代理
    • 04 路由
    • 05 负载均衡
    • 06 配置
    • 07 插件
  • Operating
    • 开发控制台
    • CLI
    • Pipy Repo
      • 介绍
      • 快速开始
      • 演示
      • REST API
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptProxyProtocol()
        • acceptSOCKS()
        • acceptTLS()
        • admin()
        • branch()
        • branchMessage()
        • branchMessageStart()
        • chain()
        • compress()
        • compressHTTP()
        • connect()
        • connectHTTPTunnel()
        • connectProxyProtocol()
        • connectSOCKS()
        • connectTLS()
        • decodeBGP()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeRESP()
        • decodeThrift()
        • decodeWebSocket()
        • decompress()
        • decompressHTTP()
        • deframe()
        • demux()
        • demuxHTTP()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeBGP()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeRESP()
        • encodeThrift()
        • encodeWebSocket()
        • exec()
        • exit()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • insert()
        • link()
        • listen()
        • loop()
        • mux()
        • muxHTTP()
        • onEnd()
        • onStart()
        • pack()
        • pipeline()
        • print()
        • read()
        • repeat()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • replay()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
        • watch()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
        • toArray()
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • payload
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • decompose()
        • first
        • hostmask
        • ip
        • last
        • mask
        • Netmask()
        • next()
        • size
        • version
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • LoadBalancer
          • allocate()
          • LoadBalancer()
          • provision()
          • schedule()
        • LoadBalancerResource
          • free()
          • target
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • debug()
        • error()
        • info()
        • log()
        • warn()
      • crypto
        • Certificate
          • issuer
          • Certificate()
          • subject
          • subjectAltNames
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • Agent
          • new()
          • request()
        • Directory
          • new()
          • serve()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
      • os
        • env
        • readDir()
        • readFile()
        • stat()
        • unlink()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()

    Configuration

    描述

    Configuration 帮助用户配置模块中的变量、管道和过滤器,以及模块间如何通过变量和子管道进行交互。一个有效的 PipyJS 模块在求值之后应该返回一个 Configuration

    获取新 Configuration 的唯一途径是调用全局函数 pipy(),大多数情况下,这也是 PipyJS 脚本的第一个函数。

    pipy() // 返回一个新的 Configuration 对象

    有了 pipy() 返回的 Configuration,就可以调用它的各种方法。通过每个方法返回的同一个 Configuration 可以实现链式调用。

    pipy().listen(8080) // 返回与之前相同配置对象
    .print() // 再次,相同的配置对象
    .dummy() // 再次,相同的配置对象

    方法

    Configuration 类的方法按功能分有三种:导出/导入变量、定义管道和向管道添加过滤器。

    导入/导出变量

    使用 export() 定义对其他模块可见的全局变量。使用 import() 导入其他模块定义和导出的全局变量。

    定义管道布局

    listen() 用于创建一个监听指定 TCP 端口的起始管道布局;read() 用于创建一个从文件读取和处理数据的起始管道布局;task() 用于创建一个处理进程信号或者定时任务的起始管道布局;pipeline() 以给定名称创建子管道布局。

    所有的方法都会更改 Configuration当前管道,之后就可以使用接下来要讨论的方法为新建的管道添加过滤器。

    添加过滤器

    剩下的方法是为当前管道(也就是前面部分最后创建的管道)添加过滤器。每个方法都会为当前管道添加不同类型的过滤器。

    过滤器

    过滤器根据功能不同,可以划分为 7 类。

    传输

    这些过滤器是运行特定协议的客户端和服务端传输层。其中最常用的是 connect(),它与远程主机建立出站 TCP 连接并来回传输 Data 事件。

    编解码器

    这些过滤器负责将流在原始字节格式消息格式间进行转换。解码器接收 Data 事件,并输出 MessageStart/MessageEnd 事件对(中间有不变的作为消息主体的 Data 事件)。编码器接收 MessageStart/MessageEnd 事件对(同样,其间也有作为消息主体的 Data 事件),并在 Data 事件中输出原始字节。

    多路复用和分路

    这些过滤器将一个流拆分成多个流,或者将多个流合并成一个流。其中一些集成了编解码器,比如 demuxHTTP()muxHTTP()。这种情况下,分路输入原始字节输出消息,而多路复用输入消息输出原始字节。

    连接器

    这些过滤器将流从一个管道传递到另一个管道。与多路复用和分路不同,它们只修改流传输的方向,而不修改流本身的内容。

    缓冲

    这些过滤器会延迟对流的处理,直到满足某些条件。换句话说,它们控制着流中事件的通行。

    观察者

    这些过滤器不修改流的内容,只按原样向下传递事件。其中一些过滤器在发现特定类型的事件时会触发用户定义的脚本。

    转换器

    这些过滤器将输入事件转换为新的输出事件。其中许多过滤器会调用用户定义的脚本,并将输入事件作为用户函数的参数;函数的所有输出会作为过滤器的输出。其中,exec() 会特殊一些,它将 Data 事件传递到外部进程的标准输入并读取进程的标准输出作为 Data 事件。

    工作原理

    Pipy 启动时首先从主模块的脚本开始求值,该脚本应解析为包含了主模块中定义的管道和过滤器描述的 Configuration 对象。从那里 Pipy 会查询并加载其他被主模块使用的模块,这些模块被解析为更多的 Configuration、管道以及过滤器。这个过程直到所有主模块直接或间接链接的模块被找到并加载,届时 Pipy 将构建、连接那些 Configuration 中的所有管道和过滤器,然后启动运行。

    示例

    下面是我们如何在一个 PipyJS 模块内配置简单的 HTTP hello 服务以及 TCP 代理

    pipy() // 返回新的 Configuration
    .listen(8080) // 定义监听 8080 端口的管道
    .serveHTTP( // 追加 serveHTTP 过滤器到这个管道
    () => new Message('Hello!\n')
    )
    .listen(8000) // 定义另一个监听 8000 端口的管道
    .connect('localhost:8080') // 追加 connect 过滤器到这个管道

    方法参考

    acceptHTTPTunnel(handler)

    Appends an acceptHTTPTunnel filter to the current pipeline layout.

    An acceptHTTPTunnel filter implements HTTP tunnel on the server side.

    • INPUT - Data stream received from the client with a leading HTTP CONNECT request Message.
    • OUTPUT - Data stream to send to the client with a leading HTTP CONNECT response Message.
    • SUB-INPUT - Data stream received from the client via HTTP tunnel.
    • SUB-OUTPUT - Data stream to send to the client via HTTP tunnel.

    acceptProxyProtocol(handler)

    Appends an acceptProxyProtocol filter to the current pipeline layout.

    An acceptProxyProtocol filter implements the Proxy Protocol on the server side.

    • INPUT - Data stream received from the client with a Proxy Protocol header.
    • OUTPUT - Data stream to send back to the client.
    • SUB-INPUT - Data stream received from the client with the Proxy Protocol header removed.
    • SUB-OUTPUT - Data stream to send back to the client.

    acceptSOCKS(handler)

    Appends an acceptSOCKS filter to the current pipeline layout.

    An acceptSOCKS filter implements SOCKS protocol on the server side.

    • INPUT - Data stream received from the client with a leading SOCKS connection request.
    • OUTPUT - Data stream to send to the client with a leading SOCKS connection response.
    • SUB-INPUT - Data stream received from the client via SOCKS.
    • SUB-OUTPUT - Data stream to send to the client via SOCKS.

    acceptTLS(options?)

    Appends an acceptTLS filter to the current pipeline layout.

    An acceptTLS filter implements TLS protocol on the server side.

    • INPUT - TLS-encrypted Data stream received from the client.
    • OUTPUT - TLS-encrypted Data stream to send to the client.
    • SUB-INPUT - Data stream received from the client after TLS decryption.
    • SUB-OUTPUT - Data stream to send to the client before TLS encryption.

    admin(path)

    Creates a custom administration handler.

    branch(condition, pipelineLayout, ...restBranches)

    Appends a branch filter to the current pipeline layout.

    A branch filter selects a pipeline layout from a number of candidates based on condition callbacks, and then creates a sub-pipeline from the selected pipeline layout before streaming events through it.

    • INPUT - Any types of Events to stream into the selected sub-pipeline.
    • OUTPUT - Events streaming out from the selected sub-pipeline.
    • SUB-INPUT - Events streaming into the branch filter.
    • SUB-OUTPUT - Any types of Events.

    branchMessage(condition, pipelineLayout, ...restBranches)

    Appends a branchMessage filter to the current pipeline layout.

    branchMessageStart(condition, pipelineLayout, ...restBranches)

    Appends a branchMessageStart filter to the current pipeline layout.

    chain(modules?)

    Appends a chain filter to the current pipeline layout.

    When given a list of module filenames, a chain filter starts a module chain and links to the entry pipeline for the first module.

    When no arguments are present, a chain filter links to the entry pipeline for the next module on the current module chain.

    • INPUT - Any types of Events.
    • OUTPUT - Events streaming out from the selected sub-pipeline.
    • SUB-INPUT - Events streaming into the chain filter.
    • SUB-OUTPUT - Any types of Events.

    compress(algorithm)

    Appends a compress filter to the current pipeline layout.

    A compress filter compresses messages.

    • INPUT - Messages to compress.
    • OUTPUT - Compressed Messages.

    compressHTTP(options?)

    Appends a compressHTTP filter to the current pipeline layout.

    A compressHTTP filter compresses HTTP messages.

    • INPUT - HTTP Messages to compress.
    • OUTPUT - Compressed HTTP Messages.

    connect(target, options?)

    Appends a connect filter to the current pipeline layout.

    A connect filter establishes a TCP connection to a remote host.

    • INPUT - Data stream to send to the host.
    • OUTPUT - Data stream received from the host.

    connectHTTPTunnel(handshake)

    Appends a connectHTTPTunnel filter to the current pipeline layout.

    A connectHTTPTunnel filter implements HTTP tunnel on the client side.

    • INPUT - Data stream to send to the server via HTTP tunnel.
    • OUTPUT - Data stream received from the server via HTTP tunnel.
    • SUB-INPUT - Data stream to send to the server with a leading HTTP CONNECT request Message.
    • SUB-OUTPUT - Data stream received from the server with a leading HTTP CONNECT response Message.

    connectProxyProtocol(header)

    Appends a connectProxyProtocol filter to the current pipeline layout.

    A connectProxyProtocol filter implements the Proxy Protocol on the client side.

    • INPUT - Data stream to send to the server via the Proxy Protocol.
    • OUTPUT - Data stream received from the server.
    • SUB-INPUT - Data stream to send to the server with the Proxy Protocol header added.
    • SUB-OUTPUT - Data stream received from the server.

    connectSOCKS(address)

    Appends a connectSOCKS filter to the current pipeline layout.

    A connectSOCKS filter implements SOCKS protocol on the client side.

    • INPUT - Data stream to send to the server via SOCKS.
    • OUTPUT - Data stream received from the server via SOCKS.
    • SUB-INPUT - Data stream to send to the server with a leading SOCKS connection request.
    • SUB-OUTPUT - Data stream received from the server with a leading SOCKS connection response.

    connectTLS(options?)

    Appends a connectTLS filter to the current pipeline layout.

    A connectTLS filter implements TLS protocol on the client side.

    • INPUT - Data stream to send to the server via TLS.
    • OUTPUT - Data stream received from the server via TLS.
    • SUB-INPUT - TLS-encrypted Data stream to send to the server.
    • SUB-OUTPUT - TLS-encrypted Data stream received from the server.

    decodeBGP(options?)

    Appends a decodeBGP filter to the current pipeline layout.

    decodeDubbo()

    Appends a decodeDubbo filter to the current pipeline layout.

    A decodeDubbo filter decodes Dubbo messages from a raw byte stream.

    • INPUT - Data stream to decode Dubbo messages from.
    • OUTPUT - Dubbo Messages decoded from the input Data stream.

    decodeHTTPRequest(handler?)

    Appends a decodeHTTPRequest filter to the current pipeline layout.

    A decodeHTTPRequest filter decodes HTTP/1 request messages from a raw byte stream.

    • INPUT - Data stream to decode HTTP/1 request messages from.
    • OUTPUT - HTTP/1 request Messages decoded from the input Data stream.

    decodeHTTPResponse(handler?)

    Appends a decodeHTTPResponse filter to the current pipeline layout.

    A decodeHTTPResponse filter decodes HTTP/1 response messages from a raw byte stream.

    • INPUT - Data stream to decode HTTP/1 response messages from.
    • OUTPUT - HTTP/1 response Messages decoded from the input Data stream.

    decodeMQTT()

    Appends a decodeMQTT filter to the current pipeline layout.

    A decodeMQTT filter decodes MQTT packets from a raw byte stream.

    • INPUT - Data stream to decode MQTT packets from.
    • OUTPUT - MQTT packets (Messages) decoded from the input Data stream.

    decodeMultipart()

    Appends a decodeMultipart filter to the current pipeline layout.

    A decodeMultipart filter decodes parts from MIME multipart messages.

    • INPUT - Messages to decode as MIME multipart format.
    • OUTPUT - Parts (Messages) decoded from the input MIME multipart messages.

    decodeRESP()

    Appends a decodeRESP filter to the current pipeline layout.

    decodeThrift()

    Appends a decodeThrift filter to the current pipeline layout.

    A decodeThrift filter decodes Thrift messages from a raw byte stream.

    • INPUT - Data stream to decode Thrift messages from.
    • OUTPUT - Thrift Messages decoded from the input Data stream.

    decodeWebSocket()

    Appends a decodeWebSocket filter to the current pipeline layout.

    A decodeWebSocket filter decodes WebSocket messages from a raw byte stream.

    • INPUT - Data stream to decode WebSocket messages from.
    • OUTPUT - WebSocket Messages decoded from the input Data stream.

    decompress(algorithm)

    Appends a decompress filter to the current pipeline layout.

    A decompress filter decompresses input Data stream.

    • INPUT - Data to decompress.
    • OUTPUT - Decompressed Data.

    decompressHTTP()

    Appends a decompressHTTP filter to the current pipeline layout.

    A decompressHTTP filter decompresses HTTP messages.

    • INPUT - HTTP Messages to decompress.
    • OUTPUT - Decompressed HTTP Messages.

    deframe(states)

    Appends a deframe filter to the current pipeline layout.

    demux(options?)

    Appends a demux filter to the current pipeline layout.

    A demux filter distributes each input Message to a separate sub-pipeline.

    • INPUT - Messages to distribute to different sub-pipelines.
    • OUTPUT - No output.
    • SUB-INPUT - A Message streaming into the demux filter.
    • SUB-OUTPUT - Disgarded.

    demuxHTTP(options?)

    Appends a demuxHTTP filter to the current pipeline layout.

    A demuxHTTP filter implements HTTP/1 and HTTP/2 protocol on the server side.

    • INPUT - Data stream received from the client with HTTP/1 or HTTP/2 requests.
    • OUTPUT - Data stream to send to the client with HTTP/1 or HTTP/2 responses.
    • SUB-INPUT - HTTP request Message received from the client.
    • SUB-OUTPUT - HTTP response Message to send to the client.

    detectProtocol(handler)

    Appends a detectProtocol filter to the current pipeline layout.

    A detectProtocol filter calls a user function to notify what protocol the input Data stream is.

    • INPUT - Data stream to detect protocol for.
    • OUTPUT - The same Data stream as input.

    dummy()

    Appends a dummy filter to the current pipeline layout.

    A dummy filter discards all its input Events and outputs nothing.

    • INPUT - Any types of Events.
    • OUTPUT - Nothing.

    dump(tag?)

    Appends a dump filter to the current pipeline layout.

    A dump filter prints out all its input Events to the standard error.

    • INPUT - Any types of Events.
    • OUTPUT - The same Events from the input.

    encodeBGP(options?)

    Appends a encodeBGP filter to the current pipeline layout.

    encodeDubbo()

    Appends an encodeDubbo filter to the current pipeline layout.

    An encodeDubbo filter encodes Dubbo messages into a raw byte stream.

    • INPUT - Dubbo Messages to encode.
    • OUTPUT - Encoded Data stream from the input Dubbo messages.

    encodeHTTPRequest()

    Appends an encodeHTTPRequest filter to the current pipeline layout.

    An encodeHTTPRequest filter encodes HTTP/1 request messages into a raw byte stream.

    • INPUT - HTTP/1 request Messages to encode.
    • OUTPUT - Encoded Data stream from the input HTTP/1 request messages.

    encodeHTTPResponse(options?)

    Appends an encodeHTTPResponse filter to the current pipeline layout.

    An encodeHTTPResponse filter encodes HTTP/1 response messages into a raw byte stream.

    • INPUT - HTTP/1 response Messages to encode.
    • OUTPUT - Encoded Data stream from the input HTTP/1 response messages.

    encodeMQTT()

    Appends an encodeMQTT filter to the current pipeline layout.

    An encodeMQTT filter encodes MQTT packets into a raw byte stream.

    • INPUT - MQTT packets (Messages) to encode.
    • OUTPUT - Encoded Data stream from the input MQTT packets.

    encodeRESP()

    Appends an encodeRESP filter to the current pipeline layout.

    encodeThrift()

    Appends an encodeThrift filter to the current pipeline layout.

    An encodeThrift filter encodes Thrift messages into a raw byte stream.

    • INPUT - Thrift Messages to encode.
    • OUTPUT - Encoded Data stream from the input Thrift messages.

    encodeWebSocket()

    Appends an encodeWebSocket filter to the current pipeline layout.

    An encodeWebSocket filter encodes WebSocket messages into a raw byte stream.

    • INPUT - WebSocket Messages to encode.
    • OUTPUT - Encoded Data stream from the input WebSocket messages.

    exec(command)

    Appends an exec filter to the current pipeline layout.

    An exec filter starts a child process and links to its standard input and output.

    • INPUT - The child process's standard input Data stream.
    • OUTPUT - The child process's standard output Data stream.

    exit()

    Creates a process exit handler.

    export(namespace, variables)

    Defines context variables that are accessible to other modules.

    fork(startupValues?)

    Appends a fork filter to the current pipeline layout.

    A fork filter clones Events to one or more sub-pipelines.

    • INPUT - Any types of Events to clone.
    • OUTPUT - Same Events as input.
    • SUB-INPUT - Cloned Events from the fork filter's input.
    • SUB-OUTPUT - Discarded.

    handleData(handler)

    Appends a handleData filter to the current pipeline layout.

    A handleData filter calls back user scripts every time a Data event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessage(handler)

    Appends a handleMessage filter to the current pipeline layout.

    A handleMessage filter calls back user scripts every time a complete message (Message) is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessageBody(handler)

    Appends a handleMessageData filter to the current pipeline layout.

    A handleMessageData filter calls back user scripts every time a complete message body (Data) is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessageEnd(handler)

    Appends a handleMessageEnd filter to the current pipeline layout.

    A handleMessageEnd filter calls back user scripts every time a MessageEnd event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessageStart(handler)

    Appends a handleMessageStart filter to the current pipeline layout.

    A handleMessageStart filter calls back user scripts every time a MessageStart event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleStreamEnd(handler)

    Appends a handleStreamEnd filter to the current pipeline layout.

    A handleStreamEnd filter calls back user scripts every time a StreamEnd event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleStreamStart(handler)

    Appends a handleStreamStart filter to the current pipeline layout.

    A handleStreamStart filter calls back user scripts for the first Event in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleTLSClientHello(handler)

    Appends a handleTLSClientHello filter to the current pipeline layout.

    A handleTLSClientHello filter calls back user scripts when a TLS client hello message is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    import(variables)

    Imports context variables defined and exported from other modules.

    insert(handler?)

    Appends an insert filter to the current pipeline layout.

    link(pipelineLayoutName)

    Appends a link filter to the current pipeline layout.

    A link filter starts a sub-pipeline and streams events through it.

    • INPUT - Any types of Events to stream into the sub-pipeline.
    • OUTPUT - Events streaming out from the sub-pipeline.
    • SUB-INPUT - Events streaming into the link filter.
    • SUB-OUTPUT - Any types of Events.

    listen(port, options?)

    Creates a port pipeline layout for incoming TCP/UDP connections on a port.

    A port pipeline has the following input/output:

    • INPUT - Data stream received from the client.
    • OUTPUT - Data stream to send to the client.

    loop(pipelineLayout)

    Appends a loop filter to the current pipeline layout.

    mux(sessionSelector, options?)

    Appends a mux filter to the current pipeline layout.

    Multiple mux filters merge input Messages into a shared sub-pipeline.

    • INPUT - A Message to queue into the shared sub-pipeline.
    • OUTPUT - The same Message as input.
    • SUB-INPUT - Messages from multiple mux filters.
    • SUB-OUTPUT - Discarded.

    mux(options?)

    Appends a mux filter that merges to the same sub-pipline as other mux filters coming from the same inbound connection.

    muxHTTP(sessionSelector, options?)

    Appends a muxHTTP filter to the current pipeline layout.

    A muxHTTP filter implements HTTP/1 and HTTP/2 protocol on the client side.

    • INPUT - HTTP request Message to send to the server.
    • OUTPUT - HTTP response Message received from the server.
    • SUB-INPUT - Data stream to send to the server with HTTP/1 or HTTP/2 requests.
    • SUB-OUTPUT - Data stream received from the server with HTTP/1 or HTTP/2 responses.

    muxHTTP(options?)

    Appends a muxHTTP filter that merges to the same sub-pipline as other muxHTTP filters coming from the same inbound connection.

    onEnd(handler)

    Registers a function to be called when a pipeline is destroyed.

    onStart(handler)

    Registers a function to be called when a pipeline is created.

    pack(batchSize?, options?)

    Appends a pack filter to the current pipeline layout.

    A pack filter combines multiple input messages into one.

    • INPUT - Stream of Messages to combine.
    • OUTPUT - Stream of combined Messages.

    pipeline(name?)

    Creates a sub-pipeline layout.

    A sub-pipeline has the following input/output:

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    print()

    Appends a print filter to the current pipeline layout.

    A print filter prints Data to the standard error.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    read(filename)

    Appends a read filter to the current pipeline layout.

    A read filter reads from a file and outputs its content as a Data stream.

    • INPUT - Any types of Events.
    • OUTPUT - File content as a Data stream.

    repeat(count, cb)

    Performs batch configuration.

    replaceData(handler?)

    Appends a replaceData filter to the current pipeline layout.

    A replaceData filter calls back user scripts to get a replacement for each Data event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessage(handler?)

    Appends a replaceMessage filter to the current pipeline layout.

    A replaceMessage filter calls back user scripts to get a replacement for each complete message (Message) found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessageBody(handler?)

    Appends a replaceMessageBody filter to the current pipeline layout.

    A replaceMessageBody filter calls back user scripts to get a replacement for each complete message body (Data) found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessageEnd(handler?)

    Appends a replaceMessageEnd filter to the current pipeline layout.

    A replaceMessageEnd filter calls back user scripts to get a replacement for each MessageEnd event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessageStart(handler?)

    Appends a replaceMessageStart filter to the current pipeline layout.

    A replaceMessageStart filter calls back user scripts to get a replacement for each MessageStart event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceStreamEnd(handler?)

    Appends a replaceStreamEnd filter to the current pipeline layout.

    A replaceStreamEnd filter calls back user scripts to get a replacement for each StreamEnd event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceStreamStart(handler?)

    Appends a replaceStreamStart filter to the current pipeline layout.

    A replaceStreamStart filter calls back user scripts to get a replacement for the first event in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replay(options?)

    Appends a replay filter to the current pipeline layout.

    A replay filter repeats its input event sequence to a new sub-pipeline when the previous sub-pipeline outputs a StreamEnd event with error code "Replay".

    • INPUT - Any types of Events to stream into the sub-pipelines.
    • OUTPUT - Events streaming out from the sub-pipelines.
    • SUB-INPUT - Events streaming into the replay filter.
    • SUB-OUTPUT - Any types of Events.

    serveHTTP(handler)

    Appends a serveHTTP filter to the current pipeline layout.

    A serveHTTP filter calls back user scripts to get an output HTTP response for each HTTP request found in the input stream.

    • INPUT - Data stream containing HTTP requests received from the client.
    • OUTPUT - Data stream containing HTTP responses to send to the client.

    split(separator)

    Appends a split filter to the current pipeline layout.

    A split filter splits an input Message into multiple Messages by a given separator.

    • INPUT - Messages to split.
    • OUTPUT - Messages splitted from the input.

    task(intervalOrSignal?)

    Creates a timer pipeline layout or a signal pipeline layout for a periodic job or a signal.

    A timer pipeline or a signal pipeline has the following input/output:

    • INPUT - Nothing.
    • OUTPUT - Discarded.

    tee(filename)

    Appends a tee filter to the current pipeline layout.

    A tee filter writes input Data to a file.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    throttleConcurrency(quota)

    Appends a throttleConcurrency filter to the current pipeline layout.

    A throttleConcurrency filter limits the number of concurrent streams.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    throttleDataRate(quota)

    Appends a throttleDataRate filter to the current pipeline layout.

    A throttleDataRate filter limits the amout of Data passing through per unit of time.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    throttleMessageRate(quota)

    Appends a throttleMessageRate filter to the current pipeline layout.

    A throttleMessageRate filter limits the number of Messages passing through per unit of time.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    to(pipelineLayout)

    Attaches a sub-pipeline layout to the last joint filter.

    use(filename, pipelineLayoutName?)

    Appends a use filter to the current pipeline layout.

    A use filter creates a sub-pipeline from a pipeline layout in a differen module and streams Events through it.

    • INPUT - Any types of Events to stream into the sub-pipeline.
    • OUTPUT - Events streaming out from the sub-pipeline.
    • SUB-INPUT - Events streaming into the use filter.
    • SUB-OUTPUT - Any types of Events.

    wait(condition)

    Appends a wait filter to the current pipeline layout.

    A wait filter blocks all input Events up until a condition is met.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input

    watch(filename)

    Creates a codebase file watcher.


    © 2024, Flomesh Team.       ICP备案/许可证号:辽ICP备2023014827号