Scramjet v4.20.1

A small fix introduced today makes your life easier if you happened to use one of Scramjet's multithreaded functions, namely:

Underneath the methods start workers ready to execute any stream transform you like and wait for a control message that is used to recreate the transform in the worker. After the control message is received the worker waits until the transformed stream ends and if there's no other stream processed on the same worker it shuts down cleanly.

The scramjet side on the other hand pushes out MultiStream members to separate workers or in a single DataStream..distribute an affinity function is used to choose which workers should receive which chunks. These two ways of distributing work as follows:

MultiStream way

  const streams = await getArrayOfStreams();

  new MultiStream(stream)
    .smap(stream => stream.use('./checks-and-filters'))
    // the line below executes in workers
    .cluster(stream => return stream.use('./cpu-heavy-ops'))
    .mux()
    // here we're back in the main thread

DataStream way

  DataStream.from(streamGenerator)
    .use('./checks-and-filters')
    .distribute(
      // this method makes sure that chunks make their way to the right workers
      chunk => chunk.id % 8,
      // the line below executes in workers
      stream => return stream.use('./cpu-heavy-ops')
    )
    .mux()
    // here we're back in the main thread with one stream

So what went wrong? Well... in short, it appears that all our tested cases covered two situations:

  • there were lots of chunk groups, so all workers were at least once utilized,
  • we tested a live scenario which was either way long lived.

We found however that the workers that are not utilized wait forever and hold the main thread from exiting even though all the other refcounts are zeroed. If we check the above example - if you run it on a small VPS with 4 CPU threads available it'll exit correctly - but run it on a bigger machine and it will do all it's work, but never exit. Here's why:

1. `distribute()` -> spawn 2 x available threads of workers: 0, 1, 2, ... 7, ... 31.
2. `resume()` -> we start flowing the elements, the affinity functions returns ids:
 * {id: 0} -> worker 0
 * {id: 1} -> worker 1
 * {id: 2} -> worker 2
 * {id: 3} -> worker 3
 * {id: 4} -> worker 4
 * {id: 5} -> worker 5
 * {id: 6} -> worker 6
 * {id: 7} -> worker 7
 * {id: 8} -> worker 0
 * {id: 9} -> worker 1
  ...
3. Actual distribution happens...
 * Workers 0 through 7 have received a code to execute and were unreffed.
 * Workers 8 through 31 are still awaiting code to run.
4. `end()` -> the stream ends
 * Workers 0 through 7 exit silently
 * Workers 8 through 32 are still awaiting code to run.
5. The program never ends.

In the patch we assumed that a worker that's not utilized for a second will also be allowed to exit. Now the mentioned workers 8 through 32 will be spawned for a second, but then stopped after execution. You may actually need that second of free time to be lower - in such case remember that both cluster and distribute can be called with options as the last parameter where you can provide the number of threads you wish to start with, for example:

  DataStream.from(streamGenerator)
    .use('./checks-and-filters')
    .distribute(
      chunk => chunk.id % 8,
      stream => return stream.use('./cpu-heavy-ops'),
      // only 8 workers will start
      {threads: 8}
    )
    .mux()