Skip to content

Make stream internal methods support promises / async await #31387

@lmammino

Description

@lmammino

Hello all,
It would be great if streams internal methods such as _write, _transform would support also a promise based implementation.

To clarify what I mean, here's an example.

Classic implementation of a writable streams that receives objects containing a path and a content and it will write a file with such content for every received chunk.

Classic implementation

import { Writable } from 'stream'
import { promises as fs } from 'fs'
import { dirname } from 'path'
import mkdirp from 'mkdirp'

export class ToFileStream extends Writable {
  constructor (opts) {
    super({ ...opts, objectMode: true })
  }

  async _write (chunk, encoding, cb) {
    mkdirp(dirname(chunk.path), (err) => {
      if (err) {
        return cb(err)
      }

      fs.writeFile(chunk.path, chunk.content, cb)
    })
  }
}

Nice and standard, but as it happens with callback-based APIs, it's easy to end up with a callback hell situation and repeated code in case of conditional asynchronous work.

Desired implementation

This does not work as of today

import { Writable } from 'stream'
import { promises as fs } from 'fs'
import { dirname } from 'path'
import mkdirp from 'mkdirp-promise'

export class ToFileStream extends Writable {
  constructor (opts) {
    super({ ...opts, objectMode: true })
  }

  async _write (chunk, encoding) {
    await mkdirp(dirname(chunk.path))
    return fs.writeFile(chunk.path, chunk.content)
  }
}

This would feel quite clean and standard for those used to the async await style

What we can do today

import { Writable } from 'stream'
import { promises as fs } from 'fs'
import { dirname } from 'path'
import mkdirp from 'mkdirp-promise'

export class ToFileStream extends Writable {
  constructor (opts) {
    super({ ...opts, objectMode: true })
  }

  async _write (chunk, encoding, cb) {
    try {
      await mkdirp(dirname(chunk.path))
      await fs.writeFile(chunk.path, chunk.content)
      cb()
    } catch (err) {
      cb(err)
    }
  }
}

In a situation with a lot of callbacks this might help to remove the nesting, but it still feels weird to (and hard to read) to mix async await style with callbacks.


What do you think? Would this be something feasible without breaking existing streams implementations?

Metadata

Metadata

Assignees

No one assigned

    Labels

    feature requestIssues that request new features to be added to Node.js.stalestreamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions