Commit cd15f768 by Sarath Subramanian

ATLAS-3077: Handle java patches in patch framework

parent c7e42427
......@@ -50,12 +50,12 @@ public class AtlasPatch implements Serializable {
private long updatedTime;
private PatchStatus status;
public enum PatchStatus { APPLIED, SKIPPED, FAILED, UNKNOWN }
public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED }
public AtlasPatch() { }
public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status, String updatedBy,
String createdBy, long createdTime, long updatedTime) {
public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status,
String updatedBy, String createdBy, long createdTime, long updatedTime) {
this.id = id;
this.description = patchName;
this.type = type;
......
......@@ -108,7 +108,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
private boolean recomputeIndexedKeys = true;
private Set<String> vertexIndexKeys = new HashSet<>();
private enum UniqueKind { NONE, GLOBAL_UNIQUE, PER_TYPE_UNIQUE }
public enum UniqueKind { NONE, GLOBAL_UNIQUE, PER_TYPE_UNIQUE }
@Inject
public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException {
......@@ -431,7 +431,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
return type instanceof AtlasRelationshipType;
}
private Class getPrimitiveClass(String attribTypeName) {
public Class getPrimitiveClass(String attribTypeName) {
String attributeTypeName = attribTypeName.toLowerCase();
switch (attributeTypeName) {
......@@ -461,7 +461,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
throw new IllegalArgumentException(String.format("Unknown primitive typename %s", attribTypeName));
}
private AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) {
public AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) {
switch (cardinality) {
case SINGLE:
return SINGLE;
......@@ -500,7 +500,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
return propertyKey;
}
private void createVertexIndex(AtlasGraphManagement management, String propertyName, UniqueKind uniqueKind, Class propertyClass,
public void createVertexIndex(AtlasGraphManagement management, String propertyName, UniqueKind uniqueKind, Class propertyClass,
AtlasCardinality cardinality, boolean createCompositeIndex, boolean createCompositeIndexWithTypeAndSuperTypes) {
if (propertyName != null) {
AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName);
......@@ -704,7 +704,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
return !(INDEX_EXCLUSION_CLASSES.contains(propertyClass) || cardinality.isMany());
}
private void commit(AtlasGraphManagement management) throws IndexException {
public void commit(AtlasGraphManagement management) throws IndexException {
try {
management.commit();
......@@ -715,7 +715,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
}
}
private void rollback(AtlasGraphManagement management) throws IndexException {
public void rollback(AtlasGraphManagement management) throws IndexException {
try {
management.rollback();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.MapUtils;
import java.util.Map;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser;
public abstract class AtlasJavaPatchHandler {
public final AtlasGraph graph;
public final AtlasTypeRegistry typeRegistry;
public final Map<String, PatchStatus> patchesRegistry;
public final EntityGraphRetriever entityRetriever;
public final GraphBackedSearchIndexer indexer;
public final PatchContext context;
public final String patchId;
public final String patchDescription;
private PatchStatus patchStatus;
public static final String JAVA_PATCH_TYPE = "JAVA_PATCH";
public AtlasJavaPatchHandler(PatchContext context, String patchId, String patchDescription) {
this.context = context;
this.graph = context.getGraph();
this.typeRegistry = context.getTypeRegistry();
this.indexer = context.getIndexer();
this.patchesRegistry = context.getPatchesRegistry();
this.patchId = patchId;
this.patchDescription = patchDescription;
this.patchStatus = getPatchStatus(patchesRegistry);
this.entityRetriever = new EntityGraphRetriever(typeRegistry);
init();
}
private void init() {
PatchStatus patchStatus = getPatchStatus();
if (patchStatus == UNKNOWN) {
AtlasVertex patchVertex = graph.addVertex();
setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patchDescription);
setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, JAVA_PATCH_TYPE);
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, getPatchStatus().toString());
setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, CREATED_BY_KEY, getCurrentUser());
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
graph.commit();
addToPatchesRegistry(patchId, getPatchStatus());
}
}
private PatchStatus getPatchStatus(Map<String, PatchStatus> patchesRegistry) {
PatchStatus ret = UNKNOWN;
if (MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId)) {
ret = patchesRegistry.get(patchId);
}
return ret;
}
public void updatePatchVertex(PatchStatus patchStatus) {
AtlasVertex patchVertex = findByPatchId(patchId);
if (patchVertex != null) {
setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString());
setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser());
graph.commit();
addToPatchesRegistry(getPatchId(), getPatchStatus());
}
}
public PatchStatus getPatchStatus() {
return patchStatus;
}
public void addToPatchesRegistry(String patchId, PatchStatus status) {
getPatchesRegistry().put(patchId, status);
}
public void setPatchStatus(PatchStatus patchStatus) {
this.patchStatus = patchStatus;
}
public String getPatchId() {
return patchId;
}
public Map<String, PatchStatus> getPatchesRegistry() {
return patchesRegistry;
}
public abstract void applyPatch();
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.type.AtlasTypeRegistry;
import java.util.Map;
/**
* Patch context for typedef and java patches.
*/
public class PatchContext {
private final AtlasGraph graph;
private final AtlasTypeRegistry typeRegistry;
private final GraphBackedSearchIndexer indexer;
private final Map<String, PatchStatus> patchesRegistry;
public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer,
Map<String, PatchStatus> patchesRegistry) {
this.graph = graph;
this.typeRegistry = typeRegistry;
this.indexer = indexer;
this.patchesRegistry = patchesRegistry;
}
public AtlasGraph getGraph() {
return graph;
}
public AtlasTypeRegistry getTypeRegistry() {
return typeRegistry;
}
public GraphBackedSearchIndexer getIndexer() {
return indexer;
}
public Map<String, PatchStatus> getPatchesRegistry() {
return patchesRegistry;
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.patches;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED;
import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findActiveEntityVerticesByType;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler {
private static final String PATCH_ID = "JAVA_PATCH_0000_001";
private static final String PATCH_DESCRIPTION = "Add new vertex property for each unique attribute of active entities";
private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatchHandler.class);
public UniqueAttributePatchHandler(PatchContext context) {
super(context, PATCH_ID, PATCH_DESCRIPTION);
}
@Override
public void applyPatch() {
Collection<AtlasEntityType> allEntityTypes = typeRegistry.getAllEntityTypes();
boolean patchFailed = false;
for (AtlasEntityType entityType : allEntityTypes) {
String typeName = entityType.getTypeName();
Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes();
int entitiesProcessed = 0;
LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName);
if (MapUtils.isNotEmpty(uniqAttributes)) {
Collection<AtlasAttribute> attributes = uniqAttributes.values();
try {
// register unique attribute property keys in graph
registerUniqueAttrPropertyKeys(attributes);
Iterator<AtlasVertex> iterator = findActiveEntityVerticesByType(typeName);
while (iterator.hasNext()) {
AtlasVertex entityVertex = iterator.next();
boolean patchApplied = false;
for (AtlasAttribute attribute : attributes) {
String uniquePropertyKey = attribute.getVertexUniquePropertyName();
Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys();
if (!propertyKeys.contains(uniquePropertyKey)) {
String propertyKey = attribute.getVertexPropertyName();
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
Object uniqAttrValue = entityRetriever.mapVertexToPrimitive(entityVertex, propertyKey, attributeDef);
// add the unique attribute property to vertex
setEncodedProperty(entityVertex, uniquePropertyKey, uniqAttrValue);
try {
graph.commit();
patchApplied = true;
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Added unique attribute property: {} to entity: {} ({})",
PATCH_ID, uniquePropertyKey, getGuid(entityVertex), typeName);
}
} catch (Throwable t) {
LOG.warn("Java patch ({}): failed to update entity guid: {}; typeName: {}; attrName: {}; attrValue: {}",
getPatchId(), getGuid(entityVertex), typeName, attribute.getName(), uniqAttrValue);
continue;
}
}
}
if (patchApplied) {
entitiesProcessed++;
}
if (entitiesProcessed % 1000 == 0) {
LOG.info("Java patch: {} : processed {} {} entities.", getPatchId(), entitiesProcessed, typeName);
}
}
} catch (IndexException e) {
LOG.error("Java patch: {} failed! error: {}", getPatchId(), e);
patchFailed = true;
break;
}
}
LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, entitiesProcessed);
}
if (patchFailed) {
setPatchStatus(FAILED);
} else {
setPatchStatus(APPLIED);
}
LOG.info("Applied java patch: {}; status: {}", getPatchId(), getPatchStatus());
updatePatchVertex(getPatchStatus());
}
private void registerUniqueAttrPropertyKeys(Collection<AtlasAttribute> attributes) throws IndexException {
AtlasGraphManagement management = graph.getManagementSystem();
boolean idxCreated = false;
for (AtlasAttribute attribute : attributes) {
String uniquePropertyName = attribute.getVertexUniquePropertyName();
boolean uniquePropertyNameExists = management.getPropertyKey(uniquePropertyName) != null;
if (!uniquePropertyNameExists) {
AtlasAttributeDef attributeDef = attribute.getAttributeDef();
boolean isIndexable = attributeDef.getIsIndexable();
String attribTypeName = attributeDef.getTypeName();
Class propertyClass = indexer.getPrimitiveClass(attribTypeName);
AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality());
indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.NONE, propertyClass, cardinality, isIndexable, true);
idxCreated = true;
}
}
//Commit indexes
if (idxCreated) {
indexer.commit(management);
}
}
}
\ No newline at end of file
......@@ -42,8 +42,12 @@ import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.patches.AtlasJavaPatchHandler;
import org.apache.atlas.repository.patches.PatchContext;
import org.apache.atlas.repository.patches.UniqueAttributePatchHandler;
import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
......@@ -90,7 +94,7 @@ import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.initPatchesRegistry;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getPatchesRegistry;
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
/**
......@@ -105,18 +109,20 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds";
public static final String TYPEDEF_PATCH_TYPE = "TYPEDEF_PATCH";
private final AtlasTypeDefStore atlasTypeDefStore;
private final AtlasTypeRegistry atlasTypeRegistry;
private final AtlasGraph atlasGraph;
private final AtlasTypeDefStore typeDefStore;
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph graph;
private final Configuration conf;
private final GraphBackedSearchIndexer indexer;
@Inject
public AtlasTypeDefStoreInitializer(AtlasTypeDefStore atlasTypeDefStore, AtlasTypeRegistry atlasTypeRegistry,
AtlasGraph atlasGraph, Configuration conf) {
this.atlasTypeDefStore = atlasTypeDefStore;
this.atlasTypeRegistry = atlasTypeRegistry;
this.atlasGraph = atlasGraph;
public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
AtlasGraph graph, Configuration conf, GraphBackedSearchIndexer indexer) {
this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry;
this.graph = graph;
this.conf = conf;
this.indexer = indexer;
}
@PostConstruct
......@@ -124,7 +130,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
LOG.info("==> AtlasTypeDefStoreInitializer.init()");
if (!HAConfiguration.isHAEnabled(conf)) {
atlasTypeDefStore.init();
typeDefStore.init();
loadBootstrapTypeDefs();
try {
......@@ -151,6 +157,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
String atlasHomeDir = System.getProperty("atlas.home");
String modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models";
PatchContext patchContext = initPatchContext();
if (modelsDirName == null || modelsDirName.length() == 0) {
LOG.info("Types directory {} does not exist or not readable or has no typedef files", modelsDirName);
......@@ -158,7 +165,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
// look for folders we need to load models from
File topModeltypesDir = new File(modelsDirName);
File[] modelsDirContents = topModeltypesDir.exists() ? topModeltypesDir.listFiles() : null;
Map<String, PatchStatus> patchesRegistry = initPatchesRegistry();
if (modelsDirContents != null && modelsDirContents.length > 0) {
Arrays.sort(modelsDirContents);
......@@ -169,23 +176,49 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
continue;
} else if (!folder.getName().equals(PATCHES_FOLDER_NAME)){
// load the models alphabetically in the subfolders apart from patches
loadModelsInFolder(folder, patchesRegistry);
loadModelsInFolder(folder, patchContext);
}
}
}
// load any files in the top models folder and any associated patches.
loadModelsInFolder(topModeltypesDir, patchesRegistry);
loadModelsInFolder(topModeltypesDir, patchContext);
}
// apply java patches
applyJavaPatches(patchContext);
LOG.info("<== AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()");
}
private void applyJavaPatches(PatchContext context) {
// register java patches
AtlasJavaPatchHandler[] patches = new AtlasJavaPatchHandler[] { new UniqueAttributePatchHandler(context) };
// apply java patches
for (AtlasJavaPatchHandler patch : patches) {
PatchStatus patchStatus = patch.getPatchStatus();
if (patchStatus == APPLIED || patchStatus == SKIPPED) {
LOG.info("Ignoring java patch: {}; status: {}", patch.getPatchId(), patchStatus);
} else {
LOG.info("Applying java patch: {}; status: {}", patch.getPatchId(), patchStatus);
patch.applyPatch();
}
}
}
public PatchContext initPatchContext() {
return new PatchContext(graph, typeRegistry, indexer, getPatchesRegistry());
}
/**
* Load all the model files in the supplied folder followed by the contents of the patches folder.
* @param typesDir
* @param patchesRegistry
* @param context
*/
private void loadModelsInFolder(File typesDir, Map<String, PatchStatus> patchesRegistry) {
private void loadModelsInFolder(File typesDir, PatchContext context) {
LOG.info("==> AtlasTypeDefStoreInitializer({})", typesDir);
String typesDirName = typesDir.getName();
......@@ -210,11 +243,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
continue;
}
AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, atlasTypeRegistry);
AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, atlasTypeRegistry, true);
AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, typeRegistry);
AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, typeRegistry, true);
if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) {
atlasTypeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate);
typeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate);
LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath());
} else {
......@@ -227,7 +260,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
}
}
applyTypePatches(typesDir.getPath(), patchesRegistry);
applyTypePatches(typesDir.getPath(), context);
}
LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir);
}
......@@ -367,7 +400,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
LOG.info("==> AtlasTypeDefStoreInitializer.instanceIsActive()");
try {
atlasTypeDefStore.init();
typeDefStore.init();
loadBootstrapTypeDefs();
......@@ -425,10 +458,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
return ret;
}
private void applyTypePatches(String typesDirName, Map<String, PatchStatus> patchesRegistry) {
private void applyTypePatches(String typesDirName, PatchContext context) {
String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME;
File typePatchesDir = new File(typePatchesDirName);
File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null;
Map<String, PatchStatus> patchesRegistry = context.getPatchesRegistry();
if (typePatchFiles == null || typePatchFiles.length == 0) {
LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName);
......@@ -439,11 +473,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
Arrays.sort(typePatchFiles);
PatchHandler[] patchHandlers = new PatchHandler[] {
new AddAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new UpdateAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
new SetServiceTypePatchHandler(atlasTypeDefStore, atlasTypeRegistry)
new AddAttributePatchHandler(typeDefStore, typeRegistry),
new UpdateAttributePatchHandler(typeDefStore, typeRegistry),
new RemoveLegacyRefAttributesPatchHandler(typeDefStore, typeRegistry),
new UpdateTypeDefOptionsPatchHandler(typeDefStore, typeRegistry),
new SetServiceTypePatchHandler(typeDefStore, typeRegistry)
};
Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
......@@ -534,7 +568,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
private void createOrUpdatePatchVertex(TypeDefPatch patch, PatchStatus patchStatus, Map<String, PatchStatus> patchesRegistry) {
String patchId = patch.getId();
boolean isPatchRegistered = MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId);
AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : atlasGraph.addVertex();
AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex();
setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId);
setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patch.getDescription());
......
......@@ -23,11 +23,11 @@ import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.SortOrder;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.discovery.SearchProcessor;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.Status;
import org.apache.atlas.model.patches.AtlasPatch;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
......@@ -65,6 +65,7 @@ import java.util.Set;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT;
import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
......@@ -74,8 +75,11 @@ import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance;
import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*;
/**
......@@ -133,7 +137,7 @@ public class AtlasGraphUtilsV2 {
}
public static String getTypeName(AtlasElement element) {
return element.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
return element.getProperty(ENTITY_TYPE_PROPERTY_KEY, String.class);
}
public static String getEdgeLabel(String fromNode, String toNode) {
......@@ -341,7 +345,7 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findByPatchId(String patchId) {
AtlasVertex ret = null;
String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : ("+ patchId +")";
Iterator<Result<Object, Object>> results = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices();
while (results != null && results.hasNext()) {
ret = results.next().getVertex();
......@@ -358,7 +362,7 @@ public class AtlasGraphUtilsV2 {
AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid);
if (ret == null) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
AtlasGraphQuery query = getGraphInstance().query()
.has(Constants.GUID_PROPERTY_KEY, guid);
Iterator<AtlasVertex> results = query.vertices().iterator();
......@@ -386,9 +390,9 @@ public class AtlasGraphUtilsV2 {
}
public static boolean typeHasInstanceVertex(String typeName) throws AtlasBaseException {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance()
AtlasGraphQuery query = getGraphInstance()
.query()
.has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
.has(TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
Iterator<AtlasVertex> results = query.vertices().iterator();
......@@ -404,8 +408,8 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findByTypeAndUniquePropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndUniquePropertyName");
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
AtlasGraphQuery query = getGraphInstance().query()
.has(ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(propertyName, attrVal);
Iterator<AtlasVertex> results = query.vertices().iterator();
......@@ -420,7 +424,7 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findBySuperTypeAndUniquePropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndUniquePropertyName");
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
AtlasGraphQuery query = getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(propertyName, attrVal);
......@@ -436,10 +440,10 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndPropertyName");
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName)
AtlasGraphQuery query = getGraphInstance().query()
.has(ENTITY_TYPE_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
.has(STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
......@@ -453,10 +457,10 @@ public class AtlasGraphUtilsV2 {
public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) {
MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndPropertyName");
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
AtlasGraphQuery query = getGraphInstance().query()
.has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName)
.has(propertyName, attrVal)
.has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
.has(STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name());
Iterator<AtlasVertex> results = query.vertices().iterator();
......@@ -467,9 +471,9 @@ public class AtlasGraphUtilsV2 {
return vertex;
}
public static Map<String, PatchStatus> initPatchesRegistry() {
public static Map<String, PatchStatus> getPatchesRegistry() {
Map<String, PatchStatus> ret = new HashMap<>();
AtlasPatches patches = getPatches();
AtlasPatches patches = getAllPatches();
for (AtlasPatch patch : patches.getPatches()) {
String patchId = patch.getId();
......@@ -483,7 +487,7 @@ public class AtlasGraphUtilsV2 {
return ret;
}
public static AtlasPatches getPatches() {
public static AtlasPatches getAllPatches() {
List<AtlasPatch> ret = new ArrayList<>();
String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)";
AtlasIndexQuery idxQuery = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, idxQueryString);
......@@ -534,8 +538,8 @@ public class AtlasGraphUtilsV2 {
}
public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query()
.has(Constants.ENTITY_TYPE_PROPERTY_KEY, typename);
AtlasGraphQuery query = getGraphInstance().query()
.has(ENTITY_TYPE_PROPERTY_KEY, typename);
if (sortOrder != null) {
AtlasGraphQuery.SortOrder qrySortOrder = sortOrder == SortOrder.ASCENDING ? ASC : DESC;
query.orderBy(Constants.QUALIFIED_NAME, qrySortOrder);
......@@ -555,14 +559,22 @@ public class AtlasGraphUtilsV2 {
return ret;
}
public static Iterator<AtlasVertex> findActiveEntityVerticesByType(String typename) {
AtlasGraphQuery query = getGraphInstance().query()
.has(ENTITY_TYPE_PROPERTY_KEY, typename)
.has(STATE_PROPERTY_KEY, Status.ACTIVE.name());
return query.vertices().iterator();
}
public static List<String> findEntityGUIDsByType(String typename) {
return findEntityGUIDsByType(typename, null);
}
public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException {
AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance()
AtlasGraphQuery query = getGraphInstance()
.query()
.has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
.has(TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName);
Iterator<AtlasEdge> results = query.edges().iterator();
......@@ -626,7 +638,7 @@ public class AtlasGraphUtilsV2 {
}
public static String getStateAsString(AtlasElement element) {
return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class);
return element.getProperty(STATE_PROPERTY_KEY, String.class);
}
private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) {
......@@ -638,7 +650,7 @@ public class AtlasGraphUtilsV2 {
ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES;
if (ret) {
Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys();
Set<String> indexSet = getGraphInstance().getVertexIndexKeys();
try {
ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName));
}
......@@ -693,13 +705,13 @@ public class AtlasGraphUtilsV2 {
private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) {
StringBuilder sb = new StringBuilder();
sb.append(INDEX_SEARCH_PREFIX + "\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
sb.append(INDEX_SEARCH_PREFIX + "\"").append(TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr())
.append(" AND ")
.append(INDEX_SEARCH_PREFIX + "\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value))
.append(" AND ")
.append(INDEX_SEARCH_PREFIX + "\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE");
.append(INDEX_SEARCH_PREFIX + "\"").append(STATE_PROPERTY_KEY).append("\":ACTIVE");
return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
return getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString());
}
public static String getIndexSearchPrefix() {
......
......@@ -564,7 +564,7 @@ public class AdminResource {
LOG.debug("==> AdminResource.getAtlasPatches()");
}
AtlasPatches ret = AtlasGraphUtilsV2.getPatches();
AtlasPatches ret = AtlasGraphUtilsV2.getAllPatches();
if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.getAtlasPatches()");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment