Skip to content

Commit 516da79

Browse files
committed
Fix
1 parent 3187b53 commit 516da79

File tree

10 files changed

+71
-10
lines changed

10 files changed

+71
-10
lines changed

docs/en/changes/changes.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323

2424
| Queue | Old threads | Old channels | Old buffer slots | New threads | New partitions | New buffer slots | New policy |
2525
|-------|-------------|--------------|------------------|-------------|----------------|------------------|------------|
26-
| L1 Aggregation (OAL) | 24 | ~1,240 | ~12.4M | 8 (unified) | ~460 adaptive | ~9.2M | `cpuCores(1.0)` |
26+
| L1 Aggregation (OAL) | 24 | ~1,240 | ~12.4M | 8 (unified) | ~330 adaptive | ~6.6M | `cpuCores(1.0)` |
2727
| L1 Aggregation (MAL) | 2 | ~100 | ~100K | (unified above) | | | |
28-
| L2 Persistence (OAL) | 2 | ~620 | ~1.24M | 3 (unified) | ~460 adaptive | ~920K | `cpuCoresWithBase(1, 0.25)` |
28+
| L2 Persistence (OAL) | 2 | ~620 | ~1.24M | 3 (unified) | ~330 adaptive | ~660K | `cpuCoresWithBase(1, 0.25)` |
2929
| L2 Persistence (MAL) | 1 | ~100 | ~100K | (unified above) | | | |
3030
| TopN Persistence | 4 | 4 | 4K | 1 | 4 adaptive | 4K | `fixed(1)` |
3131
| Exporters (gRPC/Kafka) | 3 | 6 | 120K | 3 (1 per exporter) || 60K | `fixed(1)` each |
32-
| **Total** | **36** | **~2,070** | **~13.9M** | **15** | **~924** | **~10.2M** | |
32+
| **Total** | **36** | **~2,070** | **~13.9M** | **15** | **~664** | **~7.3M** | |
3333

3434
* Remove `library-datacarrier-queue` module. All usages have been replaced by `library-batch-queue`.
3535
* Enable throughput-weighted drain rebalancing for L1 aggregation and L2 persistence queues (10s interval).
@@ -57,8 +57,8 @@
5757

5858
| Catalog | Thread Name | Count | Policy | Partitions |
5959
|---------|-------------|-------|--------|------------|
60-
| Data Pipeline | `BatchQueue-METRICS_L1_AGGREGATION-N` | 8 | `cpuCores(1.0)` | ~460 adaptive |
61-
| Data Pipeline | `BatchQueue-METRICS_L2_PERSISTENCE-N` | 3 | `cpuCoresWithBase(1, 0.25)` | ~460 adaptive |
60+
| Data Pipeline | `BatchQueue-METRICS_L1_AGGREGATION-N` | 8 | `cpuCores(1.0)` | ~330 adaptive |
61+
| Data Pipeline | `BatchQueue-METRICS_L2_PERSISTENCE-N` | 3 | `cpuCoresWithBase(1, 0.25)` | ~330 adaptive |
6262
| Data Pipeline | `BatchQueue-TOPN_PERSISTENCE-N` | 1 | `fixed(1)` | ~4 adaptive |
6363
| Data Pipeline | `BatchQueue-GRPC_REMOTE_{host}_{port}-N` | 1 per peer | `fixed(1)` | `fixed(1)` |
6464
| Data Pipeline | `BatchQueue-EXPORTER_GRPC_METRICS-N` | 1 | `fixed(1)` | `fixed(1)` |

oap-server/server-library/library-batch-queue/CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ Silently ignored for single-thread queues (nothing to rebalance).
146146
### L1 Metrics Aggregation (`MetricsAggregateWorker`)
147147
```
148148
threads: cpuCores(1.0) -- 8 threads on 8-core
149-
partitions: adaptive() -- grows with metric types (~460 for typical OAL+MAL)
149+
partitions: adaptive() -- grows with metric types (~330 for typical OAL+MAL on 8 threads)
150150
bufferSize: 20,000 per partition
151151
strategy: IF_POSSIBLE
152152
idleMs: 1..50

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ public class BatchQueue<T> {
213213
private final int[] consecutiveIdleCycles;
214214

215215
/** Set to false on {@link #shutdown()} to stop drain loops and reject new data. */
216+
/**
217+
* Whether the queue is currently accepting produces and running drain loops.
218+
*
219+
* @return true if the queue is running
220+
*/
216221
@Getter
217222
private volatile boolean running;
218223

@@ -424,6 +429,9 @@ private void scheduleDrain(final int taskIndex) {
424429
* count and grows the partition array if needed. For non-adaptive policies the
425430
* resolved count never changes, so this is a no-op beyond the registration.
426431
* Drain loop threads pick up new partitions on their next cycle via volatile reads.
432+
*
433+
* @param type the class of items to route to this handler
434+
* @param handler the consumer that processes batches of the given type
427435
*/
428436
@SuppressWarnings("unchecked")
429437
public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) {
@@ -522,6 +530,7 @@ private void enableRebalancing(final long intervalMs) {
522530
* <li>IF_POSSIBLE — returns false immediately if the partition is full (data dropped)</li>
523531
* </ul>
524532
*
533+
* @param data the item to enqueue
525534
* @return true if data was accepted, false if dropped or queue is stopped
526535
*/
527536
public boolean produce(final T data) {
@@ -844,6 +853,8 @@ int getTaskCount() {
844853

845854
/**
846855
* Take a point-in-time snapshot of queue usage across all partitions.
856+
*
857+
* @return a stats snapshot containing per-partition usage and capacity
847858
*/
848859
public BatchQueueStats stats() {
849860
final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ void validate() {
116116
* Builder customizations: convenience methods for setting paired fields together.
117117
*/
118118
public static class BatchQueueConfigBuilder<T> {
119+
/**
120+
* Configure the queue to use a shared scheduler instead of a dedicated one.
121+
*
122+
* @param name the shared scheduler name (queues with the same name share a pool)
123+
* @param threads the thread policy for the shared scheduler
124+
* @return this builder
125+
*/
119126
public BatchQueueConfigBuilder<T> sharedScheduler(final String name, final ThreadPolicy threads) {
120127
this.sharedSchedulerName = name;
121128
this.sharedSchedulerThreads = threads;
@@ -127,6 +134,7 @@ public BatchQueueConfigBuilder<T> sharedScheduler(final String name, final Threa
127134
*
128135
* @param balancer rebalancing strategy (e.g. {@link DrainBalancer#throughputWeighted()})
129136
* @param intervalMs rebalance interval in milliseconds
137+
* @return this builder
130138
*/
131139
public BatchQueueConfigBuilder<T> balancer(final DrainBalancer balancer, final long intervalMs) {
132140
this.balancer = balancer;

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueueStats.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ public class BatchQueueStats {
4848

4949
/**
5050
* Total capacity across all partitions: {@code partitionCount * bufferSize}.
51+
*
52+
* @return total capacity in item slots
5153
*/
5254
public long totalCapacity() {
5355
return (long) partitionCount * bufferSize;
5456
}
5557

5658
/**
5759
* Total number of items currently queued across all partitions.
60+
*
61+
* @return sum of items across all partitions
5862
*/
5963
public int totalUsed() {
6064
int sum = 0;
@@ -66,6 +70,8 @@ public int totalUsed() {
6670

6771
/**
6872
* Overall queue usage as a percentage (0.0–100.0).
73+
*
74+
* @return usage percentage across all partitions
6975
*/
7076
public double totalUsedPercentage() {
7177
final long capacity = totalCapacity();
@@ -77,13 +83,19 @@ public double totalUsedPercentage() {
7783

7884
/**
7985
* Number of items currently queued in the given partition.
86+
*
87+
* @param index the partition index
88+
* @return number of items in the partition
8089
*/
8190
public int partitionUsed(final int index) {
8291
return partitionUsed[index];
8392
}
8493

8594
/**
8695
* Usage of the given partition as a percentage (0.0–100.0).
96+
*
97+
* @param index the partition index
98+
* @return usage percentage for the partition
8799
*/
88100
public double partitionUsedPercentage(final int index) {
89101
if (bufferSize == 0) {
@@ -94,7 +106,10 @@ public double partitionUsedPercentage(final int index) {
94106

95107
/**
96108
* Return the top {@code n} most-loaded partitions, sorted by usage descending.
97-
* If {@code n >= partitionCount}, all partitions are returned.
109+
* If {@code n &gt;= partitionCount}, all partitions are returned.
110+
*
111+
* @param n the maximum number of partitions to return
112+
* @return list of partition usage snapshots sorted by usage descending
98113
*/
99114
public List<PartitionUsage> topN(final int n) {
100115
final Integer[] indices = new Integer[partitionCount];

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/DrainBalancer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public interface DrainBalancer {
4949
* Sorts partitions by throughput descending, assigns each to the least-loaded thread.
5050
* Zero-throughput partitions keep their current owner to avoid unnecessary moves.
5151
* Skips rebalancing when load is already balanced (max/min ratio &lt; 1.15).
52+
*
53+
* @return a throughput-weighted drain balancer
5254
*/
5355
static DrainBalancer throughputWeighted() {
5456
return new ThroughputWeightedBalancer();

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/HandlerConsumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
public interface HandlerConsumer<T> {
2828
/**
2929
* Process a batch of data belonging to this handler's type.
30+
*
31+
* @param data the batch of items to process
3032
*/
3133
void consume(List<T> data);
3234

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ private PartitionPolicy(final int fixedCount, final int multiplier,
5353
/**
5454
* Fixed number of partitions.
5555
*
56+
* @param count the exact number of partitions
57+
* @return a PartitionPolicy with a fixed partition count
5658
* @throws IllegalArgumentException if count &lt; 1
5759
*/
5860
public static PartitionPolicy fixed(final int count) {
@@ -65,6 +67,8 @@ public static PartitionPolicy fixed(final int count) {
6567
/**
6668
* Partitions = multiplier * resolved thread count.
6769
*
70+
* @param multiplier factor applied to thread count
71+
* @return a PartitionPolicy that scales with thread count
6872
* @throws IllegalArgumentException if multiplier &lt; 1
6973
*/
7074
public static PartitionPolicy threadMultiply(final int multiplier) {
@@ -95,6 +99,8 @@ public static PartitionPolicy threadMultiply(final int multiplier) {
9599
* 1000 handlers → 600 partitions (200 + 800/2)
96100
* 2000 handlers → 1100 partitions (200 + 1800/2)
97101
* </pre>
102+
*
103+
* @return an adaptive PartitionPolicy with default threshold multiplier
98104
*/
99105
public static PartitionPolicy adaptive() {
100106
return new PartitionPolicy(0, DEFAULT_ADAPTIVE_MULTIPLIER, true);
@@ -108,6 +114,7 @@ public static PartitionPolicy adaptive() {
108114
* at 1:2 ratio: {@code threshold + (handlerCount - threshold) / 2}.
109115
*
110116
* @param multiplier threshold per thread (default 25)
117+
* @return a PartitionPolicy that grows with handler registrations
111118
* @throws IllegalArgumentException if multiplier &lt; 1
112119
*/
113120
public static PartitionPolicy adaptive(final int multiplier) {
@@ -128,6 +135,10 @@ public static PartitionPolicy adaptive(final int multiplier) {
128135
* &lt;= threshold, returns handlerCount (1:1). If above, returns
129136
* threshold + (handlerCount - threshold) / 2.</li>
130137
* </ul>
138+
*
139+
* @param resolvedThreadCount the resolved number of drain threads
140+
* @param handlerCount the current number of registered type handlers
141+
* @return the resolved partition count, always &gt;= 1
131142
*/
132143
public int resolve(final int resolvedThreadCount, final int handlerCount) {
133144
if (fixedCount > 0) {

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionSelector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public interface PartitionSelector<T> {
4747
* Default selector: routes by {@code data.getClass().hashCode()}.
4848
* Same type always hits the same partition, so each consumer thread
4949
* drains pre-grouped batches — dispatch grouping is effectively a no-op.
50+
*
51+
* @param <T> the queue element type
52+
* @return a selector that partitions by item class hash
5053
*/
5154
static <T> PartitionSelector<T> typeHash() {
5255
return (data, count) -> (data.getClass().hashCode() & 0x7FFFFFFF) % count;

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/ThreadPolicy.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ private ThreadPolicy(final int fixedCount, final int base, final double cpuMulti
4444
/**
4545
* Fixed number of threads. Count must be &gt;= 1.
4646
*
47-
* @throws IllegalArgumentException if count < 1
47+
* @param count the exact number of threads
48+
* @return a ThreadPolicy with a fixed thread count
49+
* @throws IllegalArgumentException if count &lt; 1
4850
*/
4951
public static ThreadPolicy fixed(final int count) {
5052
if (count < 1) {
@@ -57,7 +59,9 @@ public static ThreadPolicy fixed(final int count) {
5759
* Threads = multiplier * available CPU cores, rounded, min 1.
5860
* Multiplier must be &gt; 0.
5961
*
60-
* @throws IllegalArgumentException if multiplier <= 0
62+
* @param multiplier factor applied to available CPU core count
63+
* @return a ThreadPolicy proportional to CPU cores
64+
* @throws IllegalArgumentException if multiplier &lt;= 0
6165
*/
6266
public static ThreadPolicy cpuCores(final double multiplier) {
6367
if (multiplier <= 0) {
@@ -72,7 +76,10 @@ public static ThreadPolicy cpuCores(final double multiplier) {
7276
*
7377
* Example: cpuCoresWithBase(2, 0.25) on 8-core = 2 + 2 = 4, on 16-core = 2 + 4 = 6, on 24-core = 2 + 6 = 8.
7478
*
75-
* @throws IllegalArgumentException if base < 0 or multiplier <= 0
79+
* @param base fixed base thread count added to the CPU-proportional portion
80+
* @param multiplier factor applied to available CPU core count
81+
* @return a ThreadPolicy that combines a fixed base with a CPU-proportional count
82+
* @throws IllegalArgumentException if base &lt; 0 or multiplier &lt;= 0
7683
*/
7784
public static ThreadPolicy cpuCoresWithBase(final int base, final double multiplier) {
7885
if (base < 0) {
@@ -86,6 +93,8 @@ public static ThreadPolicy cpuCoresWithBase(final int base, final double multipl
8693

8794
/**
8895
* Resolve the actual thread count. Always returns &gt;= 1.
96+
*
97+
* @return the resolved thread count, at least 1
8998
*/
9099
public int resolve() {
91100
if (fixedCount > 0) {

0 commit comments

Comments
 (0)