Skip to content

Commit 4632058

Browse files
authored
TEZ-4007: Introduce AmExtensions and Zookeeper-based FrameworkServices (#427) (Laszlo Bodor reviewed by Ayush Saxena, co-authored by Eric Wohlstadter)
1 parent 533b013 commit 4632058

File tree

65 files changed

+3516
-327
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+3516
-327
lines changed

pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
<commons-io.version>2.16.0</commons-io.version>
7070
<commons-lang.version>2.6</commons-lang.version>
7171
<clover.license>${user.home}/clover.license</clover.license>
72+
<curator.version>5.9.0</curator.version>
7273
<dependency-check-maven.version>3.2.0</dependency-check-maven.version>
7374
<dependency-maven-plugin.version>3.8.1</dependency-maven-plugin.version>
7475
<spotbugs.version>4.9.3</spotbugs.version>
@@ -736,13 +737,30 @@
736737
</exclusion>
737738
</exclusions>
738739
</dependency>
740+
<dependency>
741+
<groupId>org.apache.hadoop</groupId>
742+
<artifactId>hadoop-registry</artifactId>
743+
<version>${hadoop.version}</version>
744+
</dependency>
739745
<dependency>
740746
<groupId>org.apache.hadoop</groupId>
741747
<artifactId>hadoop-hdfs</artifactId>
742748
<version>${hadoop.version}</version>
743749
<type>test-jar</type>
744750
<scope>test</scope>
745751
</dependency>
752+
<dependency>
753+
<groupId>org.apache.curator</groupId>
754+
<artifactId>curator-test</artifactId>
755+
<version>${curator.version}</version>
756+
<scope>test</scope>
757+
<exclusions>
758+
<exclusion>
759+
<groupId>org.junit.jupiter</groupId>
760+
<artifactId>junit-jupiter-api</artifactId>
761+
</exclusion>
762+
</exclusions>
763+
</dependency>
746764
<dependency>
747765
<groupId>org.mockito</groupId>
748766
<artifactId>mockito-core</artifactId>

tez-api/findbugs-exclude.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,37 @@
151151
<Method name="getApplicationId" />
152152
<Bug pattern="EI_EXPOSE_REP" />
153153
</Match>
154+
155+
<!-- TEZ-4007 -->
156+
<Match>
157+
<Class name="org.apache.tez.client.registry.zookeeper.ZkFrameworkClient" />
158+
<Field name="amPort" />
159+
<Bug pattern="AT_STALE_THREAD_WRITE_OF_PRIMITIVE" />
160+
</Match>
161+
162+
<Match>
163+
<Class name="org.apache.tez.client.TezYarnClient" />
164+
<Method name="&lt;init&gt;" params="org.apache.hadoop.yarn.client.api.YarnClient" returns="void" />
165+
<Bug pattern="EI_EXPOSE_REP2" />
166+
</Match>
167+
168+
<Match>
169+
<Class name="org.apache.tez.client.registry.AMRecord" />
170+
<Method name="&lt;init&gt;"
171+
params="org.apache.hadoop.yarn.api.records.ApplicationId, java.lang.String, java.lang.String, int, java.lang.String, java.lang.String"
172+
returns="void"/>
173+
<Bug pattern="EI_EXPOSE_REP2" />
174+
</Match>
175+
176+
<Match>
177+
<Class name="org.apache.tez.client.registry.AMRecord" />
178+
<Method name="toServiceRecord" />
179+
<Bug pattern="EI_EXPOSE_REP" />
180+
</Match>
181+
182+
<Match>
183+
<Class name="org.apache.tez.frameworkplugins.FrameworkUtils" />
184+
<Method name="get" params="java.lang.Class, org.apache.hadoop.conf.Configuration, java.lang.Class" returns="org.apache.tez.frameworkplugins.FrameworkService" />
185+
<Bug pattern="REFLC_REFLECTION_MAY_INCREASE_ACCESSIBILITY_OF_CLASS" />
186+
</Match>
154187
</FindBugsFilter>

tez-api/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
<dependency>
7777
<groupId>org.apache.hadoop</groupId>
7878
<artifactId>hadoop-registry</artifactId>
79-
<version>${hadoop.version}</version>
8079
</dependency>
8180
<dependency>
8281
<groupId>org.apache.commons</groupId>
@@ -124,6 +123,11 @@
124123
<groupId>org.xerial.snappy</groupId>
125124
<artifactId>snappy-java</artifactId>
126125
</dependency>
126+
<dependency>
127+
<groupId>org.apache.curator</groupId>
128+
<artifactId>curator-test</artifactId>
129+
<scope>test</scope>
130+
</dependency>
127131
</dependencies>
128132

129133
<build>

tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.hadoop.yarn.api.records.ApplicationId;
2727
import org.apache.hadoop.yarn.api.records.ApplicationReport;
2828
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
29-
import org.apache.hadoop.yarn.client.api.YarnClient;
3029
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
3130
import org.apache.hadoop.yarn.exceptions.YarnException;
3231
import org.apache.tez.common.RPCUtil;
@@ -46,6 +45,9 @@
4645
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.ShutdownSessionRequestProto;
4746
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto;
4847
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
48+
import org.apache.tez.frameworkplugins.ClientFrameworkService;
49+
import org.apache.tez.frameworkplugins.FrameworkUtils;
50+
import org.apache.tez.frameworkplugins.yarn.YarnClientFrameworkService;
4951

5052
import com.google.protobuf.ServiceException;
5153

@@ -57,16 +59,18 @@ public abstract class FrameworkClient {
5759
protected static final Logger LOG = LoggerFactory.getLogger(FrameworkClient.class);
5860

5961
public static FrameworkClient createFrameworkClient(TezConfiguration tezConf) {
60-
6162
boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
6263
if (isLocal) {
6364
try {
6465
return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
6566
} catch (TezReflectionException e) {
6667
throw new TezUncheckedException("Fail to create LocalClient", e);
6768
}
69+
} else {
70+
ClientFrameworkService clientFrameworkService = FrameworkUtils.get(ClientFrameworkService.class, tezConf,
71+
YarnClientFrameworkService.class);
72+
return clientFrameworkService.newFrameworkClient();
6873
}
69-
return new TezYarnClient(YarnClient.createYarnClient());
7074
}
7175

7276
/**

tez-api/src/main/java/org/apache/tez/client/TezClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,9 @@ public synchronized TezClient getClient(ApplicationId appId) throws TezException
484484
}
485485

486486
private void startFrameworkClient() {
487-
frameworkClient = createFrameworkClient();
487+
if (frameworkClient == null) {
488+
frameworkClient = createFrameworkClient();
489+
}
488490
frameworkClient.init(amConfig.getTezConfiguration());
489491
frameworkClient.start();
490492
}

tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class TezYarnClient extends FrameworkClient {
4343
private String amHost;
4444
private int amPort;
4545

46-
protected TezYarnClient(YarnClient yarnClient) {
46+
public TezYarnClient(YarnClient yarnClient) {
4747
this.yarnClient = yarnClient;
4848
}
4949

tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
package org.apache.tez.client.registry;
2020

2121
import java.util.Objects;
22+
import java.util.Optional;
2223

2324
import org.apache.hadoop.classification.InterfaceAudience;
2425
import org.apache.hadoop.registry.client.types.ServiceRecord;
2526
import org.apache.hadoop.yarn.api.records.ApplicationId;
27+
import org.apache.tez.client.registry.zookeeper.ZkConfig;
2628

2729

2830
/**
@@ -37,14 +39,20 @@
3739
@InterfaceAudience.Public
3840
public class AMRecord {
3941
private static final String APP_ID_RECORD_KEY = "appId";
40-
private static final String HOST_RECORD_KEY = "host";
42+
private static final String HOST_NAME_RECORD_KEY = "hostName";
43+
private static final String HOST_IP_RECORD_KEY = "hostIp";
4144
private static final String PORT_RECORD_KEY = "port";
42-
private static final String OPAQUE_ID_KEY = "id";
45+
private static final String EXTERNAL_ID_KEY = "externalId";
46+
private static final String COMPUTE_GROUP_NAME_KEY = "computeName";
4347

4448
private final ApplicationId appId;
45-
private final String host;
49+
private final String hostName;
50+
private final String hostIp;
4651
private final int port;
47-
private final String id;
52+
private final String externalId;
53+
private final String computeName;
54+
55+
private ServiceRecord serviceRecord;
4856

4957
/**
5058
* Creates a new {@code AMRecord} with the given application ID, host, port, and identifier.
@@ -54,17 +62,23 @@ public class AMRecord {
5462
* Although this constructor may not be used directly within Tez internals,
5563
* it is part of the public API for Tez clients that handle unmanaged sessions.
5664
*
57-
* @param appId the {@link ApplicationId} of the Tez application
58-
* @param host the hostname where the Application Master is running
59-
* @param port the port number on which the Application Master is listening
60-
* @param id an opaque identifier for the record; if {@code null}, defaults to an empty string
65+
* @param appId the {@link ApplicationId} of the Tez application
66+
* @param hostName the hostname where the Application Master is running
67+
* @param hostIp the IP address of the Application Master host
68+
* @param port the RPC port number on which the Application Master is listening
69+
* @param externalId an optional external identifier for the record; if {@code null}, defaults to an empty string
70+
* @param computeName the compute group or cluster name; if {@code null},
71+
* defaults to {@link ZkConfig#DEFAULT_COMPUTE_GROUP_NAME}
6172
*/
62-
public AMRecord(ApplicationId appId, String host, int port, String id) {
73+
public AMRecord(ApplicationId appId, String hostName, String hostIp, int port, String externalId,
74+
String computeName) {
6375
this.appId = appId;
64-
this.host = host;
76+
this.hostName = hostName;
77+
this.hostIp = hostIp;
6578
this.port = port;
66-
//If id is not provided, convert to empty string
67-
this.id = (id == null) ? "" : id;
79+
//externalId is optional, if not provided, convert to empty string
80+
this.externalId = Optional.ofNullable(externalId).orElse("");
81+
this.computeName = Optional.ofNullable(computeName).orElse(ZkConfig.DEFAULT_COMPUTE_GROUP_NAME);
6882
}
6983

7084
/**
@@ -78,10 +92,15 @@ public AMRecord(ApplicationId appId, String host, int port, String id) {
7892
* @param other the {@code AMRecord} instance to copy
7993
*/
8094
public AMRecord(AMRecord other) {
81-
this.appId = other.getApplicationId();
82-
this.host = other.getHost();
83-
this.port = other.getPort();
84-
this.id = other.getId();
95+
this.appId = other.appId;
96+
this.hostName = other.hostName;
97+
this.hostIp = other.hostIp;
98+
this.port = other.port;
99+
this.externalId = other.externalId;
100+
this.computeName = other.computeName;
101+
// all fields are final immutable, we can copy the serviceRecord,
102+
// if it's initialized there already, as it won't change
103+
this.serviceRecord = other.serviceRecord;
85104
}
86105

87106
/**
@@ -97,25 +116,35 @@ public AMRecord(AMRecord other) {
97116
*/
98117
public AMRecord(ServiceRecord serviceRecord) {
99118
this.appId = ApplicationId.fromString(serviceRecord.get(APP_ID_RECORD_KEY));
100-
this.host = serviceRecord.get(HOST_RECORD_KEY);
119+
this.hostName = serviceRecord.get(HOST_NAME_RECORD_KEY);
120+
this.hostIp = serviceRecord.get(HOST_IP_RECORD_KEY);
101121
this.port = Integer.parseInt(serviceRecord.get(PORT_RECORD_KEY));
102-
this.id = serviceRecord.get(OPAQUE_ID_KEY);
122+
this.externalId = serviceRecord.get(EXTERNAL_ID_KEY);
123+
this.computeName = serviceRecord.get(COMPUTE_GROUP_NAME_KEY);
103124
}
104125

105126
public ApplicationId getApplicationId() {
106127
return appId;
107128
}
108129

109-
public String getHost() {
110-
return host;
130+
public String getHostName() {
131+
return hostName;
132+
}
133+
134+
public String getHostIp() {
135+
return hostIp;
111136
}
112137

113138
public int getPort() {
114139
return port;
115140
}
116141

117-
public String getId() {
118-
return id;
142+
public String getExternalId() {
143+
return externalId;
144+
}
145+
146+
public String getComputeName() {
147+
return computeName;
119148
}
120149

121150
@Override
@@ -125,9 +154,11 @@ public boolean equals(Object other) {
125154
}
126155
if (other instanceof AMRecord otherRecord) {
127156
return appId.equals(otherRecord.appId)
128-
&& host.equals(otherRecord.host)
157+
&& hostName.equals(otherRecord.hostName)
158+
&& hostIp.equals(otherRecord.hostIp)
129159
&& port == otherRecord.port
130-
&& id.equals(otherRecord.id);
160+
&& externalId.equals(otherRecord.externalId)
161+
&& computeName.equals(otherRecord.computeName);
131162
} else {
132163
return false;
133164
}
@@ -148,16 +179,27 @@ public boolean equals(Object other) {
148179
* @return a {@link ServiceRecord} populated with the values of this {@code AMRecord}
149180
*/
150181
public ServiceRecord toServiceRecord() {
151-
ServiceRecord serviceRecord = new ServiceRecord();
182+
if (serviceRecord != null) {
183+
return serviceRecord;
184+
}
185+
serviceRecord = new ServiceRecord();
152186
serviceRecord.set(APP_ID_RECORD_KEY, appId);
153-
serviceRecord.set(HOST_RECORD_KEY, host);
187+
serviceRecord.set(HOST_NAME_RECORD_KEY, hostName);
188+
serviceRecord.set(HOST_IP_RECORD_KEY, hostIp);
154189
serviceRecord.set(PORT_RECORD_KEY, port);
155-
serviceRecord.set(OPAQUE_ID_KEY, id);
190+
serviceRecord.set(EXTERNAL_ID_KEY, externalId);
191+
serviceRecord.set(COMPUTE_GROUP_NAME_KEY, computeName);
192+
156193
return serviceRecord;
157194
}
158195

196+
@Override
197+
public String toString() {
198+
return toServiceRecord().attributes().toString();
199+
}
200+
159201
@Override
160202
public int hashCode() {
161-
return Objects.hash(appId, host, port, id);
203+
return Objects.hash(appId, hostName, hostIp, externalId, computeName, port);
162204
}
163205
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.tez.client.registry;
20+
21+
22+
import org.apache.hadoop.yarn.api.records.ApplicationId;
23+
24+
25+
/**
26+
* Base class for {@code AMRegistry} implementations.
27+
*
28+
* <p>The specific implementation is configured via the
29+
* {@code tez.am.registry.class} property.</p>
30+
*
31+
* <p>Implementations are expected to provide appropriate service lifecycle
32+
* behavior, including:
33+
* <ul>
34+
* <li>{@code init}</li>
35+
* <li>{@code serviceStart}</li>
36+
* <li>{@code serviceStop}</li>
37+
* </ul>
38+
* </p>
39+
*/
40+
public interface AMRegistry extends AutoCloseable {
41+
42+
void add(AMRecord record) throws Exception;
43+
44+
void remove(AMRecord record) throws Exception;
45+
46+
ApplicationId generateNewId() throws Exception;
47+
48+
AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port,
49+
String computeName);
50+
}

0 commit comments

Comments
 (0)