• Pipy Documentation
  • Introduction
    • Overview
    • Concepts
  • Getting Started
    • Build and Install
    • Quick Start
    • Getting help
  • Release
    • Release History
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • Part 1: Hello
    • Part 2: Echo
    • Part 3: Proxy
    • Part 4: Routing
    • Part 5: Loading Balancing
    • Part 6: Configuration
    • Part 7: Plugins
  • Operating
    • Admin GUI
    • CLI
    • Pipy Repo
      • Introduction
      • Quick Start
      • Demo
      • REST API
    • Statistics
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptSOCKS()
        • acceptTLS()
        • branch()
        • chain()
        • compressHTTP()
        • compressMessage()
        • connect()
        • connectHTTPTunnel()
        • connectSOCKS()
        • connectTLS()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeWebSocket()
        • decompressHTTP()
        • decompressMessage()
        • deframe()
        • demux()
        • demuxHTTP()
        • demuxQueue()
        • depositMessage()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeWebSocket()
        • exec()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • input()
        • link()
        • listen()
        • mux()
        • muxHTTP()
        • muxQueue()
        • onEnd()
        • onStart()
        • output()
        • pack()
        • pipeline()
        • print()
        • read()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • first
        • hostmask
        • last
        • mask
        • Netmask()
        • next()
        • size
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • log()
      • crypto
        • Certificate
          • Certificate()
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • File
          • from()
          • toMessage()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
      • os
        • env
        • readFile()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()
    • PipyJS
      • Language
      • Builtin Objects

    Configuration

    Description

    Configuration helps user configure variables, pipeline layouts in a module, as well as how a module interacts with other modules via variables and sub-pipelines. All PipyJS modules are expected to return a Configuration object after being evaluated.

    The only way to get a new Configuration is by calling the global function pipy(), which is, in most cases, the first function you would call at the beginning of PipyJS script.

    pipy() // this returns a new Configuration object

    With that Configuration returned from pipy(), you call an assortment of methods on it. Each method returns the same Configuration, by which more method calls can be made in a continuous chain.

    pipy().listen(8080) // this returns the same Configuration object as before
    .print() // again, the same Configuration
    .dummy() // again, the same Configuration

    Methods

    There are 3 types of methods to the Configuration class: methods to export/import variables, methods to create pipeline layouts, and methods to add filters to a pipeline layout.

    Methods for importing/exporting variables

    Use export() to define global variables that are visible to other modules. Use import() to import global variables defined and exported from other modules.

    Methods for creating pipeline layout

    Use listen() to create a pipeline layout that generates a root pipeline for every incoming connection on a specified TCP port. Use read() to create a pipeline layout that reads and processes data from a file. Use task() to create a pipeline layout that generates root pipelines for received process signals or periodic tasks. Use pipeline() to create a pipeline layout for sub-pipelines.

    All three methods will change the current pipeline layout of the Configuration, after which one or more filters can be appended to the newly created pipeline layout by using the methods we will discuss later.

    Methods for adding filters

    The rest of the methods are for appending filters to the current pipeline layout (i.e. the last pipeline layout created with one of the methods said in the previous section). Each method adds a different type of filter to the current pipeline layout.

    Filters

    Filters can be categorized into 7 types according to their functionalities.

    Transportation

    These filters are transport layer clients and servers speaking a specific protocol. Among these filters, the most often used one is connect(), which establishes an outbound TCP connection to a remote host and transfers Data events back and forth.

    Codecs

    These filters translate streams between the raw byte form and the message form. A decoder takes in Data events and outputs MessageStart/MessageEnd event pairs (with unchanged message bodies as Data events in between). An encoder takes in MessageStart/MessageEnd event pairs (again, with message bodies as Data events in between), and outputs raw bytes in Data events.

    Muxers and demuxers

    These filters split messages from one stream into multiple streams, or the opposite, combine messages from multiple streams into one stream. Some have integrated codecs, such as demuxHTTP() and muxHTTP(). In this case, the demuxers have raw byte input and message output, while the muxers have message input and raw byte output.

    Connectors

    These filters pass events in a stream from one pipeline to another. Unlike muxers and demuxers, they only change where streams are going without changing their content.

    Buffering

    These filters delay a stream until some conditions are met. In other words, they control the passage of events in a stream.

    Observers

    These filters don't change anything in the stream. They simply pass down all events as is. Some of them trigger user-defined script when they see a specific type of events in a stream.

    Transformers

    These filters transform input events into new output events. Many of them call user-defined script with input events as the input to the user function and output whatever events the user function outputs. Among these filters, exec() is special in that, it feeds Data events to an external process's standard input and reads back Data events from its standard output.

    How it works

    When Pipy launches, it starts off by evaluating script from the main module first, which should resolve to a Configuration object holding descriptions about pipelines and filters in the main module. From there, it goes on finding and loading other modules that are "used" by the main module, which resolve to more Configurations, their pipelines and filters. This process goes on and on until all modules that are directly or indirectly linked from the main module are found and loaded, at which point Pipy will then build up all pipelines and filters described in those Configurations, hook them up, and get up and running.

    Example

    Here's an example of how we configure a simple HTTP hello service and a TCP transparent proxy inside one single PipyJS module.

    pipy() // Returns a new Configuration
    .listen(8080) // Creates a pipeline layout listening on port 8080
    .serveHTTP( // Appends a serveHTTP filter to that pipeline layout
    () => new Message('Hello!\n')
    )
    .listen(8000) // Creates a second pipeline layout listening on port 8000
    .connect('localhost:8080') // Appends a connect filter to that pipeline layout

    Method Reference

    acceptHTTPTunnel(handler)

    Appends an acceptHTTPTunnel filter to the current pipeline layout.

    An acceptHTTPTunnel filter implements HTTP tunnel on the server side.

    • INPUT - Data stream received from the client with a leading HTTP CONNECT request Message.
    • OUTPUT - Data stream to send to the client with a leading HTTP CONNECT response Message.
    • SUB-INPUT - Data stream received from the client via HTTP tunnel.
    • SUB-OUTPUT - Data stream to send to the client via HTTP tunnel.

    acceptSOCKS(handler)

    Appends an acceptSOCKS filter to the current pipeline layout.

    An acceptSOCKS filter implements SOCKS protocol on the server side.

    • INPUT - Data stream received from the client with a leading SOCKS connection request.
    • OUTPUT - Data stream to send to the client with a leading SOCKS connection response.
    • SUB-INPUT - Data stream received from the client via SOCKS.
    • SUB-OUTPUT - Data stream to send to the client via SOCKS.

    acceptTLS(options?)

    Appends an acceptTLS filter to the current pipeline layout.

    An acceptTLS filter implements TLS protocol on the server side.

    • INPUT - TLS-encrypted Data stream received from the client.
    • OUTPUT - TLS-encrypted Data stream to send to the client.
    • SUB-INPUT - Data stream received from the client after TLS decryption.
    • SUB-OUTPUT - Data stream to send to the client before TLS encryption.

    branch(condition, pipelineLayout, ...restBranches)

    Appends a branch filter to the current pipeline layout.

    A branch filter selects a pipeline layout from a number of candidates based on condition callbacks, and then creates a sub-pipeline from the selected pipeline layout before streaming events through it.

    • INPUT - Any types of Events to stream into the selected sub-pipeline.
    • OUTPUT - Events streaming out from the selected sub-pipeline.
    • SUB-INPUT - Events streaming into the branch filter.
    • SUB-OUTPUT - Any types of Events.

    chain(modules?)

    Appends a chain filter to the current pipeline layout.

    When given a list of module filenames, a chain filter starts a module chain and links to the entry pipeline for the first module.

    When no arguments are present, a chain filter links to the entry pipeline for the next module on the current module chain.

    • INPUT - Any types of Events.
    • OUTPUT - Events streaming out from the selected sub-pipeline.
    • SUB-INPUT - Events streaming into the chain filter.
    • SUB-OUTPUT - Any types of Events.

    compressHTTP(options?)

    Appends a compressHTTP filter to the current pipeline layout.

    A compressHTTP filter compresses HTTP messages.

    • INPUT - HTTP Messages to compress.
    • OUTPUT - Compressed HTTP Messages.

    compressMessage(options?)

    Appends a compressMessage filter to the current pipeline layout.

    A compressMessage filter compresses messages.

    • INPUT - Messages to compress.
    • OUTPUT - Compressed Messages.

    connect(target, options?)

    Appends a connect filter to the current pipeline layout.

    A connect filter establishes a TCP connection to a remote host.

    • INPUT - Data stream to send to the host.
    • OUTPUT - Data stream received from the host.

    connectHTTPTunnel(target)

    Appends a connectHTTPTunnel filter to the current pipeline layout.

    A connectHTTPTunnel filter implements HTTP tunnel on the client side.

    • INPUT - Data stream to send to the server via HTTP tunnel.
    • OUTPUT - Data stream received from the server via HTTP tunnel.
    • SUB-INPUT - Data stream to send to the server with a leading HTTP CONNECT request Message.
    • SUB-OUTPUT - Data stream received from the server with a leading HTTP CONNECT response Message.

    connectSOCKS(target)

    Appends a connectSOCKS filter to the current pipeline layout.

    A connectSOCKS filter implements SOCKS protocol on the client side.

    • INPUT - Data stream to send to the server via SOCKS.
    • OUTPUT - Data stream received from the server via SOCKS.
    • SUB-INPUT - Data stream to send to the server with a leading SOCKS connection request.
    • SUB-OUTPUT - Data stream received from the server with a leading SOCKS connection response.

    connectTLS(options?)

    Appends a connectTLS filter to the current pipeline layout.

    A connectTLS filter implements TLS protocol on the client side.

    • INPUT - Data stream to send to the server via TLS.
    • OUTPUT - Data stream received from the server via TLS.
    • SUB-INPUT - TLS-encrypted Data stream to send to the server.
    • SUB-OUTPUT - TLS-encrypted Data stream received from the server.

    decodeDubbo()

    Appends a decodeDubbo filter to the current pipeline layout.

    A decodeDubbo filter decodes Dubbo messages from a raw byte stream.

    • INPUT - Data stream to decode Dubbo messages from.
    • OUTPUT - Dubbo Messages decoded from the input Data stream.

    decodeHTTPRequest()

    Appends a decodeHTTPRequest filter to the current pipeline layout.

    A decodeHTTPRequest filter decodes HTTP/1 request messages from a raw byte stream.

    • INPUT - Data stream to decode HTTP/1 request messages from.
    • OUTPUT - HTTP/1 request Messages decoded from the input Data stream.

    decodeHTTPResponse(options?)

    Appends a decodeHTTPResponse filter to the current pipeline layout.

    A decodeHTTPResponse filter decodes HTTP/1 response messages from a raw byte stream.

    • INPUT - Data stream to decode HTTP/1 response messages from.
    • OUTPUT - HTTP/1 response Messages decoded from the input Data stream.

    decodeMQTT(options?)

    Appends a decodeMQTT filter to the current pipeline layout.

    A decodeMQTT filter decodes MQTT packets from a raw byte stream.

    • INPUT - Data stream to decode MQTT packets from.
    • OUTPUT - MQTT packets (Messages) decoded from the input Data stream.

    decodeMultipart()

    Appends a decodeMultipart filter to the current pipeline layout.

    A decodeMultipart filter decodes parts from MIME multipart messages.

    • INPUT - Messages to decode as MIME multipart format.
    • OUTPUT - Parts (Messages) decoded from the input MIME multipart messages.

    decodeWebSocket()

    Appends a decodeWebSocket filter to the current pipeline layout.

    A decodeWebSocket filter decodes WebSocket messages from a raw byte stream.

    • INPUT - Data stream to decode WebSocket messages from.
    • OUTPUT - WebSocket Messages decoded from the input Data stream.

    decompressHTTP(enable?)

    Appends a decompressHTTP filter to the current pipeline layout.

    A decompressHTTP filter decompresses HTTP messages.

    • INPUT - HTTP Messages to decompress.
    • OUTPUT - Decompressed HTTP Messages.

    decompressMessage(algorithm)

    Appends a decompressMessage filter to the current pipeline layout.

    A decompressMessage filter decompresses messages.

    • INPUT - Messages to decompress.
    • OUTPUT - Decompressed Messages.

    deframe(states)

    Appends a deframe filter to the current pipeline layout.

    demux()

    Appends a demux filter to the current pipeline layout.

    A demux filter distributes each input Message to a separate sub-pipeline.

    • INPUT - Messages to distribute to different sub-pipelines.
    • OUTPUT - No output.
    • SUB-INPUT - A Message streaming into the demux filter.
    • SUB-OUTPUT - Disgarded.

    demuxHTTP(options?)

    Appends a demuxHTTP filter to the current pipeline layout.

    A demuxHTTP filter implements HTTP/1 and HTTP/2 protocol on the server side.

    • INPUT - Data stream received from the client with HTTP/1 or HTTP/2 requests.
    • OUTPUT - Data stream to send to the client with HTTP/1 or HTTP/2 responses.
    • SUB-INPUT - HTTP request Message received from the client.
    • SUB-OUTPUT - HTTP response Message to send to the client.

    demuxQueue()

    Appends a demuxQueue filter to the current pipeline layout.

    A demuxQueue filter distributes each input Message to a separate sub-pipeline and outputs Messages streaming out from those sub-pipelines in the same order as they are in the input.

    • INPUT - Messages to distribute to different sub-pipelines.
    • OUTPUT - Messages streaming out from the sub-pipelines.
    • SUB-INPUT - A Message streaming into the demuxQueue filter.
    • SUB-OUTPUT - A Message to stream out the demuxQueue filter.

    depositMessage(filename, options?)

    Appends a depositMessage filter to the current pipeline layout.

    A depositMessage filter buffers a whole message body in a temporary file.

    • INPUT - Message to store in a file.
    • OUTPUT - The same Message as input.

    detectProtocol(handler)

    Appends a detectProtocol filter to the current pipeline layout.

    A detectProtocol filter calls a user function to notify what protocol the input Data stream is.

    • INPUT - Data stream to detect protocol for.
    • OUTPUT - The same Data stream as input.

    dummy()

    Appends a dummy filter to the current pipeline layout.

    A dummy filter discards all its input Events and outputs nothing.

    • INPUT - Any types of Events.
    • OUTPUT - Nothing.

    dump(tag?)

    Appends a dump filter to the current pipeline layout.

    A dump filter prints out all its input Events to the standard error.

    • INPUT - Any types of Events.
    • OUTPUT - The same Events from the input.

    encodeDubbo()

    Appends an encodeDubbo filter to the current pipeline layout.

    An encodeDubbo filter encodes Dubbo messages into a raw byte stream.

    • INPUT - Dubbo Messages to encode.
    • OUTPUT - Encoded Data stream from the input Dubbo messages.

    encodeHTTPRequest()

    Appends an encodeHTTPRequest filter to the current pipeline layout.

    An encodeHTTPRequest filter encodes HTTP/1 request messages into a raw byte stream.

    • INPUT - HTTP/1 request Messages to encode.
    • OUTPUT - Encoded Data stream from the input HTTP/1 request messages.

    encodeHTTPResponse(options?)

    Appends an encodeHTTPResponse filter to the current pipeline layout.

    An encodeHTTPResponse filter encodes HTTP/1 response messages into a raw byte stream.

    • INPUT - HTTP/1 response Messages to encode.
    • OUTPUT - Encoded Data stream from the input HTTP/1 response messages.

    encodeMQTT()

    Appends an encodeMQTT filter to the current pipeline layout.

    An encodeMQTT filter encodes MQTT packets into a raw byte stream.

    • INPUT - MQTT packets (Messages) to encode.
    • OUTPUT - Encoded Data stream from the input MQTT packets.

    encodeWebSocket()

    Appends an encodeWebSocket filter to the current pipeline layout.

    An encodeWebSocket filter encodes WebSocket messages into a raw byte stream.

    • INPUT - WebSocket Messages to encode.
    • OUTPUT - Encoded Data stream from the input WebSocket messages.

    exec(command)

    Appends an exec filter to the current pipeline layout.

    An exec filter starts a child process and links to its standard input and output.

    • INPUT - The child process's standard input Data stream.
    • OUTPUT - The child process's standard output Data stream.

    export(namespace, variables)

    Defines context variables that are accessible to other modules.

    fork(startupValues?)

    Appends a fork filter to the current pipeline layout.

    A fork filter clones Events to one or more sub-pipelines.

    • INPUT - Any types of Events to clone.
    • OUTPUT - Same Events as input.
    • SUB-INPUT - Cloned Events from the fork filter's input.
    • SUB-OUTPUT - Discarded.

    handleData(handler)

    Appends a handleData filter to the current pipeline layout.

    A handleData filter calls back user scripts every time a Data event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessage(handler)

    Appends a handleMessage filter to the current pipeline layout.

    A handleMessage filter calls back user scripts every time a complete message (Message) is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessageBody(handler)

    Appends a handleMessageData filter to the current pipeline layout.

    A handleMessageData filter calls back user scripts every time a complete message body (Data) is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessageEnd(handler)

    Appends a handleMessageEnd filter to the current pipeline layout.

    A handleMessageEnd filter calls back user scripts every time a MessageEnd event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleMessageStart(handler)

    Appends a handleMessageStart filter to the current pipeline layout.

    A handleMessageStart filter calls back user scripts every time a MessageStart event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleStreamEnd(handler)

    Appends a handleStreamEnd filter to the current pipeline layout.

    A handleStreamEnd filter calls back user scripts every time a StreamEnd event is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleStreamStart(handler)

    Appends a handleStreamStart filter to the current pipeline layout.

    A handleStreamStart filter calls back user scripts for the first Event in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    handleTLSClientHello(handler)

    Appends a handleTLSClientHello filter to the current pipeline layout.

    A handleTLSClientHello filter calls back user scripts when a TLS client hello message is found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as input.

    import(variables)

    Imports context variables defined and exported from other modules.

    input(callback?)

    Appends an input filter to the current pipeline layout.

    An input filter starts a sub-pipeline where Events are outputted via output filters.

    • INPUT - Any types of Events.
    • OUTPUT - Events outputted from output filters in the sub-pipeline.

    link(pipelineLayoutName)

    Appends a link filter to the current pipeline layout.

    A link filter starts a sub-pipeline and streams events through it.

    • INPUT - Any types of Events to stream into the sub-pipeline.
    • OUTPUT - Events streaming out from the sub-pipeline.
    • SUB-INPUT - Events streaming into the link filter.
    • SUB-OUTPUT - Any types of Events.

    listen(port, options?)

    Creates a port pipeline layout for incoming TCP/UDP connections on a port.

    A port pipeline has the following input/output:

    • INPUT - Data stream received from the client.
    • OUTPUT - Data stream to send to the client.

    mux(target, options?)

    Appends a mux filter to the current pipeline layout.

    Multiple mux filters merge input Messages into a shared sub-pipeline.

    • INPUT - A Message to queue into the shared sub-pipeline.
    • OUTPUT - The same Message as input.
    • SUB-INPUT - Messages from multiple mux filters.
    • SUB-OUTPUT - Discarded.

    mux(options?)

    Appends a mux filter that merges to the same target sub-pipline as other mux filters coming from the same inbound connection.

    muxHTTP(target, options?)

    Appends a muxHTTP filter to the current pipeline layout.

    A muxHTTP filter implements HTTP/1 and HTTP/2 protocol on the client side.

    • INPUT - HTTP request Message to send to the server.
    • OUTPUT - HTTP response Message received from the server.
    • SUB-INPUT - Data stream to send to the server with HTTP/1 or HTTP/2 requests.
    • SUB-OUTPUT - Data stream received from the server with HTTP/1 or HTTP/2 responses.

    muxHTTP(options?)

    Appends a muxHTTP filter that merges to the same target sub-pipline as other muxHTTP filters coming from the same inbound connection.

    muxQueue(target, options?)

    Appends a muxQueue filter to the current pipeline layout.

    Multiple muxQueue filters queue input Messages into a shared sub-pipeline as well as dequeue output Messages from the sub-pipeline.

    • INPUT - A Message to queue into the shared sub-pipeline.
    • OUTPUT - A Message dequeued from the shared sub-pipeline.
    • SUB-INPUT - Messages from multiple muxQueue filters.
    • SUB-OUTPUT - Messages to be dequeued by multiple muxQueue filters.

    muxQueue(options?)

    Appends a muxQueue filter that merges to the same target sub-pipline as other muxQueue filters coming from the same inbound connection.

    onEnd(handler)

    Registers a function to be called when a pipeline is destroyed.

    onStart(handler)

    Registers a function to be called when a pipeline is created.

    output(out?)

    Appends an output filter to the current pipeline layout.

    An output filter forwards its input Events to the output of an input filter.

    • INPUT - Any types of Events.
    • OUTPUT - Nothing.

    pack(batchSize?, options?)

    Appends a pack filter to the current pipeline layout.

    A pack filter combines multiple input messages into one.

    • INPUT - Stream of Messages to combine.
    • OUTPUT - Stream of combined Messages.

    pipeline(name?)

    Creates a sub-pipeline layout.

    A sub-pipeline has the following input/output:

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    print()

    Appends a print filter to the current pipeline layout.

    A print filter prints Data to the standard error.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    read(filename)

    Creates a file pipeline layout for reading from a file.

    A file pipeline has the following input/output:

    • INPUT - Data stream from the file.
    • OUTPUT - Discarded.

    replaceData(handler?)

    Appends a replaceData filter to the current pipeline layout.

    A replaceData filter calls back user scripts to get a replacement for each Data event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessage(handler?)

    Appends a replaceMessage filter to the current pipeline layout.

    A replaceMessage filter calls back user scripts to get a replacement for each complete message (Message) found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessageBody(handler?)

    Appends a replaceMessageBody filter to the current pipeline layout.

    A replaceMessageBody filter calls back user scripts to get a replacement for each complete message body (Data) found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessageEnd(handler?)

    Appends a replaceMessageEnd filter to the current pipeline layout.

    A replaceMessageEnd filter calls back user scripts to get a replacement for each MessageEnd event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceMessageStart(handler?)

    Appends a replaceMessageStart filter to the current pipeline layout.

    A replaceMessageStart filter calls back user scripts to get a replacement for each MessageStart event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceStreamEnd(handler?)

    Appends a replaceStreamEnd filter to the current pipeline layout.

    A replaceStreamEnd filter calls back user scripts to get a replacement for each StreamEnd event found in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    replaceStreamStart(handler?)

    Appends a replaceStreamStart filter to the current pipeline layout.

    A replaceStreamStart filter calls back user scripts to get a replacement for the first event in the input stream.

    • INPUT - Any types of Events.
    • OUTPUT - Any types of Events.

    serveHTTP(handler)

    Appends a serveHTTP filter to the current pipeline layout.

    A serveHTTP filter calls back user scripts to get an output HTTP response for each HTTP request found in the input stream.

    • INPUT - Data stream containing HTTP requests received from the client.
    • OUTPUT - Data stream containing HTTP responses to send to the client.

    split(separator)

    Appends a split filter to the current pipeline layout.

    A split filter splits an input Message into multiple Messages by a given separator.

    • INPUT - Messages to split.
    • OUTPUT - Messages splitted from the input.

    task(intervalOrSignal?)

    Creates a timer pipeline layout or a signal pipeline layout for a periodic job or a signal.

    A timer pipeline or a signal pipeline has the following input/output:

    • INPUT - Nothing.
    • OUTPUT - Discarded.

    tee(filename)

    Appends a tee filter to the current pipeline layout.

    A tee filter writes input Data to a file.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    throttleConcurrency(quota)

    Appends a throttleConcurrency filter to the current pipeline layout.

    A throttleConcurrency filter limits the number of concurrent streams.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    throttleDataRate(quota)

    Appends a throttleDataRate filter to the current pipeline layout.

    A throttleDataRate filter limits the amout of Data passing through per unit of time.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    throttleMessageRate(quota)

    Appends a throttleMessageRate filter to the current pipeline layout.

    A throttleMessageRate filter limits the number of Messages passing through per unit of time.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input.

    to(pipelineLayout)

    Attaches a sub-pipeline layout to the last joint filter.

    use(filename, pipelineLayoutName?)

    Appends a use filter to the current pipeline layout.

    A use filter creates a sub-pipeline from a pipeline layout in a differen module and streams Events through it.

    • INPUT - Any types of Events to stream into the sub-pipeline.
    • OUTPUT - Events streaming out from the sub-pipeline.
    • SUB-INPUT - Events streaming into the use filter.
    • SUB-OUTPUT - Any types of Events.

    wait(condition)

    Appends a wait filter to the current pipeline layout.

    A wait filter blocks all input Events up until a condition is met.

    • INPUT - Any types of Events.
    • OUTPUT - Same Events as the input

    See Also


    © 2022, Flomesh Team.