Skip to content

Commit fbe40c8

Browse files
committed
TEZ-4007: Zookeeper based FrameworkServices and AmExtensions (3/3) - checkstyle, spotbugs, javadoc improvements, refactor
1 parent 5fbcda9 commit fbe40c8

File tree

35 files changed

+605
-342
lines changed

35 files changed

+605
-342
lines changed

tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java

Lines changed: 82 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
/**
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
12-
* Unless required by applicable law or agreed to in writing, software
13-
* distributed under the License is distributed on an "AS IS" BASIS,
14-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15-
* See the License for the specific language governing permissions and
16-
* limitations under the License.
17-
*/
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
1818

1919
package org.apache.tez.client.registry.zookeeper;
2020

@@ -37,7 +37,6 @@
3737
import org.apache.hadoop.registry.client.types.ServiceRecord;
3838
import org.apache.tez.client.registry.AMRecord;
3939
import org.apache.tez.client.registry.AMRegistryClient;
40-
import org.apache.tez.client.registry.AMRegistryClientListener;
4140
import org.apache.tez.dag.api.TezConfiguration;
4241

4342
import com.google.common.base.Preconditions;
@@ -46,8 +45,8 @@
4645
import org.slf4j.LoggerFactory;
4746

4847
/**
49-
* Curator/Zookeeper impl of AMRegistryClient
50-
*/
48+
* Curator/Zookeeper implementation of {@link AMRegistryClient}.
49+
*/
5150
@InterfaceAudience.Public
5251
public class ZkAMRegistryClient extends AMRegistryClient {
5352
private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistryClient.class);
@@ -57,7 +56,6 @@ public class ZkAMRegistryClient extends AMRegistryClient {
5756
//Cache of known AMs
5857
private ConcurrentHashMap<String, AMRecord> amRecordCache = new ConcurrentHashMap<>();
5958
private CuratorFramework client;
60-
private PathChildrenCache cache;
6159

6260
private static Map<String, ZkAMRegistryClient> INSTANCES = new HashMap<>();
6361

@@ -79,7 +77,7 @@ private ZkAMRegistryClient(final Configuration conf) {
7977
public void start() throws Exception {
8078
ZkConfig zkConf = new ZkConfig(this.conf);
8179
client = zkConf.createCuratorFramework();
82-
cache = new PathChildrenCache(client, zkConf.getZkNamespace(), true);
80+
PathChildrenCache cache = new PathChildrenCache(client, zkConf.getZkNamespace(), true);
8381
client.start();
8482
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
8583
for (ChildData childData : cache.getCurrentData()) {
@@ -91,7 +89,15 @@ public void start() throws Exception {
9189
cache.getListenable().addListener(new ZkRegistryListener());
9290
}
9391

94-
//Deserialize ServiceRecord from Zookeeper to populate AMRecord in cache
92+
/**
93+
* Deserializes a {@link ServiceRecord} from ZooKeeper data and converts it into an {@link AMRecord}
94+
* for caching.
95+
*
96+
* @param childData the ZooKeeper node data containing a serialized {@link ServiceRecord}
97+
* @return an {@link AMRecord} constructed from the deserialized {@link ServiceRecord}, or {@code null}
98+
* if no data is present
99+
* @throws IOException if the data cannot be deserialized into a {@link ServiceRecord}
100+
*/
95101
public static AMRecord getAMRecord(final ChildData childData) throws IOException {
96102
byte[] data = childData.getData();
97103
// only the path appeared, there is no data yet
@@ -104,75 +110,76 @@ public static AMRecord getAMRecord(final ChildData childData) throws IOException
104110
return new AMRecord(serviceRecord);
105111
}
106112

107-
@Override public AMRecord getRecord(String appId) {
113+
@Override
114+
public AMRecord getRecord(String appId) {
108115
if (amRecordCache.get(appId) == null) {
109116
return null;
110117
}
111118
//Return a copy
112119
return new AMRecord(amRecordCache.get(appId));
113120
}
114121

115-
@Override public List<AMRecord> getAllRecords() {
116-
return amRecordCache.values().stream()
117-
.map(record -> new AMRecord(record)).collect(Collectors.toList());
118-
}
119-
120-
@Override public synchronized void addListener(AMRegistryClientListener listener) {
121-
listeners.add(listener);
122+
@Override
123+
public List<AMRecord> getAllRecords() {
124+
return amRecordCache.values().stream().map(AMRecord::new).collect(Collectors.toList());
122125
}
123126

124-
//Callback for Zookeeper to update local cache
127+
/**
128+
* Callback listener for ZooKeeper events that updates the local cache
129+
* when child nodes under the monitored path change.
130+
*/
125131
private class ZkRegistryListener implements PathChildrenCacheListener {
126132

127-
@Override public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event)
128-
throws Exception {
129-
Preconditions.checkArgument(client != null && client.getState() == CuratorFrameworkState.STARTED,
133+
@Override
134+
public void childEvent(final CuratorFramework clientParam, final PathChildrenCacheEvent event)
135+
throws Exception {
136+
Preconditions.checkArgument(clientParam != null && clientParam.getState() == CuratorFrameworkState.STARTED,
130137
"Curator client is not started");
131138

132-
ChildData childData = event.getData();
133-
switch (event.getType()) {
134-
case CHILD_ADDED:
135-
if(isEmpty(childData)) {
136-
LOG.info("AppId allocated: {}", childData.getPath());
137-
} else {
138-
AMRecord amRecord = getAMRecord(childData);
139-
if (amRecord != null) {
140-
LOG.info("AM registered with data: {}. Notifying {} listeners.", amRecord, listeners.size());
141-
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
142-
notifyOnAdded(amRecord);
139+
ChildData childData = event.getData();
140+
switch (event.getType()) {
141+
case CHILD_ADDED:
142+
if (isEmpty(childData)) {
143+
LOG.info("AppId allocated: {}", childData.getPath());
144+
} else {
145+
AMRecord amRecord = getAMRecord(childData);
146+
if (amRecord != null) {
147+
LOG.info("AM registered with data: {}. Notifying {} listeners.", amRecord, listeners.size());
148+
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
149+
notifyOnAdded(amRecord);
150+
}
151+
}
152+
break;
153+
case CHILD_UPDATED:
154+
if (isEmpty(childData)) {
155+
throw new RuntimeException("AM updated with empty data");
156+
} else {
157+
AMRecord amRecord = getAMRecord(childData);
158+
if (amRecord != null) {
159+
LOG.info("AM updated data: {}. Notifying {} listeners.", amRecord, listeners.size());
160+
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
161+
notifyOnAdded(amRecord);
162+
}
143163
}
144-
}
145-
break;
146-
case CHILD_UPDATED:
147-
if(isEmpty(childData)) {
148-
throw new RuntimeException("AM updated with empty data");
149-
} else {
150-
AMRecord amRecord = getAMRecord(childData);
151-
if (amRecord != null) {
152-
LOG.info("AM updated data: {}. Notifying {} listeners.", amRecord, listeners.size());
153-
amRecordCache.put(amRecord.getApplicationId().toString(), amRecord);
154-
notifyOnAdded(amRecord);
164+
break;
165+
case CHILD_REMOVED:
166+
if (isEmpty(childData)) {
167+
LOG.info("Unused AppId unregistered: {}", childData.getPath());
168+
} else {
169+
AMRecord amRecord = getAMRecord(childData);
170+
if (amRecord != null) {
171+
LOG.info("AM removed: {}. Notifying {} listeners.", amRecord, listeners.size());
172+
amRecordCache.remove(amRecord.getApplicationId().toString(), amRecord);
173+
notifyOnRemoved(amRecord);
174+
}
155175
}
156-
}
157-
break;
158-
case CHILD_REMOVED:
159-
if(isEmpty(childData)) {
160-
LOG.info("Unused AppId unregistered: {}", childData.getPath());
161-
} else {
162-
AMRecord amRecord = getAMRecord(childData);
163-
if (amRecord != null) {
164-
LOG.info("AM removed: {}. Notifying {} listeners.", amRecord, listeners.size());
165-
amRecordCache.remove(amRecord.getApplicationId().toString(), amRecord);
166-
notifyOnRemoved(amRecord);
176+
break;
177+
default:
178+
if (childData == null) {
179+
LOG.info("Ignored event {}", event.getType());
180+
} else {
181+
LOG.info("Ignored event {} for {}", event.getType(), childData.getPath());
167182
}
168-
}
169-
break;
170-
default:
171-
if(childData == null) {
172-
LOG.info("Ignored event {}", event.getType());
173-
} else {
174-
LOG.info("Ignored event {} for {}", event.getType(), childData.getPath());
175-
}
176183
}
177184
}
178185

tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,13 @@
3737
public class ZkFrameworkClient extends FrameworkClient {
3838

3939
private AMRecord amRecord;
40-
private TezConfiguration tezConf;
4140
private ZkAMRegistryClient amRegistryClient = null;
4241
private boolean isRunning = false;
4342
private String amHost;
4443
private int amPort;
4544

4645
@Override
4746
public synchronized void init(TezConfiguration tezConf) {
48-
this.tezConf = tezConf;
4947
if (this.amRegistryClient == null) {
5048
try {
5149
this.amRegistryClient = ZkAMRegistryClient.getClient(tezConf);
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
@Public
20+
@Evolving
21+
package org.apache.tez.client.registry.zookeeper;
22+
23+
import org.apache.hadoop.classification.InterfaceAudience.Public;
24+
import org.apache.hadoop.classification.InterfaceStability.Evolving;

tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -869,8 +869,7 @@ public TezConfiguration(boolean loadDefaults) {
869869
/** Int value. Port used for AM RPC*/
870870
@ConfigurationScope(Scope.AM)
871871
@ConfigurationProperty(type="integer")
872-
public static final String TEZ_AM_RPC_PORT =
873-
TEZ_AM_PREFIX + "rpc.port";
872+
public static final String TEZ_AM_RPC_PORT = TEZ_AM_PREFIX + "rpc.port";
874873
public static final int TEZ_AM_RPC_PORT_DEFAULT = 0;
875874

876875
/** Int value. Number of threads to handle client RPC requests. Expert level setting.*/
@@ -2351,52 +2350,57 @@ static Set<String> getPropertySet() {
23512350
public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks";
23522351

23532352
/**
2354-
* String value
2353+
* String value. ZooKeeper quorum connection string used when creating a CuratorFramework for the ZooKeeper registry.
23552354
*/
23562355
@ConfigurationScope(Scope.AM)
23572356
@ConfigurationProperty
23582357
public static final String TEZ_AM_ZOOKEEPER_QUORUM = TEZ_AM_PREFIX + "zookeeper.quorum";
23592358

23602359
/**
2361-
* String value
2360+
* String value. Namespace in ZooKeeper registry for the Application Master.
23622361
*/
23632362
@ConfigurationScope(Scope.AM)
23642363
@ConfigurationProperty
23652364
public static final String TEZ_AM_REGISTRY_NAMESPACE = TEZ_AM_PREFIX + "registry.namespace";
23662365
public static final String TEZ_AM_REGISTRY_NAMESPACE_DEFAULT = "/tez_am/server";
23672366

23682367
/**
2369-
* Integer value
2368+
* Integer value. Initial backoff sleep duration (in milliseconds) for Curator retries.
2369+
* It's used when creating a CuratorFramework for the ZooKeeper registry.
23702370
*/
23712371
@ConfigurationScope(Scope.AM)
23722372
@ConfigurationProperty
23732373
public static final String TEZ_AM_CURATOR_BACKOFF_SLEEP = TEZ_AM_PREFIX + "curator.backoff.sleep";
23742374
public static final int TEZ_AM_CURATOR_BACKOFF_SLEEP_DEFAULT = 1000;
23752375

23762376
/**
2377-
* Integer value
2377+
* Integer value. Maximum number of retries for Curator operations.
2378+
* It's used when creating a CuratorFramework for the ZooKeeper registry.
23782379
*/
23792380
@ConfigurationScope(Scope.AM)
23802381
@ConfigurationProperty
23812382
public static final String TEZ_AM_CURATOR_MAX_RETRIES = TEZ_AM_PREFIX + "curator.max.retries";
23822383
public static final int TEZ_AM_CURATOR_MAX_RETRIES_DEFAULT = 3;
23832384

23842385
/**
2385-
* Integer value (milliseconds)
2386+
* Integer value. Session timeout (in milliseconds) for Curator framework.
2387+
* It's used when creating a CuratorFramework for the ZooKeeper registry.
23862388
*/
23872389
@ConfigurationScope(Scope.AM)
23882390
@ConfigurationProperty
23892391
public static final String TEZ_AM_CURATOR_SESSION_TIMEOUT = TEZ_AM_PREFIX + "curator.session.timeout";
23902392
public static final int TEZ_AM_CURATOR_SESSION_TIMEOUT_DEFAULT = 60000;
23912393

23922394
/**
2393-
* Integer value (milliseconds)
2394-
*/
2395+
* Integer value. Connection timeout (in milliseconds) for Curator framework.
2396+
* It's used when creating a CuratorFramework for the ZooKeeper registry.
2397+
*/
23952398
@ConfigurationScope(Scope.AM)
23962399
@ConfigurationProperty
23972400
public static final String TEZ_AM_CURATOR_CONNECTION_TIMEOUT = TEZ_AM_PREFIX + "curator.connection.timeout";
23982401
public static final int TEZ_AM_CURATOR_CONNECTION_TIMEOUT_DEFAULT = 15000;
23992402

2403+
24002404
@ConfigurationProperty
24012405
public static final String TEZ_FRAMEWORK_MODE = TEZ_PREFIX + ".framework.mode";
24022406

tez-api/src/main/java/org/apache/tez/frameworkplugins/ClientFrameworkService.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.hadoop.conf.Configuration;
2323
import org.apache.tez.client.FrameworkClient;
24-
import org.apache.tez.client.registry.AMRegistryClient;
2524

2625
/*
2726
FrameworkService that runs code within the client process that is using TezClient
@@ -30,6 +29,4 @@
3029
public interface ClientFrameworkService extends FrameworkService {
3130
//Provide an impl. for org.apache.tez.client.FrameworkClient
3231
Optional<FrameworkClient> createOrGetFrameworkClient(Configuration conf);
33-
//Provide an impl. for org.apache.tez.registry.AMRegistryClient
34-
Optional<AMRegistryClient> createOrGetRegistryClient(Configuration conf);
3532
}

tez-api/src/main/java/org/apache/tez/frameworkplugins/FrameworkMode.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
*
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
*
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,13 +28,25 @@ public enum FrameworkMode {
2828

2929
STANDALONE_ZOOKEEPER(
3030
"org.apache.tez.frameworkplugins.zookeeper.ZookeeperStandaloneClientFrameworkService",
31-
"org.apache.tez.frameworkplugins.zookeeper.ZookeeperStandaloneServerFrameworkService");
31+
"org.apache.tez.frameworkplugins.zookeeper.ZookeeperStandaloneServerFrameworkService"),
32+
33+
YARN(
34+
"org.apache.tez.frameworkplugins.yarn.YarnClientFrameworkService",
35+
"org.apache.tez.frameworkplugins.yarn.YarnServerFrameworkService");
3236

33-
String clientClassName;
34-
String serverClassName;
37+
private final String clientClassName;
38+
private final String serverClassName;
3539

3640
FrameworkMode(String clientClassName, String serverClassName) {
3741
this.clientClassName = clientClassName;
3842
this.serverClassName = serverClassName;
3943
}
44+
45+
public String getClientClassName() {
46+
return clientClassName;
47+
}
48+
49+
public String getServerClassName() {
50+
return serverClassName;
51+
}
4052
}

0 commit comments

Comments
 (0)