Products

中文

中文

  • Pipy 文档
  • Introduction
    • 概述
    • 概念
  • Getting Started
    • 构建和安装
    • 快速入门
    • 获取帮助
  • Release
    • 发布历史
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • 01 你好
    • 02 回显
    • 03 代理
    • 04 路由
    • 05 负载均衡
    • 06 配置
    • 07 插件
  • Operating
    • 开发控制台
    • CLI
    • Pipy Repo
      • 介绍
      • 快速开始
      • 演示
      • REST API
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptProxyProtocol()
        • acceptSOCKS()
        • acceptTLS()
        • admin()
        • branch()
        • branchMessage()
        • branchMessageStart()
        • chain()
        • compress()
        • compressHTTP()
        • connect()
        • connectHTTPTunnel()
        • connectProxyProtocol()
        • connectSOCKS()
        • connectTLS()
        • decodeBGP()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeRESP()
        • decodeThrift()
        • decodeWebSocket()
        • decompress()
        • decompressHTTP()
        • deframe()
        • demux()
        • demuxHTTP()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeBGP()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeRESP()
        • encodeThrift()
        • encodeWebSocket()
        • exec()
        • exit()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • insert()
        • link()
        • listen()
        • loop()
        • mux()
        • muxHTTP()
        • onEnd()
        • onStart()
        • pack()
        • pipeline()
        • print()
        • read()
        • repeat()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • replay()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
        • watch()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
        • toArray()
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • payload
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • decompose()
        • first
        • hostmask
        • ip
        • last
        • mask
        • Netmask()
        • next()
        • size
        • version
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • LoadBalancer
          • allocate()
          • LoadBalancer()
          • provision()
          • schedule()
        • LoadBalancerResource
          • free()
          • target
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • debug()
        • error()
        • info()
        • log()
        • warn()
      • crypto
        • Certificate
          • issuer
          • Certificate()
          • subject
          • subjectAltNames
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • Agent
          • new()
          • request()
        • Directory
          • new()
          • serve()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
      • os
        • env
        • readDir()
        • readFile()
        • stat()
        • unlink()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()

    07 插件

    现在,我们的小代理程序已经有了路由和负载均衡的能力。然而,就在实现它的过程中,我们渐渐意识到自己正在往单个源文件里随意加入越来越多的管道布局,这个文件在快速增长。我们最好对代码做一次重构,这样未来每个新加入的功能就可以放在它自己的文件里,有选择地挂接到系统,也就是所谓的 “插件系统”

    设计

    目前我们已经实现了两个功能:路由和负载均衡。每个都能重构到一个单独的插件里。组合这些插件的方式是把它们排列在一条 上。对于接收到的每个请求,依次访问链上的每个插件,让它们决定是要对请求做出响应,还是略过。既然插件被按照既定次序排列,每个插件也能决定其后的插件是否应该继续处理一个请求。

    image/svg+xmlPlugincontinue?Plugincontinue?Pluginyesyesnono

    下面,来看看我们要创建的插件。

    路由插件

    路由插件的工作是为请求找到路由,找到的路由应该保存在一个上下文变量里,以便其他插件访问。该插件总是把请求传给后面的插件,而不管路由能否找到。它仅在找到路由时用找到的路由 "标记" 一下该请求。

    负载均衡插件

    负载均衡插件负责把请求转发到处理它的服务器。它把路由插件找到的路由映射到一个服务器列表,然后在这些服务器之间分发流量。当路由找不到时,或者没有配置服务器列表时,它就忽略请求,仅仅把它传递给下一个插件。否则,它就以服务器返回的响应中止插件链,因为请求已经由该插件 “处理” 了。

    默认插件

    当请求被以上插件都透传时,也就是说,要么没有找到路由,要么没有服务器来处理,那么最后的一个退路插件会接手。这就是默认插件。它总是回应 404 消息,表示请求没有被代理处理。

    路由插件

    第一个要做的插件是路由插件,我们为它创建一个独立的文件。

    1. 点击 并且输入 /plugins/router.js 作为新文件的名字。点击 Create

    往文件夹里添加文件时,无需先创建文件夹。只需输入路径全名,包括文件夹的名字,文件夹将被自动创建。

    1. /proxy.js 复制完整的内容到 /plugins/router.js

    删除 /plugins/router.js 里几乎全部代码,只保留对 URLRouter.find() 的调用,因为这才是路由插件的核心。然后我们把 handleMessageStart()——这唯一仅存的过滤器——放在 .pipeline() 调用之下。在不带参数的情况下,pipeline() 会开启一个模块的 入口 管道布局。

    ((
    config = JSON.decode(pipy.load('config.json')),
    router = new algo.URLRouter(config.routes),
    services = Object.fromEntries(
    Object.entries(config.services).map(
    ([k, v]) => [
    k, new algo.RoundRobinLoadBalancer(v)
    ]
    )
    ),
    ) => pipy({
    _target: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$
    .pipeline()
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    )
    )
    )
    )()

    我们已经调用了 pipeline() 来定义这个插件的入口点,也就是前面的插件(如果有的话)连接到这个插件的地方。现在,在路由计算之后,我们也应该连接下一个插件。我们可以通过一个叫做 chain() 的接合过滤器来实现。

    .pipeline()
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .chain()

    正如你所见,找到的路由现在存放在一个未定义变量 s 里。这肯定不对,但我们过一会儿再回来看这个问题。

    负载均衡插件

    第二个插件也可以按第一个的方式来制作:

    • proxy.js 克隆出 plugins/balancer.js
    • 删除所有除负载均衡以外的代码
    • 在剩下的过滤器之前放一个 .pipeline() 作为模块入口
    ((
    config = JSON.decode(pipy.load('config.json')),
    router = new algo.URLRouter(config.routes),
    services = Object.fromEntries(
    Object.entries(config.services).map(
    ([k, v]) => [
    k, new algo.RoundRobinLoadBalancer(v)
    ]
    )
    ),
    ) => pipy({
    _target: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .pipeline()
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    )
    )
    )
    )()

    之前讨论的 一样,当找不到目标时,这个插件应该把请求传递给下一个插件(在我们的例子里,就是默认插件)来继续处理,而不是直接回应 404 消息,所以我们把它改成调用 chain()

    .pipeline()
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    $=>$.chain()
    )
    )

    默认插件

    我们例子里最后一个插件相当简单:仅仅回应 404 消息。我们把它放在名为 plugins/default.js 的文件里。

    pipy()
    .pipeline()
    .replaceMessage(new Message({ status: 404 }, 'No handler'))

    插件链

    现在,我们把这些插件放到一条链上。首先,准备数据。在我们的设计里,插件通过配置文件里的一个数组激活,数组中每个元素是一个插件的文件名。

    {
    "listen": 8000,
    "plugin": [
    "plugins/router.js",
    "plugins/balancer.js",
    "plugins/default.js"
    ],
    "routes": {
    "/hi/*": "service-hi",
    "/echo": "service-echo",
    "/ip/*": "service-tell-ip"
    },
    "services": {
    "service-hi" : ["127.0.0.1:8080", "127.0.0.1:8082"],
    "service-echo" : ["127.0.0.1:8081"],
    "service-tell-ip" : ["127.0.0.1:8082"]
    }
    }

    注意,顺序很关键。位置靠前的插件比后面有更高的优先级,不仅仅先被访问到,而且能决定是否继续访问后面的插件。

    现在,我们回到入口文件 proxy.js,并且删除全部功能性代码,只保留插件所需的 “脚手架”

    ((
    config = JSON.decode(pipy.load('config.json')),
    router = new algo.URLRouter(config.routes),
    services = Object.fromEntries(
    Object.entries(config.services).map(
    ([k, v]) => [
    k, new algo.RoundRobinLoadBalancer(v)
    ]
    )
    ),
    ) => pipy({
    _target: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    )
    )
    )
    .chain(config.plugins)
    )()

    正如你所见,我们用了同一个过滤器 chain(),就像在插件里"链" 到后继者时那样。但这里的用法有点不一样。通过给它插件列表作为参数,我们从这点开始启动了整条 插件链

    导出和导入

    最后要做的是修复 router.js 里的未定义变量。这个变量应该能从 balancer.js 访问到,以根据路由找到目标。既然它的值依赖于不同的请求,那它必须是个 上下文变量。但我们不能像过去那样只把它放在 pipy() 调用的参数里,因为这样它仅在 router.js 里可见。我们应该在主框架 proxy.js 里用 export() 来定义它,让 router.jsbalancer.js 能够 "导入" 它。

    ((
    config = pipy.solve('config.js'),
    ) => pipy()
    .export('main', {
    __route: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$.chain(config.plugins)
    )
    )()

    export() 的第一个参数是 命名空间,其他文件导入上下文变量时需要引用它。它可以是任意的名字。这里我们使用 "main",因为它跟插件所运行于的主框架有关。

    导出的上下文变量可以用任何有效的 JavaScript 标识符。但是作为一项最佳实践,我们推荐所有 导出 的上下文变量名用双下划线做前缀,以使他们有别于仅仅本模块可访问的上下文变量。

    接下来,我们在 router.jsbalancer.js 里导入 __route。记住命名空间应该和 proxy.js 里导出 __route 时相匹配,叫做 "main"。

    对于 router.js,我们导入 __route 之后,把路由计算结果保存在里面。

    .import({
    __route: 'main',
    })
    .pipeline()
    .handleMessageStart(
    msg => (
    s = router.find(
    __route = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    )
    )
    .chain()

    对于 balancer.js,我们把 router.js 找到的路由读取回来,并且为它分配一个目标。

    .import({
    __route: 'main',
    })
    .pipeline()
    .branch(
    () => Boolean(_target), (
    () => Boolean(_target = services[__route]?.next?.()), (
    $=>$
    .muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ),
    $=>$.chain()
    )

    这就是把代码库重构成一个简单的基于插件的程序所需做的全部。现在,你可以用与之前同样的方式来做些测试了。

    总结

    在本教程的这一部分,你学会了如何把不同的功能分离到不同的插件里。

    要点

    1. 用带文件名列表的 chain() 来开启一个模块链。用不带参数的 chain() 来链接到当前模块的下一个模块。

    2. 通过 pipy() 定义的上下文变量仅在当前模块里可见。要跨文件共享上下文变量,请使用 export()import()


    © 2024, Flomesh Team.       ICP备案/许可证号:辽ICP备2023014827号