Configuration.fork()
Description
Appends a fork filter to the current pipeline layout.
A fork filter clones Events to one or more sub-pipelines.
- INPUT - Any types of Events to clone.
- OUTPUT - Same Events as input.
- SUB-INPUT - Cloned Events from the fork filter's input.
- SUB-OUTPUT - Discarded.
Create a single sub-pipeline
When fork() has no parameters, only one sub-pipeline will be created for each input stream. The sub-pipeline shares the same context from the fork filter's pipeline, aka. the main pipeline. That is, all states in the main pipeline is visible to the sub-pipeline, and all changes to the states in the sub-pipeline will be reflected in the main pipeline as well.
Create multiple sub-pipelines
When fork() is given an argument, it is supposed to be an array, or a function that returns an array. The length of the array is the number of sub-pipelines to create for every input stream. Each sub-pipeline is given one of the array elements as the startup parameter. These sub-pipelines can obtain their startup parameters by the parameter to a callback function in onStart(). Sub-pipelines created this way also get their own new contexts cloned from the main pipeline (the fork filter's pipeline).
pipy({_index: 0,}).task().onStart(() => new Message)// Create 10 sub-pipelines, each having// numbers 1 to 10 for their _index values.fork(new Array(10).fill().map((_, i) => i)).to($=>$.onStart(i => _index = i).dump(() => _index) // Dump all events with tag _index)
Syntax
pipy().pipeline().fork().to(subPipelineLayout)pipy().pipeline().fork([ ...startupValues ]).to(subPipelineLayout)pipy().pipeline().fork(() => getStartupValues()).to(subPipelineLayout)
Parameters
fork(startupValues?)
An array of startup values, or a function that returns that. Each startup value will be given to a newly created sub-pipeline via the parameter to its onStart() callback.
The same Configuration object.
Example
pipy().listen(8000)// Clone all data to a sub-pipeline.fork().to(// Send the cloned data to localhost:8008$=>$.connect('localhost:8008'))// Respond with 'Hello!' to all requests.serveHTTP(new Message('Hello!'))