Fast, asynchronous, functional reactive stream programming framework with multi‑threading capability based on ES6+.

Master Build StatusMaster Build StatusDependenciesDev DependenciesKnown VulnerabilitiesDeepScan grade
Module exports
Classes
  • Scramjet

    What does it do?

    Scramjet is a fast, simple, multi-threaded functional stream programming framework written on top of node.js object streams. It exposes a standards inspired javascript API and written fully in native ES6. Thanks to it some built in optimizations scramjet is much faster and much much simpler than similar frameworks when using asynchronous operations.

    It is built upon the logic behind three well known javascript array operations - namingly map, filter and reduce. This means that if you've ever performed operations on an Array in JavaScript - you already know Scramjet like the back of your hand.

    The main advantage of scramjet is running asynchronous operations on your data streams. First of all it allows you to perform the transformations both synchronously and asynchronously by using the same API - so now you can "map" your stream from whatever source and call any number of API's consecutively.

    The benchmarks are published in the scramjet-benchmark repo.

    Example

    How about a full API to API migration, reading a long list of items from one API and checking them one after another, pushing them to another API? With simultaneous request control? And outputting the log of the conversion? Easy!

    const request = require("request");
    const rp = require("request-promise-native");
    const { StringStream } = require("scramjet");
    
    StringStream.from(                                 // fetch your API to a scramjet stream
        request("https://api.example.org/v1/shows/list")
    )
        .setOptions({maxParallel: 4})                  // set your options
        .lines()                                       // split the stream by line
        .parse(theirShow => {                          // parse to your requirement
            return {
                id: theirShow.id,
                title: theirShow.name,
                url: theirShow.url
            };
        })
        .map(myShow => rp({                            // parse to your requirement
            method: "POST",
            simple: true,
            uri: `http://api.local/set/${myShow.id}`,
            body: JSON.stringify(myShow)
        }))
        .map(resp => `+ Update succeeded "${resp}"`)  // make your logs
        .catch(err => `! Error occured ${err.uri}`)
        .toStringStream()
        .append("\n")
        .pipe(process.stdout)   // pipe to any output
    ;

    Usage

    Scramjet uses functional programming to run transformations on your data streams in a fashion very similar to the well known event-stream node module. Most transformations are done by passing a transform function. You can write your function in three ways:

    1. Synchronous

    Example: a simple stream transform that outputs a stream of objects of the same id property and the length of the value string.

    DataStream
       .from(items)
       .map(
           (item) => ({id: item.id, length: item.value.length})
       )
    1. Asynchronous using ES2015 async await

    Example: A simple stream that uses Fetch API to get all the contents of all entries in the stream

    StringStream
        .from(urls)
        .map(
            async (url) => fetch(url).then(res => res.json())
        )
        .JSONParse()
    1. Asynchronous using Promises

    Example: A simple stream that fetches an url mentioned in the incoming object

       datastream.map(
           (item) => new Promise((resolve, reject) => {
               request(item.url, (err, res, data) => {
                   if (err)
                       reject(err); // will emit an "error" event on the stream
                   else
                       resolve(data);
               });
           })
       )

    The actual logic of this transform function is as if you passed your function to the then method of a Promise resolved with the data from the input stream.

    1. Streams with multi-threading

    To distribute your code among the processor cores, just use the method distribute:

       datastream.distribute(
           16, // number of threads
           (stream) => {
               // multi-threaded code goes here.
               // it MUST return a valid stream back to the main thread.
           }
       )

    Typescript support

    Scramjet aims to be fully documented and expose TypeScript declarations. First version to include definitions in .d.ts folder is 4.15.0. More TypeScript support will be added with next versions, so feel free to report issues in GitHub.

    Detailed docs

    Here's the list of the exposed classes and methods, please review the specific documentation for details:

    Note that:

    • Most of the methods take a callback argument that operates on the stream items.
    • The callback, unless it's stated otherwise, will receive an argument with the next chunk.
    • If you want to perform your operations asynchronously, return a Promise, otherwise just return the right value.

    CLI

    Check out the command line interface for simplified scramjet usage with scramjet-cli

    $ sjr -i http://datasource.org/file.csv ./transform-module-1 ./transform-module-1 | gzip > logs.gz

    Scramjet core

    Don't like dependencies? Scramjet packs just a couple of those, but if you are really really annoyed by second depth of deps, please try scramjet-core.

    Only the most vital methods there, but the library is dependency free.

    License and contributions

    As of version 2.0 Scramjet is MIT Licensed.

    Help wanted

    The project need's your help! There's lots of work to do - transforming and muxing, joining and splitting, browserifying, modularizing, documenting and issuing those issues.

    If you want to help and be part of the Scramjet team, please reach out to me, signicode on Github or email me: [email protected]