Fast, asynchronous, functional reactive stream programming framework with multi‑threading capability based on ES6+.

Master Build StatusMaster Build StatusDependenciesDev DependenciesKnown VulnerabilitiesDeepScan grade
Module exports
Classes
  • DataStream : stream.PassThrough

    DataStream is the primary stream type for Scramjet. When you parse your stream, just pipe it you can then perform calculations on the data objects streamed through your flow.

    Use as:

    const { DataStream } = require('scramjet');
    
    await (DataStream.from(aStream) // create a DataStream
        .map(findInFiles)           // read some data asynchronously
        .map(sendToAPI)             // send the data somewhere
        .run());                    // wait until end

    Kind: global class
    Extends: stream.PassThrough

    new DataStream(opts)

    Create the DataStream.

    Param Type Description
    opts StreamOptions Stream options passed to superclass

    dataStream.map(func, ClassType) ↺

    Transforms stream objects into new ones, just like Array.prototype.map does.

    Map takes an argument which is the Function function operating on every element of the stream. If the function returns a Promise or is an AsyncFunction then the stream will await for the outcome of the operation before pushing the data forwards.

    A simple example that turns stream of urls into stream of responses

    stream.map(async url => fetch(url));

    Multiple subsequent map operations (as well as filter, do, each and other simple ops) will be merged together into a single operation to improve performance. Such behaviour can be suppressed by chaining .tap() after .map().

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-map.js

    Param Type Description
    func MapCallback The function that creates the new object
    ClassType Class (optional) The class to be mapped to.

    dataStream.filter(func) ↺

    Filters object based on the function outcome, just like Array.prototype.filter.

    Filter takes a Function argument which should be a Function or an AsyncFunction that will be called on each stream item. If the outcome of the operation is falsy (0, '', false, null or undefined) the item will be filtered from subsequent operations and will not be pushed to the output of the stream. Otherwise the item will not be affected.

    A simple example that filters out non-2xx responses from a stream

    stream.filter(({statusCode}) => !(statusCode >= 200 && statusCode < 300));

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-filter.js

    Param Type Description
    func FilterCallback The function that filters the object

    dataStream.reduce(func, into) ⇄

    Reduces the stream into a given accumulator

    Works similarly to Array.prototype.reduce, so whatever you return in the former operation will be the first operand to the latter. The result is a promise that's resolved with the return value of the last transform executed.

    A simple example that sums values from a stream

    stream.reduce((accumulator, {value}) => accumulator + value);

    This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.

    Kind: instance method of DataStream
    Test: test/methods/data-stream-reduce.js

    Param Type Description
    func ReduceCallback The into object will be passed as the first argument, the data object from the stream as the second.
    into Object Any object passed initially to the transform function

    dataStream.do(func) ↺

    Perform an asynchronous operation without changing or resuming the stream.

    In essence the stream will use the call to keep the backpressure, but the resolving value has no impact on the streamed data (except for possible mutation of the chunk itself)

    Kind: instance method of DataStream
    Chainable

    Param Type Description
    func DoCallback the async function

    dataStream.all(functions) ↺

    Processes a number of functions in parallel, returns a stream of arrays of results.

    This method is to allow running multiple asynchronous operations and receive all the results at one, just like Promise.all behaves.

    Keep in mind that if one of your methods rejects, this behaves just like Promise.all you won't be able to receive partial results.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-all.js

    Param Type Description
    functions Array.<function()> list of async functions to run

    dataStream.race(functions) ↺

    Processes a number of functions in parallel, returns the first resolved.

    This method is to allow running multiple asynchronous operations awaiting just the result of the quickest to execute, just like Promise.race behaves.

    Keep in mind that if one of your methods it will only raise an error if that was the first method to reject.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-race.js

    Param Type Description
    functions Array.<function()> list of async functions to run

    dataStream.unorder(func)

    Allows processing items without keeping order

    This method useful if you are not concerned about the order in which the chunks are being pushed out of the operation. The maxParallel option is still used for keeping a number of simultaneous number of parallel operations that are currently happening.

    Kind: instance method of DataStream

    Param Type Description
    func MapCallback the async function that will be unordered

    dataStream.into(func, into) ↺

    Allows own implementation of stream chaining.

    The async Function is called on every chunk and should implement writes in it's own way. The resolution will be awaited for flow control. The passed into argument is passed as the first argument to every call.

    It returns the DataStream passed as the second argument.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-into.js

    Param Type Description
    func IntoCallback the method that processes incoming chunks
    into DataStream the DataStream derived class

    dataStream.use(func) ↺

    Calls the passed method in place with the stream as first argument, returns result.

    The main intention of this method is to run scramjet modules - transforms that allow complex transforms of streams. These modules can also be run with scramjet-cli directly from the command line.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-use.js

    Param Type Description
    func AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | String | Readable if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain. Alternatively this can be a relative path to a scramjet-module. Lastly it can be a Transform stream.
    [...parameters] * any additional parameters top be passed to the module

    dataStream.run() ⇄

    Consumes all stream items doing nothing. Resolves when the stream is ended.

    Kind: instance method of DataStream

    dataStream.tap()

    Stops merging transform Functions at the current place in the command chain.

    Kind: instance method of DataStream
    Test: test/methods/data-stream-tap.js

    dataStream.whenRead() ⇄

    Reads a chunk from the stream and resolves the promise when read.

    Kind: instance method of DataStream

    dataStream.whenWrote(chunk, [...more]) ⇄

    Writes a chunk to the stream and returns a Promise resolved when more chunks can be written.

    Kind: instance method of DataStream

    Param Type Description
    chunk * a chunk to write
    [...more] * more chunks to write

    dataStream.whenEnd() ⇄

    Resolves when stream ends - rejects on uncaught error

    Kind: instance method of DataStream

    dataStream.whenDrained() ⇄

    Returns a promise that resolves when the stream is drained

    Kind: instance method of DataStream

    dataStream.whenError() ⇄

    Returns a promise that resolves (!) when the stream is errors

    Kind: instance method of DataStream

    dataStream.setOptions(options) ↺

    Allows resetting stream options.

    It's much easier to use this in chain than constructing new stream:

        stream.map(myMapper).filter(myFilter).setOptions({maxParallel: 2})

    Kind: instance method of DataStream
    Chainable
    Meta.conditions: keep-order,chain

    Param Type
    options StreamOptions

    dataStream.tee(func) ↺

    Duplicate the stream

    Creates a duplicate stream instance and passes it to the Function.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-tee.js

    Param Type Description
    func TeeCallback | Writable The duplicate stream will be passed as first argument.

    dataStream.each(func) ↺

    Performs an operation on every chunk, without changing the stream

    This is a shorthand for stream.on("data", func) but with flow control. Warning: this resumes the stream!

    Kind: instance method of DataStream
    Chainable

    Param Type Description
    func MapCallback a Function called for each chunk.

    dataStream.while(func) ↺

    Reads the stream while the function outcome is truthy.

    Stops reading and emits end as soon as it finds the first chunk that evaluates to false. If you're processing a file until a certain point or you just need to confirm existence of some data, you can use it to end the stream before reaching end.

    Keep in mind that whatever you piped to the stream will still need to be handled.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-while.js

    Param Type Description
    func FilterCallback The condition check

    dataStream.until(func) ↺

    Reads the stream until the function outcome is truthy.

    Works opposite of while.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-until.js

    Param Type Description
    func FilterCallback The condition check

    dataStream.catch(callback) ↺

    Provides a way to catch errors in chained streams.

    The handler will be called as asynchronous

    • if it resolves then the error will be muted.
    • if it rejects then the error will be passed to the next handler

    If no handlers will resolve the error, an error event will be emitted

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-catch.js

    Param Type Description
    callback function Error handler (async function)

    dataStream.raise(err) ⇄

    Executes all error handlers and if none resolves, then emits an error.

    The returned promise will always be resolved even if there are no successful handlers.

    Kind: instance method of DataStream
    Test: test/methods/data-stream-raise.js

    Param Type Description
    err Error The thrown error

    dataStream.pipe(to, options) : Writable ↺

    Override of node.js Readable pipe.

    Except for calling overridden method it proxies errors to piped stream.

    Kind: instance method of DataStream
    Chainable
    Returns: Writable - the to stream

    Param Type Description
    to Writable Writable stream to write to
    options WritableOptions

    dataStream.bufferify(serializer) : BufferStream ↺

    Creates a BufferStream.

    The passed serializer must return a buffer.

    Kind: instance method of DataStream
    Chainable
    Returns: BufferStream - the resulting stream
    Meta.noreadme:
    Test: test/methods/data-stream-tobufferstream.js

    Param Type Description
    serializer MapCallback A method that converts chunks to buffers

    dataStream.stringify(serializer) : StringStream ↺

    Creates a StringStream.

    The passed serializer must return a string.

    Kind: instance method of DataStream
    Chainable
    Returns: StringStream - the resulting stream
    Test: test/methods/data-stream-tostringstream.js

    Param Type Description
    serializer MapCallback A method that converts chunks to strings

    dataStream.toArray(initial) : Array ⇄

    Aggregates the stream into a single Array

    In fact it's just a shorthand for reducing the stream into an Array.

    Kind: instance method of DataStream

    Param Type Description
    initial Array Optional array to begin with.

    dataStream.toGenerator() : Iterable.<Promise.<*>>

    Returns an async generator

    Ready for https://github.com/tc39/proposal-async-iteration

    Kind: instance method of DataStream
    Returns: Iterable.<Promise.<*>> - Returns an iterator that returns a promise for each item.

    dataStream.pull(pullable) : Promise ⇄

    Pulls in any readable stream, resolves when the pulled stream ends.

    You can also pass anything that can be passed to DataStream.from.

    Does not preserve order, does not end this stream.

    Kind: instance method of DataStream
    Returns: Promise - resolved when incoming stream ends, rejects on incoming error
    Test: test/methods/data-stream-pull.js

    Param Type
    pullable Array | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | String | Readable

    dataStream.shift(count, func) ↺

    Shifts the first n items from the stream and pushes out the remaining ones.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-shift.js

    Param Type Description
    count Number The number of items to shift.
    func ShiftCallback Function that receives an array of shifted items

    dataStream.peek(count, func) ↺

    Allows previewing some of the streams data without removing them from the stream.

    Important: Peek does not resume the flow.

    Kind: instance method of DataStream
    Chainable

    Param Type Description
    count Number The number of items to view before
    func ShiftCallback Function called before other streams

    dataStream.slice([start], [length]) ↺

    Slices out a part of the stream to the passed Function.

    Returns a stream consisting of an array of items with 0 to start omitted and length items after start included. Works similarly to Array.prototype.slice.

    Takes count from the moment it's called. Any previous items will not be taken into account.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-slice.js

    Param Type Default Description
    [start] Number 0 omit this number of entries.
    [length] Number Infinity get this number of entries to the resulting stream

    dataStream.assign(func) ↺

    Transforms stream objects by assigning the properties from the returned data along with data from original ones.

    The original objects are unaltered.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-assign.js

    Param Type Description
    func MapCallback | Object The function that returns new object properties or just the new properties

    dataStream.empty(callback) ↺

    Called only before the stream ends without passing any items

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-empty.js

    Param Type Description
    callback function Function called when stream ends

    dataStream.unshift(item) ↺

    Pushes any data at call time (essentially at the beginning of the stream)

    This is a synchronous only function.

    Kind: instance method of DataStream
    Chainable

    Param Type Description
    item * list of items to unshift (you can pass more items)

    dataStream.endWith(item) ↺

    Pushes any data at end of stream

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:
    Test: test/methods/data-stream-endwith.js

    Param Type Description
    item * list of items to push at end

    dataStream.accumulate(func, into) : Promise ⇄

    Accumulates data into the object.

    Works very similarly to reduce, but result of previous operations have no influence over the accumulator in the next one.

    Method is parallel

    Kind: instance method of DataStream
    Returns: Promise - resolved with the "into" object on stream end.
    Meta.noreadme:
    Test: test/methods/data-stream-accumulate.js

    Param Type Description
    func AccumulateCallback The accumulation function
    into * Accumulator object

    dataStream.consume(func) ⇄

    Deprecated

    Consumes the stream by running each Function

    Kind: instance method of DataStream
    Meta.noreadme:

    Param Type Description
    func function the consument

    dataStream.reduceNow(func, into) : * ↺

    Reduces the stream into the given object, returning it immediately.

    The main difference to reduce is that only the first object will be returned at once (however the method will be called with the previous entry). If the object is an instance of EventEmitter then it will propagate the error from the previous stream.

    This method is serial - meaning that any processing on an entry will occur only after the previous entry is fully processed. This does mean it's much slower than parallel functions.

    Kind: instance method of DataStream
    Chainable
    Returns: * - whatever was passed as into
    Meta.noreadme:
    Test: test/methods/data-stream-reduceNow.js

    Param Type Description
    func ReduceCallback The into object will be passed as the first argument, the data object from the stream as the second.
    into * | EventEmitter Any object passed initially to the transform function

    dataStream.remap(func, TypeClass) : DataStream ↺

    Remaps the stream into a new stream.

    This means that every item may emit as many other items as we like.

    Kind: instance method of DataStream
    Chainable
    Returns: DataStream - a new DataStream of the given class with new chunks
    Meta.noreadme:
    Test: test/methods/data-stream-remap.js

    Param Type Description
    func RemapCallback A Function that is called on every chunk
    TypeClass class Optional DataStream subclass to be constructed

    dataStream.flatMap(func, TypeClass) : DataStream ↺

    Takes any method that returns any iterable and flattens the result.

    The passed Function must return an iterable (otherwise an error will be emitted). The resulting stream will consist of all the items of the returned iterables, one iterable after another.

    Kind: instance method of DataStream
    Chainable
    Returns: DataStream - a new DataStream of the given class with new chunks
    Test: test/methods/data-stream-flatmap.js

    Param Type Description
    func FlatMapCallback A Function that is called on every chunk
    TypeClass class Optional DataStream subclass to be constructed

    dataStream.flatten() : DataStream ↺

    A shorthand for streams of Arrays to flatten them.

    More efficient equivalent of: .flatmap(i => i);

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-flatten.js

    dataStream.concat(streams) ↺

    Returns a new stream that will append the passed streams to the callee

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-concat.js

    Param Type Description
    streams * Streams to be passed

    dataStream.join(item) ↺

    Method will put the passed object between items. It can also be a function call.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-join.js

    Param Type Description
    item * | JoinCallback An object that should be interweaved between stream items

    dataStream.keep(count) ↺

    Keep a buffer of n-chunks for use with {@see DataStream..rewind}

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-keep.js

    Param Type Description
    count number Number of objects or -1 for all the stream

    dataStream.rewind(count) ↺

    Rewinds the buffered chunks the specified length backwards. Requires a prior call to {@see DataStream..keep}

    Kind: instance method of DataStream
    Chainable

    Param Type Description
    count number Number of objects or -1 for all the buffer

    dataStream.distribute([affinity], clusterFunc, options) ↺

    Distributes processing into multiple sub-processes or threads if you like.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-distribute.js
    Todo

    • Currently order is not kept.
    • Example test breaks travis-ci build
    Param Type Description
    [affinity] AffinityCallback | Number A Function that affixes the item to specific output stream which must exist in the object for each chunk, must return a string. A number may be passed to identify how many round-robin threads to start up. Defaults to Round Robin to twice the number of CPU threads.
    clusterFunc ClusterCallback stream transforms similar to {@see DataStream#use method}
    options Object Options

    dataStream.separateInto(streams, affinity) ↺

    Separates stream into a hash of streams. Does not create new streams!

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:

    Param Type Description
    streams Object.<DataStream> the object hash of streams. Keys must be the outputs of the affinity function
    affinity AffinityCallback the Function that affixes the item to specific streams which must exist in the object for each chunk.

    dataStream.separate(affinity, createOptions) : MultiStream ↺

    Separates execution to multiple streams using the hashes returned by the passed Function.

    Calls the given Function for a hash, then makes sure all items with the same hash are processed within a single stream. Thanks to that streams can be distributed to multiple threads.

    Kind: instance method of DataStream
    Chainable
    Returns: MultiStream - separated stream
    Meta.noreadme:
    Test: test/methods/data-stream-separate.js

    Param Type Description
    affinity AffinityCallback the affinity function
    createOptions Object options to use to create the separated streams

    dataStream.delegate(delegateFunc, worker, [plugins]) ↺

    Delegates work to a specified worker.

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:

    Param Type Default Description
    delegateFunc DelegateCallback A function to be run in the sub-thread.
    worker WorkerStream
    [plugins] Array []

    dataStream.rate(cps, [options]) ↺

    Limit the rate of the stream to a given number of chunks per second or given timeframe.

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:

    Param Type Default Description
    cps Number Chunks per timeframe, the default timeframe is 1000 ms.
    [options] RateOptions {} Options for the limiter controlling the timeframe and time source. Both must work on same units.

    dataStream.batch(count) ↺

    Aggregates chunks in arrays given number of number of items long.

    This can be used for micro-batch processing.

    Kind: instance method of DataStream
    Chainable
    Test: test/methods/data-stream-batch.js

    Param Type Description
    count Number How many items to aggregate

    dataStream.timeBatch(ms, count) ↺

    Aggregates chunks to arrays not delaying output by more than the given number of ms.

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:
    Test: test/methods/data-stream-timebatch.js

    Param Type Description
    ms Number Maximum amount of milliseconds
    count Number Maximum number of items in batch (otherwise no limit)

    dataStream.nagle([size], [ms]) ↺

    Performs the Nagle's algorithm on the data. In essence it waits until we receive some more data and releases them in bulk.

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:
    Todo

    • needs more work, for now it's simply waiting some time, not checking the queues.
    Param Type Default Description
    [size] number 32 maximum number of items to wait for
    [ms] number 10 milliseconds to wait for more data

    dataStream.window(length) : WindowStream ↺

    Returns a WindowStream of the specified length

    Kind: instance method of DataStream
    Chainable
    Returns: WindowStream - a stream of array's
    Meta.noreadme:

    Param Type
    length Number

    dataStream.toJSONArray([enclosure]) : StringStream ↺

    Transforms the stream to a streamed JSON array.

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:
    Test: test/methods/data-stream-tojsonarray.js

    Param Type Default Description
    [enclosure] Iterable '[]' Any iterable object of two items (beginning and end)

    dataStream.toJSONObject([entryCallback], [enclosure]) : StringStream ↺

    Transforms the stream to a streamed JSON object.

    Kind: instance method of DataStream
    Chainable
    Meta.noreadme:
    Meta.noreadme:
    Test: test/methods/data-stream-tojsonobject.js

    Param Type Default Description
    [entryCallback] MapCallback async function returning an entry (array of [key, value])
    [enclosure] Iterable '{}' Any iterable object of two items (beginning and end)

    dataStream.JSONStringify([endline]) : StringStream ↺

    Returns a StringStream containing JSON per item with optional end line

    Kind: instance method of DataStream
    Chainable
    Returns: StringStream - output stream
    Meta.noreadme:

    Param Type Default Description
    [endline] Boolean | String os.EOL whether to add endlines (boolean or string as delimiter)

    dataStream.CSVStringify(options) : StringStream ↺

    Stringifies CSV to DataString using 'papaparse' module.

    Kind: instance method of DataStream
    Chainable
    Returns: StringStream - stream of parsed items
    Test: test/methods/data-stream-csv.js

    Param Description
    options options for the papaparse.unparse module.

    dataStream.exec(command, options, args)

    Executes a given sub-process with arguments and pipes the current stream into it while returning the output as another DataStream.

    Pipes the current stream into the sub-processes stdin. The data is serialized and deserialized as JSON lines by default. You can provide your own alternative methods in the ExecOptions object.

    Note: if you're piping both stderr and stdout (options.stream=3) keep in mind that chunks may get mixed up!

    Kind: instance method of DataStream
    Test: test/methods/data-stream-exec.js

    Param Type Description
    command String command to execute
    options ExecDataOptions options to be passed to spawn and defining serialization.
    args String additional arguments (will overwrite to SpawnOptions args even if not given)

    dataStream.debug(func) : DataStream ↺

    Injects a debugger statement when called.

    Kind: instance method of DataStream
    Chainable
    Returns: DataStream - self
    Meta.noreadme:
    Test: test/methods/data-stream-debug.js

    Param Type Description
    func function if passed, the function will be called on self to add an option to inspect the stream in place, while not breaking the transform chain

    dataStream.toBufferStream(serializer) : BufferStream ↺

    Creates a BufferStream.

    The passed serializer must return a buffer.

    Kind: instance method of DataStream
    Chainable
    Returns: BufferStream - the resulting stream
    Meta.noreadme:
    Test: test/methods/data-stream-tobufferstream.js

    Param Type Description
    serializer MapCallback A method that converts chunks to buffers

    dataStream.toStringStream(serializer) : StringStream ↺

    Creates a StringStream.

    The passed serializer must return a string.

    Kind: instance method of DataStream
    Chainable
    Returns: StringStream - the resulting stream
    Test: test/methods/data-stream-tostringstream.js

    Param Type Description
    serializer MapCallback A method that converts chunks to strings

    DataStream:from(input, options) : DataStream

    Returns a DataStream from pretty much anything sensibly possible.

    Depending on type:

    • self will return self immediately
    • Readable stream will get piped to the current stream with errors forwarded
    • Array will get iterated and all items will be pushed to the returned stream. The stream will also be ended in such case.
    • GeneratorFunction will get executed to return the iterator which will be used as source for items
    • AsyncGeneratorFunction will also work as above (including generators) in node v10.
    • Iterables iterator will be used as a source for streams

    You can also pass a Function or AsyncFunction that will be executed and it's outcome will be passed again to from and piped to the initially returned stream. Any additional arguments will be passed as arguments to the function.

    If a String is passed, scramjet will attempt to resolve it as a module and use the outcome as an argument to from as in the Function case described above. For more information see modules.md

    A simple example from a generator:

    DataStream
      .from(function* () {
         while(x < 100) yield x++;
      })
      .each(console.log)
      // 0
      // 1...
      // 99

    Kind: static method of DataStream

    Param Type Description
    input Array | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | String | Readable argument to be turned into new stream
    options StreamOptions | Writable

    DataStream:pipeline(readable, ...transforms) : DataStream

    Creates a pipeline of streams and returns a scramjet stream.

    This is similar to node.js stream pipeline method, but also takes scramjet modules as possibilities in an array of transforms. It may be used to run a series of non-scramjet transform streams.

    The first argument is anything streamable and will be sanitized by DataStream..from.

    Each following argument will be understood as a transform and can be any of:

    • AsyncFunction or Function - will be executed by DataStream..use
    • A transform stream that will be piped to the preceding stream

    Kind: static method of DataStream
    Returns: DataStream - a new DataStream instance of the resulting pipeline

    Param Type Description
    readable Array | Iterable | AsyncGeneratorFunction | GeneratorFunction | AsyncFunction | function | String | Readable the initial readable argument that is streamable by scramjet.from
    ...transforms AsyncFunction | function | Transform Transform functions (as in DataStream..use) or Transform streams (any number of these as consecutive arguments)

    DataStream:fromArray(array, options) : DataStream

    Create a DataStream from an Array

    Kind: static method of DataStream
    Test: test/methods/data-stream-fromarray.js

    Param Type Description
    array Array list of chunks
    options ScramjetOptions the read stream options

    DataStream:fromIterator(iterator, options) : DataStream

    Create a DataStream from an Iterator

    Doesn't end the stream until it reaches end of the iterator.

    Kind: static method of DataStream
    Test: test/methods/data-stream-fromiterator.js

    Param Type Description
    iterator Iterator the iterator object
    options ScramjetOptions the read stream options

    DataStream:MapCallback : Promise | *

    Kind: static typedef of DataStream
    Returns: Promise | * - the mapped object

    Param Type Description
    chunk * the chunk to be mapped

    DataStream:FilterCallback : Promise | Boolean

    Kind: static typedef of DataStream
    Returns: Promise | Boolean - information if the object should remain in the filtered stream.

    Param Type Description
    chunk * the chunk to be filtered or not

    DataStream:ReduceCallback : Promise | *

    Kind: static typedef of DataStream
    Returns: Promise | * - accumulator for the next pass

    Param Type Description
    accumulator * the accumulator - the object initially passed or returned by the previous reduce operation
    chunk Object the stream chunk.

    DataStream:DoCallback : function ⇄

    Kind: static typedef of DataStream

    Param Type Description
    chunk Object source stream chunk

    DataStream:IntoCallback : * ⇄

    Kind: static typedef of DataStream
    Returns: * - resolution for the old stream (for flow control only)

    Param Type Description
    into * stream passed to the into method
    chunk Object source stream chunk

    DataStream:TeeCallback : function

    Kind: static typedef of DataStream

    Param Type Description
    teed DataStream The teed stream

    DataStream:ShiftCallback : function

    Shift Function

    Kind: static typedef of DataStream

    Param Type Description
    shifted Array.<Object> an array of shifted chunks

    DataStream:AccumulateCallback : Promise | *

    Kind: static typedef of DataStream
    Returns: Promise | * - resolved when all operations are completed

    Param Type Description
    accumulator * Accumulator passed to accumulate function
    chunk * the stream chunk

    DataStream:ConsumeCallback : Promise | *

    Kind: static typedef of DataStream
    Returns: Promise | * - resolved when all operations are completed

    Param Type Description
    chunk * the stream chunk

    DataStream:RemapCallback : Promise | *

    Kind: static typedef of DataStream
    Returns: Promise | * - promise to be resolved when chunk has been processed

    Param Type Description
    emit function a method to emit objects in the remapped stream
    chunk * the chunk from the original stream

    DataStream:FlatMapCallback : Promise. | Iterable

    Kind: static typedef of DataStream
    Returns: Promise.<Iterable> | Iterable - promise to be resolved when chunk has been processed

    Param Type Description
    chunk * the chunk from the original stream

    DataStream:JoinCallback : Promise.<*> | *

    Kind: static typedef of DataStream
    Returns: Promise.<*> | * - promise that is resolved with the joining item

    Param Type Description
    previous * the chunk before
    next * the chunk after

    DataStream:RateOptions

    Kind: static typedef of DataStream

    Param Type Default Description
    [timeFrame] Number 1000 The size of the window to look for streams.
    [getTime] function Date.now Time source - anything that returns time.
    [setTimeout] function setTimeout Timing function that works identically to setTimeout.