Commit 6fddccd6 by Suma Shivaprasad

ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)

parent ab95c1a7
......@@ -36,6 +36,7 @@ import java.util.Set;
* A storm topology utility class.
*/
public final class StormTopologyUtil {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(StormTopologyUtil.class);
private StormTopologyUtil() {
}
......@@ -136,74 +137,79 @@ public final class StormTopologyUtil {
Map<String, String> output = new HashMap<>();
if (objectsToSkip.add(instance)) {
Class clazz = instance.getClass();
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields();
for (Field field : fields) {
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
continue;
}
String key;
if (prependClassName) {
key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
} else {
key = field.getName();
}
try {
if (objectsToSkip.add(instance)) {
Class clazz = instance.getClass();
for (Class<?> c = clazz; c != null; c = c.getSuperclass()) {
Field[] fields = c.getDeclaredFields();
for (Field field : fields) {
if (java.lang.reflect.Modifier.isStatic(field.getModifiers())) {
continue;
}
boolean accessible = field.isAccessible();
if (!accessible) {
field.setAccessible(true);
}
Object fieldVal = field.get(instance);
if (fieldVal == null) {
continue;
} else if (fieldVal.getClass().isPrimitive() ||
isWrapperType(fieldVal.getClass())) {
if (toString(fieldVal, false).isEmpty()) continue;
output.put(key, toString(fieldVal, false));
} else if (isMapType(fieldVal.getClass())) {
//TODO: check if it makes more sense to just stick to json
// like structure instead of a flatten output.
Map map = (Map) fieldVal;
for (Object entry : map.entrySet()) {
Object mapKey = ((Map.Entry) entry).getKey();
Object mapVal = ((Map.Entry) entry).getValue();
String keyStr = getString(mapKey, false, objectsToSkip);
String valStr = getString(mapVal, false, objectsToSkip);
if ((valStr == null) || (valStr.isEmpty())) {
continue;
} else {
output.put(String.format("%s.%s", key, keyStr), valStr);
}
String key;
if (prependClassName) {
key = String.format("%s.%s", clazz.getSimpleName(), field.getName());
} else {
key = field.getName();
}
} else if (isCollectionType(fieldVal.getClass())) {
//TODO check if it makes more sense to just stick to
// json like structure instead of a flatten output.
Collection collection = (Collection) fieldVal;
if (collection.size() == 0) continue;
String outStr = "";
for (Object o : collection) {
outStr += getString(o, false, objectsToSkip) + ",";
boolean accessible = field.isAccessible();
if (!accessible) {
field.setAccessible(true);
}
if (outStr.length() > 0) {
outStr = outStr.substring(0, outStr.length() - 1);
Object fieldVal = field.get(instance);
if (fieldVal == null) {
continue;
} else if (fieldVal.getClass().isPrimitive() ||
isWrapperType(fieldVal.getClass())) {
if (toString(fieldVal, false).isEmpty()) continue;
output.put(key, toString(fieldVal, false));
} else if (isMapType(fieldVal.getClass())) {
//TODO: check if it makes more sense to just stick to json
// like structure instead of a flatten output.
Map map = (Map) fieldVal;
for (Object entry : map.entrySet()) {
Object mapKey = ((Map.Entry) entry).getKey();
Object mapVal = ((Map.Entry) entry).getValue();
String keyStr = getString(mapKey, false, objectsToSkip);
String valStr = getString(mapVal, false, objectsToSkip);
if ((valStr == null) || (valStr.isEmpty())) {
continue;
} else {
output.put(String.format("%s.%s", key, keyStr), valStr);
}
}
} else if (isCollectionType(fieldVal.getClass())) {
//TODO check if it makes more sense to just stick to
// json like structure instead of a flatten output.
Collection collection = (Collection) fieldVal;
if (collection.size() == 0) continue;
String outStr = "";
for (Object o : collection) {
outStr += getString(o, false, objectsToSkip) + ",";
}
if (outStr.length() > 0) {
outStr = outStr.substring(0, outStr.length() - 1);
}
output.put(key, String.format("%s", outStr));
} else {
Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false, objectsToSkip);
for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
}
}
output.put(key, String.format("%s", outStr));
} else {
Map<String, String> nestedFieldValues = getFieldValues(fieldVal, false, objectsToSkip);
for (Map.Entry<String, String> entry : nestedFieldValues.entrySet()) {
output.put(String.format("%s.%s", key, entry.getKey()), entry.getValue());
if (!accessible) {
field.setAccessible(false);
}
}
if (!accessible) {
field.setAccessible(false);
}
}
}
}
catch (Exception e){
LOG.warn("Exception while constructing topology", e);
}
return output;
}
......
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ATLAS-1060 Add composite indexes for exact match performance improvements for all attributes (sumasai via shwethags)
ALL CHANGES:
ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai)
ATLAS-1119 Add retries for edge label creation (sumasai via shwethags)
ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai)
ATLAS-1115 Show Tag / Taxonomy Listing in sorted order (Kalyanikashikar via sumasai)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment