Skip to content

Commit 62e2d5e

Browse files
committed
TEZ-4007: Zookeeper based FrameworkServices and AmExtensions (3/3) - initial patch
1 parent cad674e commit 62e2d5e

File tree

55 files changed

+2258
-349
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2258
-349
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
<commons-io.version>2.16.0</commons-io.version>
7070
<commons-lang.version>2.6</commons-lang.version>
7171
<clover.license>${user.home}/clover.license</clover.license>
72+
<curator.version>2.7.1</curator.version>
7273
<dependency-check-maven.version>3.2.0</dependency-check-maven.version>
7374
<dependency-maven-plugin.version>3.8.1</dependency-maven-plugin.version>
7475
<spotbugs.version>4.9.3</spotbugs.version>
@@ -736,6 +737,11 @@
736737
</exclusion>
737738
</exclusions>
738739
</dependency>
740+
<dependency>
741+
<groupId>org.apache.hadoop</groupId>
742+
<artifactId>hadoop-registry</artifactId>
743+
<version>${hadoop.version}</version>
744+
</dependency>
739745
<dependency>
740746
<groupId>org.apache.hadoop</groupId>
741747
<artifactId>hadoop-hdfs</artifactId>

tez-api/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
<dependency>
7777
<groupId>org.apache.hadoop</groupId>
7878
<artifactId>hadoop-registry</artifactId>
79-
<version>${hadoop.version}</version>
8079
</dependency>
8180
<dependency>
8281
<groupId>org.apache.commons</groupId>
@@ -124,6 +123,12 @@
124123
<groupId>org.xerial.snappy</groupId>
125124
<artifactId>snappy-java</artifactId>
126125
</dependency>
126+
<dependency>
127+
<groupId>org.apache.curator</groupId>
128+
<artifactId>curator-test</artifactId>
129+
<version>${curator.version}</version>
130+
<scope>test</scope>
131+
</dependency>
127132
</dependencies>
128133

129134
<build>

tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.tez.client;
2020

2121
import java.io.IOException;
22+
import java.util.Optional;
2223

2324
import org.apache.hadoop.classification.InterfaceAudience.Private;
2425
import org.apache.hadoop.conf.Configuration;
@@ -46,6 +47,8 @@
4647
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
4748
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
4849
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
50+
import org.apache.tez.frameworkplugins.ClientFrameworkService;
51+
import org.apache.tez.frameworkplugins.FrameworkUtils;
4952

5053
import com.google.protobuf.ServiceException;
5154

@@ -57,6 +60,9 @@ public abstract class FrameworkClient {
5760
protected static final Logger LOG = LoggerFactory.getLogger(FrameworkClient.class);
5861

5962
public static FrameworkClient createFrameworkClient(TezConfiguration tezConf) {
63+
Optional<FrameworkClient> pluginClient =
64+
FrameworkUtils.get(ClientFrameworkService.class, tezConf)
65+
.flatMap(framework -> framework.createOrGetFrameworkClient(tezConf));
6066

6167
boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
6268
if (isLocal) {
@@ -65,6 +71,8 @@ public static FrameworkClient createFrameworkClient(TezConfiguration tezConf) {
6571
} catch (TezReflectionException e) {
6672
throw new TezUncheckedException("Fail to create LocalClient", e);
6773
}
74+
} else if (pluginClient.isPresent()) {
75+
return pluginClient.get();
6876
}
6977
return new TezYarnClient(YarnClient.createYarnClient());
7078
}

tez-api/src/main/java/org/apache/tez/client/TezClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,9 @@ public synchronized TezClient getClient(ApplicationId appId) throws TezException
484484
}
485485

486486
private void startFrameworkClient() {
487-
frameworkClient = createFrameworkClient();
487+
if (frameworkClient == null) {
488+
frameworkClient = createFrameworkClient();
489+
}
488490
frameworkClient.init(amConfig.getTezConfiguration());
489491
frameworkClient.start();
490492
}
@@ -670,6 +672,8 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
670672
}
671673

672674
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
675+
676+
673677
DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
674678
usingTezArchiveDeploy, sessionCredentials, servicePluginsDescriptor, javaOptsChecker);
675679

@@ -684,6 +688,7 @@ private DAGClient submitDAGSession(DAG dag) throws TezException, IOException {
684688

685689
// if request size exceeds maxSubmitDAGRequestSizeThroughIPC, we serialize them to HDFS
686690
SubmitDAGRequestProto request = requestBuilder.build();
691+
687692
if (request.getSerializedSize() > maxSubmitDAGRequestSizeThroughIPC) {
688693
Path dagPlanPath = new Path(TezCommonUtils.getTezSystemStagingPath(amConfig.getTezConfiguration(),
689694
sessionAppId.toString()), TezConstants.TEZ_PB_PLAN_BINARY_NAME +

tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ private static Path getPath(String configUri) {
149149
}
150150
}
151151

152+
153+
152154
/**
153155
* Setup LocalResource map for Tez jars based on provided Configuration
154156
*
@@ -186,7 +188,7 @@ static boolean setupTezJarsLocalResources(TezConfiguration conf,
186188
+ conf.get(TezConfiguration.TEZ_LIB_URIS_CLASSPATH));
187189

188190
usingTezArchive = addLocalResources(conf, tezJarUris,
189-
tezJarResources, credentials);
191+
tezJarResources, credentials);
190192

191193
if (tezJarResources.isEmpty()) {
192194
throw new TezUncheckedException(
@@ -263,8 +265,8 @@ private static boolean addLocalResources(Configuration conf,
263265

264266
// Add URI fragment or just the filename
265267
Path name = new Path((null == u.getFragment())
266-
? p.getName()
267-
: u.getFragment());
268+
? p.getName()
269+
: u.getFragment());
268270
if (name.isAbsolute()) {
269271
throw new IllegalArgumentException("Resource name must be "
270272
+ "relative, not absolute: " + name
@@ -577,7 +579,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
577579
// don't overwrite existing conf, needed for TezClient.getClient() so existing containers have stable resource fingerprints
578580
if(!binaryConfPath.getFileSystem(tezConf).exists(binaryConfPath)) {
579581
ConfigurationProto finalConfProto = createFinalConfProtoForApp(tezConf,
580-
servicePluginsDescriptor);
582+
servicePluginsDescriptor);
581583

582584
FSDataOutputStream amConfPBOutBinaryStream = null;
583585
try {
@@ -618,12 +620,12 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
618620
}
619621

620622
LocalResource sessionJarsPBLRsrc =
621-
TezClientUtils.createLocalResource(fs,
622-
sessionJarsPath, LocalResourceType.FILE,
623-
LocalResourceVisibility.APPLICATION);
623+
TezClientUtils.createLocalResource(fs,
624+
sessionJarsPath, LocalResourceType.FILE,
625+
LocalResourceVisibility.APPLICATION);
624626
amLocalResources.put(
625-
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME,
626-
sessionJarsPBLRsrc);
627+
TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME,
628+
sessionJarsPBLRsrc);
627629

628630
String user = UserGroupInformation.getCurrentUser().getShortUserName();
629631
ACLManager aclManager = new ACLManager(user, amConfig.getTezConfiguration());
@@ -655,9 +657,9 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
655657
}
656658

657659
amLocalResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME,
658-
TezClientUtils.createLocalResource(fs,
659-
binaryPath, LocalResourceType.FILE,
660-
LocalResourceVisibility.APPLICATION));
660+
TezClientUtils.createLocalResource(fs,
661+
binaryPath, LocalResourceType.FILE,
662+
LocalResourceVisibility.APPLICATION));
661663

662664
if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
663665
Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath);
@@ -688,7 +690,7 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
688690

689691
Collection<String> tagsFromConf =
690692
amConfig.getTezConfiguration().getTrimmedStringCollection(
691-
TezConfiguration.TEZ_APPLICATION_TAGS);
693+
TezConfiguration.TEZ_APPLICATION_TAGS);
692694

693695
appContext.setApplicationType(TezConstants.TEZ_APPLICATION_TYPE);
694696
if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
@@ -709,8 +711,8 @@ public static ApplicationSubmissionContext createApplicationSubmissionContext(
709711
appContext.setAMContainerSpec(amContainer);
710712

711713
appContext.setMaxAppAttempts(
712-
amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
713-
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
714+
amConfig.getTezConfiguration().getInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS,
715+
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT));
714716

715717
return appContext;
716718

@@ -848,7 +850,7 @@ public static void addLog4jSystemProperties(String logLevel,
848850
}
849851

850852
public static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
851-
ServicePluginsDescriptor servicePluginsDescriptor) {
853+
ServicePluginsDescriptor servicePluginsDescriptor) {
852854
assert amConf != null;
853855
ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
854856
for (Entry<String, String> entry : amConf) {
@@ -949,7 +951,7 @@ static DAGClientAMProtocolBlockingPB getAMProxy(FrameworkClient frameworkClient,
949951
+ ", trackingUrl=" + appReport.getTrackingUrl()
950952
+ ", diagnostics="
951953
+ (appReport.getDiagnostics() != null ? appReport.getDiagnostics()
952-
: TezClient.NO_CLUSTER_DIAGNOSTICS_MSG);
954+
: TezClient.NO_CLUSTER_DIAGNOSTICS_MSG);
953955
LOG.info(msg);
954956
throw new SessionNotRunning(msg);
955957
}
@@ -1015,7 +1017,7 @@ static void createSessionToken(String tokenIdentifier,
10151017
public static String maybeAddDefaultMemoryJavaOpts(String javaOpts, Resource resource,
10161018
double maxHeapFactor) {
10171019
if ((javaOpts != null && !javaOpts.isEmpty()
1018-
&& (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")))
1020+
&& (javaOpts.contains("-Xmx") || javaOpts.contains("-Xms")))
10191021
|| (resource.getMemory() <= 0)) {
10201022
return javaOpts;
10211023
}
@@ -1026,8 +1028,8 @@ public static String maybeAddDefaultMemoryJavaOpts(String javaOpts, Resource res
10261028

10271029
if (Double.parseDouble("-1") == maxHeapFactor) {
10281030
maxHeapFactor = resource.getMemory() < TezConstants.TEZ_CONTAINER_SMALL_SLAB_BOUND_MB
1029-
? TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_SMALL_SLAB
1030-
: TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_LARGE_SLAB;
1031+
? TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_SMALL_SLAB
1032+
: TezConstants.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_LARGE_SLAB;
10311033
}
10321034
int maxMemory = (int)(resource.getMemory() * maxHeapFactor);
10331035
maxMemory = maxMemory <= 0 ? 1 : maxMemory;
@@ -1037,7 +1039,7 @@ public static String maybeAddDefaultMemoryJavaOpts(String javaOpts, Resource res
10371039
}
10381040

10391041
private static boolean checkAncestorPermissionsForAllUsers(Configuration conf, Path pathComponent,
1040-
FsAction permission) throws IOException {
1042+
FsAction permission) throws IOException {
10411043
FileSystem fs = pathComponent.getFileSystem(conf);
10421044

10431045
if (Shell.WINDOWS && fs instanceof LocalFileSystem) {

tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ public class AMRecord {
3939
private static final String APP_ID_RECORD_KEY = "appId";
4040
private static final String HOST_RECORD_KEY = "host";
4141
private static final String PORT_RECORD_KEY = "port";
42-
private static final String OPAQUE_ID_KEY = "id";
42+
private static final String EXTERNAL_ID_KEY = "externalId";
4343

4444
private final ApplicationId appId;
4545
private final String host;
4646
private final int port;
47-
private final String id;
47+
private final String externalId;
4848

4949
/**
5050
* Creates a new {@code AMRecord} with the given application ID, host, port, and identifier.
@@ -57,14 +57,14 @@ public class AMRecord {
5757
* @param appId the {@link ApplicationId} of the Tez application
5858
* @param host the hostname where the Application Master is running
5959
* @param port the port number on which the Application Master is listening
60-
* @param id an opaque identifier for the record; if {@code null}, defaults to an empty string
60+
* @param externalId an opaque identifier for the record; if {@code null}, defaults to an empty string
6161
*/
62-
public AMRecord(ApplicationId appId, String host, int port, String id) {
62+
public AMRecord(ApplicationId appId, String host, int port, String externalId) {
6363
this.appId = appId;
6464
this.host = host;
6565
this.port = port;
66-
//If id is not provided, convert to empty string
67-
this.id = (id == null) ? "" : id;
66+
//externalId is optional, if not provided, convert to empty string
67+
this.externalId = (externalId == null) ? "" : externalId;
6868
}
6969

7070
/**
@@ -81,7 +81,7 @@ public AMRecord(AMRecord other) {
8181
this.appId = other.getApplicationId();
8282
this.host = other.getHost();
8383
this.port = other.getPort();
84-
this.id = other.getId();
84+
this.externalId = other.getExternalId();
8585
}
8686

8787
/**
@@ -99,7 +99,7 @@ public AMRecord(ServiceRecord serviceRecord) {
9999
this.appId = ApplicationId.fromString(serviceRecord.get(APP_ID_RECORD_KEY));
100100
this.host = serviceRecord.get(HOST_RECORD_KEY);
101101
this.port = Integer.parseInt(serviceRecord.get(PORT_RECORD_KEY));
102-
this.id = serviceRecord.get(OPAQUE_ID_KEY);
102+
this.externalId = serviceRecord.get(EXTERNAL_ID_KEY);
103103
}
104104

105105
public ApplicationId getApplicationId() {
@@ -114,8 +114,8 @@ public int getPort() {
114114
return port;
115115
}
116116

117-
public String getId() {
118-
return id;
117+
public String getExternalId() {
118+
return externalId;
119119
}
120120

121121
@Override
@@ -127,7 +127,7 @@ public boolean equals(Object other) {
127127
return appId.equals(otherRecord.appId)
128128
&& host.equals(otherRecord.host)
129129
&& port == otherRecord.port
130-
&& id.equals(otherRecord.id);
130+
&& externalId.equals(otherRecord.externalId);
131131
} else {
132132
return false;
133133
}
@@ -152,12 +152,12 @@ public ServiceRecord toServiceRecord() {
152152
serviceRecord.set(APP_ID_RECORD_KEY, appId);
153153
serviceRecord.set(HOST_RECORD_KEY, host);
154154
serviceRecord.set(PORT_RECORD_KEY, port);
155-
serviceRecord.set(OPAQUE_ID_KEY, id);
155+
serviceRecord.set(EXTERNAL_ID_KEY, externalId);
156156
return serviceRecord;
157157
}
158158

159159
@Override
160160
public int hashCode() {
161-
return Objects.hash(appId, host, port, id);
161+
return Objects.hash(appId, host, port, externalId);
162162
}
163163
}

0 commit comments

Comments
 (0)