Skip to content

Commit eec88ab

Browse files
committed
TEZ-4007: Zookeeper based FrameworkServices and AmExtensions (3/3) - checkstyle, spotbugs, javadoc improvements, refactor, test fixes
1 parent 21442f1 commit eec88ab

File tree

69 files changed

+1416
-934
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+1416
-934
lines changed

pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<commons-io.version>2.16.0</commons-io.version>
7070
<commons-lang.version>2.6</commons-lang.version>
7171
<clover.license>${user.home}/clover.license</clover.license>
72-
<curator.version>2.7.1</curator.version>
72+
<curator.version>5.9.0</curator.version>
7373
<dependency-check-maven.version>3.2.0</dependency-check-maven.version>
7474
<dependency-maven-plugin.version>3.8.1</dependency-maven-plugin.version>
7575
<spotbugs.version>4.9.3</spotbugs.version>
@@ -749,6 +749,18 @@
749749
<type>test-jar</type>
750750
<scope>test</scope>
751751
</dependency>
752+
<dependency>
753+
<groupId>org.apache.curator</groupId>
754+
<artifactId>curator-test</artifactId>
755+
<version>${curator.version}</version>
756+
<scope>test</scope>
757+
<exclusions>
758+
<exclusion>
759+
<groupId>org.junit.jupiter</groupId>
760+
<artifactId>junit-jupiter-api</artifactId>
761+
</exclusion>
762+
</exclusions>
763+
</dependency>
752764
<dependency>
753765
<groupId>org.mockito</groupId>
754766
<artifactId>mockito-core</artifactId>

tez-api/findbugs-exclude.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,11 @@
151151
<Method name="getApplicationId" />
152152
<Bug pattern="EI_EXPOSE_REP" />
153153
</Match>
154+
155+
<!-- TEZ-4007 -->
156+
<Match>
157+
<Class name="org.apache.tez.client.registry.zookeeper.ZkFrameworkClient" />
158+
<Field name="amPort" />
159+
<Bug pattern="AT_STALE_THREAD_WRITE_OF_PRIMITIVE" />
160+
</Match>
154161
</FindBugsFilter>

tez-api/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@
126126
<dependency>
127127
<groupId>org.apache.curator</groupId>
128128
<artifactId>curator-test</artifactId>
129-
<version>${curator.version}</version>
130129
<scope>test</scope>
131130
</dependency>
132131
</dependencies>

tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919
package org.apache.tez.client;
2020

2121
import java.io.IOException;
22-
import java.util.Optional;
2322

2423
import org.apache.hadoop.classification.InterfaceAudience.Private;
2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.security.UserGroupInformation;
2726
import org.apache.hadoop.yarn.api.records.ApplicationId;
2827
import org.apache.hadoop.yarn.api.records.ApplicationReport;
2928
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
30-
import org.apache.hadoop.yarn.client.api.YarnClient;
3129
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
3230
import org.apache.hadoop.yarn.exceptions.YarnException;
3331
import org.apache.tez.common.RPCUtil;
@@ -49,6 +47,7 @@
4947
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGResponseProto;
5048
import org.apache.tez.frameworkplugins.ClientFrameworkService;
5149
import org.apache.tez.frameworkplugins.FrameworkUtils;
50+
import org.apache.tez.frameworkplugins.yarn.YarnClientFrameworkService;
5251

5352
import com.google.protobuf.ServiceException;
5453

@@ -60,21 +59,18 @@ public abstract class FrameworkClient {
6059
protected static final Logger LOG = LoggerFactory.getLogger(FrameworkClient.class);
6160

6261
public static FrameworkClient createFrameworkClient(TezConfiguration tezConf) {
63-
Optional<FrameworkClient> pluginClient =
64-
FrameworkUtils.get(ClientFrameworkService.class, tezConf)
65-
.flatMap(framework -> framework.createOrGetFrameworkClient(tezConf));
66-
6762
boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
6863
if (isLocal) {
6964
try {
7065
return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient");
7166
} catch (TezReflectionException e) {
7267
throw new TezUncheckedException("Fail to create LocalClient", e);
7368
}
74-
} else if (pluginClient.isPresent()) {
75-
return pluginClient.get();
69+
} else {
70+
ClientFrameworkService clientFrameworkService = FrameworkUtils.get(ClientFrameworkService.class, tezConf);
71+
return clientFrameworkService == null ? new YarnClientFrameworkService().newFrameworkClient()
72+
: clientFrameworkService.newFrameworkClient();
7673
}
77-
return new TezYarnClient(YarnClient.createYarnClient());
7874
}
7975

8076
/**

tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class TezYarnClient extends FrameworkClient {
4343
private String amHost;
4444
private int amPort;
4545

46-
protected TezYarnClient(YarnClient yarnClient) {
46+
public TezYarnClient(YarnClient yarnClient) {
4747
this.yarnClient = yarnClient;
4848
}
4949

tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ public ServiceRecord toServiceRecord() {
156156
return serviceRecord;
157157
}
158158

159+
@Override
160+
public String toString() {
161+
return toServiceRecord().attributes().toString();
162+
}
163+
159164
@Override
160165
public int hashCode() {
161166
return Objects.hash(appId, host, port, externalId);

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.List;
2525

2626
/**
27-
* Interface for client-side AM discovery
27+
* Interface for client-side AM discovery.
2828
*/
2929
public abstract class AMRegistryClient implements Closeable {
3030

Lines changed: 101 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
/**
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
12-
* Unless required by applicable law or agreed to in writing, software
13-
* distributed under the License is distributed on an "AS IS" BASIS,
14-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
* See the License for the specific language governing permissions and
16-
* limitations under the License.
17-
*/
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
1818

1919
package org.apache.tez.client.registry.zookeeper;
2020

@@ -31,13 +31,13 @@
3131
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
3232
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
3333
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
34+
import org.apache.curator.shaded.com.google.common.base.Charsets;
3435
import org.apache.hadoop.classification.InterfaceAudience;
3536
import org.apache.hadoop.conf.Configuration;
3637
import org.apache.hadoop.registry.client.binding.RegistryUtils;
3738
import org.apache.hadoop.registry.client.types.ServiceRecord;
3839
import org.apache.tez.client.registry.AMRecord;
3940
import org.apache.tez.client.registry.AMRegistryClient;
40-
import org.apache.tez.client.registry.AMRegistryClientListener;
4141
import org.apache.tez.dag.api.TezConfiguration;
4242

4343
import com.google.common.base.Preconditions;
@@ -46,20 +46,20 @@
4646
import org.slf4j.LoggerFactory;
4747

4848
/**
49-
* Curator/Zookeeper impl of AMRegistryClient
50-
*/
49+
* Curator/Zookeeper implementation of {@link AMRegistryClient}.
50+
*/
5151
@InterfaceAudience.Public
5252
public class ZkAMRegistryClient extends AMRegistryClient {
5353
private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistryClient.class);
54-
54+
private static final Map<String, ZkAMRegistryClient> INSTANCES = new HashMap<>();
5555
private final Configuration conf;
56-
5756
//Cache of known AMs
58-
private ConcurrentHashMap<String, AMRecord> amRecordCache = new ConcurrentHashMap<>();
57+
private final ConcurrentHashMap<String, AMRecord> amRecordCache = new ConcurrentHashMap<>();
5958
private CuratorFramework client;
60-
private PathChildrenCache cache;
6159

62-
private static Map<String, ZkAMRegistryClient> INSTANCES = new HashMap<>();
60+
private ZkAMRegistryClient(final Configuration conf) {
61+
this.conf = conf;
62+
}
6363

6464
public static synchronized ZkAMRegistryClient getClient(final Configuration conf) {
6565
String namespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE);
@@ -72,14 +72,31 @@ public static synchronized ZkAMRegistryClient getClient(final Configuration conf
7272
return registry;
7373
}
7474

75-
private ZkAMRegistryClient(final Configuration conf) {
76-
this.conf = conf;
75+
/**
76+
* Deserializes a {@link ServiceRecord} from ZooKeeper data and converts it into an {@link AMRecord}
77+
* for caching.
78+
*
79+
* @param childData the ZooKeeper node data containing a serialized {@link ServiceRecord}
80+
* @return an {@link AMRecord} constructed from the deserialized {@link ServiceRecord}, or {@code null}
81+
* if no data is present
82+
* @throws IOException if the data cannot be deserialized into a {@link ServiceRecord}
83+
*/
84+
public static AMRecord getAMRecord(final ChildData childData) throws IOException {
85+
byte[] data = childData.getData();
86+
// only the path appeared, there is no data yet
87+
if (data.length == 0) {
88+
return null;
89+
}
90+
String value = new String(data, Charsets.UTF_8);
91+
RegistryUtils.ServiceRecordMarshal marshal = new RegistryUtils.ServiceRecordMarshal();
92+
ServiceRecord serviceRecord = marshal.fromJson(value);
93+
return new AMRecord(serviceRecord);
7794
}
7895

7996
public void start() throws Exception {
8097
ZkConfig zkConf = new ZkConfig(this.conf);
8198
client = zkConf.createCuratorFramework();
82-
cache = new PathChildrenCache(client, zkConf.getZkNamespace(), true);
99+
PathChildrenCache cache = new PathChildrenCache(client, zkConf.getZkNamespace(), true);
83100
client.start();
84101
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
85102
for (ChildData childData : cache.getCurrentData()) {
@@ -91,98 +108,85 @@ public void start() throws Exception {
91108
cache.getListenable().addListener(new ZkRegistryListener());
92109
}
93110

94-
//Deserialize ServiceRecord from Zookeeper to populate AMRecord in cache
95-
public static AMRecord getAMRecord(final ChildData childData) throws IOException {
96-
byte[] data = childData.getData();
97-
// only the path appeared, there is no data yet
98-
if (data.length == 0) {
99-
return null;
100-
}
101-
String value = new String(data);
102-
RegistryUtils.ServiceRecordMarshal marshal = new RegistryUtils.ServiceRecordMarshal();
103-
ServiceRecord serviceRecord = marshal.fromJson(value);
104-
return new AMRecord(serviceRecord);
105-
}
106-
107-
@Override public AMRecord getRecord(String appId) {
111+
@Override
112+
public AMRecord getRecord(String appId) {
108113
if (amRecordCache.get(appId) == null) {
109114
return null;
110115
}
111116
//Return a copy
112117
return new AMRecord(amRecordCache.get(appId));
113118
}
114119

115-
@Override public List<AMRecord> getAllRecords() {
116-
return amRecordCache.values().stream()
117-
.map(record -> new AMRecord(record)).collect(Collectors.toList());
118-
}
120+
@Override
121+
public List<AMRecord> getAllRecords() {
122+
return amRecordCache.values().stream().map(AMRecord::new).collect(Collectors.toList());
123+
}
119124

120-
@Override public synchronized void addListener(AMRegistryClientListener listener) {
121-
listeners.add(listener);
125+
@Override
126+
public void close() {
127+
client.close();
122128
}
123129

124-
//Callback for Zookeeper to update local cache
130+
/**
131+
* Callback listener for ZooKeeper events that updates the local cache
132+
* when child nodes under the monitored path change.
133+
*/
125134
private class ZkRegistryListener implements PathChildrenCacheListener {
126135

127-
@Override public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event)
128-
throws Exception {
129-
Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED,
136+
@Override
137+
public void childEvent(final CuratorFramework clientParam, final PathChildrenCacheEvent event) throws Exception {
138+
Preconditions.checkArgument(clientParam != null && clientParam.getState() == CuratorFrameworkState.STARTED,
130139
"Curator client is not started");
131140

132-
ChildData childData = event.getData();
133-
switch (event.getType()) {
134-
case CHILD_ADDED:
135-
if(isEmpty(childData)) {
136-
LOG.info("AppId allocated: {}", childData.getPath());
137-
} else {
138-
AMRecord amRecord = getAMRecord(childData);
139-
if (amRecord != null) {
140-
LOG.info("AM registered with data: {}. Notifying {} listeners.", amRecord, listeners.size());
141-
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
142-
notifyOnAdded(amRecord);
141+
ChildData childData = event.getData();
142+
switch (event.getType()) {
143+
case CHILD_ADDED:
144+
if (isEmpty(childData)) {
145+
LOG.info("AppId allocated: {}", childData.getPath());
146+
} else {
147+
AMRecord amRecord = getAMRecord(childData);
148+
if (amRecord != null) {
149+
LOG.info("AM registered with data: {}. Notifying {} listeners.", amRecord, listeners.size());
150+
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
151+
notifyOnAdded(amRecord);
152+
}
153+
}
154+
break;
155+
case CHILD_UPDATED:
156+
if (isEmpty(childData)) {
157+
throw new RuntimeException("AM updated with empty data");
158+
} else {
159+
AMRecord amRecord = getAMRecord(childData);
160+
if (amRecord != null) {
161+
LOG.info("AM updated data: {}. Notifying {} listeners.", amRecord, listeners.size());
162+
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
163+
notifyOnAdded(amRecord);
164+
}
143165
}
144-
}
145-
break;
146-
case CHILD_UPDATED:
147-
if(isEmpty(childData)) {
148-
throw new RuntimeException("AM updated with empty data");
149-
} else {
150-
AMRecord amRecord = getAMRecord(childData);
151-
if (amRecord != null) {
152-
LOG.info("AM updated data: {}. Notifying {} listeners.", amRecord, listeners.size());
153-
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
154-
notifyOnAdded(amRecord);
166+
break;
167+
case CHILD_REMOVED:
168+
if (isEmpty(childData)) {
169+
LOG.info("Unused AppId unregistered: {}", childData.getPath());
170+
} else {
171+
AMRecord amRecord = getAMRecord(childData);
172+
if (amRecord != null) {
173+
LOG.info("AM removed: {}. Notifying {} listeners.", amRecord, listeners.size());
174+
amRecordCache.remove(amRecord.getApplicationId().toString(), amRecord);
175+
notifyOnRemoved(amRecord);
176+
}
155177
}
156-
}
157-
break;
158-
case CHILD_REMOVED:
159-
if(isEmpty(childData)) {
160-
LOG.info("Unused AppId unregistered: {}", childData.getPath());
161-
} else {
162-
AMRecord amRecord = getAMRecord(childData);
163-
if (amRecord != null) {
164-
LOG.info("AM removed: {}. Notifying {} listeners.", amRecord, listeners.size());
165-
amRecordCache.remove(amRecord.getApplicationId().toString(), amRecord);
166-
notifyOnRemoved(amRecord);
178+
break;
179+
default:
180+
if (childData == null) {
181+
LOG.info("Ignored event {}", event.getType());
182+
} else {
183+
LOG.info("Ignored event {} for {}", event.getType(), childData.getPath());
167184
}
168-
}
169-
break;
170-
default:
171-
if(childData == null) {
172-
LOG.info("Ignored event {}", event.getType());
173-
} else {
174-
LOG.info("Ignored event {} for {}", event.getType(), childData.getPath());
175-
}
176185
}
177186
}
178187

179188
private boolean isEmpty(ChildData childData) {
180189
return childData == null || childData.getData() == null || childData.getData().length == 0;
181190
}
182191
}
183-
184-
@Override
185-
public void close() {
186-
client.close();
187-
}
188192
}

0 commit comments

Comments
 (0)