Fast, asynchronous, functional reactive stream programming framework with multi‑threading capability based on ES6+.

Master Build StatusMaster Build StatusDependenciesDev DependenciesKnown VulnerabilitiesDeepScan grade
Classes
  • ~MultiStream

    An object consisting of multiple streams than can be refined or muxed.

    The idea behind a MultiStream is being able to mux and demux streams when needed.

    Usage:

    new MultiStream([...streams])
     .mux();
    
    new MultiStream(function*(){ yield* streams; })
     .map(stream => stream.filter(myFilter))
     .mux();

    Kind: inner class
    Test: test/methods/multi-stream-constructor.js

    new MultiStream(streams, options)

    Crates an instance of MultiStream with the specified stream list

    Param Type Description
    streams Array.<stream.Readable> | AsyncGenerator.<Readable> | Generator.<Readable> the list of readable streams (other objects will be filtered out!)
    options Object Optional options for the super object. ;)

    multiStream.streams : Array

    Array of all streams

    Kind: instance property of MultiStream

    multiStream.source : DataStream

    Source of the MultiStream.

    This is nulled when the stream ends and is used to control the

    Kind: instance property of MultiStream

    multiStream.length : number

    Returns the current stream length

    Kind: instance property of MultiStream

    multiStream.map(aFunc, rFunc) : MultiStream ↺

    Returns new MultiStream with the streams returned by the transform.

    Runs a callback for every stream, returns a new MultiStream of mapped streams and creates a new MultiStream consisting of streams returned by the Function.

    Kind: instance method of MultiStream
    Chainable
    Returns: MultiStream - the mapped instance
    Test: test/methods/multi-stream-map.js

    Param Type Description
    aFunc MultiMapCallback Add callback (normally you need only this)
    rFunc MultiMapCallback Remove callback, called when the stream is removed

    multiStream.find(...args) : DataStream

    Calls Array.prototype.find on the streams

    Kind: instance method of MultiStream
    Returns: DataStream - found DataStream

    Param Type Description
    ...args Arguments arguments for

    multiStream.filter(func) : MultiStream ↺

    Filters the stream list and returns a new MultiStream with only the streams for which the Function returned true

    Kind: instance method of MultiStream
    Chainable
    Returns: MultiStream - the filtered instance
    Test: test/methods/multi-stream-filter.js

    Param Type Description
    func TransformFunction Filter ran in Promise::then (so you can return a promise or a boolean)

    multiStream.mux([comparator], [ClassType]) : DataStream

    Muxes the streams into a single one

    Kind: instance method of MultiStream
    Returns: DataStream - The resulting DataStream
    Test: test/methods/multi-stream-mux.js
    Todo

    • For now using comparator will not affect the mergesort.
    • Sorting requires all the streams to be constantly flowing, any single one drain results in draining the muxed too even if there were possible data on other streams.
    Param Type Default Description
    [comparator] ComparatorFunction Should return -1 0 or 1 depending on the desired order. If passed the chunks will be added in a sorted order.
    [ClassType] function DataStream the class to be outputted

    multiStream.add(stream)

    Adds a stream to the MultiStream

    If the stream was muxed, filtered or mapped, this stream will undergo the same transforms and conditions as if it was added in constructor.

    Kind: instance method of MultiStream
    Meta.noreadme:
    Test: test/methods/multi-stream-add.js

    Param Type Description
    stream stream.Readable [description]

    multiStream.remove(stream)

    Removes a stream from the MultiStream

    If the stream was muxed, filtered or mapped, it will be removed from same streams.

    Kind: instance method of MultiStream
    Meta.noreadme:
    Test: test/methods/multi-stream-remove.js

    Param Type Description
    stream stream.Readable [description]

    multiStream.route([policy], [count]) : MultiStream

    Re-routes streams to a new MultiStream of specified size

    Kind: instance method of MultiStream
    Returns: MultiStream - [description]
    Meta.noreadme:
    Todo

    • NYT: not yet tested
    • NYD: not yet documented
    Param Type Default Description
    [policy] function Affinity.RoundRobin [description]
    [count] number os.cpus().length [description]

    multiStream.smap(transform) ↺

    Map stream synchronously

    Kind: instance method of MultiStream
    Chainable

    Param Type Description
    transform function mapping function ran on every stream (SYNCHRONOUS!)

    multiStream.cluster(clusterFunc, [options]) ↺

    Distributes processing to multiple forked subprocesses.

    Kind: instance method of MultiStream
    Chainable

    Param Type Default Description
    clusterFunc function | String a cluster callback with all operations working similarly to DataStream::use
    [options] DistributeOptions {}

    MultiStream:from(streams, [StreamClass]) : MultiStream

    Constructs MultiStream from any number of streams-likes

    Kind: static method of MultiStream

    Param Type Default Description
    streams Array.<(Array|Iterable|AsyncGeneratorFunction|GeneratorFunction|AsyncFunction|function()|String|Readable)> the array of input streamlike elements
    [StreamClass] function DataStream