Products

中文

中文

  • Pipy 文档
  • Introduction
    • 概述
    • 概念
  • Getting Started
    • 构建和安装
    • 快速入门
    • 获取帮助
  • Release
    • 发布历史
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • 01 你好
    • 02 回显
    • 03 代理
    • 04 路由
    • 05 负载均衡
    • 06 配置
    • 07 插件
  • Operating
    • 开发控制台
    • CLI
    • Pipy Repo
      • 介绍
      • 快速开始
      • 演示
      • REST API
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptProxyProtocol()
        • acceptSOCKS()
        • acceptTLS()
        • admin()
        • branch()
        • branchMessage()
        • branchMessageStart()
        • chain()
        • compress()
        • compressHTTP()
        • connect()
        • connectHTTPTunnel()
        • connectProxyProtocol()
        • connectSOCKS()
        • connectTLS()
        • decodeBGP()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeRESP()
        • decodeThrift()
        • decodeWebSocket()
        • decompress()
        • decompressHTTP()
        • deframe()
        • demux()
        • demuxHTTP()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeBGP()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeRESP()
        • encodeThrift()
        • encodeWebSocket()
        • exec()
        • exit()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • insert()
        • link()
        • listen()
        • loop()
        • mux()
        • muxHTTP()
        • onEnd()
        • onStart()
        • pack()
        • pipeline()
        • print()
        • read()
        • repeat()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • replay()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
        • watch()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
        • toArray()
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • payload
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • decompose()
        • first
        • hostmask
        • ip
        • last
        • mask
        • Netmask()
        • next()
        • size
        • version
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • LoadBalancer
          • allocate()
          • LoadBalancer()
          • provision()
          • schedule()
        • LoadBalancerResource
          • free()
          • target
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • debug()
        • error()
        • info()
        • log()
        • warn()
      • crypto
        • Certificate
          • issuer
          • Certificate()
          • subject
          • subjectAltNames
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • Agent
          • new()
          • request()
        • Directory
          • new()
          • serve()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
      • os
        • env
        • readDir()
        • readFile()
        • stat()
        • unlink()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()

    概念

    Pipy 是一个代理,它的工作方式像一个 “流处理器”:吞入、处理、吐出

    入站和出站

    来自客户端(或者下游)的流称为入站流;发往服务器(或者上游)的流称为出站流

    image/svg+xmlInboundOutboundinputoutputinputoutputoutputinputoutputinput

    无论入站侧还是出站侧,都既有输入的流也有输出的流:

    • 在入站一侧,输入流从客户端流向 Pipy,而输出流则从 Pipy 返回客户端。
    • 在出站一侧,输出流从 Pipy 流向服务器,而输入流从服务器返回 Pipy。

    事件

    一个 Pipy 流是一系列的事件。事件类型有四种:

    • Data
    • MessageStart
    • MessageEnd
    • StreamEnd

    来自 Pipy 外部的输入流由一系列 Data 事件组成,并由 StreamEnd 事件结束。每个 Data 事件持有一小块来自 I/O 的字节数据。

    例如,一个典型的 HTTP 请求类似这样:

    image/svg+xmlData"POST / HTTP/1.1\r\n""Host: xyz.com\r\n"DataData"Hello!""Content-Length: 6\r\n""\r\n"Input streamStreamEnd

    Pipy 要做的是,处理输入流中的事件:有些会被转换,有些会被丢弃,也有些新的事件会加入。这些新生成的事件也包括除 DataStreamEnd 之外的其他类型: 即 MessageStartMessageEnd。这些非数据事件被用作 “标记”,为承载业务逻辑的原始字节流提供更高级的语义。

    例如,上面的输入流会被解码成一个 HTTP 请求消息,并以一对 MessageStartMessageEnd 事件包围。这个消息接着会被变换成一个 HTTP 响应消息,然后被编码成一系列 Data 事件,最后去往输出。

    image/svg+xmlMessageStart{ method, path,headers }DataMessageEnd"Hello!"Internal stream AMessageStart{ status, headers }DataMessageEnd"OK"Internal stream BtransformStreamEndStreamEndInput streamdecodeOutput streamencode

    最终,经过所有的处理,事件流被发送到输出。此时,MessageStartMessageEnd 事件被丢弃,只有 DataStreamEnd 事件被吐出来。

    image/svg+xmlData"HTTP/1.1 200 OK\r\n"DataData"OK""Content-Length: 2\r\n""\r\n"Output streamStreamEnd

    过滤器和管道

    理解管道(Pipeline)的直观办法是将其想象成 Unix 管道。唯一与 Unix 管道不同的是 Pipy 处理的是事件流而不是字节流。

    Pipy 内部通过一串过滤器(Filter)来处理进来的流。每个过滤器有点像一个小型 Unix 进程,从标准输入(stdin)读、往标准输出(stdout)写,上一个过滤器的输出就是下一个过滤器的输入。

    image/svg+xmleventsFiltereventsFiltereventsFilterevents

    过滤器连成一串就叫作 管道。按其输入源的类型,管道分成四类:

    端口管道

    端口管道(port pipeline) 会在某个监听端口上有新的 TCP 连接(或者 UDP 虚拟会话)接入时被创建。它从网络端口读取并处理 Data 事件,然后将结果写回客户端。这就类似于广泛采用的 HTTP 请求响应 通信模型,管道的输入是请求,输出是响应。每个进来的连接都有一个专属的端口管道,用于处理该连接上的双向通信。

    定时器管道

    定时器管道(timer pipeline) 可以被周期性地创建。该管道在创建时能够产生任意所需的输入,经过其内部所有过滤器的处理后,一切输出仅被简单丢弃。该类型的管道可以被用来执行类似 cron job 的任务。

    信号管道

    当 Pipy 进程接收到信号时,一个 信号管道(signal pipeline) 会被创建。在创建时,信号管道能够产生任意所需的输入,经过其内部所有过滤器的处理后,一切输出仅被简单丢弃。该类型的管道可用于要在某些信号发生时执行的任务。

    子管道

    子管道是可以通过接合过滤器(joint filter)从其他管道启动的管道。最基本的接合过滤器是 link()。该过滤器从它所在管道中的上一个过滤器接收事件,将它们输送到子管道进行处理,并回读该子管道的全部输出,然后输送给下一个过滤器。

    image/svg+xmleventsFiltereventsJointFiltereventsFiltereventseventsFiltereventsFilterFilterSub-pipelineMain pipelineeventsevents

    子管道接合过滤器就像在过程式编程里调用一个子例程时的被调用方调用方。接合过滤器的输入是子例程的参数,接合过滤器的输出是它的返回值。

    与子管道不同,其他类型的管道——端口管道文件管道定时器管道信号管道——不能被接合过滤器从内部 “调用”,它们只能从外部源来启动。我们称这些管道为根管道(root pipelines)

    模块

    模块 是一个 PipyJS 源文件,里面的脚本定义了一系列管道布局

    管道布局规定了管道包含哪些过滤器及其顺序。在模块中配置管道布局并不会立即创建管道。此时它只是定义了管道的样子,只有在运行中处理输入的时候才会创建管道。但有些情况下,当含义很明确时,为了简洁,我们使用术语 “管道” 来指代 “管道布局”

    上下文

    上下文(Context) 是归属于管道的一套变量,脚本利用它来维护一个管道的当前状态。

    一个 Pipy 模块里的所有管道都使用同样一套变量。换句话说,一个模块里的所有管道,它们的上下文具有相同的 “形态”,而不同模块的上下文则可以具有不同形态。在实现一个 Pipy 模块时,要做的第一件事就是通过调用内置函数 pipy() 来定义模块中使用的上下文的形态,也就是它有哪些变量,以及变量的初始值。

    虽然一个模块里的所有管道具有完全相同的一套上下文变量,但每个管道可以拥有与其他管道隔离的变量 “值”,或者说,每个管道都有其自己的 “状态”

    image/svg+xmlFilterPipeline layout AContext shapevar1var2var3varNFilterFilterFilterPipeline layout BFilterFilterFilterPipelineFilterFilterContext (state)var1var2var3varNModuleeventseventsclone

    对于模块中的脚本来说,上下文变量就像全局变量一样,从同一个模块文件里的任何位置都可以通过脚本访问这些变量。然而,它们 不是 我们通常所理解的那种 全局变量

    “全局变量” 通常意味着 “全局唯一”,在某个时刻,这些变量应该只有一个状态,而在 Pipy 中,根据当前不同上下文的数量,这些变量可以有多个不同状态。如果熟悉多线程编程的概念,可以把上下文理解成 TLS(线程局部存储),不同的线程可以有处于不同状态的线程局部变量,或者就 Pipy 来说,上下文变量在不同管道中的状态可以不同。


    © 2024, Flomesh Team.       ICP备案/许可证号:辽ICP备2023014827号