Fast, asynchronous, functional reactive stream programming framework with multi‑threading capability based on ES6+.

Master Build StatusMaster Build StatusDependenciesDev DependenciesKnown VulnerabilitiesDeepScan grade
Module exports
Classes
  • MultiStream

    An object consisting of multiple streams than can be refined or muxed.

    Kind: global class

    new MultiStream(streams, options)

    Crates an instance of MultiStream with the specified stream list

    Param Type Description
    streams Array.<stream.Readable> the list of readable streams (other objects will be filtered out!)
    options Object Optional options for the super object. ;)

    multiStream.streams : Array

    Array of all streams

    Kind: instance property of MultiStream

    multiStream.length : number

    Returns the current stream length

    Kind: instance property of MultiStream

    multiStream.map(aFunc) : MultiStream ↺

    Returns new MultiStream with the streams returned by the transform.

    Runs Function for every stream, returns a new MultiStream of mapped streams and creates a new MultiStream consisting of streams returned by the Function.

    Kind: instance method of MultiStream
    Chainable
    Returns: MultiStream - the mapped instance
    Test: test/methods/multi-stream-map.js

    Param Type Description
    aFunc MapCallback Mapper ran in Promise::then (so you can return a promise or an object)

    multiStream.find(...args) : DataStream

    Calls Array.prototype.find on the streams

    Kind: instance method of MultiStream
    Returns: DataStream - found DataStream

    Param Type Description
    ...args Arguments arguments for

    multiStream.filter(func) : MultiStream ↺

    Filters the stream list and returns a new MultiStream with only the streams for which the Function returned true

    Kind: instance method of MultiStream
    Chainable
    Returns: MultiStream - the filtered instance
    Test: test/methods/multi-stream-filter.js

    Param Type Description
    func TransformFunction Filter ran in Promise::then (so you can return a promise or a boolean)

    multiStream.mux(comparator) : DataStream

    Muxes the streams into a single one

    Kind: instance method of MultiStream
    Returns: DataStream - The resulting DataStream
    Test: test/methods/multi-stream-mux.js
    Todo

    • For now using comparator will not affect the mergesort.
    • Sorting requires all the streams to be constantly flowing, any single one drain results in draining the muxed too even if there were possible data on other streams.
    Param Type Description
    comparator ComparatorFunction Should return -1 0 or 1 depending on the desired order. If passed the chunks will be added in a sorted order.

    multiStream.add(stream)

    Adds a stream to the MultiStream

    If the stream was muxed, filtered or mapped, this stream will undergo the same transforms and conditions as if it was added in constructor.

    Kind: instance method of MultiStream
    Meta.noreadme:
    Test: test/methods/multi-stream-add.js

    Param Type Description
    stream stream.Readable [description]

    multiStream.remove(stream)

    Removes a stream from the MultiStream

    If the stream was muxed, filtered or mapped, it will be removed from same streams.

    Kind: instance method of MultiStream
    Meta.noreadme:
    Test: test/methods/multi-stream-remove.js

    Param Type Description
    stream stream.Readable [description]

    multiStream.route([policy], [count]) : MultiStream

    Re-routes streams to a new MultiStream of specified size

    Kind: instance method of MultiStream
    Returns: MultiStream - [description]
    Meta.noreadme:
    Todo

    • NYT: not yet tested
    • NYD: not yet documented
    Param Type Default Description
    [policy] function Affinity.RoundRobin [description]
    [count] number os.cpus().length [description]

    multiStream.smap(transform) ↺

    Map stream synchronously

    Kind: instance method of MultiStream
    Chainable

    Param Type Description
    transform function mapping function ran on every stream (SYNCHRONOUS!)

    multiStream.cluster(clusterFunc, options) ↺

    Distributes processing to multiple forked subprocesses.

    Kind: instance method of MultiStream
    Chainable

    Param Type Description
    clusterFunc function | String a cluster callback with all operations working similarly to DataStream::use
    options DistributeOptions

    MultiStream:DistributeOptions

    Distribute options

    Kind: static typedef of MultiStream
    Properties

    Name Type Description
    plugins Array a list of scramjet plugins to load (if omitted, will use just the ones in scramjet itself)
    StreamClass String the class to deserialize the stream to.
    threads Number maximum threads to use - defaults to number of processor threads in os, but it may be sensible to go over this value if you'd intend to run synchronous code.