Skip to content

Commit bd1161c

Browse files
committed
PR comments 2
1 parent 3893e8f commit bd1161c

File tree

8 files changed

+35
-22
lines changed

8 files changed

+35
-22
lines changed

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistry.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,12 @@
3939
*/
4040
public interface AMRegistry extends AutoCloseable {
4141

42-
void add(AMRecord server) throws Exception;
42+
void add(AMRecord record) throws Exception;
4343

44-
void remove(AMRecord server) throws Exception;
44+
void remove(AMRecord record) throws Exception;
4545

4646
ApplicationId generateNewId() throws Exception;
4747

4848
AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port,
4949
String computeName);
50-
51-
void close();
5250
}

tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ protected synchronized void notifyOnAdded(AMRecord record) {
8181
try {
8282
listener.onAdd(record);
8383
} catch (Exception e) {
84-
LOG.warn("Exception while calling AM add listener, AM record {}", record, e);
84+
LOG.warn("Exception while notifying AM add listener for record {}", record, e);
8585
}
8686
}
8787
}
@@ -96,7 +96,7 @@ protected synchronized void notifyOnUpdated(AMRecord record) {
9696
try {
9797
listener.onUpdate(record);
9898
} catch (Exception e) {
99-
LOG.warn("Exception while calling AM update listener, AM record {}", record, e);
99+
LOG.warn("Exception while notifying AM update listener for record {}", record, e);
100100
}
101101
}
102102
}
@@ -111,7 +111,7 @@ protected synchronized void notifyOnRemoved(AMRecord record) {
111111
try {
112112
listener.onRemove(record);
113113
} catch (Exception e) {
114-
LOG.warn("Exception while calling AM remove listener, AM record {}", record, e);
114+
LOG.warn("Exception while notifying AM remove listener for record {}", record, e);
115115
}
116116
}
117117
}

tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ public void start() throws Exception {
113113
client = zkConf.createCuratorFramework();
114114
cache = new TreeCache(client, zkConf.getZkNamespace());
115115
client.start();
116+
client.blockUntilConnected();
116117
cache.start();
117118
listener = new ZkRegistryListener();
118119
cache.getListenable().addListener(listener);
@@ -175,7 +176,7 @@ public void childEvent(final CuratorFramework clientParam, final TreeCacheEvent
175176
if (amRecord != null) {
176177
LOG.info("AM updated data: {}. Notifying listeners.", amRecord);
177178
amRecordCache.put(amRecord.getApplicationId(), amRecord);
178-
notifyOnAdded(amRecord);
179+
notifyOnUpdated(amRecord);
179180
}
180181
}
181182
break;

tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,9 @@
3737

3838
public class ZkFrameworkClient extends FrameworkClient {
3939

40-
private AMRecord amRecord;
40+
private volatile AMRecord amRecord;
4141
private ZkAMRegistryClient amRegistryClient = null;
4242
private volatile boolean isRunning = false;
43-
private String amHost;
44-
private int amPort;
4543

4644
@Override
4745
public synchronized void init(TezConfiguration tezConf) {
@@ -56,6 +54,9 @@ public synchronized void init(TezConfiguration tezConf) {
5654

5755
@Override
5856
public void start() {
57+
if (isRunning) {
58+
return;
59+
}
5960
try {
6061
amRegistryClient.start();
6162
isRunning = true;
@@ -140,8 +141,6 @@ public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnEx
140141
report.setDiagnostics("AM record not found (likely died) in zookeeper for application id: " + appId);
141142
} else {
142143
report.setHost(amRecord.getHostName());
143-
amHost = amRecord.getHostName();
144-
amPort = amRecord.getPort();
145144
report.setRpcPort(amRecord.getPort());
146145
report.setYarnApplicationState(YarnApplicationState.RUNNING);
147146
}
@@ -155,12 +154,12 @@ public boolean isRunning() {
155154

156155
@Override
157156
public String getAmHost() {
158-
return amHost;
157+
return amRecord == null ? null : amRecord.getHostName();
159158
}
160159

161160
@Override
162161
public int getAmPort() {
163-
return amPort;
162+
return amRecord == null ? 0 : amRecord.getPort();
164163
}
165164

166165
@VisibleForTesting

tez-api/src/main/java/org/apache/tez/frameworkplugins/FrameworkUtils.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.tez.frameworkplugins;
1919

2020

21+
import java.lang.reflect.InvocationTargetException;
22+
2123
import javax.annotation.Nullable;
2224

25+
import org.apache.commons.lang3.StringUtils;
2326
import org.apache.hadoop.conf.Configuration;
2427
import org.apache.tez.common.ReflectionUtils;
2528
import org.apache.tez.dag.api.TezConfiguration;
@@ -54,17 +57,18 @@ public static <T extends FrameworkService> T get(Class<T> interfaze, @Nullable C
5457
String modeInConf = conf != null ? conf.get(TezConfiguration.TEZ_FRAMEWORK_MODE) : null;
5558
String modeInEnv = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE);
5659
try {
57-
if (modeInConf != null) {
60+
if (StringUtils.isNotEmpty(modeInConf)) {
5861
return getByMode(interfaze, modeInConf);
59-
} else if (modeInEnv != null) {
62+
} else if (StringUtils.isNotEmpty(modeInEnv)) {
6063
return getByMode(interfaze, modeInEnv);
6164
} else if (defaultClazz != null) {
62-
return (T) defaultClazz.newInstance();
65+
return (T) defaultClazz.getDeclaredConstructor().newInstance();
6366
} else {
6467
throw new RuntimeException(
6568
"Framework service not found in any mode: configuration, environment, or default class");
6669
}
67-
} catch (TezReflectionException | InstantiationException | IllegalAccessException e) {
70+
} catch (TezReflectionException | InstantiationException | IllegalAccessException | NoSuchMethodException |
71+
InvocationTargetException e) {
6872
throw new RuntimeException("Failed to load framework service for interface: " + interfaze.getName(), e);
6973
}
7074
}

tez-api/src/test/java/org/apache/tez/client/TestTezClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,8 @@ public void testTezClientReconnect(boolean isSession) throws Exception {
546546
assertEquals(dagClient.getSessionIdentifierString(), appId.toString());
547547

548548
dagClient.close();
549+
client.stop();
550+
client2.stop();
549551
}
550552

551553
@Test (timeout=5000)

tez-dag/src/main/java/org/apache/tez/dag/api/client/registry/zookeeper/ZkAMRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public ApplicationId generateNewId() throws Exception {
156156
}
157157

158158
@Override
159-
public AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port, String computeName) {
159+
public AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port, String computeName) {
160160
return new AMRecord(appId, hostName, hostIp, port, externalId, computeName);
161161
}
162162

tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import java.util.Map;
4747
import java.util.Map.Entry;
4848
import java.util.Objects;
49-
import java.util.Optional;
5049
import java.util.Random;
5150
import java.util.Set;
5251
import java.util.Timer;
@@ -1979,7 +1978,17 @@ void stopServices() {
19791978
}
19801979
}
19811980

1982-
Optional.ofNullable(frameworkService.getAMRegistry(this.amConf)).ifPresent(AMRegistry::close);
1981+
AMRegistry registry = frameworkService.getAMRegistry(this.amConf);
1982+
if (registry != null) {
1983+
try {
1984+
registry.close();
1985+
} catch (Exception e) {
1986+
LOG.warn("Failed to close registry", e);
1987+
if (firstException != null) {
1988+
firstException = e;
1989+
}
1990+
}
1991+
}
19831992

19841993
//after stopping all services, rethrow the first exception raised
19851994
if (firstException != null) {

0 commit comments

Comments
 (0)