11/**
2- * Licensed to the Apache Software Foundation (ASF) under one
3- * or more contributor license agreements. See the NOTICE file
4- * distributed with this work for additional information
5- * regarding copyright ownership. The ASF licenses this file
6- * to you under the Apache License, Version 2.0 (the
7- * "License"); you may not use this file except in compliance
8- * with the License. You may obtain a copy of the License at
9- *
10- * http://www.apache.org/licenses/LICENSE-2.0
11- *
12- * Unless required by applicable law or agreed to in writing, software
13- * distributed under the License is distributed on an "AS IS" BASIS,
14- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15- * See the License for the specific language governing permissions and
16- * limitations under the License.
17- */
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ * <p>
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ * <p>
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
1818
1919package org .apache .tez .client .registry .zookeeper ;
2020
3131import org .apache .curator .framework .recipes .cache .PathChildrenCache ;
3232import org .apache .curator .framework .recipes .cache .PathChildrenCacheEvent ;
3333import org .apache .curator .framework .recipes .cache .PathChildrenCacheListener ;
34+ import org .apache .curator .shaded .com .google .common .base .Charsets ;
3435import org .apache .hadoop .classification .InterfaceAudience ;
3536import org .apache .hadoop .conf .Configuration ;
3637import org .apache .hadoop .registry .client .binding .RegistryUtils ;
3738import org .apache .hadoop .registry .client .types .ServiceRecord ;
3839import org .apache .tez .client .registry .AMRecord ;
3940import org .apache .tez .client .registry .AMRegistryClient ;
40- import org .apache .tez .client .registry .AMRegistryClientListener ;
4141import org .apache .tez .dag .api .TezConfiguration ;
4242
4343import com .google .common .base .Preconditions ;
4646import org .slf4j .LoggerFactory ;
4747
4848/**
49- * Curator/Zookeeper impl of AMRegistryClient
50- */
49+ * Curator/Zookeeper implementation of {@link AMRegistryClient}.
50+ */
5151@ InterfaceAudience .Public
5252public class ZkAMRegistryClient extends AMRegistryClient {
5353 private static final Logger LOG = LoggerFactory .getLogger (ZkAMRegistryClient .class );
54-
54+ private static final Map < String , ZkAMRegistryClient > INSTANCES = new HashMap <>();
5555 private final Configuration conf ;
56-
5756 //Cache of known AMs
58- private ConcurrentHashMap <String , AMRecord > amRecordCache = new ConcurrentHashMap <>();
57+ private final ConcurrentHashMap <String , AMRecord > amRecordCache = new ConcurrentHashMap <>();
5958 private CuratorFramework client ;
60- private PathChildrenCache cache ;
6159
62- private static Map <String , ZkAMRegistryClient > INSTANCES = new HashMap <>();
60+ private ZkAMRegistryClient (final Configuration conf ) {
61+ this .conf = conf ;
62+ }
6363
6464 public static synchronized ZkAMRegistryClient getClient (final Configuration conf ) {
6565 String namespace = conf .get (TezConfiguration .TEZ_AM_REGISTRY_NAMESPACE );
@@ -72,14 +72,31 @@ public static synchronized ZkAMRegistryClient getClient(final Configuration conf
7272 return registry ;
7373 }
7474
75- private ZkAMRegistryClient (final Configuration conf ) {
76- this .conf = conf ;
75+ /**
76+ * Deserializes a {@link ServiceRecord} from ZooKeeper data and converts it into an {@link AMRecord}
77+ * for caching.
78+ *
79+ * @param childData the ZooKeeper node data containing a serialized {@link ServiceRecord}
80+ * @return an {@link AMRecord} constructed from the deserialized {@link ServiceRecord}, or {@code null}
81+ * if no data is present
82+ * @throws IOException if the data cannot be deserialized into a {@link ServiceRecord}
83+ */
84+ public static AMRecord getAMRecord (final ChildData childData ) throws IOException {
85+ byte [] data = childData .getData ();
86+ // only the path appeared, there is no data yet
87+ if (data .length == 0 ) {
88+ return null ;
89+ }
90+ String value = new String (data , Charsets .UTF_8 );
91+ RegistryUtils .ServiceRecordMarshal marshal = new RegistryUtils .ServiceRecordMarshal ();
92+ ServiceRecord serviceRecord = marshal .fromJson (value );
93+ return new AMRecord (serviceRecord );
7794 }
7895
7996 public void start () throws Exception {
8097 ZkConfig zkConf = new ZkConfig (this .conf );
8198 client = zkConf .createCuratorFramework ();
82- cache = new PathChildrenCache (client , zkConf .getZkNamespace (), true );
99+ PathChildrenCache cache = new PathChildrenCache (client , zkConf .getZkNamespace (), true );
83100 client .start ();
84101 cache .start (PathChildrenCache .StartMode .BUILD_INITIAL_CACHE );
85102 for (ChildData childData : cache .getCurrentData ()) {
@@ -91,98 +108,85 @@ public void start() throws Exception {
91108 cache .getListenable ().addListener (new ZkRegistryListener ());
92109 }
93110
94- //Deserialize ServiceRecord from Zookeeper to populate AMRecord in cache
95- public static AMRecord getAMRecord (final ChildData childData ) throws IOException {
96- byte [] data = childData .getData ();
97- // only the path appeared, there is no data yet
98- if (data .length == 0 ) {
99- return null ;
100- }
101- String value = new String (data );
102- RegistryUtils .ServiceRecordMarshal marshal = new RegistryUtils .ServiceRecordMarshal ();
103- ServiceRecord serviceRecord = marshal .fromJson (value );
104- return new AMRecord (serviceRecord );
105- }
106-
107- @ Override public AMRecord getRecord (String appId ) {
111+ @ Override
112+ public AMRecord getRecord (String appId ) {
108113 if (amRecordCache .get (appId ) == null ) {
109114 return null ;
110115 }
111116 //Return a copy
112117 return new AMRecord (amRecordCache .get (appId ));
113118 }
114119
115- @ Override public List < AMRecord > getAllRecords () {
116- return amRecordCache . values (). stream ()
117- . map ( record -> new AMRecord ( record ) ).collect (Collectors .toList ());
118- }
120+ @ Override
121+ public List < AMRecord > getAllRecords () {
122+ return amRecordCache . values (). stream (). map ( AMRecord :: new ).collect (Collectors .toList ());
123+ }
119124
120- @ Override public synchronized void addListener (AMRegistryClientListener listener ) {
121- listeners .add (listener );
125+ @ Override
126+ public void close () {
127+ client .close ();
122128 }
123129
124- //Callback for Zookeeper to update local cache
130+ /**
131+ * Callback listener for ZooKeeper events that updates the local cache
132+ * when child nodes under the monitored path change.
133+ */
125134 private class ZkRegistryListener implements PathChildrenCacheListener {
126135
127- @ Override public void childEvent ( final CuratorFramework client , final PathChildrenCacheEvent event )
128- throws Exception {
129- Preconditions .checkArgument (client != null && client .getState () == CuratorFrameworkState .STARTED ,
136+ @ Override
137+ public void childEvent ( final CuratorFramework clientParam , final PathChildrenCacheEvent event ) throws Exception {
138+ Preconditions .checkArgument (clientParam != null && clientParam .getState () == CuratorFrameworkState .STARTED ,
130139 "Curator client is not started" );
131140
132- ChildData childData = event .getData ();
133- switch (event .getType ()) {
134- case CHILD_ADDED :
135- if (isEmpty (childData )) {
136- LOG .info ("AppId allocated: {}" , childData .getPath ());
137- } else {
138- AMRecord amRecord = getAMRecord (childData );
139- if (amRecord != null ) {
140- LOG .info ("AM registered with data: {}. Notifying {} listeners." , amRecord , listeners .size ());
141- amRecordCache .put (amRecord .getApplicationId ().toString (), amRecord );
142- notifyOnAdded (amRecord );
141+ ChildData childData = event .getData ();
142+ switch (event .getType ()) {
143+ case CHILD_ADDED :
144+ if (isEmpty (childData )) {
145+ LOG .info ("AppId allocated: {}" , childData .getPath ());
146+ } else {
147+ AMRecord amRecord = getAMRecord (childData );
148+ if (amRecord != null ) {
149+ LOG .info ("AM registered with data: {}. Notifying {} listeners." , amRecord , listeners .size ());
150+ amRecordCache .put (amRecord .getApplicationId ().toString (), amRecord );
151+ notifyOnAdded (amRecord );
152+ }
153+ }
154+ break ;
155+ case CHILD_UPDATED :
156+ if (isEmpty (childData )) {
157+ throw new RuntimeException ("AM updated with empty data" );
158+ } else {
159+ AMRecord amRecord = getAMRecord (childData );
160+ if (amRecord != null ) {
161+ LOG .info ("AM updated data: {}. Notifying {} listeners." , amRecord , listeners .size ());
162+ amRecordCache .put (amRecord .getApplicationId ().toString (), amRecord );
163+ notifyOnAdded (amRecord );
164+ }
143165 }
144- }
145- break ;
146- case CHILD_UPDATED :
147- if ( isEmpty ( childData )) {
148- throw new RuntimeException ( "AM updated with empty data" );
149- } else {
150- AMRecord amRecord = getAMRecord ( childData );
151- if ( amRecord != null ) {
152- LOG . info ( "AM updated data: {}. Notifying {} listeners." , amRecord , listeners . size () );
153- amRecordCache . put ( amRecord . getApplicationId (). toString (), amRecord );
154- notifyOnAdded ( amRecord );
166+ break ;
167+ case CHILD_REMOVED :
168+ if ( isEmpty ( childData )) {
169+ LOG . info ( "Unused AppId unregistered: {}" , childData . getPath ());
170+ } else {
171+ AMRecord amRecord = getAMRecord ( childData );
172+ if ( amRecord != null ) {
173+ LOG . info ( "AM removed: {}. Notifying {} listeners." , amRecord , listeners . size ());
174+ amRecordCache . remove ( amRecord . getApplicationId (). toString (), amRecord );
175+ notifyOnRemoved ( amRecord );
176+ }
155177 }
156- }
157- break ;
158- case CHILD_REMOVED :
159- if (isEmpty (childData )) {
160- LOG .info ("Unused AppId unregistered: {}" , childData .getPath ());
161- } else {
162- AMRecord amRecord = getAMRecord (childData );
163- if (amRecord != null ) {
164- LOG .info ("AM removed: {}. Notifying {} listeners." , amRecord , listeners .size ());
165- amRecordCache .remove (amRecord .getApplicationId ().toString (), amRecord );
166- notifyOnRemoved (amRecord );
178+ break ;
179+ default :
180+ if (childData == null ) {
181+ LOG .info ("Ignored event {}" , event .getType ());
182+ } else {
183+ LOG .info ("Ignored event {} for {}" , event .getType (), childData .getPath ());
167184 }
168- }
169- break ;
170- default :
171- if (childData == null ) {
172- LOG .info ("Ignored event {}" , event .getType ());
173- } else {
174- LOG .info ("Ignored event {} for {}" , event .getType (), childData .getPath ());
175- }
176185 }
177186 }
178187
179188 private boolean isEmpty (ChildData childData ) {
180189 return childData == null || childData .getData () == null || childData .getData ().length == 0 ;
181190 }
182191 }
183-
184- @ Override
185- public void close () {
186- client .close ();
187- }
188192}
0 commit comments