Products

中文

中文

  • Pipy 文档
  • Introduction
    • 概述
    • 概念
  • Getting Started
    • 构建和安装
    • 快速入门
    • 获取帮助
  • Release
    • 发布历史
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • 01 你好
    • 02 回显
    • 03 代理
    • 04 路由
    • 05 负载均衡
    • 06 配置
    • 07 插件
  • Operating
    • 开发控制台
    • CLI
    • Pipy Repo
      • 介绍
      • 快速开始
      • 演示
      • REST API
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptProxyProtocol()
        • acceptSOCKS()
        • acceptTLS()
        • admin()
        • branch()
        • branchMessage()
        • branchMessageStart()
        • chain()
        • compress()
        • compressHTTP()
        • connect()
        • connectHTTPTunnel()
        • connectProxyProtocol()
        • connectSOCKS()
        • connectTLS()
        • decodeBGP()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeRESP()
        • decodeThrift()
        • decodeWebSocket()
        • decompress()
        • decompressHTTP()
        • deframe()
        • demux()
        • demuxHTTP()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeBGP()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeRESP()
        • encodeThrift()
        • encodeWebSocket()
        • exec()
        • exit()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • insert()
        • link()
        • listen()
        • loop()
        • mux()
        • muxHTTP()
        • onEnd()
        • onStart()
        • pack()
        • pipeline()
        • print()
        • read()
        • repeat()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • replay()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
        • watch()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
        • toArray()
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • payload
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • decompose()
        • first
        • hostmask
        • ip
        • last
        • mask
        • Netmask()
        • next()
        • size
        • version
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • LoadBalancer
          • allocate()
          • LoadBalancer()
          • provision()
          • schedule()
        • LoadBalancerResource
          • free()
          • target
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • debug()
        • error()
        • info()
        • log()
        • warn()
      • crypto
        • Certificate
          • issuer
          • Certificate()
          • subject
          • subjectAltNames
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • Agent
          • new()
          • request()
        • Directory
          • new()
          • serve()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
      • os
        • env
        • readDir()
        • readFile()
        • stat()
        • unlink()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()

    Configuration.mux()

    Description

    Appends a mux filter to the current pipeline layout.

    Multiple mux filters merge input Messages into a shared sub-pipeline.

    • INPUT - A Message to queue into the shared sub-pipeline.
    • OUTPUT - The same Message as input.
    • SUB-INPUT - Messages from multiple mux filters.
    • SUB-OUTPUT - Discarded.
    muxMessageMessageSub-pipelineMessagemuxMessageMessageMessage

    A mux filter does the following:

    • At the input to a mux filter, the Message is merged to a sub-pipeline that is shared by multiple mux filters
    • At the input to the shared sub-pipeline, Messages from multiple mux filters are queued up into one single Message stream
    • Whatever is outputted from the shared sub-pipeline is discared
    • At the output from the mux filter, out come the same Mesasge as its input

    Multiple mux filters can have one shared sub-pipeline, in which many input Messages are queued up.

    Stream sharing

    You can control where a mux filter merges its input Message to by specifying a "merging target". It can be a value of any type or a function that returns it. Filters with the same merging target share and merge to the same sub-pipeline.

    Merging only happens among filters coming from the same place in the same pipeline layout. Two filters from different pipeline layouts or different places in the same pipeline layout will never merge to the same sub-pipeline, even when they have the same merging target.

    Sub-pipeline lifecycle

    Since the sub-pipeline is shared by multiple mux filter instances, it won't be closed until the last mux ends. After all mux filter instances sharing it are done, the sub-pipeline will wait for 60 seconds to ensure no more new mux filters come and merge to it. You can change this waiting time by the maxIdle option in the options parameter. It can be either a number in seconds or a string with a unit suffix such as 's', 'm', 'h', 'd'.

    When the merging target is an object, it will be a weak ref, just like a key in a WeakMap. When the object is dead, so is the sub-pipeline being weakly referenced by the object, regardless of the idleTime option.

    Syntax

    pipy()
    .pipeline()
    .mux().to(
    subPipelineLayout
    )
    pipy()
    .pipeline()
    .mux(
    () => whereToMerge()
    ).to(
    subPipelineLayout
    )
    pipy()
    .pipeline()
    .mux({
    maxIdle,
    maxQueue,
    }).to(
    subPipelineLayout
    )
    pipy()
    .pipeline()
    .mux(
    () => whereToMerge(),
    {
    maxIdle,
    maxQueue,
    }
    ).to(
    subPipelineLayout
    )

    Parameters

    mux(sessionSelector, options?)

    sessionSelector

    A function that returns a key identifiying the shared sub-pipeline to merge messages to.

    options?

    Options or a function that returns the options including:

    • maxIdle - Maximum time an idle sub-pipeline should stay around. Can be a number in seconds or a string with one of the time unit suffixes such as s, m or h. Defaults is 60 seconds.
    • maxQueue - Maximum number of messages allowed to run concurrently in one sub-pipeline.
    • maxMessages - Maximum number of messages allowed to run accumulatively in one sub-pipeline.
    Return Value

    The same Configuration object.

    mux(options?)

    Appends a mux filter that merges to the same sub-pipline as other mux filters coming from the same inbound connection.

    options?

    Options or a function that returns the options including:

    • maxIdle - Maximum time an idle sub-pipeline should stay around. Can be a number in seconds or a string with one of the time unit suffixes such as s, m or h. Defaults is 60 seconds.
    • maxQueue - Maximum number of messages allowed to run concurrently in one sub-pipeline.
    • maxMessages - Maximum number of messages allowed to run accumulatively in one sub-pipeline.
    Return Value

    The same Configuration object.

    See Also


    © 2024, Flomesh Team.       ICP备案/许可证号:辽ICP备2023014827号