Skip to content

Commit 85c1b88

Browse files
authored
[Dataflow Streaming] Create a separate option to control commit threads with direct path. (#37848)
The number of commit threads will be per backend windmill worker.
1 parent 914e425 commit 85c1b88

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {
100100

101101
void setWindmillServiceCommitThreads(Integer value);
102102

103+
@Description(
104+
"Number of commit threads per backend windmill worker in streaming engine direct path mode.")
105+
@Default.Integer(1)
106+
Integer getWindmillServiceDirectPathCommitThreads();
107+
108+
void setWindmillServiceDirectPathCommitThreads(Integer value);
109+
103110
@Description(
104111
"Frequency at which active work should be reported back to Windmill, in millis. "
105112
+ "The first refresh will occur after at least this much time has passed since "

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
419419
.setCommitByteSemaphore(maxCommitByteSemaphore)
420420
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
421421
.setOnCommitComplete(this::onCompleteCommit)
422-
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
422+
.setNumCommitSenders(
423+
Math.max(options.getWindmillServiceDirectPathCommitThreads(), 1))
423424
.setCommitWorkStreamFactory(
424425
() -> CloseableStream.create(commitWorkStream, () -> {}))
425426
.build(),

0 commit comments

Comments
 (0)