Products

中文

中文

  • Pipy 文档
  • Introduction
    • 概述
    • 概念
  • Getting Started
    • 构建和安装
    • 快速入门
    • 获取帮助
  • Release
    • 发布历史
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • 01 你好
    • 02 回显
    • 03 代理
    • 04 路由
    • 05 负载均衡
    • 06 配置
    • 07 插件
  • Operating
    • 开发控制台
    • CLI
    • Pipy Repo
      • 介绍
      • 快速开始
      • 演示
      • REST API
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptProxyProtocol()
        • acceptSOCKS()
        • acceptTLS()
        • admin()
        • branch()
        • branchMessage()
        • branchMessageStart()
        • chain()
        • compress()
        • compressHTTP()
        • connect()
        • connectHTTPTunnel()
        • connectProxyProtocol()
        • connectSOCKS()
        • connectTLS()
        • decodeBGP()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeRESP()
        • decodeThrift()
        • decodeWebSocket()
        • decompress()
        • decompressHTTP()
        • deframe()
        • demux()
        • demuxHTTP()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeBGP()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeRESP()
        • encodeThrift()
        • encodeWebSocket()
        • exec()
        • exit()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • insert()
        • link()
        • listen()
        • loop()
        • mux()
        • muxHTTP()
        • onEnd()
        • onStart()
        • pack()
        • pipeline()
        • print()
        • read()
        • repeat()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • replay()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
        • watch()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
        • toArray()
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • payload
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • decompose()
        • first
        • hostmask
        • ip
        • last
        • mask
        • Netmask()
        • next()
        • size
        • version
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • LoadBalancer
          • allocate()
          • LoadBalancer()
          • provision()
          • schedule()
        • LoadBalancerResource
          • free()
          • target
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • debug()
        • error()
        • info()
        • log()
        • warn()
      • crypto
        • Certificate
          • issuer
          • Certificate()
          • subject
          • subjectAltNames
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • Agent
          • new()
          • request()
        • Directory
          • new()
          • serve()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
      • os
        • env
        • readDir()
        • readFile()
        • stat()
        • unlink()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()

    Configuration.demuxHTTP()

    Description

    Appends a demuxHTTP filter to the current pipeline layout.

    A demuxHTTP filter implements HTTP/1 and HTTP/2 protocol on the server side.

    • INPUT - Data stream received from the client with HTTP/1 or HTTP/2 requests.
    • OUTPUT - Data stream to send to the client with HTTP/1 or HTTP/2 responses.
    • SUB-INPUT - HTTP request Message received from the client.
    • SUB-OUTPUT - HTTP response Message to send to the client.
    demuxHTTPDataDataSub-pipelineMessageMessageSub-pipeline

    A demuxHTTP filter does the following:

    • At the input to a demuxHTTP filter, HTTP requests are de-multiplexed and decoded from the input Data stream
    • For each request, a dedicated sub-pipeline is created, whose input is only one decoded request Message
    • At the output from the demuxHTTP filter, response Messages coming out from all sub-pipelines are encoded and multiplexed into one single output Data stream

    One demuxHTTP filter can have more than one sub-pipelines, each of which handles one HTTP request. Compare this to muxHTTP().

    HTTP versions

    The decoder can detect the version of HTTP protocol used by the input stream and automatically switch to HTTP/2 if requested.

    Multiplexing in HTTP/1.1

    In HTTP/1.1, multiplexing is done by HTTP pipelining. A message queue is used internally inside demuxHTTP so that output responses from the filter will always come in the same order as the input requests, even when the sub-pipelines output their responses out of order, in which case an early-arrived response will stay in the queue waiting for its turn to come out.

    Multiplexing in HTTP/2

    In HTTP/2, multiplexing is inherently supported by the protocol itself. With HTTP/2, every virtual stream in the HTTP/2 transport stream runs in a dedicated sub-pipeline. There is no shared queue to wait as in HTTP/1.1, so a delayed response from one sub-pipeline won't affect other sub-pipelines.

    Chunked transfer

    When encoding an HTTP/1.x response, the Content-Length header needs to come before the body, so demuxHTTP has to buffer the entire body until it sees a MessageEnd event, only by then can the filter output a value for Content-Length header, followed by the previously buffered body.

    The buffering is limited to 4KB by default. When the buffered data is over 4KB, the encoder will opt for chunked encoding, where a Content-Length header is no longer needed. You can change this limit by the option bufferSize in the options parameter.

    Syntax

    pipy()
    .pipeline('example')
    .demuxHTTP().to(
    subPipelineLayout
    )
    pipy()
    .pipeline('example')
    .demuxHTTP({
    bufferSize,
    }).to(
    subPipelineLayout
    )

    Parameters

    demuxHTTP(options?)

    options?

    Options including:

    • bufferSize - (optional) Maximum body size above which a message should be transferred in chunks. Can be a number in bytes or a string with a unit suffix such as 'k', 'm', 'g' and 't'. Default is 16KB.
    Return Value

    The same Configuration object.

    Example

    pipy()
    .listen(8000) // Accept TCP connection on port 8000
    .demuxHTTP().to( // De-multiplex and decode HTTP requests
    $=>$.muxHTTP().to( // Encode and multiplex the request into a shared TCP stream
    $=>$.connect('localhost:8080') // Send to and receive from localhost:8080
    )
    )

    See Also


    © 2024, Flomesh Team.       ICP备案/许可证号:辽ICP备2023014827号