608B71FC-006A-4934-A643-7D9BA9340450Blog

Scramjet v4.20.1

blog__author-img
Michał Czapracki
CEO at Scramjet, Data Streaming Expert.
23F2E8CD-3026-46A5-86CC-D13114F7176E425AE875-B1A1-4EA1-8529-075D08DA0BB1

11 January 2019

A small fix introduced today makes your life easier if you happened to use one of Scramjet's multithreaded functions, namely:

Underneath the methods start workers ready to execute any stream transform you like and wait for a control message that is used to recreate the transform in the worker. After the control message is received the worker waits until the transformed stream ends and if there's no other stream processed on the same worker it shuts down cleanly.

The scramjet side on the other hand pushes out MultiStream members to separate workers or in a single DataStream..distribute an affinity function is used to choose which workers should receive which chunks. These two ways of distributing work as follows:

MultiStream way


_10
const streams = await getArrayOfStreams();
_10
_10
new MultiStream(stream)
_10
.smap(stream => stream.use('./checks-and-filters'))
_10
// the line below executes in workers
_10
.cluster(stream => return stream.use('./cpu-heavy-ops'))
_10
.mux()
_10
// here we're back in the main thread

DataStream way


_10
DataStream.from(streamGenerator)
_10
.use('./checks-and-filters')
_10
.distribute(
_10
// this method makes sure that chunks make their way to the right workers
_10
chunk => chunk.id % 8,
_10
// the line below executes in workers
_10
stream => return stream.use('./cpu-heavy-ops')
_10
)
_10
.mux()
_10
// here we're back in the main thread with one stream

So what went wrong? Well... in short, it appears that all our tested cases covered two situations:

  • there were lots of chunk groups, so all workers were at least once utilized,
  • we tested a live scenario which was either way long lived.

We found however that the workers that are not utilized wait forever and hold the main thread from exiting even though all the other refcounts are zeroed. If we check the above example - if you run it on a small VPS with 4 CPU threads available it'll exit correctly - but run it on a bigger machine and it will do all it's work, but never exit. Here's why:


_20
1. `distribute()` -> spawn 2 x available threads of workers: 0, 1, 2, ... 7, ... 31.
_20
2. `resume()` -> we start flowing the elements, the affinity functions returns ids:
_20
* {id: 0} -> worker 0
_20
* {id: 1} -> worker 1
_20
* {id: 2} -> worker 2
_20
* {id: 3} -> worker 3
_20
* {id: 4} -> worker 4
_20
* {id: 5} -> worker 5
_20
* {id: 6} -> worker 6
_20
* {id: 7} -> worker 7
_20
* {id: 8} -> worker 0
_20
* {id: 9} -> worker 1
_20
...
_20
3. Actual distribution happens...
_20
* Workers 0 through 7 have received a code to execute and were unreffed.
_20
* Workers 8 through 31 are still awaiting code to run.
_20
4. `end()` -> the stream ends
_20
* Workers 0 through 7 exit silently
_20
* Workers 8 through 32 are still awaiting code to run.
_20
5. The program never ends.

In the patch we assumed that a worker that's not utilized for a second will also be allowed to exit. Now the mentioned workers 8 through 32 will be spawned for a second, but then stopped after execution. You may actually need that second of free time to be lower - in such case remember that both cluster and distribute can be called with options as the last parameter where you can provide the number of threads you wish to start with, for example:


_10
DataStream.from(streamGenerator)
_10
.use('./checks-and-filters')
_10
.distribute(
_10
chunk => chunk.id % 8,
_10
stream => return stream.use('./cpu-heavy-ops'),
_10
// only 8 workers will start
_10
{threads: 8}
_10
)
_10
.mux()

Project co-financed by the European Union from the European Regional Development Fund under the Knowledge Education Development Program. The project is carried out as a part of the competition of the National for Research and Development: Szybka Ścieżka.