Commit a2801f0e by Madhan Neethiraj Committed by Suma Shivaprasad

ATLAS-1089: fix Storm hook to handle cyclic references in topology object

parent 9eafb165
...@@ -179,7 +179,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -179,7 +179,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
private Referenceable createDataSet(String name, String topologyOwner, private Referenceable createDataSet(String name, String topologyOwner,
Serializable instance, Serializable instance,
Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException { Map stormConf, List<Referenceable> dependentEntities) throws IllegalAccessException {
Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true); Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);
String clusterName = null; String clusterName = null;
Referenceable dataSetReferenceable; Referenceable dataSetReferenceable;
...@@ -298,7 +298,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -298,7 +298,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
stormSpout.get_spout_object().get_serialized_java(), Serializable.class); stormSpout.get_spout_object().get_serialized_java(), Serializable.class);
spoutReferenceable.set("driverClass", instance.getClass().getName()); spoutReferenceable.set("driverClass", instance.getClass().getName());
Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true); Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
spoutReferenceable.set("conf", flatConfigMap); spoutReferenceable.set("conf", flatConfigMap);
return spoutReferenceable; return spoutReferenceable;
...@@ -322,7 +322,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -322,7 +322,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
stormBolt.get_bolt_object().get_serialized_java(), Serializable.class); stormBolt.get_bolt_object().get_serialized_java(), Serializable.class);
boltReferenceable.set("driverClass", instance.getClass().getName()); boltReferenceable.set("driverClass", instance.getClass().getName());
Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true); Map<String, String> flatConfigMap = StormTopologyUtil.getFieldValues(instance, true, null);
boltReferenceable.set("conf", flatConfigMap); boltReferenceable.set("conf", flatConfigMap);
return boltReferenceable; return boltReferenceable;
......
...@@ -127,10 +127,17 @@ public final class StormTopologyUtil { ...@@ -127,10 +127,17 @@ public final class StormTopologyUtil {
} }
public static Map<String, String> getFieldValues(Object instance, public static Map<String, String> getFieldValues(Object instance,
boolean prependClassName) boolean prependClassName,
Set<Object> objectsToSkip)
throws IllegalAccessException { throws IllegalAccessException {
Class clazz = instance.getClass(); if (objectsToSkip == null) {
objectsToSkip = new HashSet<Object>();
}
Map<String, String> output = new HashMap<>(); Map<String, String> output = new HashMap<>();
if (objectsToSkip.add(instance)) {
Class clazz = instance.getClass();
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields(); Field[] fields = c.getDeclaredFields();
for (Field field : fields) { for (Field field : fields) {
...@@ -164,8 +171,8 @@ public final class StormTopologyUtil { ...@@ -164,8 +171,8 @@ public final class StormTopologyUtil {
Object mapKey = ((Map.Entry) entry).getKey(); Object mapKey = ((Map.Entry) entry).getKey();
Object mapVal = ((Map.Entry) entry).getValue(); Object mapVal = ((Map.Entry) entry).getValue();
String keyStr = getString(mapKey, false); String keyStr = getString(mapKey, false, objectsToSkip);
String valStr = getString(mapVal, false); String valStr = getString(mapVal, false, objectsToSkip);
if ((valStr == null) || (valStr.isEmpty())) { if ((valStr == null) || (valStr.isEmpty())) {
continue; continue;
} else { } else {
...@@ -176,17 +183,17 @@ public final class StormTopologyUtil { ...@@ -176,17 +183,17 @@ public final class StormTopologyUtil {
//TODO check if it makes more sense to just stick to //TODO check if it makes more sense to just stick to
// json like structure instead of a flatten output. // json like structure instead of a flatten output.
Collection collection = (Collection) fieldVal; Collection collection = (Collection) fieldVal;
if (collection.size()==0) continue; if (collection.size() == 0) continue;
String outStr = ""; String outStr = "";
for (Object o : collection) { for (Object o : collection) {
outStr += getString(o, false) + ","; outStr += getString(o, false, objectsToSkip) + ",";
} }
if (outStr.length() > 0) { if (outStr.length() > 0) {
outStr = outStr.substring(0, outStr.length() - 1); outStr = outStr.substring(0, outStr.length() - 1);
} }
output.put(key, String.format("%s", outStr)); output.put(key, String.format("%s", outStr));
} else { } else {
Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false); Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false, objectsToSkip);
for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) { for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue()); output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
} }
...@@ -196,17 +203,19 @@ public final class StormTopologyUtil { ...@@ -196,17 +203,19 @@ public final class StormTopologyUtil {
} }
} }
} }
}
return output; return output;
} }
private static String getString(Object instance, private static String getString(Object instance,
boolean wrapWithQuote) throws IllegalAccessException { boolean wrapWithQuote,
Set<Object> objectsToSkip) throws IllegalAccessException {
if (instance == null) { if (instance == null) {
return null; return null;
} else if (instance.getClass().isPrimitive() || isWrapperType(instance.getClass())) { } else if (instance.getClass().isPrimitive() || isWrapperType(instance.getClass())) {
return toString(instance, wrapWithQuote); return toString(instance, wrapWithQuote);
} else { } else {
return getString(getFieldValues(instance, false), wrapWithQuote); return getString(getFieldValues(instance, false, objectsToSkip), wrapWithQuote);
} }
} }
......
...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ...@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES: ALL CHANGES:
ATLAS-1089 Storm hook should handle cyclic references in topology object (mneethiraj via sumasai)
ATLAS-1086 Build failure in hive-bridge after security fixes in ATLAS-762 (sumasai) ATLAS-1086 Build failure in hive-bridge after security fixes in ATLAS-762 (sumasai)
ATLAS-1088 Fix /search api to default to fulltext on dsl failure (sumasai) ATLAS-1088 Fix /search api to default to fulltext on dsl failure (sumasai)
ATLAS-762 Assertion in NegativeSSLAndKerberosTest.testUnsecuredClient needs to be hardened (nixonrodrigues via sumasai) ATLAS-762 Assertion in NegativeSSLAndKerberosTest.testUnsecuredClient needs to be hardened (nixonrodrigues via sumasai)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment