Skip to content

Commit 48ee659

Browse files
committed
#AI commit# 开发阶段:新增功能 - 添加批量获取队列资源功能支持
1 parent 66b8840 commit 48ee659

File tree

6 files changed

+362
-2
lines changed

6 files changed

+362
-2
lines changed

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/ExternalResourceService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ List<ExternalAppInfo> getAppInfo(
5050
ExternalResourceIdentifier identifier)
5151
throws RMErrorException;
5252

53+
Map<String, NodeResource> getBatchResource(
54+
ResourceType resourceType,
55+
RMLabelContainer labelContainer,
56+
List<ExternalResourceIdentifier> identifiers)
57+
throws RMErrorException;
58+
5359
ExternalResourceProvider chooseProvider(
5460
ResourceType resourceType, RMLabelContainer labelContainer) throws RMErrorException;
5561
}

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.linkis.manager.rm.external.service.impl;
1919

2020
import org.apache.linkis.manager.common.conf.RMConfiguration;
21+
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
2122
import org.apache.linkis.manager.common.entity.resource.NodeResource;
2223
import org.apache.linkis.manager.common.entity.resource.ResourceType;
2324
import org.apache.linkis.manager.common.exception.RMErrorException;
@@ -33,6 +34,8 @@
3334
import org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser;
3435
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
3536
import org.apache.linkis.manager.rm.external.service.ExternalResourceService;
37+
import org.apache.linkis.manager.rm.external.yarn.YarnQueueInfo;
38+
import org.apache.linkis.manager.rm.external.yarn.YarnResourceIdentifier;
3639
import org.apache.linkis.manager.rm.external.yarn.YarnResourceRequester;
3740
import org.apache.linkis.manager.rm.utils.RMUtils;
3841

@@ -44,11 +47,13 @@
4447

4548
import java.net.ConnectException;
4649
import java.text.MessageFormat;
50+
import java.util.HashMap;
4751
import java.util.List;
4852
import java.util.Map;
4953
import java.util.concurrent.ExecutionException;
5054
import java.util.concurrent.TimeUnit;
5155
import java.util.function.Function;
56+
import java.util.stream.Collectors;
5257

5358
import com.fasterxml.jackson.core.JsonParseException;
5459
import com.google.common.cache.CacheBuilder;
@@ -141,6 +146,60 @@ public List<ExternalAppInfo> getAppInfo(
141146
return appInfos;
142147
}
143148

149+
@Override
150+
public Map<String, NodeResource> getBatchResource(
151+
ResourceType resourceType,
152+
RMLabelContainer labelContainer,
153+
List<ExternalResourceIdentifier> identifiers)
154+
throws RMErrorException {
155+
ExternalResourceProvider provider = chooseProvider(resourceType, labelContainer);
156+
ExternalResourceRequester externalResourceRequester = getRequester(resourceType);
157+
158+
if (externalResourceRequester instanceof YarnResourceRequester) {
159+
YarnResourceRequester yarnRequester = (YarnResourceRequester) externalResourceRequester;
160+
List<String> queueNames =
161+
identifiers.stream()
162+
.map(id -> ((YarnResourceIdentifier) id).getQueueName())
163+
.collect(Collectors.toList());
164+
165+
Map<String, YarnQueueInfo> batchResources =
166+
(Map<String, YarnQueueInfo>)
167+
retry(
168+
RMConfiguration.EXTERNAL_RETRY_NUM.getValue(),
169+
(i) ->
170+
yarnRequester.getBatchResources(
171+
yarnRequester.getAndUpdateActiveRmWebAddress(provider),
172+
queueNames,
173+
provider),
174+
(i) -> yarnRequester.reloadExternalResourceAddress(provider));
175+
176+
Map<String, NodeResource> result = new HashMap<>();
177+
batchResources.forEach(
178+
(queueName, queueInfo) -> {
179+
CommonNodeResource nodeResource = new CommonNodeResource();
180+
nodeResource.setMaxResource(queueInfo.getMaxResource());
181+
nodeResource.setUsedResource(queueInfo.getUsedResource());
182+
nodeResource.setMaxApps(queueInfo.getMaxApps());
183+
nodeResource.setNumPendingApps(queueInfo.getNumPendingApps());
184+
nodeResource.setNumActiveApps(queueInfo.getNumActiveApps());
185+
result.put(queueName, nodeResource);
186+
});
187+
return result;
188+
} else {
189+
// For other resource types, fall back to individual requests
190+
Map<String, NodeResource> result = new HashMap<>();
191+
for (ExternalResourceIdentifier identifier : identifiers) {
192+
try {
193+
NodeResource resource = getResource(resourceType, labelContainer, identifier);
194+
result.put(((YarnResourceIdentifier) identifier).getQueueName(), resource);
195+
} catch (Exception e) {
196+
logger.error("Failed to get resource for identifier " + identifier, e);
197+
}
198+
}
199+
return result;
200+
}
201+
}
202+
144203
private Object retry(int retryNum, Function function, Function reloadExternalAddress)
145204
throws RMErrorException {
146205
int times = 0;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.linkis.manager.rm.external.yarn;
19+
20+
/** Enumeration of Yarn application states. */
21+
public enum YarnAppState {
22+
23+
/** Application has been submitted but not yet accepted by ResourceManager. */
24+
NEW("NEW"),
25+
26+
/** Application is being saved to persistent storage. */
27+
NEW_SAVING("NEW_SAVING"),
28+
29+
/** Application has been submitted and is waiting for scheduling. */
30+
SUBMITTED("SUBMITTED"),
31+
32+
/** Application has been accepted by ResourceManager and waiting for resource allocation. */
33+
ACCEPTED("ACCEPTED"),
34+
35+
/** Application is running with at least one container executing. */
36+
RUNNING("RUNNING"),
37+
38+
/** Application has completed successfully. */
39+
FINISHED("FINISHED"),
40+
41+
/** Application execution has failed. */
42+
FAILED("FAILED"),
43+
44+
/** Application has been manually terminated. */
45+
KILLED("KILLED");
46+
47+
private final String state;
48+
49+
YarnAppState(String state) {
50+
this.state = state;
51+
}
52+
53+
public String getState() {
54+
return state;
55+
}
56+
57+
/**
58+
* Check if the state is active (RUNNING or ACCEPTED). These states represent applications that
59+
* are consuming or about to consume cluster resources.
60+
*
61+
* @return true if the state is active
62+
*/
63+
public boolean isActive() {
64+
return this == RUNNING || this == ACCEPTED;
65+
}
66+
67+
/**
68+
* Parse string to YarnAppState enum.
69+
*
70+
* @param state the state string
71+
* @return YarnAppState enum
72+
*/
73+
public static YarnAppState fromString(String state) {
74+
for (YarnAppState appState : YarnAppState.values()) {
75+
if (appState.state.equals(state)) {
76+
return appState;
77+
}
78+
}
79+
throw new IllegalArgumentException("Unknown YarnAppState: " + state);
80+
}
81+
}

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.linkis.manager.rm.external.yarn;
1919

2020
import org.apache.linkis.engineplugin.server.conf.EngineConnPluginConfiguration;
21+
import org.apache.linkis.manager.common.conf.RMConfiguration;
2122
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
2223
import org.apache.linkis.manager.common.entity.resource.NodeResource;
2324
import org.apache.linkis.manager.common.entity.resource.ResourceType;
@@ -217,6 +218,31 @@ public YarnQueueInfo getResources(
217218
String queueName,
218219
ExternalResourceProvider provider) {
219220
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
221+
return getResourcesFromResponse(resp, realQueueName, queueName, provider);
222+
}
223+
224+
public Map<String, YarnQueueInfo> getBatchResources(
225+
String rmWebAddress, List<String> queueNames, ExternalResourceProvider provider) {
226+
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
227+
Map<String, YarnQueueInfo> queueInfoMap = new HashMap<>();
228+
for (String queueName : queueNames) {
229+
try {
230+
String realQueueName = queuePrefix + queueName;
231+
if (queueName.startsWith(queuePrefix)) {
232+
realQueueName = queueName;
233+
}
234+
YarnQueueInfo queueInfo =
235+
getResourcesFromResponse(resp, realQueueName, queueName, provider);
236+
queueInfoMap.put(queueName, queueInfo);
237+
} catch (Exception e) {
238+
logger.error("Failed to get resource for queue " + queueName, e);
239+
}
240+
}
241+
return queueInfoMap;
242+
}
243+
244+
private YarnQueueInfo getResourcesFromResponse(
245+
JsonNode resp, String realQueueName, String queueName, ExternalResourceProvider provider) {
220246
JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo");
221247
String schedulerType = schedulerInfo.path("type").asText();
222248
if ("capacityScheduler".equals(schedulerType)) {
@@ -232,7 +258,8 @@ public YarnQueueInfo getResources(
232258
}
233259
JsonNode queueInfo = queue.get();
234260
return new YarnQueueInfo(
235-
maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(),
261+
maxEffectiveHandle(queue, getAndUpdateActiveRmWebAddress(provider), queueName, provider)
262+
.get(),
236263
getYarnResource(queue.map(node -> node.path("resourcesUsed")), queueName).get(),
237264
queueInfo.path("maxApps").asInt(),
238265
queueInfo.path("numPendingApps").asInt(),
@@ -313,8 +340,23 @@ public List<ExternalAppInfo> requestAppInfo(
313340

314341
String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
315342
String realQueueName = queuePrefix + queueName;
343+
JsonNode resp;
344+
if (RMConfiguration.YARN_APPS_FILTER_ENABLED.getValue()) {
345+
// Build query parameters to filter apps at Yarn API level using active states only
346+
String queryParams =
347+
"?queue="
348+
+ realQueueName
349+
+ "&states="
350+
+ YarnAppState.RUNNING.getState()
351+
+ ","
352+
+ YarnAppState.ACCEPTED.getState();
353+
resp =
354+
getResponseByUrl("apps" + queryParams, rmWebAddress, provider).path("apps").path("app");
355+
} else {
356+
// Fetch all apps without filtering (for backward compatibility)
357+
resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app");
358+
}
316359

317-
JsonNode resp = getResponseByUrl("apps", rmWebAddress, provider).path("apps").path("app");
318360
if (resp.isMissingNode()) {
319361
return new ArrayList<>();
320362
}

0 commit comments

Comments
 (0)