Skip to content

Commit 4a28907

Browse files
authored
TEZ-4008: Pluggable AM FrameworkServices and AmExtensions (2/3) (#426) (Laszlo Bodor co-authored by Eric Wohlstadter reviewed by Ayush Saxena)
1 parent 6683866 commit 4a28907

File tree

15 files changed

+474
-8
lines changed

15 files changed

+474
-8
lines changed

tez-api/findbugs-exclude.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,24 @@
131131
<Bug pattern="IS2_INCONSISTENT_SYNC" />
132132
</Match>
133133

134+
<!-- TEZ-4008 -->
135+
<Match>
136+
<Class name="org.apache.tez.client.registry.AMRecord" />
137+
<Method name="&lt;init&gt;"
138+
params="org.apache.hadoop.yarn.api.records.ApplicationId, java.lang.String, int, java.lang.String"
139+
returns="void"/>
140+
<Bug pattern="EI_EXPOSE_REP2" />
141+
</Match>
142+
143+
<Match>
144+
<Class name="org.apache.tez.client.registry.AMRecord" />
145+
<Method name="&lt;init&gt;" params="org.apache.hadoop.registry.client.types.ServiceRecord"/>
146+
<Bug pattern="CT_CONSTRUCTOR_THROW" />
147+
</Match>
148+
149+
<Match>
150+
<Class name="org.apache.tez.client.registry.AMRecord" />
151+
<Method name="getApplicationId" />
152+
<Bug pattern="EI_EXPOSE_REP" />
153+
</Match>
134154
</FindBugsFilter>

tez-api/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@
7373
<groupId>org.apache.hadoop</groupId>
7474
<artifactId>hadoop-yarn-client</artifactId>
7575
</dependency>
76+
<dependency>
77+
<groupId>org.apache.hadoop</groupId>
78+
<artifactId>hadoop-registry</artifactId>
79+
<version>${hadoop.version}</version>
80+
</dependency>
7681
<dependency>
7782
<groupId>org.apache.commons</groupId>
7883
<artifactId>commons-collections4</artifactId>
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.tez.client.registry;
20+
21+
import java.util.Objects;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.registry.client.types.ServiceRecord;
25+
import org.apache.hadoop.yarn.api.records.ApplicationId;
26+
27+
28+
/**
29+
* Record representing an Application Master (AM) instance within Tez.
30+
* <p>
31+
* This class can be serialized to and from a {@link ServiceRecord}, enabling
32+
* storage and retrieval of AM metadata in external systems. Some constructors
33+
* and methods are not necessarily used within the Tez codebase itself, but
34+
* are part of the Tez API and intended for Tez clients that manage or interact
35+
* with Tez unmanaged sessions.
36+
*/
37+
@InterfaceAudience.Public
38+
public class AMRecord {
39+
private static final String APP_ID_RECORD_KEY = "appId";
40+
private static final String HOST_RECORD_KEY = "host";
41+
private static final String PORT_RECORD_KEY = "port";
42+
private static final String OPAQUE_ID_KEY = "id";
43+
44+
private final ApplicationId appId;
45+
private final String host;
46+
private final int port;
47+
private final String id;
48+
49+
/**
50+
* Creates a new {@code AMRecord} with the given application ID, host, port, and identifier.
51+
* <p>
52+
* If the provided identifier is {@code null}, it will be converted to an empty string.
53+
* <p>
54+
* Although this constructor may not be used directly within Tez internals,
55+
* it is part of the public API for Tez clients that handle unmanaged sessions.
56+
*
57+
* @param appId the {@link ApplicationId} of the Tez application
58+
* @param host the hostname where the Application Master is running
59+
* @param port the port number on which the Application Master is listening
60+
* @param id an opaque identifier for the record; if {@code null}, defaults to an empty string
61+
*/
62+
public AMRecord(ApplicationId appId, String host, int port, String id) {
63+
this.appId = appId;
64+
this.host = host;
65+
this.port = port;
66+
//If id is not provided, convert to empty string
67+
this.id = (id == null) ? "" : id;
68+
}
69+
70+
/**
71+
* Copy constructor.
72+
* <p>
73+
* Creates a new {@code AMRecord} by copying the fields of another instance.
74+
* <p>
75+
* This constructor is mainly useful for client-side logic and session handling,
76+
* and may not be invoked directly within the Tez codebase.
77+
*
78+
* @param other the {@code AMRecord} instance to copy
79+
*/
80+
public AMRecord(AMRecord other) {
81+
this.appId = other.getApplicationId();
82+
this.host = other.getHost();
83+
this.port = other.getPort();
84+
this.id = other.getId();
85+
}
86+
87+
/**
88+
* Constructs a new {@code AMRecord} from a {@link ServiceRecord}.
89+
* <p>
90+
* This allows conversion from serialized metadata back into an in-memory {@code AMRecord}.
91+
* <p>
92+
* While not always used in Tez internals, it exists in the Tez API so
93+
* clients can reconstruct AM information when working with unmanaged sessions.
94+
*
95+
* @param serviceRecord the {@link ServiceRecord} containing AM metadata
96+
* @throws IllegalArgumentException if required keys are missing or invalid
97+
*/
98+
public AMRecord(ServiceRecord serviceRecord) {
99+
this.appId = ApplicationId.fromString(serviceRecord.get(APP_ID_RECORD_KEY));
100+
this.host = serviceRecord.get(HOST_RECORD_KEY);
101+
this.port = Integer.parseInt(serviceRecord.get(PORT_RECORD_KEY));
102+
this.id = serviceRecord.get(OPAQUE_ID_KEY);
103+
}
104+
105+
public ApplicationId getApplicationId() {
106+
return appId;
107+
}
108+
109+
public String getHost() {
110+
return host;
111+
}
112+
113+
public int getPort() {
114+
return port;
115+
}
116+
117+
public String getId() {
118+
return id;
119+
}
120+
121+
@Override
122+
public boolean equals(Object other) {
123+
if (this == other) {
124+
return true;
125+
}
126+
if (other instanceof AMRecord otherRecord) {
127+
return appId.equals(otherRecord.appId)
128+
&& host.equals(otherRecord.host)
129+
&& port == otherRecord.port
130+
&& id.equals(otherRecord.id);
131+
} else {
132+
return false;
133+
}
134+
}
135+
136+
/**
137+
* Converts this {@code AMRecord} into a {@link ServiceRecord}.
138+
* <p>
139+
* The returned {@link ServiceRecord} contains the Application Master metadata
140+
* (application ID, host, port, and opaque identifier) so that it can be stored
141+
* in an external registry or retrieved later.
142+
* <p>
143+
* While this method may not be directly used within Tez internals,
144+
* it is part of the Tez public API and is intended for Tez clients
145+
* that interact with unmanaged sessions or otherwise need to
146+
* persist/reconstruct Application Master information.
147+
*
148+
* @return a {@link ServiceRecord} populated with the values of this {@code AMRecord}
149+
*/
150+
public ServiceRecord toServiceRecord() {
151+
ServiceRecord serviceRecord = new ServiceRecord();
152+
serviceRecord.set(APP_ID_RECORD_KEY, appId);
153+
serviceRecord.set(HOST_RECORD_KEY, host);
154+
serviceRecord.set(PORT_RECORD_KEY, port);
155+
serviceRecord.set(OPAQUE_ID_KEY, id);
156+
return serviceRecord;
157+
}
158+
159+
@Override
160+
public int hashCode() {
161+
return Objects.hash(appId, host, port, id);
162+
}
163+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
@Public
20+
@Evolving
21+
package org.apache.tez.client.registry;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience.Public;
24+
import org.apache.hadoop.classification.InterfaceStability.Evolving;

tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,4 +2349,11 @@ static Set<String> getPropertySet() {
23492349
@ConfigurationScope(Scope.AM)
23502350
@ConfigurationProperty
23512351
public static final String TEZ_AM_STANDALONE_CONFS = TEZ_AM_PREFIX + "standalone.confs";
2352+
2353+
/**
2354+
* String value. The class to be used for the AM registry.
2355+
*/
2356+
@ConfigurationScope(Scope.AM)
2357+
@ConfigurationProperty
2358+
public static final String TEZ_AM_REGISTRY_CLASS = TEZ_AM_PREFIX + "registry.class";
23522359
}

tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ public final class TezConstants {
102102
/// Version-related Environment variables
103103
public static final String TEZ_CLIENT_VERSION_ENV = "TEZ_CLIENT_VERSION";
104104

105+
//Arbitrary opaque ID to identify AM instances from AMRegistryClient
106+
public static final String TEZ_AM_UUID = "TEZ_AM_UUID";
107+
105108
private static final String TEZ_AM_SERVICE_PLUGIN_NAME_YARN_CONTAINERS = "TezYarn";
106109
private static final String TEZ_AM_SERVICE_PLUGIN_NAME_IN_AM = "TezUber";
107110

tez-dag/src/main/java/org/apache/tez/client/LocalClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemp
415415
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto)
416416
: new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
417417
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
418-
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto);
418+
versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto, null);
419419
}
420420

421421
@Override
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.tez.dag.api.client.registry;
20+
21+
import org.apache.hadoop.service.AbstractService;
22+
import org.apache.tez.client.registry.AMRecord;
23+
24+
/**
25+
* Base class for AMRegistry implementations.
26+
* The specific implementation class is configured by `tez.am.registry.class`.
27+
*
28+
* Implementations should handle the relevant service lifecycle operations:
29+
* `init`, `serviceStart`, `serviceStop`, etc.
30+
* - `init` and `serviceStart` are invoked during `DAGAppMaster.serviceInit`.
31+
* - `serviceStop` is invoked on `DAGAppMaster` shutdown.
32+
*/
33+
public abstract class AMRegistry extends AbstractService {
34+
35+
/* Implementations should provide a public no-arg constructor. */
36+
protected AMRegistry(String name) {
37+
super(name);
38+
}
39+
40+
/* Under typical usage, add() will be called once automatically with an AMRecord
41+
for the DAGClientServer that services an AM. */
42+
public abstract void add(AMRecord server) throws Exception;
43+
44+
/* Under typical usage, implementations should remove any stale AMRecords upon serviceStop. */
45+
public abstract void remove(AMRecord server) throws Exception;
46+
47+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
@Public
20+
@Evolving
21+
package org.apache.tez.dag.api.client.registry;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience.Public;
24+
import org.apache.hadoop.classification.InterfaceStability.Evolving;

0 commit comments

Comments
 (0)