Commit 66f57da8 by Sarath Subramanian

ATLAS-3113: Use index query to search for active entities and better logging in…

ATLAS-3113: Use index query to search for active entities and better logging in java patch framework
parent 2b29e4a4
...@@ -22,6 +22,7 @@ import org.apache.atlas.repository.IndexException; ...@@ -22,6 +22,7 @@ import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.AtlasCardinality; import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
...@@ -56,7 +57,7 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler { ...@@ -56,7 +57,7 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
for (AtlasEntityType entityType : allEntityTypes) { for (AtlasEntityType entityType : allEntityTypes) {
String typeName = entityType.getTypeName(); String typeName = entityType.getTypeName();
Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes(); Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes();
int entitiesProcessed = 0; int patchAppliedCount = 0;
LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName); LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName);
...@@ -67,12 +68,16 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler { ...@@ -67,12 +68,16 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
// register unique attribute property keys in graph // register unique attribute property keys in graph
registerUniqueAttrPropertyKeys(attributes); registerUniqueAttrPropertyKeys(attributes);
Iterator<AtlasVertex> iterator = findActiveEntityVerticesByType(typeName); Iterator<Result<Object, Object>> iterator = findActiveEntityVerticesByType(typeName);
while (iterator.hasNext()) { int entityCount = 0;
AtlasVertex entityVertex = iterator.next();
while (iterator != null && iterator.hasNext()) {
AtlasVertex entityVertex = iterator.next().getVertex();
boolean patchApplied = false; boolean patchApplied = false;
entityCount++;
for (AtlasAttribute attribute : attributes) { for (AtlasAttribute attribute : attributes) {
String uniquePropertyKey = attribute.getVertexUniquePropertyName(); String uniquePropertyKey = attribute.getVertexUniquePropertyName();
Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys(); Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys();
...@@ -104,11 +109,11 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler { ...@@ -104,11 +109,11 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
} }
if (patchApplied) { if (patchApplied) {
entitiesProcessed++; patchAppliedCount++;
} }
if (entitiesProcessed % 1000 == 0) { if (entityCount % 1000 == 0) {
LOG.info("Java patch: {} : processed {} {} entities.", getPatchId(), entitiesProcessed, typeName); LOG.info("Java patch: {} : applied {}; processed {} {} entities.", getPatchId(), patchAppliedCount, entityCount, typeName);
} }
} }
} catch (IndexException e) { } catch (IndexException e) {
...@@ -120,7 +125,7 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler { ...@@ -120,7 +125,7 @@ public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
} }
} }
LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, entitiesProcessed); LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, patchAppliedCount);
} }
if (patchFailed) { if (patchFailed) {
......
...@@ -566,12 +566,20 @@ public class AtlasGraphUtilsV2 { ...@@ -566,12 +566,20 @@ public class AtlasGraphUtilsV2 {
return ret; return ret;
} }
public static Iterator<AtlasVertex> findActiveEntityVerticesByType(String typename) { public static Iterator<Result<Object, Object>> findActiveEntityVerticesByType(String typename) {
AtlasGraphQuery query = getGraphInstance().query() AtlasIndexQuery indexQuery = getActiveEntityIndexQuery(typename);
.has(ENTITY_TYPE_PROPERTY_KEY, typename)
.has(STATE_PROPERTY_KEY, Status.ACTIVE.name()); return indexQuery.vertices();
}
private static AtlasIndexQuery getActiveEntityIndexQuery(String typename) {
StringBuilder sb = new StringBuilder();
sb.append(INDEX_SEARCH_PREFIX + "\"").append(TYPE_NAME_PROPERTY_KEY).append("\":").append(typename)
.append(" AND ")
.append(INDEX_SEARCH_PREFIX + "\"").append(STATE_PROPERTY_KEY).append("\":").append(Status.ACTIVE.name());
return query.vertices().iterator(); return getGraphInstance().indexQuery(VERTEX_INDEX, sb.toString());
} }
public static List<String> findEntityGUIDsByType(String typename) { public static List<String> findEntityGUIDsByType(String typename) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment