Products

English

English

  • Pipy Documentation
  • Introduction
    • Overview
    • Concepts
  • Getting Started
    • Build and Install
    • Quick Start
    • Getting help
  • Release
    • Release History
      • 0.10.0-1
      • 0.22.0-31
      • 0.30.0
      • 0.50.0
  • Tutorial
    • Part 1: Hello
    • Part 2: Echo
    • Part 3: Proxy
    • Part 4: Routing
    • Part 5: Loading Balancing
    • Part 6: Configuration
    • Part 7: Plugins
  • Operating
    • Admin GUI
    • CLI
    • Pipy Repo
      • Introduction
      • Quick Start
      • Demo
      • REST API
    • Statistics
  • Reference
    • API
      • Configuration
        • acceptHTTPTunnel()
        • acceptProxyProtocol()
        • acceptSOCKS()
        • acceptTLS()
        • admin()
        • branch()
        • branchMessage()
        • branchMessageStart()
        • chain()
        • compress()
        • compressHTTP()
        • connect()
        • connectHTTPTunnel()
        • connectProxyProtocol()
        • connectSOCKS()
        • connectTLS()
        • decodeBGP()
        • decodeDubbo()
        • decodeHTTPRequest()
        • decodeHTTPResponse()
        • decodeMQTT()
        • decodeMultipart()
        • decodeRESP()
        • decodeThrift()
        • decodeWebSocket()
        • decompress()
        • decompressHTTP()
        • deframe()
        • demux()
        • demuxHTTP()
        • detectProtocol()
        • dummy()
        • dump()
        • encodeBGP()
        • encodeDubbo()
        • encodeHTTPRequest()
        • encodeHTTPResponse()
        • encodeMQTT()
        • encodeRESP()
        • encodeThrift()
        • encodeWebSocket()
        • exec()
        • exit()
        • export()
        • fork()
        • handleData()
        • handleMessage()
        • handleMessageBody()
        • handleMessageEnd()
        • handleMessageStart()
        • handleStreamEnd()
        • handleStreamStart()
        • handleTLSClientHello()
        • import()
        • insert()
        • link()
        • listen()
        • loop()
        • mux()
        • muxHTTP()
        • onEnd()
        • onStart()
        • pack()
        • pipeline()
        • print()
        • read()
        • repeat()
        • replaceData()
        • replaceMessage()
        • replaceMessageBody()
        • replaceMessageEnd()
        • replaceMessageStart()
        • replaceStreamEnd()
        • replaceStreamStart()
        • replay()
        • serveHTTP()
        • split()
        • task()
        • tee()
        • throttleConcurrency()
        • throttleDataRate()
        • throttleMessageRate()
        • to()
        • use()
        • wait()
        • watch()
      • Data
        • from()
        • Data()
        • push()
        • shift()
        • shiftTo()
        • shiftWhile()
        • size
        • toArray()
      • Hessian
        • decode()
        • encode()
      • JSON
        • decode()
        • encode()
        • parse()
        • stringify()
      • Message
        • body
        • head
        • Message()
        • tail
      • MessageEnd
        • MessageEnd()
        • payload
        • tail
      • MessageStart
        • head
        • MessageStart()
      • Netmask
        • base
        • bitmask
        • broadcast
        • contains()
        • decompose()
        • first
        • hostmask
        • ip
        • last
        • mask
        • Netmask()
        • next()
        • size
        • version
      • StreamEnd
        • error
        • StreamEnd()
      • URL
        • auth
        • hash
        • host
        • hostname
        • href
        • URL()
        • origin
        • password
        • path
        • pathname
        • port
        • protocol
        • query
        • search
        • searchParams
        • username
      • URLSearchParams
        • get()
        • getAll()
        • URLSearchParams()
        • set()
        • toObject()
      • XML
        • Node
          • attributes
          • children
          • name
          • Node()
        • decode()
        • encode()
        • parse()
        • stringify()
      • algo
        • Cache
          • clear()
          • get()
          • Cache()
          • remove()
          • set()
        • HashingLoadBalancer
          • add()
          • HashingLoadBalancer()
          • next()
        • LeastWorkLoadBalancer
          • LeastWorkLoadBalancer()
          • next()
          • set()
        • LoadBalancer
          • allocate()
          • LoadBalancer()
          • provision()
          • schedule()
        • LoadBalancerResource
          • free()
          • target
        • Quota
          • consume()
          • current
          • initial
          • Quota()
          • produce()
          • reset()
        • RoundRobinLoadBalancer
          • RoundRobinLoadBalancer()
          • next()
          • set()
        • URLRouter
          • add()
          • find()
          • URLRouter()
        • hash()
        • uuid()
      • console
        • debug()
        • error()
        • info()
        • log()
        • warn()
      • crypto
        • Certificate
          • issuer
          • Certificate()
          • subject
          • subjectAltNames
        • CertificateChain
          • CertificateChain()
        • Cipher
          • final()
          • Cipher()
          • update()
        • Decipher
          • final()
          • Decipher()
          • update()
        • Hash
          • digest()
          • Hash()
          • update()
        • Hmac
          • digest()
          • Hmac()
          • update()
        • JWK
          • isValid
          • JWK()
        • JWT
          • header
          • isValid
          • JWT()
          • payload
          • verify()
        • PrivateKey
          • PrivateKey()
        • PublicKey
          • PublicKey()
        • Sign
          • Sign()
          • sign()
          • update()
        • Verify
          • Verify()
          • update()
          • verify()
      • http
        • Agent
          • new()
          • request()
        • Directory
          • new()
          • serve()
      • logging
        • BinaryLogger
          • log()
          • BinaryLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • JSONLogger
          • log()
          • JSONLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
        • TextLogger
          • log()
          • TextLogger()
          • toFile()
          • toHTTP()
          • toStderr()
          • toStdout()
          • toSyslog()
      • os
        • env
        • readDir()
        • readFile()
        • stat()
        • unlink()
        • writeFile()
      • pipy()
        • exit()
        • load()
        • restart()
        • solve()
      • stats
        • Counter
          • decrease()
          • increase()
          • Counter()
          • withLabels()
          • zero()
        • Gauge
          • decrease()
          • increase()
          • Gauge()
          • set()
          • withLabels()
          • zero()
        • Histogram
          • Histogram()
          • observe()
          • withLabels()
          • zero()
    • PipyJS
      • Language
      • Builtin Objects
      • NMI

    Part 7: Plugins

    Now our little proxy program has routing and load balancing abilities. However, as we were implementing it, we found ourself throwing more and more pipeline layouts into a single source file. The file has been growing rapidly in length. We'd better do some refactoring to the code structure so that every new function coming in the future can sit in its own file, only optionally hooked up into the system, aka, a "plugin system".

    Design

    We have implemented 2 functions in our proxy so far: routing and load balancing. Each of them can be refactored out into a separate plugin. The way we combine these plugins is arrange them in a chain. For each request we receive, visit every plugin on the chain and let them decide if they'll do something in response or just pass. Since the plugins are arranged in a determined order, each plugin also gets to decide whether plugins after it should continue to handle a request.

    image/svg+xmlPlugincontinue?Plugincontinue?Pluginyesyesnono

    With That, let's take a look at the plugins we'll create.

    Routing plugin

    The routing plugin's job is finding a route for a request. The found route should be saved in a context variable for accessing from other plugins. The plugin will always pass on a request to the next plugin, no matter if a route can be found or not. It only "marks" a request with its route if one can be found.

    Load balancing plugin

    The load balancing plugin is responsible for forwarding a request to a server that can handle it. It maps a route found by the routing plugin to a server list and then distribute traffic across those servers. When a route can't be found or a server list isn't configured for a route, it should ignore it and just pass on the request to the next plugin. Otherwise, it should abort the plugin chain with the response returned from the server, since the request is already "handled" in this plugin.

    The default plugin

    When a request is passed on by both of the above plugins, which means, either there is no route for it, or no servers can handle the route, a last fallback plugin will take it on. This is the default plugin. It should always respond with a 404 message indicating that the request is not handled by the proxy.

    Routing plugin

    The first plugin we do is the routing plugin. Let's create a standalone file for it.

    1. Click and input /plugins/router.js for the new file's name. Click Create.

    No need to create a folder before adding a new file in it. Just put in the full pathname including folder names and the folders will be created automatically.

    1. Copy the entire content from /proxy.js to /plugins/router.js.

    We delete nearly all code in /plugins/router.js, keeping only the call to URLRouter.find() since that is the meat of our routing plugin. Then we put handleMessageStart(), the only filter that's left, under a .pipeline() call. Without a parameter to it, pipeline() starts the entrance pipeline layout for a module.

    ((
    config = JSON.decode(pipy.load('config.json')),
    router = new algo.URLRouter(config.routes),
    services = Object.fromEntries(
    Object.entries(config.services).map(
    ([k, v]) => [
    k, new algo.RoundRobinLoadBalancer(v)
    ]
    )
    ),
    ) => pipy({
    _target: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$
    .pipeline()
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    )
    )
    )
    )()

    We have called pipeline() to define the entrance point to this plugin. That is where the previous plugin connects to this one, if any. Now we should also connect to our next plugin after the routing calculation. We can do so by a joint filter called chain().

    .pipeline()
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .chain()

    As you can see, the found route is now saved in an undefined variable s. This is certainly not right, but we'll get back to that in a moment.

    Load balancing plugin

    The second plugin can be made in the same way as the first one:

    • Clone plugins/balancer.js from proxy.js
    • Delete all code apart from the stuff for load balancing
    • Put a .pipeline() before what's left of the filters as the entrance point to this module.
    ((
    config = JSON.decode(pipy.load('config.json')),
    router = new algo.URLRouter(config.routes),
    services = Object.fromEntries(
    Object.entries(config.services).map(
    ([k, v]) => [
    k, new algo.RoundRobinLoadBalancer(v)
    ]
    )
    ),
    ) => pipy({
    _target: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .pipeline()
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    )
    )
    )
    )()

    As discussed earlier, when a target is not found, this plugin should just pass on the request to the next plugin (the default plugin in our case) to continue, instead of responding with a 404 message directly. So we change that to a call to chain().

    .pipeline()
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    $=>$.chain()
    )
    )

    The default plugin

    The last plugin in our example is pretty simple: just respond with a 404 message. We put it in a new file called plugins/default.js.

    pipy()
    .pipeline()
    .replaceMessage(new Message({ status: 404 }, 'No handler'))

    Chain of plugins

    Now let's put those plugins together into a chain. First, let's prepare the data. In our design, plugins will be activated by using an array in the configuration file. Each element in the array is a plugin's filename.

    {
    "listen": 8000,
    "plugin": [
    "plugins/router.js",
    "plugins/balancer.js",
    "plugins/default.js"
    ],
    "routes": {
    "/hi/*": "service-hi",
    "/echo": "service-echo",
    "/ip/*": "service-tell-ip"
    },
    "services": {
    "service-hi" : ["127.0.0.1:8080", "127.0.0.1:8082"],
    "service-echo" : ["127.0.0.1:8081"],
    "service-tell-ip" : ["127.0.0.1:8082"]
    }
    }

    Note that the order matters. Plugins coming first have a higher priority over those after, not only they are visited earlier, but also they can decide if plugins after them should continue being visited.

    Now let's go back to the entrance file proxy.js and delete all code for its functionality, leaving only the "scaffolding" for the plugins.

    ((
    config = JSON.decode(pipy.load('config.json')),
    router = new algo.URLRouter(config.routes),
    services = Object.fromEntries(
    Object.entries(config.services).map(
    ([k, v]) => [
    k, new algo.RoundRobinLoadBalancer(v)
    ]
    )
    ),
    ) => pipy({
    _target: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$
    .handleMessageStart(
    msg => (
    ((
    s = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    ) => (
    _target = services[s]?.next?.()
    ))()
    )
    )
    .branch(
    () => Boolean(_target), (
    $=>$.muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ), (
    $=>$.replaceMessage(
    new Message({ status: 404 }, 'No route')
    )
    )
    )
    )
    .chain(config.plugins)
    )()

    As you can see, we use the same filter chain() as we do in the plugins when we "chain" into their successors. But here we use it a bit differently. By giving it the plugin list as its parameter, we started the entire plugin chain from this point.

    Export and import

    The last thing is fix the undefined variable we have in router.js. This variable is supposed to be accessible from balancer.js to find a target for the route. Since its value depends on each different request, it should certainly be context variable. However, we can't just put it inside the parameter to pipy() as we did before, because that way it'll be only visible to router.js itself. We should define it with export() in the main framework proxy.js so that it can be "imported" from both router.js and balancer.js.

    ((
    config = pipy.solve('config.js'),
    ) => pipy()
    .export('main', {
    __route: undefined,
    })
    .listen(config.listen)
    .demuxHTTP().to(
    $=>$.chain(config.plugins)
    )
    )()

    The first parameter to export() is the namespace that other files need to refer to when they import a context variable. It can be any arbitrary name. Here we use "main" because it's related to the main framework our plugins are running under.

    Exported context variables can have any names that are valid JavaScript identifiers. But as a best practice, we recommend that all exported context variable names should be prefixed with 2 underscores, just to make them distinct from context variables that are only locally accessible to a module itself.

    Next, we import __route into router.js and balancer.js. Mind that the namespace should match what's in proxy.js where __route is exported, which should be "main".

    For router.js, we save the routing calculation result in __route after importing it.

    .import({
    __route: 'main',
    })
    .pipeline()
    .handleMessageStart(
    msg => (
    s = router.find(
    __route = router.find(
    msg.head.headers.host,
    msg.head.path,
    )
    )
    )
    .chain()

    For balancer.js, we read back the route that's found in router.js and allocate a target for it.

    .import({
    __route: 'main',
    })
    .pipeline()
    .branch(
    () => Boolean(_target), (
    () => Boolean(_target = services[__route]?.next?.()), (
    $=>$
    .muxHTTP(() => _target).to(
    $=>$.connect(() => _target.id)
    )
    ),
    $=>$.chain()
    )

    That's all we have to do for refactoring our codebase into a simple plugin-based program. You can now do some tests the same way as before.

    Summary

    In this part of the tutorial, you've learned how different functions can be isolated in different plugins.

    Takeaways

    1. Use chain() with a list of filenames to start a module chain. Use chain() with no parameters to chain into the next module after the current one.

    2. Context variables defined with pipy() are visible only within the current module. To share context variables across different files, use export() and import().


    © 2024, Flomesh Team.       ICP备案/许可证号:辽ICP备2023014827号