Skip to content

Modularization

Consider this example:

js
_(csvSource)
  .csv({header: true})
  .map(normalizeNames)
  .filter(italianNames)
  .uniqBy(uniqByName)
  .batch(1000)
  .map(uploadToPostgres)
  .resolve(2)
  .start()

This flow is doing a couple of different things in a relatively complex chain. Many real use cases are even more complex and difficult to read. In this cases, the Stream can be modularized thanks to the _.pipeline() function

The _.pipeline() function let you create a transform or a writable stream that can be used in an Exstream chain thanks to the .through and .pipe functions. The above example can be refactored like this:

js
const getNormalizedItalianNames = () => _.pipeline()
  .map(normalizeNames)
  .filter(italianNames)
  .uniqBy(uniqByName)

const bulkUploadToPostgres => (batchSize, parallelism) => _.pipeline()
  .batch(batchSize)
  .map(uploadToPostgres)
  .resolve(parallelism)
  .start()

const mainFlow = _(csvSource)
  .csv({header: true})
  .through(getNormalizedItalianNames())
  .pipe(bulkUploadToPostgres(1000, 2))

As you can see, the code is now way more readable.

A pipeline can also be used in a vanilla Node.js stream chain:

js
const getNormalizedItalianNamesTransformStream = () => _.pipeline()
  .map(normalizeNames)
  .filter(italianNames)
  .uniqBy(uniqByName)
  .toNodeStream()

fs.createReadStream('input.csv')
  .pipe(csv.parse())
  .pipe(getNormalizedItalianNamesTransformStream())
  .pipe(csv.stringify())
  .pipe(fs.createWriteStream('output.csv'))

As we'll see in the next chapter, a stream can be forked and many source streams can be merged together, and modularization is particularly useful in those cases to write clean and readable code

Released under the MIT License.