diff --git a/packages/d2ts/src/operators/join.ts b/packages/d2ts/src/operators/join.ts index 3c71eb5..a7c1179 100644 --- a/packages/d2ts/src/operators/join.ts +++ b/packages/d2ts/src/operators/join.ts @@ -116,8 +116,12 @@ export class JoinOperator extends BinaryOperator< if (this.outputFrontier.lessThan(inputFrontier)) { this.outputFrontier = inputFrontier this.output.sendFrontier(this.outputFrontier) - this.#indexA.compact(this.outputFrontier) - this.#indexB.compact(this.outputFrontier) + // Compact in the background + // but do not await to avoid blocking the event loop + Promise.all([ + this.#indexA.compactAsync(this.outputFrontier), + this.#indexB.compactAsync(this.outputFrontier) + ]) } } } diff --git a/packages/d2ts/src/version-index.ts b/packages/d2ts/src/version-index.ts index 81078b6..5c2d225 100644 --- a/packages/d2ts/src/version-index.ts +++ b/packages/d2ts/src/version-index.ts @@ -29,6 +29,7 @@ export class Index implements IndexType { #inner: IndexMap #compactionFrontier: Antichain | null #modifiedKeys: Set + #compactionPromise: Promise constructor() { this.#inner = new DefaultMap>( @@ -42,6 +43,7 @@ export class Index implements IndexType { // } this.#compactionFrontier = null this.#modifiedKeys = new Set() + this.#compactionPromise = Promise.resolve() } toString(indent = false): string { @@ -144,44 +146,26 @@ export class Index implements IndexType { ) // We want to iterate over the smaller of the two indexes to reduce the - // number of operations we need to do. - if (this.#inner.size <= other.#inner.size) { - for (const [key, versions] of this.#inner) { - if (!other.has(key)) continue - const otherVersions = other.get(key) - for (const [rawVersion1, data1] of versions) { - const version1 = - this.#compactionFrontier && - this.#compactionFrontier.lessEqualVersion(rawVersion1) - ? rawVersion1.advanceBy(this.#compactionFrontier) - : rawVersion1 - for (const [version2, data2] of otherVersions) { - for (const [val1, mul1] of data1) { - for (const [val2, mul2] of data2) { - const resultVersion = version1.join(version2) - collections.update(resultVersion, (existing) => { - existing.push([key, [val1, val2], mul1 * mul2]) - return existing - }) - } - } - } - } - } - } else { - for (const [key, otherVersions] of other.entries()) { - if (!this.has(key)) continue - const versions = this.get(key) + // number of operations we need to do. + const thisIsTheSmallerIndex = this.#inner.size <= other.#inner.size + const [smallestIndex, otherIndex] = + thisIsTheSmallerIndex ? [this, other] : [other, this] + + for (const [key, versions] of smallestIndex.#inner) { + if (!otherIndex.has(key)) continue + const otherVersions = otherIndex.get(key) + for (const [version1, data1] of versions) { for (const [version2, data2] of otherVersions) { - for (const [version1, data1] of versions) { + for (const [val1, mul1] of data1) { for (const [val2, mul2] of data2) { - for (const [val1, mul1] of data1) { - const resultVersion = version1.join(version2) - collections.update(resultVersion, (existing) => { - existing.push([key, [val1, val2], mul1 * mul2]) - return existing - }) - } + const resultVersion = version1.join(version2) + collections.update(resultVersion, (existing) => { + const values: [V, V2] = thisIsTheSmallerIndex + ? [val1 as V, val2 as V2] + : [val2 as V, val1 as V2] + existing.push([key, values, mul1 * mul2]) + return existing + }) } } } @@ -197,7 +181,10 @@ export class Index implements IndexType { return result as [Version, MultiSet<[K, [V, V2]]>][] } - compact(compactionFrontier: Antichain, keys: K[] = []): void { + compact( + compactionFrontier: Antichain, + keys: K[] = Array.from(this.#modifiedKeys), + ): void { if ( this.#compactionFrontier && !this.#compactionFrontier.lessEqual(compactionFrontier) @@ -226,10 +213,7 @@ export class Index implements IndexType { ) } - const keysToProcess = - keys.length > 0 ? keys : Array.from(this.#modifiedKeys) - - for (const key of keysToProcess) { + for (const key of keys) { const versions = this.#inner.get(key) const toCompact = Array.from(versions.keys()).filter( @@ -264,6 +248,87 @@ export class Index implements IndexType { this.#compactionFrontier = compactionFrontier } + /** + * Asynchronous version of compact that processes one key at a time and yields to the event loop. + * This prevents blocking the event loop during compaction of large datasets. + * Multiple calls to compactAsync are chained to ensure sequential execution. + * WARNING: Never interleave calls to compactAsync with calls to compact. + * Those will not be chained and may interleave, resulting in wrong compaction. + */ + async compactAsync( + compactionFrontier: Antichain, + keys: K[] = Array.from(this.#modifiedKeys), + ): Promise { + // Chain this compaction operation to the previous one + this.#compactionPromise = this.#compactionPromise.then(async () => { + if ( + this.#compactionFrontier && + !this.#compactionFrontier.lessEqual(compactionFrontier) + ) { + throw new Error('Invalid compaction frontier') + } + + this.#validate(compactionFrontier) + + const consolidateValues = (values: [V, number][]): [V, number][] => { + const consolidated = new Map() + + for (const [value, multiplicity] of values) { + const key = hash(value) + const existing = consolidated.get(key) + if (existing) { + consolidated.set(key, [value, existing[1] + multiplicity]) + } else { + consolidated.set(key, [value, multiplicity]) + } + } + + return Array.from(consolidated.values()).filter( + ([_, multiplicity]) => multiplicity !== 0, + ) + } + + for (const key of keys) { + const versions = this.#inner.get(key) + + const toCompact = Array.from(versions.keys()).filter( + (version) => !compactionFrontier.lessEqualVersion(version), + ) + + const toConsolidate = new Set() + + for (const version of toCompact) { + const values = versions.get(version) + versions.delete(version) + + const newVersion = version.advanceBy(compactionFrontier) + versions.update(newVersion, (existing) => { + chunkedArrayPush(existing, values) + return existing + }) + toConsolidate.add(newVersion) + } + + for (const version of toConsolidate) { + const newValues = consolidateValues(versions.get(version)) + if (newValues.length > 0) { + versions.set(version, newValues) + } else { + this.#inner.delete(key) + } + } + this.#modifiedKeys.delete(key) + + // Yield to the event loop after processing each key + await new Promise(resolve => setTimeout(resolve, 0)) + } + + this.#compactionFrontier = compactionFrontier + }) + + return this.#compactionPromise + } + keys(): K[] { return Array.from(this.#inner.keys()) }