Skip to content

Commit ec85cc7

Browse files
authored
Merge branch 'apache:master' into ATLAS-5074
2 parents 540762b + 14073c4 commit ec85cc7

File tree

17 files changed

+238
-204
lines changed

17 files changed

+238
-204
lines changed

addons/sqoop-bridge-shim/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818

1919
package org.apache.atlas.sqoop.hook;
20-
21-
2220
import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
2321
import org.apache.sqoop.SqoopJobDataPublisher;
2422
import org.slf4j.Logger;
@@ -33,8 +31,8 @@ public class SqoopHook extends SqoopJobDataPublisher {
3331
private static final String ATLAS_PLUGIN_TYPE = "sqoop";
3432
private static final String ATLAS_SQOOP_HOOK_IMPL_CLASSNAME = "org.apache.atlas.sqoop.hook.SqoopHook";
3533

36-
private AtlasPluginClassLoader atlasPluginClassLoader = null;
37-
private SqoopJobDataPublisher sqoopHookImpl = null;
34+
private AtlasPluginClassLoader atlasPluginClassLoader;
35+
private SqoopJobDataPublisher sqoopHookImpl;
3836

3937
public SqoopHook() {
4038
this.initialize();

addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818

1919
package org.apache.atlas.sqoop.hook;
20-
21-
2220
import org.apache.atlas.ApplicationProperties;
2321
import org.apache.atlas.AtlasClient;
2422
import org.apache.atlas.AtlasConstants;
@@ -28,9 +26,9 @@
2826
import org.apache.atlas.hook.AtlasHookException;
2927
import org.apache.atlas.model.instance.AtlasEntity;
3028
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
29+
import org.apache.atlas.model.instance.AtlasObjectId;
3130
import org.apache.atlas.model.notification.HookNotification;
3231
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
33-
import org.apache.atlas.model.instance.AtlasObjectId;
3432
import org.apache.atlas.sqoop.model.SqoopDataTypes;
3533
import org.apache.atlas.type.AtlasTypeUtil;
3634
import org.apache.atlas.utils.AtlasConfigurationUtil;
@@ -42,11 +40,11 @@
4240
import org.slf4j.LoggerFactory;
4341

4442
import java.util.Collections;
45-
import java.util.Map;
43+
import java.util.Date;
4644
import java.util.HashMap;
47-
import java.util.Properties;
4845
import java.util.List;
49-
import java.util.Date;
46+
import java.util.Map;
47+
import java.util.Properties;
5048

5149
import static org.apache.atlas.repository.Constants.SQOOP_SOURCE;
5250

@@ -98,7 +96,6 @@ public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
9896
AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
9997
AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, metadataNamespace);
10098

101-
10299
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
103100

104101
entities.addReferredEntity(entDbStore);
@@ -110,9 +107,8 @@ public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
110107
HookNotification message = new EntityCreateRequestV2(AtlasHook.getUser(), entities);
111108

112109
atlasHook.sendNotification(message);
113-
} catch(Exception e) {
110+
} catch (Exception e) {
114111
LOG.error("SqoopHook.publish() failed", e);
115-
116112
throw new AtlasHookException("SqoopHook.publish() failed.", e);
117113
}
118114
}
@@ -134,12 +130,11 @@ private AtlasEntity toHiveDatabaseEntity(String metadataNamespace, String dbName
134130

135131
private AtlasEntity toHiveTableEntity(AtlasEntity entHiveDb, String tableName) {
136132
AtlasEntity entHiveTable = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
137-
String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
133+
String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String) entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String) entHiveDb.getAttribute(AtlasClient.NAME), tableName);
138134

139135
entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
140136
entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
141137
entHiveTable.setRelationshipAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasRelatedObjectId(entHiveDb, RELATIONSHIP_HIVE_TABLE_DB));
142-
143138
return entHiveTable;
144139
}
145140

@@ -177,13 +172,12 @@ private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity ent
177172
Properties options = data.getOptions();
178173

179174
for (Object k : options.keySet()) {
180-
sqoopOptionsMap.put((String)k, (String) options.get(k));
175+
sqoopOptionsMap.put((String) k, (String) options.get(k));
181176
}
182177

183178
entProcess.setAttribute(AtlasClient.NAME, sqoopProcessName);
184179
entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
185180
entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
186-
187181
List<AtlasObjectId> sqoopObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entDbStore));
188182
List<AtlasObjectId> hiveObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb));
189183

@@ -246,7 +240,6 @@ static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) {
246240
}
247241

248242
private static class AtlasHookImpl extends AtlasHook {
249-
250243
public String getMessageSource() {
251244
return SQOOP_SOURCE;
252245
}

addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataTypes.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222
* Hive Data Types for model and bridge.
2323
*/
2424
public enum SqoopDataTypes {
25-
2625
// Classes
2726
SQOOP_DBDATASTORE,
28-
SQOOP_PROCESS,
29-
;
27+
SQOOP_PROCESS;
3028

3129
public String getName() {
3230
return name().toLowerCase();

addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void setUp() throws Exception {
4949
//Set-up sqoop session
5050
Configuration configuration = ApplicationProperties.get();
5151
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
52-
atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[]{"admin", "admin"});
52+
atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[] {"admin", "admin"});
5353
} else {
5454
atlasClient = new AtlasClient(configuration.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
5555
}
@@ -132,7 +132,7 @@ protected void waitFor(int timeout, Predicate predicate) throws Exception {
132132
try {
133133
predicate.evaluate();
134134
return;
135-
} catch(Error | Exception e) {
135+
} catch (Error | Exception e) {
136136
if (System.currentTimeMillis() >= mustEnd) {
137137
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
138138
}

addons/storm-bridge-shim/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.atlas.storm.hook;
2020

21-
2221
import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader;
2322
import org.apache.storm.ISubmitterHook;
2423
import org.apache.storm.generated.StormTopology;
@@ -33,26 +32,22 @@
3332
*/
3433
public class StormAtlasHook implements ISubmitterHook {
3534
private static final Logger LOG = LoggerFactory.getLogger(StormAtlasHook.class);
36-
37-
3835
private static final String ATLAS_PLUGIN_TYPE = "storm";
3936
private static final String ATLAS_STORM_HOOK_IMPL_CLASSNAME = "org.apache.atlas.storm.hook.StormAtlasHook";
4037

41-
private AtlasPluginClassLoader atlasPluginClassLoader = null;
42-
private ISubmitterHook stormHook = null;
43-
38+
private AtlasPluginClassLoader atlasPluginClassLoader;
39+
private ISubmitterHook stormHook;
4440

4541
public StormAtlasHook() {
4642
this.initialize();
4743
}
4844

4945
@Override
5046
public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology stormTopology)
51-
throws IllegalAccessException {
47+
throws IllegalAccessException {
5248
if (LOG.isDebugEnabled()) {
5349
LOG.debug("==> StormAtlasHook.notify({}, {}, {})", topologyInfo, stormConf, stormTopology);
5450
}
55-
5651
try {
5752
activatePluginClassLoader();
5853
stormHook.notify(topologyInfo, stormConf, stormTopology);

addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,40 @@
1818

1919
package org.apache.atlas.storm.hook;
2020

21+
import org.apache.atlas.AtlasClient;
22+
import org.apache.atlas.AtlasConstants;
23+
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
24+
import org.apache.atlas.hook.AtlasHook;
2125
import org.apache.atlas.model.instance.AtlasEntity;
2226
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
2327
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
2428
import org.apache.atlas.model.notification.HookNotification;
2529
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
30+
import org.apache.atlas.storm.model.StormDataTypes;
2631
import org.apache.atlas.type.AtlasTypeUtil;
2732
import org.apache.atlas.utils.HdfsNameServiceResolver;
2833
import org.apache.commons.collections.CollectionUtils;
34+
import org.apache.commons.lang.StringUtils;
35+
import org.apache.hadoop.conf.Configuration;
36+
import org.apache.hadoop.fs.Path;
37+
import org.apache.hadoop.hbase.HBaseConfiguration;
38+
import org.apache.hadoop.hive.conf.HiveConf;
2939
import org.apache.storm.ISubmitterHook;
3040
import org.apache.storm.generated.Bolt;
3141
import org.apache.storm.generated.SpoutSpec;
3242
import org.apache.storm.generated.StormTopology;
3343
import org.apache.storm.generated.TopologyInfo;
3444
import org.apache.storm.utils.Utils;
35-
import org.apache.atlas.AtlasClient;
36-
import org.apache.atlas.AtlasConstants;
37-
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
38-
import org.apache.atlas.hook.AtlasHook;
39-
import org.apache.atlas.storm.model.StormDataTypes;
40-
import org.apache.commons.lang.StringUtils;
41-
import org.apache.hadoop.conf.Configuration;
42-
import org.apache.hadoop.fs.Path;
43-
import org.apache.hadoop.hbase.HBaseConfiguration;
44-
import org.apache.hadoop.hive.conf.HiveConf;
4545
import org.slf4j.Logger;
4646

4747
import java.io.Serializable;
4848
import java.util.ArrayList;
4949
import java.util.Collections;
50+
import java.util.Date;
5051
import java.util.HashMap;
5152
import java.util.List;
5253
import java.util.Map;
5354
import java.util.Set;
54-
import java.util.Date;
5555

5656
import static org.apache.atlas.repository.Constants.STORM_SOURCE;
5757

@@ -264,7 +264,7 @@ private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Seriali
264264
final String dbName = config.get("HiveBolt.options.databaseName");
265265
final String tblName = config.get("HiveBolt.options.tableName");
266266

267-
if (dbName == null || tblName ==null) {
267+
if (dbName == null || tblName == null) {
268268
LOG.error("Hive database or table name not found");
269269
} else {
270270
AtlasEntity dbEntity = new AtlasEntity("hive_db");
@@ -413,4 +413,4 @@ private String extractComponentMetadataNamespace(Configuration configuration, Ma
413413
public String getMessageSource() {
414414
return STORM_SOURCE;
415415
}
416-
}
416+
}

addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormTopologyUtil.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818

1919
package org.apache.atlas.storm.hook;
2020

21+
import com.google.common.base.Joiner;
2122
import org.apache.commons.lang.StringUtils;
2223
import org.apache.storm.generated.Bolt;
2324
import org.apache.storm.generated.GlobalStreamId;
2425
import org.apache.storm.generated.Grouping;
2526
import org.apache.storm.generated.StormTopology;
26-
import com.google.common.base.Joiner;
2727
import org.slf4j.Logger;
2828

2929
import java.lang.reflect.Field;
@@ -94,25 +94,26 @@ public static Map<String, Set<String>> getAdjacencyMap(StormTopology topology,
9494
public static Set<String> removeSystemComponents(Set<String> components) {
9595
Set<String> userComponents = new HashSet<>();
9696
for (String component : components) {
97-
if (!isSystemComponent(component))
97+
if (!isSystemComponent(component)) {
9898
userComponents.add(component);
99+
}
99100
}
100101

101102
return userComponents;
102103
}
103104

104105
private static final Set<Class> WRAPPER_TYPES = new HashSet<Class>() {{
105-
add(Boolean.class);
106-
add(Character.class);
107-
add(Byte.class);
108-
add(Short.class);
109-
add(Integer.class);
110-
add(Long.class);
111-
add(Float.class);
112-
add(Double.class);
113-
add(Void.class);
114-
add(String.class);
115-
}};
106+
add(Boolean.class);
107+
add(Character.class);
108+
add(Byte.class);
109+
add(Short.class);
110+
add(Integer.class);
111+
add(Long.class);
112+
add(Float.class);
113+
add(Double.class);
114+
add(Void.class);
115+
add(String.class);
116+
}};
116117

117118
public static boolean isWrapperType(Class clazz) {
118119
return WRAPPER_TYPES.contains(clazz);
@@ -161,7 +162,9 @@ public static Map<String, String> getFieldValues(Object instance,
161162
continue;
162163
} else if (fieldVal.getClass().isPrimitive() ||
163164
isWrapperType(fieldVal.getClass())) {
164-
if (toString(fieldVal, false).isEmpty()) continue;
165+
if (toString(fieldVal, false).isEmpty()) {
166+
continue;
167+
}
165168
output.put(key, toString(fieldVal, false));
166169
} else if (isMapType(fieldVal.getClass())) {
167170
//TODO: check if it makes more sense to just stick to json
@@ -181,7 +184,9 @@ public static Map<String, String> getFieldValues(Object instance,
181184
//TODO check if it makes more sense to just stick to
182185
// json like structure instead of a flatten output.
183186
Collection collection = (Collection) fieldVal;
184-
if (collection.size() == 0) continue;
187+
if (collection.size() == 0) {
188+
continue;
189+
}
185190
String outStr = "";
186191
for (Object o : collection) {
187192
outStr += getString(o, false, objectsToSkip) + ",";
@@ -203,7 +208,7 @@ public static Map<String, String> getFieldValues(Object instance,
203208
}
204209
}
205210
}
206-
catch (Exception e){
211+
catch (Exception e) {
207212
LOG.warn("Exception while constructing topology", e);
208213
}
209214
return output;
@@ -237,12 +242,16 @@ private static String getString(Map<String, String> flattenFields, boolean wrapW
237242
}
238243

239244
private static String toString(Object instance, boolean wrapWithQuote) {
240-
if (instance instanceof String)
241-
if (wrapWithQuote)
245+
if (instance instanceof String) {
246+
if (wrapWithQuote) {
242247
return "\"" + instance + "\"";
243-
else
248+
}
249+
else {
244250
return instance.toString();
245-
else
251+
}
252+
}
253+
else {
246254
return instance.toString();
255+
}
247256
}
248257
}

addons/storm-bridge/src/main/java/org/apache/atlas/storm/model/StormDataTypes.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,11 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
1918
package org.apache.atlas.storm.model;
20-
21-
2219
/**
2320
* Storm Data Types for model and hook.
2421
*/
2522
public enum StormDataTypes {
26-
2723
// Topology Classes
2824
STORM_TOPOLOGY, // represents the topology containing the DAG
2925

@@ -34,9 +30,7 @@ public enum StormDataTypes {
3430
// Data Sets
3531
KAFKA_TOPIC, // kafka data set
3632
JMS_TOPIC, // jms data set
37-
HBASE_TABLE, // hbase table data set
38-
;
39-
33+
HBASE_TABLE; // hbase table data set
4034
public String getName() {
4135
return name().toLowerCase();
4236
}

0 commit comments

Comments
 (0)