Products

中文

中文

  • Pipy 文档
  • Introduction
    • 概述
    • 概念
  • Getting Started
    • 构建和安装
    • 快速入门
    • 获取帮助
  • Release
    • 发布历史
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • 01 你好
    • 02 回显
    • 03 代理
    • 04 路由
    • 05 负载均衡
    • 06 配置
    • 07 插件
  • Operating
    • 开发控制台
    • CLI
    • Pipy Repo
      • 介绍
      • 快速开始
      • 演示
      • REST API
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptProxyProtocol()
        • acceptSOCKS()
        • acceptTLS()
        • admin()
        • branch()
        • branchMessage()
        • branchMessageStart()
        • chain()
        • compress()
        • compressHTTP()
        • connect()
        • connectHTTPTunnel()
        • connectProxyProtocol()
        • connectSOCKS()
        • connectTLS()
        • decodeBGP()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeRESP()
        • decodeThrift()
        • decodeWebSocket()
        • decompress()
        • decompressHTTP()
        • deframe()
        • demux()
        • demuxHTTP()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeBGP()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeRESP()
        • encodeThrift()
        • encodeWebSocket()
        • exec()
        • exit()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • insert()
        • link()
        • listen()
        • loop()
        • mux()
        • muxHTTP()
        • onEnd()
        • onStart()
        • pack()
        • pipeline()
        • print()
        • read()
        • repeat()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • replay()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
        • watch()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
        • toArray()
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • payload
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • decompose()
        • first
        • hostmask
        • ip
        • last
        • mask
        • Netmask()
        • next()
        • size
        • version
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • LoadBalancer
          • allocate()
          • LoadBalancer()
          • provision()
          • schedule()
        • LoadBalancerResource
          • free()
          • target
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • debug()
        • error()
        • info()
        • log()
        • warn()
      • crypto
        • Certificate
          • issuer
          • Certificate()
          • subject
          • subjectAltNames
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • Agent
          • new()
          • request()
        • Directory
          • new()
          • serve()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
      • os
        • env
        • readDir()
        • readFile()
        • stat()
        • unlink()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()

    Configuration.muxHTTP()

    Description

    Appends a muxHTTP filter to the current pipeline layout.

    A muxHTTP filter implements HTTP/1 and HTTP/2 protocol on the client side.

    • INPUT - HTTP request Message to send to the server.
    • OUTPUT - HTTP response Message received from the server.
    • SUB-INPUT - Data stream to send to the server with HTTP/1 or HTTP/2 requests.
    • SUB-OUTPUT - Data stream received from the server with HTTP/1 or HTTP/2 responses.
    muxHTTPMessageMessageSub-pipelineDataDatamuxHTTPMessageMessageDataData

    A muxHTTP filter does the following:

    • At the input to a muxHTTP filter, an HTTP request is merged to a sub-pipeline that is shared by multiple muxHTTP filters
    • At the input to the shared sub-pipeline, HTTP requests from multiple muxHTTP filters are encoded and multiplexed into one single Data stream
    • At the output from the shared sub-pipeline, Data stream is de-multiplexed and decoded before individual responses are distributed back to their corresponding muxHTTP filters
    • At the output from the muxHTTP filter, out come an HTTP response as a Message

    Multiple muxHTTP filters can have one shared sub-pipeline, in which many HTTP requests and responses are multiplexed. Compare this to demuxHTTP().

    Stream sharing

    You can control where a mux filter merges its input Message to by specifying a "merging target". It can be a value of any type or a function that returns it. Filters with the same merging target share and merge to the same sub-pipeline.

    Merging only happens among filters coming from the same place in the same pipeline layout. Two filters from different pipeline layouts or different places in the same pipeline layout will never merge to the same sub-pipeline, even when they have the same merging target.

    Sub-pipeline lifecycle

    Since the sub-pipeline is shared by multiple mux filter instances, it won't be closed until the last mux ends. After all mux filter instances sharing it are done, the sub-pipeline will wait for 60 seconds to ensure no more new mux filters come and merge to it. You can change this waiting time by the maxIdle option in the options parameter. It can be either a number in seconds or a string with a unit suffix such as 's', 'm', 'h', 'd'.

    When the merging target is an object, it will be a weak ref, just like a key in a WeakMap. When the object is dead, so is the sub-pipeline being weakly referenced by the object, regardless of the idleTime option.

    HTTP versions

    You can select between HTTP/1.1 and HTTP/2 by using the option version in the options parameter. It can be 1 for HTTP/1.1, or 2 for HTTP/2. You can also specify a callback function that gets called at session start and returns the desired protocol version. Default value is 1.

    Multiplexing in HTTP/1.1

    In HTTP/1.1, multiplexing is done by HTTP pipelining. A message queue is used internally by each group of muxHTTP filters sharing the same sub-pipeline, so that each filter can pick the right response for its request from the sub-pipeline's output.

    Multiplexing in HTTP/2

    In HTTP/2, multiplexing is inherently supported by the protocol itself. With HTTP/2, every muxHTTP filter runs a dedicated virtual stream in an HTTP/2 transport stream. There is no shared queue to wait as in HTTP/1.1, so responses to the requests can arrive out of order without having to wait for each other.

    Chunked transfer

    When encoding an HTTP/1.x request, the Content-Length header needs to come before the body, so muxHTTP has to buffer the entire body until it sees a MessageEnd event, only by then can the filter output a value for Content-Length header, followed by the previously buffered body.

    The buffering is limited to 4KB by default. When the buffered data is over 4KB, the encoder will opt for chunked encoding, where a Content-Length header is no longer needed. You can change this limit by the option bufferSize in the options parameter.

    Syntax

    pipy()
    .pipeline()
    .muxHTTP().to(
    subPipelineLayout
    )
    pipy()
    .pipeline()
    .muxHTTP(
    () => whereToMerge()
    ).to(
    subPipelineLayout
    )
    pipy()
    .pipeline()
    .muxHTTP({
    maxIdle,
    maxQueue,
    bufferSize,
    version,
    }).to(
    subPipelineLayout
    )
    pipy()
    .pipeline()
    .muxHTTP(
    () => whereToMerge(),
    {
    maxIdle,
    maxQueue,
    bufferSize,
    version,
    }
    ).to(
    subPipelineLayout
    )

    Parameters

    muxHTTP(sessionSelector, options?)

    sessionSelector

    A function that returns a key identifiying the shared sub-pipeline to merge messages to.

    options?

    Options or a function that returns the options including:

    • maxIdle - Maximum time an idle sub-pipeline should stay around. Can be a number in seconds or a string with one of the time unit suffixes such as s, m or h. Defaults is 60 seconds.
    • maxQueue - Maximum number of messages allowed to run concurrently in one sub-pipeline.
    • maxMessages - Maximum number of messages allowed to run accumulatively in one sub-pipeline.
    • bufferSize - Maximum body size above which a message should be transferred in chunks. Can be a number in bytes or a string with a unit suffix such as 'k', 'm', 'g' and 't'. Default is 16KB.
    • version - Number 1 for HTTP/1 or number 2 for HTTP/2. Can also be a function that returns 1 or 2.
    Return Value

    The same Configuration object.

    muxHTTP(options?)

    Appends a muxHTTP filter that merges to the same sub-pipline as other muxHTTP filters coming from the same inbound connection.

    options?

    Options or a function that returns the options including:

    • maxIdle - Maximum time an idle sub-pipeline should stay around. Can be a number in seconds or a string with one of the time unit suffixes such as s, m or h. Defaults is 60 seconds.
    • maxQueue - Maximum number of messages allowed to run concurrently in one sub-pipeline.
    • maxMessages - Maximum number of messages allowed to run accumulatively in one sub-pipeline.
    • bufferSize - Maximum body size above which a message should be transferred in chunks. Can be a number in bytes or a string with a unit suffix such as 'k', 'm', 'g' and 't'. Default is 16KB.
    • version - Number 1 for HTTP/1 or number 2 for HTTP/2. Can also be a function that returns 1 or 2.
    Return Value

    The same Configuration object.

    Example

    pipy()
    .listen(8000) // Accept TCP connection on port 8000
    .demuxHTTP().to( // De-multiplex and decode HTTP requests
    $=>$.muxHTTP().to( // Encode and multiplex the request into a shared TCP stream
    $=>$.connect('localhost:8080') // Send to and receive from localhost:8080
    )
    )

    See Also


    © 2024, Flomesh Team.       ICP备案/许可证号:辽ICP备2023014827号