Skip to content

Document/Improve: Continuous batching/chunking of a stream #118

@jmealo

Description

@jmealo

I'm creating this issue to discuss solutions (documentation or otherwise) to help folks avoid some of the non-memory related pitfalls on #116 when continuously batching COPY commands.

See:
#116 (comment)

@jeromew: I found out what happens if you always have a COPY in progress. It works great until something finally manages to get a lock on the table and then you have a bunch of COPY operations pending. From a locking perspective, it's really not ideal to start a COPY, stream data into it, close that stream, start a new COPY stream an immediately pipe into it, you're effectively never yielding the table for an exclusive lock.

🔥 If you do not give PostgreSQL some breathing room between COPY operations; you're going to run into 🔒 locking issues 🔒.

I'm interested if anyone has any ideas for the best ways to handle this entirely. I'm going to dump some tips for anyone working on this in the future.

Pro-tip: Safely create async batches

Under high load, a new event may arrive before the previous event's call to getBatch() resolves. This can lead to orphaned batches (memory leak) and lost messages.

const getBatch = async () => {
  // If another batch is already pending; we return the promise that will resolve to that batch.
  // You can .then() or await the same promise multiple times; you cannot resolve()/reject() multiple values
  if (nextBatch) {
    return nextBatch
  }

  try {
    nextBatch = producer.createBatch({ maxSizeInBytes })
    const batch = await nextBatch
    return batch
  } finally {
    nextBatch = undefined
  }
}

Anti-pattern: Immediately unpipe/piping another stream into rotating COPY streams

// Ooops! If you immediately rotate streams you will always have a lock on the table (until you don't and then COPY will pile up in a `WAITING` state, especially if you have control/metadata queries interleaved between)
const rotateCopyStream = async (logStream) => {
  // disconnecting the csv stream from our log stream, causes log stream to pause
  if (csvStream) logStream.unpipe(csvStream)

  if (pgStream) {
    // disconnecting the pg stream from the csv stream causes the csv stream to end
    csvStream.unpipe(pgStream)
    pgStream.end() // calling this ends the COPY command
  }

  // Get connection and execute our copy command
  const client = await pool.connect()
  pgStream = client.query(copyFrom(COPY_QUERY))

  // Debugging helps
  pgStream.on('error', (error) => {
    if (client) client.release()
    console.log('pgSteam -> stream errored')
    console.log(error)
  })

  pgStream.on('finish', () => {
    if (client) client.release()
    console.log('pgStream -> stream complete')
  })

  csvStream = format({headers: false, includeEndRowDelimiter: true, delimiter: '\t'})
  logStream.pipe(csvStream, {end: false}).pipe(pgStream)
}

@jbtobar: I'm using 100% streams in my workflow, but I've found a Node.js foot canon when batching things coming from an event emitter/event handler/subscription. If batch creation is async, you need to make sure if a second event comes in while the batches are being rotated, that you don't end up creating two or more batches. An example of how to accomplish this is below:

I considered putting an advisory lock at the beginning of the COPY command. This would give us a chance to keep our input stream paused and resume flow when the last COPY is complete with the possibility of application-level logic. I haven't tried it yet, but, that's my next plan of attack.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions