Configuration.replay()
Description
Appends a replay filter to the current pipeline layout.
A replay filter repeats its input event sequence to a new sub-pipeline when the previous sub-pipeline outputs a StreamEnd event with error code "Replay".
- INPUT - Any types of Events to stream into the sub-pipelines.
- OUTPUT - Events streaming out from the sub-pipelines.
- SUB-INPUT - Events streaming into the replay filter.
- SUB-OUTPUT - Any types of Events.
The replay filter works sort of like link(), except that it also "records" everything from its input so that the exactly same event sequence can be "replayed" afterwards when needed. Replay only happens when the sub-pipeline outputs a StreamEnd event with an error code of "Replay". When it replays, a new sub-pipeline will be created to receive and process the event sequence that has been recorded up to that point.
Syntax
pipy().pipeline().replay().to(subPipelineLayout)
Parameters
replay(options?)
options?
Options including:
- delay - Time interval to wait before each replay. Can be a number in seconds, or a string with a time unit suffix like 's', 'm' or 'h', or a function that returns that.
Return Value
The same Configuration object.
Example
((lb = new algo.RoundRobinLoadBalancer(['localhost:8081','localhost:8082','localhost:8083','localhost:8084','localhost:8085',]),) => pipy({_unhealthy: null,_target: null,}).listen(8080).handleStreamStart(() => _unhealthy = new algo.Cache).demuxHTTP().to($=>$.replay().to($=>$.muxHTTP(() => (_target = lb.next(undefined, undefined, _unhealthy))).to($=>$.connect(() => _target.id)).replaceMessage(msg => ((status = msg.head.status,) => (200 <= status && status < 300 ? (msg // successful response) : (_unhealthy.set(_target.id, true),console.log(`Target ${_target.id} failed`),new StreamEnd('Replay') // trigger replay)))())))// Only listen on 8083, leaving other ports as unhealthy.listen(8083).serveHTTP(new Message('Hi')))()