Skip to content

Commit fca17d7

Browse files
TEZ-4689: Introduce Node abstraction for DAGAppMaster instead of separate NodeManager-related fields
1 parent 17546aa commit fca17d7

File tree

10 files changed

+222
-68
lines changed

10 files changed

+222
-68
lines changed

tez-dag/src/main/java/org/apache/tez/client/LocalClient.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.apache.tez.dag.app.DAGAppMaster;
6868
import org.apache.tez.dag.app.DAGAppMasterState;
6969
import org.apache.tez.dag.app.LocalDAGAppMaster;
70+
import org.apache.tez.dag.app.LocalNodeContext;
71+
import org.apache.tez.dag.app.NodeContext;
7072
import org.apache.tez.dag.app.dag.DAG;
7173

7274
import com.google.common.annotations.VisibleForTesting;
@@ -369,10 +371,11 @@ public void run() {
369371
long appSubmitTime = System.currentTimeMillis();
370372

371373
dagAppMaster =
372-
createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
374+
createDAGAppMaster(applicationAttemptId, cId,
373375
SystemClock.getInstance(), appSubmitTime, isSession, userDir.toUri().getPath(),
374376
new String[] {localDir.toUri().getPath()}, new String[] {logDir.toUri().getPath()},
375-
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName());
377+
amCredentials, UserGroupInformation.getCurrentUser().getShortUserName(),
378+
new LocalNodeContext(currentHost, nmPort, nmHttpPort));
376379
DAGAppMaster.initAndStartAppMaster(dagAppMaster, conf);
377380
clientHandler = new DAGClientHandler(dagAppMaster);
378381
((AsyncDispatcher)dagAppMaster.getDispatcher()).setDrainEventsOnStop();
@@ -395,27 +398,32 @@ public void run() {
395398

396399
// this can be overridden by test code to create a mock app
397400
@VisibleForTesting
398-
protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
399-
ContainerId cId, String currentHost, int nmPort,
400-
int nmHttpPort,
401-
Clock clock, long appSubmitTime, boolean isSession,
402-
String userDir,
403-
String[] localDirs, String[] logDirs,
404-
Credentials credentials, String jobUserName) throws
405-
IOException {
401+
protected DAGAppMaster createDAGAppMaster(
402+
ApplicationAttemptId applicationAttemptId,
403+
ContainerId cId,
404+
Clock clock,
405+
long appSubmitTime,
406+
boolean isSession,
407+
String userDir,
408+
String[] localDirs,
409+
String[] logDirs,
410+
Credentials credentials,
411+
String jobUserName,
412+
NodeContext nodeContext)
413+
throws IOException {
406414

407415
// Read in additional information about external services
408416
AMPluginDescriptorProto amPluginDescriptorProto =
409417
TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
410418
.getAmPluginDescriptor();
411419

412420
return isLocalWithoutNetwork
413-
? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
421+
? new LocalDAGAppMaster(applicationAttemptId, cId,
414422
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
415-
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto)
416-
: new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
423+
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext)
424+
: new DAGAppMaster(applicationAttemptId, cId,
417425
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
418-
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
426+
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, nodeContext);
419427
}
420428

421429
@Override

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,16 @@
209209
* The state machine is encapsulated in the implementation of Job interface.
210210
* All state changes happens via Job interface. Each event
211211
* results in a Finite State Transition in Job.
212-
*
212+
* <p>
213213
* Tez DAG AppMaster is the composition of loosely coupled services. The services
214214
* interact with each other via events. The components resembles the
215215
* Actors model. The component acts on received event and send out the
216216
* events to other components.
217217
* This keeps it highly concurrent with no or minimal synchronization needs.
218-
*
218+
* <p>
219219
* The events are dispatched by a central Dispatch mechanism. All components
220220
* register to the Dispatcher.
221-
*
221+
* <p>
222222
* The information is shared across different components using AppContext.
223223
*/
224224

@@ -245,9 +245,7 @@ public class DAGAppMaster extends AbstractService {
245245
private String appName;
246246
private final ApplicationAttemptId appAttemptID;
247247
private final ContainerId containerID;
248-
private final String nmHost;
249-
private final int nmPort;
250-
private final int nmHttpPort;
248+
private String nmHost;
251249
private final String workingDirectory;
252250
private final String[] localDirs;
253251
private final String[] logDirs;
@@ -309,6 +307,7 @@ public class DAGAppMaster extends AbstractService {
309307

310308
private ListeningExecutorService execService;
311309
private final PluginManager pluginManager;
310+
private final NodeContext nodeContext;
312311

313312

314313
/**
@@ -344,20 +343,18 @@ public class DAGAppMaster extends AbstractService {
344343
private TezDAGHook[] hooks = {};
345344

346345
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
347-
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
346+
ContainerId containerId,
348347
Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
349348
String [] localDirs, String[] logDirs, String clientVersion,
350-
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
349+
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) {
351350
super(DAGAppMaster.class.getName());
352351
this.mdcContext = LoggingUtils.setupLog4j();
353352
this.clock = clock;
354353
this.startTime = clock.getTime();
355354
this.appSubmitTime = appSubmitTime;
356355
this.appAttemptID = applicationAttemptId;
357356
this.containerID = containerId;
358-
this.nmHost = nmHost;
359-
this.nmPort = nmPort;
360-
this.nmHttpPort = nmHttpPort;
357+
this.nodeContext = nodeContext;
361358
this.state = DAGAppMasterState.NEW;
362359
this.isSession = isSession;
363360
this.workingDirectory = workingDirectory;
@@ -371,9 +368,6 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
371368
.createRemoteUser(jobUserName);
372369
this.appMasterUgi.addCredentials(amCredentials);
373370

374-
this.containerLogs = getRunningLogURL(this.nmHost + ":" + this.nmHttpPort,
375-
this.containerID.toString(), this.appMasterUgi.getShortUserName());
376-
377371
LOG.info("Created DAGAppMaster for application " + applicationAttemptId
378372
+ ", versionInfo=" + dagVersionInfo);
379373
TezCommonUtils.logCredentials(LOG, this.appMasterUgi.getCredentials(), "am");
@@ -443,6 +437,16 @@ protected void serviceInit(final Configuration conf) throws Exception {
443437
this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
444438
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
445439

440+
if (!isLocal) {
441+
this.nmHost = nodeContext.getNodeHostString();
442+
int nmHttpPort = Integer.parseInt(nodeContext.getNodeHttpPortString());
443+
this.containerLogs =
444+
getRunningLogURL(
445+
this.nmHost + ":" + nmHttpPort,
446+
this.containerID.toString(),
447+
this.appMasterUgi.getShortUserName());
448+
}
449+
446450
UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(amConf);
447451

448452
PluginManager.PluginDescriptors pluginDescriptors = pluginManager.parseAllPlugins(isLocal, defaultPayload);
@@ -1211,11 +1215,11 @@ public String getAppNMHost() {
12111215
}
12121216

12131217
public int getAppNMPort() {
1214-
return nmPort;
1218+
return Integer.parseInt(nodeContext.getNodePortString());
12151219
}
12161220

12171221
public int getAppNMHttpPort() {
1218-
return nmHttpPort;
1222+
return Integer.parseInt(nodeContext.getNodeHttpPortString());
12191223
}
12201224

12211225
public int getRpcPort() {
@@ -2415,13 +2419,15 @@ public static void main(String[] args) {
24152419
// Install the tez class loader, which can be used add new resources
24162420
TezClassLoader.setupTezClassLoader();
24172421
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
2418-
final String pid = System.getenv().get("JVM_PID");
24192422

2420-
String nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.name());
2421-
String nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.name());
2422-
String nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.name());
2423+
final long pid = ProcessHandle.current().pid();
24232424
String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
24242425
String clientVersion = System.getenv(TezConstants.TEZ_CLIENT_VERSION_ENV);
2426+
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
2427+
String pwd = System.getenv(ApplicationConstants.Environment.PWD.name());
2428+
String localDirs = System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name());
2429+
String logDirs = System.getenv(ApplicationConstants.Environment.LOG_DIRS.name());
2430+
24252431
if (clientVersion == null) {
24262432
clientVersion = VersionInfo.UNKNOWN;
24272433
}
@@ -2435,14 +2441,14 @@ public static void main(String[] args) {
24352441
DAGProtos.ConfigurationProto confProto = amExtensions.loadConfigurationProto();
24362442
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());
24372443

2444+
NodeContext nodeContext = new YarnNodeManagerContext();
24382445
ContainerId containerId = amExtensions.allocateContainerId(conf);
24392446

24402447
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
24412448
org.apache.hadoop.ipc.CallerContext.setCurrent(new org.apache.hadoop.ipc.CallerContext
24422449
.Builder("tez_appmaster_" + containerId.getApplicationAttemptId()
24432450
).build());
24442451
long appSubmitTime = Long.parseLong(appSubmitTimeStr);
2445-
String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name());
24462452

24472453
// Command line options
24482454
Option option = Option.builder()
@@ -2462,9 +2468,9 @@ public static void main(String[] args) {
24622468
+ ", jvmPid=" + pid
24632469
+ ", userFromEnv=" + jobUserName
24642470
+ ", cliSessionOption=" + sessionModeCliOption
2465-
+ ", pwd=" + System.getenv(ApplicationConstants.Environment.PWD.name())
2466-
+ ", localDirs=" + System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())
2467-
+ ", logDirs=" + System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));
2471+
+ ", pwd=" + pwd
2472+
+ ", localDirs=" + localDirs
2473+
+ ", logDirs=" + logDirs);
24682474

24692475
AMPluginDescriptorProto amPluginDescriptorProto = null;
24702476
if (confProto.hasAmPluginDescriptor()) {
@@ -2477,20 +2483,26 @@ public static void main(String[] args) {
24772483
TezUtilsInternal.setSecurityUtilConfigration(LOG, conf);
24782484

24792485
DAGAppMaster appMaster =
2480-
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString),
2481-
Integer.parseInt(nodeHttpPortString), new SystemClock(), appSubmitTime, sessionModeCliOption,
2482-
System.getenv(ApplicationConstants.Environment.PWD.name()),
2483-
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name())),
2484-
TezCommonUtils.getTrimmedStrings(System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
2485-
clientVersion, credentials, jobUserName, amPluginDescriptorProto);
2486+
new DAGAppMaster(
2487+
applicationAttemptId,
2488+
containerId,
2489+
new SystemClock(),
2490+
appSubmitTime,
2491+
sessionModeCliOption,
2492+
pwd,
2493+
TezCommonUtils.getTrimmedStrings(localDirs),
2494+
TezCommonUtils.getTrimmedStrings(logDirs),
2495+
clientVersion,
2496+
credentials,
2497+
jobUserName,
2498+
amPluginDescriptorProto,
2499+
nodeContext);
24862500
ShutdownHookManager.get().addShutdownHook(new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
24872501

24882502
// log the system properties
24892503
if (LOG.isInfoEnabled()) {
24902504
String systemPropsToLog = TezCommonUtils.getSystemPropertiesToLog(conf);
2491-
if (systemPropsToLog != null) {
2492-
LOG.info(systemPropsToLog);
2493-
}
2505+
LOG.info(systemPropsToLog);
24942506
}
24952507

24962508
initAndStartAppMaster(appMaster, conf);

tez-dag/src/main/java/org/apache/tez/dag/app/LocalDAGAppMaster.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@
3232
public class LocalDAGAppMaster extends DAGAppMaster {
3333

3434
public LocalDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
35-
String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession,
35+
Clock clock, long appSubmitTime, boolean isSession,
3636
String workingDirectory, String[] localDirs, String[] logDirs, String clientVersion,
37-
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
38-
super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
37+
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto, NodeContext nodeContext) {
38+
super(applicationAttemptId, containerId, clock, appSubmitTime,
3939
isSession, workingDirectory, localDirs, logDirs, clientVersion, credentials, jobUserName,
40-
pluginDescriptorProto);
40+
pluginDescriptorProto, nodeContext);
4141
}
4242

4343
@Override
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* <p>http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.apache.tez.dag.app;
16+
17+
/** Local implementation of NodeContext. */
18+
public final class LocalNodeContext implements NodeContext {
19+
20+
private final String nodeHostString;
21+
private final String nodePortString;
22+
private final String nodeHttpPortString;
23+
24+
public LocalNodeContext(String nodeHostString, int nodePortString, int nmHttpPort) {
25+
this.nodeHostString = nodeHostString;
26+
this.nodePortString = String.valueOf(nodePortString);
27+
this.nodeHttpPortString = String.valueOf(nmHttpPort);
28+
}
29+
30+
@Override
31+
public String getNodeHostString() {
32+
return nodeHostString;
33+
}
34+
35+
@Override
36+
public String getNodePortString() {
37+
return nodePortString;
38+
}
39+
40+
@Override
41+
public String getNodeHttpPortString() {
42+
return nodeHttpPortString;
43+
}
44+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* <p>http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
11+
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
package org.apache.tez.dag.app;
16+
17+
/** Provides context information about the node on which the DAGAppMaster is running. */
18+
public sealed interface NodeContext permits YarnNodeManagerContext, LocalNodeContext {
19+
20+
/**
21+
* @return The node host string
22+
*/
23+
String getNodeHostString();
24+
25+
/**
26+
* @return The node port string
27+
*/
28+
String getNodePortString();
29+
30+
/**
31+
* @return The node HTTP port string
32+
*/
33+
String getNodeHttpPortString();
34+
}

0 commit comments

Comments
 (0)