Skip to content

Implement rescale change#142

Open
ashish47108 wants to merge 4 commits into
pinterest:mainfrom
ashish47108:ashish/rescale
Open

Implement rescale change#142
ashish47108 wants to merge 4 commits into
pinterest:mainfrom
ashish47108:ashish/rescale

Conversation

@ashish47108
Copy link
Copy Markdown

@ashish47108 ashish47108 commented May 11, 2026

Summary

Fix the issue with the Flink Table API source-rescale path so that:

  • Parallelism resolution lives in a single pure utility (getEffectiveSourceParallelism()) that returns the parallelism a PSC source should run at when rescaling is enabled.
  • The source operator now actively pins its source parallelism using effective parallelism when rescale is enabled.

Motivation

The motivating gap with the prior implementation was that it only consulted scan.parallelism and table.exec.resource.default-parallelism. If a user neither set those nor a useful job parallelism but did have a known partition count, there was no way to derive a sensible source parallelism — now Kafka partition count is the final fallback in the chain.
Precedence: scan.parallelism > table.exec.resource.default-parallelism > kafka partition count

Changes

PscTableCommonUtils.java

  • Removed shouldApplyRescale(...) (the boolean decision-maker that combined the rescale flag + parallelism comparison).
  • Added getEffectiveSourceParallelism(globalConfig, topicUris, pscProperties, scanParallelism) returning an int. Walks the following chain and returns the first positive value:
    1. scan.parallelism (table-level, table option)
    2. table.exec.resource.default-parallelism (table environment default)
    3. Kafka partition count (via PSC metadata client, mockable through the existing PartitionCountProvider)
    4. -1 if none of the above is usable (e.g. metadata fetch fails)
  • Each tier logs which source it picked; a warning is logged when it falls through to -1.

PscDynamicTableFactory.java / UpsertPscDynamicTableFactory.java (same pattern in both)

  • Static import switched from shouldApplyRescale → getEffectiveSourceParallelism.
  • scan.enable-rescale is now read directly at the factory: final boolean shouldRescale = tableOptions.get(SCAN_ENABLE_RESCALE);
  • effectiveParallelism is computed only when shouldRescale is true; otherwise it is set to -1.
  • The downstream source (PscDynamicSource / Upsert source) now receives effectiveParallelism in the slot where scanParallelism used to be passed.
  • Logs the chosen effective parallelism for the source operator when known.

PscDynamicSource.java (in produceDataStream)

  • The constructor parameter previously named scanParallelism now carries the effective parallelism (factory-computed). Internally still referenced as scanParallelism.
  • When enableRescale == true and scanParallelism > 0:
    1. Pin source parallelism to Math.min(scanParallelism, execEnv.getParallelism()).
    2. This avoids creating idle subtasks beyond the job parallelism, while still letting Flink's per-partition assignment work for the source itself.

PscTableCommonUtilsTest.java
Removed all shouldApplyRescale-based tests (the method is gone).
Added 7 tests targeting getEffectiveSourceParallelism() (table below).

Usage examples
Example 1 — User sets scan.parallelism

CREATE TABLE my_topic (...) WITH (
  'connector' = 'psc',
  'topic' = 'plaintext:kafka:env:cluster:/my_topic',
  'scan.enable-rescale' = 'true',
  'scan.parallelism' = '64',
  ...
);

Resolution: scan.parallelism = 64 is used. Source is pinned to min(64, env.getParallelism()); data is then rescale()d downstream.

Example 2 — User leaves scan.parallelism unset; table env has default parallelism

StreamTableEnvironment tEnv = ...;
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 32);
CREATE TABLE my_topic (...) WITH (
  'connector' = 'psc',
  'scan.enable-rescale' = 'true',
  ...                       -- no scan.parallelism
);

Example 3 — No parallelism configured anywhere, but topic has 24 partitions

CREATE TABLE my_topic (...) WITH (
  'connector' = 'psc',
  'scan.enable-rescale' = 'true',
  ...                       -- no scan.parallelism, no table.exec default
);

Resolution: PSC metadata client returns 24; effective parallelism = 24. Source pinned to min(24, env.getParallelism()).

Example 4 — Metadata fetch fails / topic unreachable
Resolution: util returns -1. Factory still propagates shouldRescale = true and effectiveParallelism = -1. Inside PscDynamicSource, the warning branch fires and the source runs at env.getParallelism() (no explicit pin), so the job still starts.

Example 5 — Rescale disabled
'scan.enable-rescale' = 'false'
The factory skips getEffectiveSourceParallelism() entirely and passes -1 down. PscDynamicSource logs the "rescale disabled" branch and uses job default parallelism. (No behavioral regression from prior version.)

Executed all Unit test case related to this change using following commands

mvn -pl psc-flink test -am \
  -Dtest=PscDynamicTableFactoryTest \
  -DfailIfNoTests=false \
  -Dmaven.repo.local=./lrepo \
  -Dgpg.skip=true \
  -Djacoco.skip=true

mvn -pl psc-flink test -am \
  -Dtest=PscTableCommonUtilsTest \
  -DfailIfNoTests=false \
  -Dmaven.repo.local=./lrepo \
  -Dgpg.skip=true \
  -Djacoco.skip=true

mvn -pl psc-flink test -am \
 -Dtest=UpsertPscDynamicTableFactoryTest \
  -DfailIfNoTests=false \
  -Dmaven.repo.local=./lrepo \
  -Dgpg.skip=true \
  -Djacoco.skip=true

@ashish47108 ashish47108 requested a review from a team as a code owner May 11, 2026 19:23
@ashish47108 ashish47108 changed the title Implement rescale change - WIP PR Implement rescale change May 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant