• Pipy Documentation
  • Introduction
    • Overview
    • Concepts
  • Getting Started
    • Build and Install
    • Quick Start
    • Getting help
  • Release
    • Release History
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • Part 1: Hello
    • Part 2: Echo
    • Part 3: Proxy
    • Part 4: Routing
    • Part 5: Loading Balancing
    • Part 6: Configuration
    • Part 7: Plugins
  • Operating
    • Admin GUI
    • CLI
    • Pipy Repo
      • Introduction
      • Quick Start
      • Demo
      • REST API
    • Statistics
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptSOCKS()
        • acceptTLS()
        • branch()
        • chain()
        • compressHTTP()
        • compressMessage()
        • connect()
        • connectHTTPTunnel()
        • connectSOCKS()
        • connectTLS()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeWebSocket()
        • decompressHTTP()
        • decompressMessage()
        • deframe()
        • demux()
        • demuxHTTP()
        • demuxQueue()
        • depositMessage()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeWebSocket()
        • exec()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • input()
        • link()
        • listen()
        • mux()
        • muxHTTP()
        • muxQueue()
        • onEnd()
        • onStart()
        • output()
        • pack()
        • pipeline()
        • print()
        • read()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • first
        • hostmask
        • last
        • mask
        • Netmask()
        • next()
        • size
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • log()
      • crypto
        • Certificate
          • Certificate()
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • File
          • from()
          • toMessage()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
      • os
        • env
        • readFile()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()
    • PipyJS
      • Language
      • Builtin Objects

    Concepts

    Stream

    Pipy is a proxy that works like a "stream processor". It gulps in streams, processes streams, and spews out streams.

    Inbound and outbound

    Streams coming from a client (or downstream) are called inbound streams. Streams going to a server (or upstream) are called outbound streams.

    image/svg+xmlInboundOutboundinputoutputinputoutputoutputinputoutputinput

    Both inbound side and outbound side have input and output streams:

    • On the inbound side, input streams go from the client to Pipy and output streams go from Pipy back to the client.
    • On the outbound side, output streams go from Pipy to the server and input streams go from the server back to Pipy.

    Events

    A Pipy stream is a series of events. There are four types of events:

    • Data
    • MessageStart
    • MessageEnd
    • StreamEnd

    An input stream coming from outside of Pipy is a series of Data events, ended by a StreamEnd event. Each Data event holds a chunk of bytes received from the I/O.

    For example, a typical HTTP request would be like this:

    image/svg+xmlData"POST / HTTP/1.1\r\n""Host: xyz.com\r\n"DataData"Hello!""Content-Length: 6\r\n""\r\n"Input streamStreamEnd

    What Pipy does is process the events in the input stream. Some are transformed, some are discarded. New events can be inserted too. Those new events also include other types of events besides Data and StreamEnd, namely MessageStart and MessageEnd. These non-Data events are used inside Pipy as "markers" to give the original raw bytes some higher-level meanings for the business logic you are running.

    For example, the above input stream would be decoded into an HTTP request message wrapped between a pair of MessageStart and MessageEnd events. That message is transformed into an HTTP response message and encoded into a series of Data events before going to the output.

    image/svg+xmlMessageStart{ method, path,headers }DataMessageEnd"Hello!"Internal stream AMessageStart{ status, headers }DataMessageEnd"OK"Internal stream BtransformStreamEndStreamEndInput streamdecodeOutput streamencode

    Eventually, after all the processing, the stream of events are sent to an output. At this point, MessageStart and MessageEnd events are discarded, only Data and StreamEnd events are spewed out.

    image/svg+xmlData"HTTP/1.1 200 OK\r\n"DataData"OK""Content-Length: 2\r\n""\r\n"Output streamStreamEnd

    Filters and pipelines

    An intuitive way to understand how Pipy works is to think of it as Unix pipelines. The only thing that's changed from Unix pipelines is that we are dealing with streams of events, rather than bytes.

    An input stream is processed through a chain of filters inside of Pipy. Each filter works somewhat like a tiny Unix process that reads from its input (stdin) and writes to its output (stdout). The output from one filter is connected to the input to the next.

    image/svg+xmleventsFiltereventsFiltereventsFilterevents

    A chain of filters is called a pipeline. There are 5 types of pipelines, categorized by various input sources:

    Port pipeline

    A port pipeline is created whenever there's an incoming TCP connection (or a UDP virtual session) on a listening port. It then reads Data events from the connection, processes them, and writes output back to the client. This resembles the widely adopted "request and response" communication model as in HTTP, where the input to the pipeline is the requests and the output from the pipeline is the responses. Every incoming connection to Pipy has a dedicated port pipeline to it, handling the two-way communication happening in that connection.

    File pipeline

    A file pipeline is created to read from a file. The input to the pipeline is the data from that file, in the form of a series of Data events. Any output from the pipeline is simply discarded.

    Timer pipeline

    A timer pipeline can be created periodically. The pipeline can generate any sorts of inputs it needs at creation time. Whatever it outputs is simply discarded after all the processing in its filters. This type of pipeline can be used to carry out cron job-like tasks.

    Signal pipeline

    A signal pipeline is created when a signal is received by the Pipy process. The pipeline can generate any sorts of inputs it needs at creation time. Whatever it outputs is simply discarded after all the processing in its filters. This type of pipeline is useful when certain tasks need to be carried out at the time of a signal.

    Sub-pipeline

    A sub-pipeline is a pipeline that can be started from other pipelines by using a joint filter. The most basic joint filter, besides a couple of others, is link(). It receives events from the filter before it in its own pipeline, pumps them to a sub-pipeline for processing, reads back whatever that sub-pipeline outputs and pumps them all down to its next filter.

    image/svg+xmleventsFiltereventsJointFiltereventsFiltereventseventsFiltereventsFilterFilterSub-pipelineMain pipelineeventsevents

    One way to understand joint filters and sub-pipelines is compare them to callers and callees when calling a sub-routine in procedural programming. The input to the joint filter is like parameters to the sub-routine, and the output is like return values from it.

    Unlike sub-pipelines, the other types of pipelines, namely port pipelines, file pipelines, timer pipelines and signal pipelines, cannot be "called" internally from a joint filter. They can only be started from external sources. We call these pipelines root pipelines.

    Module

    A module is a PipyJS source file containing scripts that define a set of pipeline layouts.

    A pipeline layout tells Pipy what filters a pipeline has and in what order. Note that defining a pipeline layout in a module doesn't create any pipelines at that moment. It only tells what pipelines look like when they're actually created at runtime to handle some inputs, though in some cases, when the meaning is obvious, we use the term "pipeline" for "pipeline layout" just for brevity.

    Context

    A context is a set of variables attached to a pipeline, used by scripts to maintain the current state of the pipeline.

    All pipelines use the same set of context variables inside a specific Pipy module. In other words, all contexts are of the same "shape" for all pipelines in a single module. But different modules can have different context shapes. When you start a Pipy module, the first thing is define the shape of the context to be used in that module by calling the built-in function pipy(). It tells Pipy what context variables it's going to use and their initial values.

    Although all pipelines from the same module have exactly the same set of context variables, each pipeline can have its own variable "values" that are isolated from others, or let's say, each pipeline can have its own "state".

    image/svg+xmlFilterPipeline layout AContext shapevar1var2var3varNFilterFilterFilterPipeline layout BFilterFilterFilterPipelineFilterFilterContext (state)var1var2var3varNModuleeventseventsclone

    To the scripts in a module, these context variables are used like global variables in other programming languages, which means that they are always accessible to the scripts from anywhere in the same module file. However, they are NOT the global variables you know in the common sense.

    "Global variable" usually means "globally unique", you should only have one single state of these variables at a given time, whereas in Pipy you can have many separate states of them depending on how many different contexts you have. If you are familiar with multi-thread programming concepts, you can correlate this to TLS (thread-local storage), where TLS variables can have different states across different threads, or by the terms of Pipy, context variables can have different states across different pipelines.


    © 2022, Flomesh Team.