Commit 3ded3ee2 by Ashutosh Mestry Committed by Madhan Neethiraj

ATLAS-2460: implementation of migration-import to migrate Atlas data in Titan to JanusGraph

parent 7f5a665e
...@@ -34,4 +34,7 @@ public final class AtlasConstants { ...@@ -34,4 +34,7 @@ public final class AtlasConstants {
public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000"; public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30; public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final String DEFAULT_TYPE_VERSION = "1.0"; public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.mode.filename";
public static final String ATLAS_SERVICES_ENABLED = "atlas.services.enabled";
} }
...@@ -124,6 +124,9 @@ public final class Constants { ...@@ -124,6 +124,9 @@ public final class Constants {
public static final String CLASSIFICATION_EDGE_STATE_PROPERTY_KEY = STATE_PROPERTY_KEY; public static final String CLASSIFICATION_EDGE_STATE_PROPERTY_KEY = STATE_PROPERTY_KEY;
public static final String CLASSIFICATION_LABEL = "classifiedAs"; public static final String CLASSIFICATION_LABEL = "classifiedAs";
public static final String VERTEX_ID_IN_IMPORT_KEY = "__vIdInImport";
public static final String EDGE_ID_IN_IMPORT_KEY = "__eIdInImport";
private Constants() { private Constants() {
} }
......
...@@ -6,9 +6,9 @@ ...@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the * to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* * <p>
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* * <p>
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -19,6 +19,7 @@ package org.apache.atlas.service; ...@@ -19,6 +19,7 @@ package org.apache.atlas.service;
import org.apache.atlas.annotation.AtlasService; import org.apache.atlas.annotation.AtlasService;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;
...@@ -28,6 +29,9 @@ import javax.annotation.PreDestroy; ...@@ -28,6 +29,9 @@ import javax.annotation.PreDestroy;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.List; import java.util.List;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
import static org.apache.atlas.AtlasConstants.ATLAS_SERVICES_ENABLED;
/** /**
* Utility for starting and stopping all services. * Utility for starting and stopping all services.
*/ */
...@@ -35,45 +39,65 @@ import java.util.List; ...@@ -35,45 +39,65 @@ import java.util.List;
@Profile("!test") @Profile("!test")
public class Services { public class Services {
public static final Logger LOG = LoggerFactory.getLogger(Services.class); public static final Logger LOG = LoggerFactory.getLogger(Services.class);
private static final String DATA_MIGRATION_CLASS_NAME_DEFAULT = "DataMigrationService";
private final List<Service> services; private final List<Service> services;
private final Configuration configuration; private final String dataMigrationClassName;
private final boolean servicesEnabled;
private final boolean migrationEnabled;
@Inject @Inject
public Services(List<Service> services, Configuration configuration) { public Services(List<Service> services, Configuration configuration) {
this.services = services; this.services = services;
this.configuration = configuration; this.dataMigrationClassName = configuration.getString("atlas.migration.class.name", DATA_MIGRATION_CLASS_NAME_DEFAULT);
this.servicesEnabled = configuration.getBoolean(ATLAS_SERVICES_ENABLED, true);
this.migrationEnabled = StringUtils.isNotEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME));
} }
@PostConstruct @PostConstruct
public void start() { public void start() {
if (configuration.getBoolean("atlas.services.enabled", true)) { try {
try { for (Service svc : services) {
for (Service service : services) { if (!isServiceUsed(svc)) {
LOG.info("Starting service {}", service.getClass().getName()); continue;
service.start();
} }
} catch (Exception e) {
throw new RuntimeException(e); LOG.info("Starting service {}", svc.getClass().getName());
svc.start();
} }
} catch (Exception e) {
throw new RuntimeException(e);
} }
} }
@PreDestroy @PreDestroy
public void stop() { public void stop() {
if (configuration.getBoolean("atlas.services.enabled", true)) { for (int idx = services.size() - 1; idx >= 0; idx--) {
for (int idx = services.size() - 1; idx >= 0; idx--) { Service svc = services.get(idx);
Service service = services.get(idx); try {
if (!isServiceUsed(svc)) {
continue;
}
LOG.info("Stopping service {}", service.getClass().getName()); LOG.info("Stopping service {}", svc.getClass().getName());
try { svc.stop();
service.stop(); } catch (Throwable e) {
} catch (Throwable e) { LOG.warn("Error stopping service {}", svc.getClass().getName(), e);
LOG.warn("Error stopping service {}", service.getClass().getName(), e);
}
} }
} }
} }
private boolean isServiceUsed(Service service) {
if (isDataMigrationService(service)) {
return migrationEnabled;
} else {
return !migrationEnabled && servicesEnabled;
}
}
private boolean isDataMigrationService(Service svc) {
return svc.getClass().getSimpleName().equals(dataMigrationClassName);
}
} }
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.atlas.repository.graphdb; package org.apache.atlas.repository.graphdb;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
...@@ -317,4 +318,6 @@ public interface AtlasGraph<V, E> { ...@@ -317,4 +318,6 @@ public interface AtlasGraph<V, E> {
* @return * @return
*/ */
boolean isMultiProperty(String name); boolean isMultiProperty(String name);
void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException;
} }
...@@ -64,6 +64,7 @@ import javax.script.Bindings; ...@@ -64,6 +64,7 @@ import javax.script.Bindings;
import javax.script.ScriptEngine; import javax.script.ScriptEngine;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
...@@ -464,4 +465,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE ...@@ -464,4 +465,9 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
} }
} }
} }
@Override
public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
AtlasJanusGraphDatabase.loadLegacyGraphSON(relationshipCache, fs);
}
} }
...@@ -20,13 +20,16 @@ package org.apache.atlas.repository.graphdb.janus; ...@@ -20,13 +20,16 @@ package org.apache.atlas.repository.graphdb.janus;
import org.apache.atlas.ApplicationProperties; import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase; import org.apache.atlas.repository.graphdb.GraphDatabase;
import org.apache.atlas.repository.graphdb.janus.migration.AtlasGraphSONReader;
import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer; import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
import org.apache.atlas.runner.LocalSolrRunner; import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer; import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
...@@ -35,22 +38,22 @@ import org.slf4j.LoggerFactory; ...@@ -35,22 +38,22 @@ import org.slf4j.LoggerFactory;
import org.janusgraph.core.JanusGraphFactory; import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.JanusGraph; import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.schema.JanusGraphManagement; import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry; import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map;
/** /**
* Default implementation for Graph Provider that doles out Titan Graph. * Default implementation for Graph Provider that doles out Titan Graph.
*/ */
public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> { public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> {
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);
private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("AtlasJanusGraphDatabase");
private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);
/** /**
* Constant for the configuration property that indicates the prefix. * Constant for the configuration property that indicates the prefix.
...@@ -116,6 +119,20 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, ...@@ -116,6 +119,20 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
return graphInstance; return graphInstance;
} }
public static JanusGraph getBulkLoadingGraphInstance() {
try {
Configuration cfg = getConfiguration();
cfg.setProperty("storage.batch-loading", true);
return JanusGraphFactory.open(cfg);
} catch (IllegalArgumentException ex) {
LOG.error("getBulkLoadingGraphInstance: Failed!", ex);
} catch (AtlasException ex) {
LOG.error("getBulkLoadingGraphInstance: Failed!", ex);
}
return null;
}
public static void unload() { public static void unload() {
synchronized (AtlasJanusGraphDatabase.class) { synchronized (AtlasJanusGraphDatabase.class) {
...@@ -215,4 +232,31 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, ...@@ -215,4 +232,31 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
return ret; return ret;
} }
public static void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
AtlasPerfTracer perf = null;
try {
LOG.info("Starting loadLegacyGraphSON...");
if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "loadLegacyGraphSON");
}
AtlasGraphSONReader legacyGraphSONReader = AtlasGraphSONReader.build().
relationshipCache(relationshipCache).
schemaDB(getGraphInstance()).
bulkLoadingDB(getBulkLoadingGraphInstance()).create();
legacyGraphSONReader.readGraph(fs);
} catch (Exception ex) {
LOG.error("Error loading loadLegacyGraphSON2", ex);
throw new AtlasBaseException(ex);
} finally {
AtlasPerfTracer.log(perf);
LOG.info("Done! loadLegacyGraphSON.");
}
}
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
public final class GraphSONTokensTP2 {
public static final String _ID = "_id";
public static final String _LABEL = "_label";
public static final String _TYPE = "_type";
public static final String _OUT_V = "_outV";
public static final String _IN_V = "_inV";
public static final String VALUE = "value";
public static final String TYPE = "type";
public static final String TYPE_LIST = "list";
public static final String TYPE_STRING = "string";
public static final String TYPE_DOUBLE = "double";
public static final String TYPE_INTEGER = "integer";
public static final String TYPE_FLOAT = "float";
public static final String TYPE_MAP = "map";
public static final String TYPE_BOOLEAN = "boolean";
public static final String TYPE_LONG = "long";
public static final String TYPE_SHORT = "short";
public static final String TYPE_BYTE = "byte";
public static final String TYPE_UNKNOWN = "unknown";
public static final String VERTICES = "vertices";
public static final String EDGES = "edges";
public static final String MODE = "mode";
private GraphSONTokensTP2() {
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.Constants;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
public class JsonNodeParsers {
private static final Logger LOG = LoggerFactory.getLogger(JsonNodeParsers.class);
static abstract class ParseElement {
protected GraphSONUtility utility;
abstract String getMessage();
abstract Object getId(JsonNode node);
abstract boolean isTypeNode(JsonNode node);
abstract String getType(JsonNode node);
public void setContext(GraphSONUtility utility) {
this.utility = utility;
}
abstract Map<String, Object> parse(Graph gr, MappedElementCache cache, JsonNode Node);
public void commit(Graph graph) {
graph.tx().commit();
}
abstract Element get(Graph gr, Object id);
abstract Element getByOriginalId(Graph gr, Object id);
public Element getByOriginalId(Graph gr, JsonNode node) {
return getByOriginalId(gr, getId(node));
}
public Element update(Graph gr, Object id, Map<String,Object> schema) {
Element el = get(gr, id);
for (Map.Entry<String, Object> entry : schema.entrySet()) {
el.property(entry.getKey(), entry.getValue());
}
return el;
}
static Object getTypedValueFromJsonNode(final JsonNode node) {
Object theValue = null;
if (node != null && !node.isNull()) {
if (node.isBoolean()) {
theValue = node.booleanValue();
} else if (node.isDouble()) {
theValue = node.doubleValue();
} else if (node.isFloatingPointNumber()) {
theValue = node.floatValue();
} else if (node.isInt()) {
theValue = node.intValue();
} else if (node.isLong()) {
theValue = node.longValue();
} else if (node.isTextual()) {
theValue = node.textValue();
} else if (node.isArray()) {
// this is an array so just send it back so that it can be
// reprocessed to its primitive components
theValue = node;
} else if (node.isObject()) {
// this is an object so just send it back so that it can be
// reprocessed to its primitive components
theValue = node;
} else {
theValue = node.textValue();
}
}
return theValue;
}
}
static class ParseEdge extends ParseElement {
private static final String MESSAGE_EDGE = "edge";
private static final String TYPE_NAME_NODE_NAME = Constants.VERTEX_TYPE_PROPERTY_KEY;
@Override
public String getMessage() {
return MESSAGE_EDGE;
}
@Override
Object getId(JsonNode node) {
return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
}
@Override
public Map<String, Object> parse(Graph gr, MappedElementCache cache, JsonNode node) {
return utility.edgeFromJson(gr, cache, node);
}
@Override
Element get(Graph gr, Object id) {
return gr.edges(id).next();
}
@Override
Element getByOriginalId(Graph gr, Object id) {
try {
return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, id).next();
} catch (Exception ex) {
LOG.error("fetchEdge: fetchFromDB failed: {}", id);
return null;
}
}
@Override
public boolean isTypeNode(JsonNode node) {
return node.get(GraphSONTokensTP2._LABEL).textValue().startsWith(Constants.TYPENAME_PROPERTY_KEY);
}
@Override
public String getType(JsonNode node) {
return node.get(GraphSONTokensTP2._LABEL).textValue();
}
}
static class ParseVertex extends ParseElement {
private static final String NODE_VALUE_KEY = "value";
private static final String MESSAGE_VERTEX = "vertex";
@Override
public String getMessage() {
return MESSAGE_VERTEX;
}
@Override
Object getId(JsonNode node) {
return getTypedValueFromJsonNode(node.get(GraphSONTokensTP2._ID));
}
@Override
public Map<String, Object> parse(Graph graph, MappedElementCache cache, JsonNode node) {
return utility.vertexFromJson(graph, node);
}
@Override
Element get(Graph gr, Object id) {
return gr.vertices(id).next();
}
@Override
Element getByOriginalId(Graph gr, Object id) {
try {
return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, id).next();
} catch (Exception ex) {
LOG.error("getByOriginalId failed: {}", id);
return null;
}
}
@Override
public boolean isTypeNode(JsonNode node) {
return node.has(Constants.TYPENAME_PROPERTY_KEY);
}
@Override
public String getType(JsonNode node) {
return node.has(Constants.ENTITY_TYPE_PROPERTY_KEY) ? node.get(Constants.ENTITY_TYPE_PROPERTY_KEY).get(NODE_VALUE_KEY).toString() : "";
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.JsonNodeParsers.ParseElement;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
public class JsonNodeProcessManager {
private static class Consumer extends WorkItemConsumer<JsonNode> {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private static final int WAIT_DURATION_AFTER_COMMIT_EXCEPTION = 1000;
private final Graph graph;
protected final Graph bulkLoadGraph;
protected final ParseElement parseElement;
private final long batchSize;
private long counter;
private final MappedElementCache cache;
private final List<JsonNode> nodes = new ArrayList<>();
public Consumer(BlockingQueue<JsonNode> workQueue, Graph graph, Graph bulkLoadGraph, ParseElement parseElement, long batchSize) {
super(workQueue);
this.graph = graph;
this.bulkLoadGraph = bulkLoadGraph;
this.parseElement = parseElement;
this.batchSize = batchSize;
this.counter = 0;
this.cache = new MappedElementCache();
}
@Override
public void processItem(JsonNode node) {
try {
Map<String, Object> result = parseElement.parse(bulkLoadGraph, cache, node);
if (result == null) {
nodes.add(node);
commitConditionally(counter++);
} else {
commitBulk();
cache.clearAll();
updateSchema(result, node);
}
} catch (Exception ex) {
bulkLoadGraph.tx().rollback();
error("Failed! Retrying...", ex);
retryBatchCommit();
}
}
@Override
protected void commitDirty() {
super.commitDirty();
cache.clearAll();
}
@Override
protected void doCommit() {
commitBulk();
}
private void commitConditionally(long index) {
if (index % batchSize == 0 && nodes.size() > 0) {
commitBulk();
}
}
private void commitBulk() {
commit(bulkLoadGraph, nodes.size());
nodes.clear();
}
private void commitRegular() {
commit(graph, nodes.size());
}
private void commit(Graph g, int size) {
parseElement.commit(g);
display("commit-size: {}: Done!", size);
}
private void updateSchema(Map<String, Object> schema, org.apache.tinkerpop.shaded.jackson.databind.JsonNode node) {
synchronized (graph) {
String typeName = parseElement.getType(node);
try {
display("updateSchema: type: {}: ...", typeName);
if (schema.containsKey("oid")) {
parseElement.parse(graph, cache, node);
} else {
Object id = schema.get("id");
schema.remove("id");
parseElement.update(graph, id, schema);
}
commitRegular();
display("updateSchema: type: {}: Done!", typeName);
} catch (NoSuchElementException ex) {
parseElement.parse(graph, cache, node);
commitRegular();
display("updateSchema: NoSuchElementException processed!: type: {}: Done!", typeName);
} catch (Exception ex) {
graph.tx().rollback();
error("updateSchema: failed!: type: " + typeName, ex);
}
}
}
private void retryBatchCommit() {
display("Waiting with [{} nodes] for 1 secs.", nodes.size());
try {
Thread.sleep(WAIT_DURATION_AFTER_COMMIT_EXCEPTION);
for (org.apache.tinkerpop.shaded.jackson.databind.JsonNode n : nodes) {
parseElement.parse(bulkLoadGraph, cache, n);
}
commitBulk();
display("Done!: After re-adding {}.", nodes.size());
} catch (Exception ex) {
error("retryBatchCommit: Failed! Potential data loss.", ex);
}
}
private void display(String message, Object s1, Object s2) {
LOG.info("{}: [{}]: " + message, parseElement.getMessage(), counter, s1, s2);
}
private void display(String message, Object s1) {
display(message, s1, "");
}
private void error(String message, Exception ex) {
LOG.error("{}: [{}]: " + message, parseElement.getMessage(), counter, ex);
}
}
private static class ResumingConsumer extends Consumer {
public ResumingConsumer(BlockingQueue<JsonNode> workQueue, Graph graph, Graph bulkLoadGraph, ParseElement parseElement, long batchSize) {
super(workQueue, graph, bulkLoadGraph, parseElement, batchSize);
}
@Override
public void processItem(JsonNode node) {
if (!contains(node)) {
super.processItem(node);
}
}
private boolean contains(JsonNode node) {
return (parseElement.getByOriginalId(bulkLoadGraph, node) != null);
}
}
private static class ConsumerBuilder implements WorkItemBuilder<Consumer, JsonNode> {
private final Graph graph;
private final Graph bulkLoadGraph;
private final ParseElement parseElement;
private final int batchSize;
private final long startIndex;
public ConsumerBuilder(Graph graph, Graph bulkLoadGraph, ParseElement parseElement, int batchSize, long startIndex) {
this.graph = graph;
this.bulkLoadGraph = bulkLoadGraph;
this.batchSize = batchSize;
this.parseElement = parseElement;
this.startIndex = startIndex;
}
@Override
public Consumer build(BlockingQueue<JsonNode> queue) {
if(startIndex == 0) {
return new Consumer(queue, graph, bulkLoadGraph, parseElement, batchSize);
}
return new ResumingConsumer(queue, graph, bulkLoadGraph, parseElement, batchSize);
}
}
static class WorkItemManager extends org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager {
public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
super(builder, batchSize, numWorkers);
}
}
public static WorkItemManager create(Graph rGraph, Graph bGraph,
ParseElement parseElement, int numWorkers, int batchSize, long startIndex) {
ConsumerBuilder cb = new ConsumerBuilder(rGraph, bGraph, parseElement, batchSize, startIndex);
return new WorkItemManager(cb, batchSize, numWorkers);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.utils.LruCache;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.atlas.repository.Constants.EDGE_ID_IN_IMPORT_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_ID_IN_IMPORT_KEY;
public class MappedElementCache {
private static final Logger LOG = LoggerFactory.getLogger(MappedElementCache.class);
private final Map<Object, Vertex> lruVertexCache = new LruCache<>(500, 100000);
private final Map<String, String> lruEdgeCache = new LruCache<>(500, 100000);
public Vertex getMappedVertex(Graph gr, Object key) {
try {
Vertex ret = lruVertexCache.get(key);
if (ret == null) {
synchronized (lruVertexCache) {
ret = lruVertexCache.get(key);
if(ret == null) {
ret = fetchVertex(gr, key);
lruVertexCache.put(key, ret);
}
}
}
return ret;
} catch (Exception ex) {
LOG.error("getMappedVertex: {}", key, ex);
return null;
}
}
public String getMappedEdge(Graph gr, String key) {
try {
String ret = lruEdgeCache.get(key);
if (ret == null) {
synchronized (lruEdgeCache) {
ret = lruEdgeCache.get(key);
if (ret == null) {
Edge e = fetchEdge(gr, key);
ret = e.id().toString();
lruEdgeCache.put(key, ret);
}
}
}
return ret;
} catch (Exception ex) {
LOG.error("getMappedEdge: {}", key, ex);
return null;
}
}
private Vertex fetchVertex(Graph gr, Object key) {
try {
return gr.traversal().V().has(VERTEX_ID_IN_IMPORT_KEY, key).next();
} catch (Exception ex) {
LOG.error("fetchVertex: fetchFromDB failed: {}", key);
return null;
}
}
private Edge fetchEdge(Graph gr, String key) {
try {
return gr.traversal().E().has(EDGE_ID_IN_IMPORT_KEY, key).next();
} catch (Exception ex) {
LOG.error("fetchEdge: fetchFromDB failed: {}", key);
return null;
}
}
public void clearVertexCache() {
lruVertexCache.clear();
}
public void clearEdgeCache() {
lruEdgeCache.clear();
}
public void clearAll() {
clearVertexCache();
clearEdgeCache();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
public class PostProcessManager {
private static class Consumer extends WorkItemConsumer<Object> {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private final Graph bulkLoadGraph;
private final GraphSONUtility utility;
private final String[] properties;
private final MappedElementCache cache;
private final int batchSize;
private long counter;
private long batchCounter;
public Consumer(BlockingQueue<Object> queue, Graph bulkLoadGraph, GraphSONUtility utility,
String[] properties, MappedElementCache cache, int batchSize) {
super(queue);
this.bulkLoadGraph = bulkLoadGraph;
this.utility = utility;
this.properties = properties;
this.cache = cache;
this.batchSize = batchSize;
this.counter = 0;
this.batchCounter = 0;
}
@Override
public void processItem(Object vertexId) {
batchCounter++;
counter++;
try {
Vertex v = bulkLoadGraph.traversal().V(vertexId).next();
for (String p : properties) {
utility.replaceReferencedEdgeIdForList(bulkLoadGraph, cache, v, p);
}
if (batchCounter >= batchSize) {
LOG.info("[{}]: batch: {}: commit", counter, batchCounter);
commit();
batchCounter = 0;
}
}
catch (Exception ex) {
LOG.error("processItem: v[{}] error!", vertexId, ex);
}
}
@Override
protected void doCommit() {
bulkLoadGraph.tx().commit();
}
}
private static class ConsumerBuilder implements WorkItemBuilder<Consumer, Object> {
private final Graph bulkLoadGraph;
private final GraphSONUtility utility;
private final int batchSize;
private final MappedElementCache cache;
private final String[] vertexPropertiesToPostProcess;
public ConsumerBuilder(Graph bulkLoadGraph, GraphSONUtility utility, String[] propertiesToPostProcess, int batchSize) {
this.bulkLoadGraph = bulkLoadGraph;
this.utility = utility;
this.batchSize = batchSize;
this.cache = new MappedElementCache();
this.vertexPropertiesToPostProcess = propertiesToPostProcess;
}
@Override
public Consumer build(BlockingQueue<Object> queue) {
return new Consumer(queue, bulkLoadGraph, utility, vertexPropertiesToPostProcess, cache, batchSize);
}
}
static class WorkItemsManager extends WorkItemManager<Object, Consumer> {
public WorkItemsManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
super(builder, batchSize, numWorkers);
}
}
public static WorkItemsManager create(Graph bGraph, GraphSONUtility utility, String[] propertiesToPostProcess, int batchSize, int numWorkers) {
ConsumerBuilder cb = new ConsumerBuilder(bGraph, utility, propertiesToPostProcess, batchSize);
return new WorkItemsManager(cb, batchSize, numWorkers);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.Constants;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
public class ReaderStatusManager {
private static final Logger LOG = LoggerFactory.getLogger(ReaderStatusManager.class);
private static final String MIGRATION_STATUS_TYPE_NAME = "__MigrationStatus";
private static final String CURRENT_INDEX_PROPERTY = "currentIndex";
private static final String OPERATION_STATUS_PROPERTY = "operationStatus";
private static final String START_TIME_PROPERTY = "startTime";
private static final String END_TIME_PROPERTY = "endTime";
private static final String TOTAL_COUNT_PROPERTY = "totalCount";
public static final String STATUS_NOT_STARTED = "NOT STARTED";
public static final String STATUS_IN_PROGRESS = "IN PROGRESS";
public static final String STATUS_SUCCESS = "SUCCESS";
public static final String STATUS_FAILED = "FAILED";
private Object migrationStatusId = null;
private Vertex migrationStatus = null;
public ReaderStatusManager(Graph graph, Graph bulkLoadGraph) {
init(graph, bulkLoadGraph);
}
public void init(Graph graph, Graph bulkLoadGraph) {
migrationStatus = fetchUsingTypeName(bulkLoadGraph.traversal());
if(migrationStatus == null) {
createAndCommit(graph);
migrationStatus = fetchUsingId(bulkLoadGraph.traversal());
}
if(migrationStatus == null) {
migrationStatus = fetchUsingId(bulkLoadGraph.traversal());
}
}
public void end(Graph bGraph, Long counter, String status) {
migrationStatus.property(END_TIME_PROPERTY, new Date());
migrationStatus.property(TOTAL_COUNT_PROPERTY, counter);
update(bGraph, counter, status);
}
public void update(Graph graph, Long counter) {
migrationStatus.property(CURRENT_INDEX_PROPERTY, counter);
graph.tx().commit();
}
public void update(Graph graph, Long counter, String status) {
migrationStatus.property(OPERATION_STATUS_PROPERTY, status);
update(graph, counter);
}
public void clear() {
migrationStatus = null;
}
public long getStartIndex() {
return (long) migrationStatus.property(CURRENT_INDEX_PROPERTY).value();
}
private Vertex fetchUsingId(GraphTraversalSource g) {
return g.V(migrationStatusId).next();
}
private Vertex fetchUsingTypeName(GraphTraversalSource g) {
GraphTraversal src = g.V().has(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME);
return src.hasNext() ? (Vertex) src.next() : null;
}
private void createAndCommit(Graph rGraph) {
Vertex v = rGraph.addVertex();
long longValue = 0L;
v.property(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME);
v.property(CURRENT_INDEX_PROPERTY, longValue);
v.property(TOTAL_COUNT_PROPERTY, longValue);
v.property(OPERATION_STATUS_PROPERTY, STATUS_NOT_STARTED);
v.property(START_TIME_PROPERTY, new Date());
v.property(END_TIME_PROPERTY, new Date());
migrationStatusId = v.id();
rGraph.tx().commit();
LOG.info("migrationStatus vertex created! v[{}]", migrationStatusId);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class RelationshipTypeCache {
private static final Logger LOG = LoggerFactory.getLogger(RelationshipTypeCache.class);
private final Map<String, String> relationshipLookup;
public RelationshipTypeCache(Map<String, String> lookup) {
relationshipLookup = lookup;
}
public String get(String label) {
return relationshipLookup.get(label);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration.pc;
import java.util.concurrent.BlockingQueue;
public interface WorkItemBuilder<T extends Runnable, U> {
T build(BlockingQueue<U> queue);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration.pc;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public abstract class WorkItemConsumer<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class);
private static final int POLLING_DURATION_SECONDS = 5;
private final BlockingQueue<T> queue;
private boolean isDirty = false;
private long maxCommitTimeSeconds = 0;
public WorkItemConsumer(BlockingQueue<T> queue) {
this.queue = queue;
}
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS);
if (item == null) {
commitDirty();
return;
}
isDirty = true;
processItem(item);
} catch (InterruptedException e) {
LOG.error("WorkItemConsumer: Interrupted: ", e);
}
}
}
public long getMaxCommitTimeSeconds() {
return (this.maxCommitTimeSeconds > 0 ? this.maxCommitTimeSeconds : 15);
}
protected void commitDirty() {
if (!isDirty) {
return;
}
LOG.info("isDirty");
commit();
}
protected void commit() {
Stopwatch sw = Stopwatch.createStarted();
doCommit();
sw.stop();
updateCommitTime(sw.elapsed(TimeUnit.SECONDS));
isDirty = false;
}
protected abstract void doCommit();
protected abstract void processItem(T item);
private void updateCommitTime(long commitTime) {
if (this.maxCommitTimeSeconds < commitTime) {
this.maxCommitTimeSeconds = commitTime;
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration.pc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class WorkItemManager<T, U extends WorkItemConsumer> {
private static final Logger LOG = LoggerFactory.getLogger(WorkItemManager.class);
private final BlockingQueue<T> workQueue;
private final ExecutorService service;
private final List<U> consumers = new ArrayList<>();
public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers);
service = Executors.newFixedThreadPool(numWorkers);
for (int i = 0; i < numWorkers; i++) {
U c = (U) builder.build(workQueue);
service.submit(c);
consumers.add(c);
}
}
public void produce(T item) {
try {
workQueue.put(item);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
public void shutdown() throws InterruptedException {
int avgCommitTimeSeconds = getAvgCommitTimeSeconds() * 2;
LOG.info("WorkItemManager: Shutdown started. Will wait for: {} seconds...", avgCommitTimeSeconds);
service.shutdown();
service.awaitTermination(avgCommitTimeSeconds, TimeUnit.MINUTES);
LOG.info("WorkItemManager: Shutdown done!");
}
private int getAvgCommitTimeSeconds() {
int commitTimeSeconds = 0;
for (U c : consumers) {
commitTimeSeconds += c.getMaxCommitTimeSeconds();
}
return commitTimeSeconds / consumers.size();
}
}
...@@ -55,6 +55,7 @@ import javax.script.ScriptEngine; ...@@ -55,6 +55,7 @@ import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager; import javax.script.ScriptEngineManager;
import javax.script.ScriptException; import javax.script.ScriptException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
...@@ -414,7 +415,12 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> { ...@@ -414,7 +415,12 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> {
return multiProperties.contains(propertyName); return multiProperties.contains(propertyName);
} }
@Override
public void loadLegacyGraphSON(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
}
public void addMultiProperties(Set<String> names) { public void addMultiProperties(Set<String> names) {
multiProperties.addAll(names); multiProperties.addAll(names);
} }
} }
...@@ -50,7 +50,7 @@ public class AtlasExportResult implements Serializable { ...@@ -50,7 +50,7 @@ public class AtlasExportResult implements Serializable {
public final static String ENTITY_COUNT = "entityCount"; public final static String ENTITY_COUNT = "entityCount";
public enum OperationStatus { public enum OperationStatus {
SUCCESS, PARTIAL_SUCCESS, FAIL SUCCESS, PARTIAL_SUCCESS, INPROGRESS, FAIL
} }
private AtlasExportRequest request; private AtlasExportRequest request;
...@@ -143,6 +143,9 @@ public class AtlasExportResult implements Serializable { ...@@ -143,6 +143,9 @@ public class AtlasExportResult implements Serializable {
this.operationStatus = operationStatus; this.operationStatus = operationStatus;
} }
public void setMetric(String key, int value) {
metrics.put(key, value);
}
public void incrementMeticsCounter(String key) { public void incrementMeticsCounter(String key) {
incrementMeticsCounter(key, 1); incrementMeticsCounter(key, 1);
...@@ -222,7 +225,6 @@ public class AtlasExportResult implements Serializable { ...@@ -222,7 +225,6 @@ public class AtlasExportResult implements Serializable {
public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; } public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; }
public StringBuilder toString(StringBuilder sb) { public StringBuilder toString(StringBuilder sb) {
if (sb == null) { if (sb == null) {
sb = new StringBuilder(); sb = new StringBuilder();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.model.impexp;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.io.Serializable;
import java.util.Date;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class MigrationStatus implements Serializable {
private static final long serialVersionUID = 1L;
private String operationStatus;
private Date startTime;
private Date endTime;
private long currentIndex;
private long totalCount;
public void setOperationStatus(String operationStatus) {
this.operationStatus = operationStatus;
}
public String getOperationStatus() {
return operationStatus;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getStartTime() {
return startTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
public Date getEndTime() {
return endTime;
}
public void setCurrentIndex(long currentIndex) {
this.currentIndex = currentIndex;
}
public long getCurrentIndex() {
return currentIndex;
}
public void setTotalCount(long totalCount) {
this.totalCount = totalCount;
}
public long getTotalCount() {
return this.totalCount;
}
public StringBuilder toString(StringBuilder sb) {
sb.append(", operationStatus=").append(operationStatus);
sb.append(", startTime=").append(startTime);
sb.append(", endTime=").append(endTime);
sb.append(", currentIndex=").append(currentIndex);
sb.append(", totalCount=").append(totalCount);
return sb;
}
}
...@@ -28,7 +28,9 @@ import org.apache.atlas.model.typedef.AtlasRelationshipDef; ...@@ -28,7 +28,9 @@ import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.model.typedef.AtlasTypesDef;
import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Interface to persistence store of TypeDef * Interface to persistence store of TypeDef
...@@ -105,4 +107,6 @@ public interface AtlasTypeDefStore { ...@@ -105,4 +107,6 @@ public interface AtlasTypeDefStore {
AtlasBaseTypeDef getByName(String name) throws AtlasBaseException; AtlasBaseTypeDef getByName(String name) throws AtlasBaseException;
AtlasBaseTypeDef getByGuid(String guid) throws AtlasBaseException; AtlasBaseTypeDef getByGuid(String guid) throws AtlasBaseException;
void loadLegacyData(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException;
} }
...@@ -1757,7 +1757,10 @@ ...@@ -1757,7 +1757,10 @@
<version>2.7</version> <version>2.7</version>
<configuration> <configuration>
<encoding>UTF-8</encoding> <encoding>UTF-8</encoding>
<nonFilteredFileExtensions>zip</nonFilteredFileExtensions> <nonFilteredFileExtensions>
<nonFilteredFileExtension>zip</nonFilteredFileExtension>
<nonFilteredFileExtension>json</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration> </configuration>
</plugin> </plugin>
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
~ limitations under the License. ~ limitations under the License.
--> -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
...@@ -68,7 +69,7 @@ ...@@ -68,7 +69,7 @@
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>joda-time</groupId> <groupId>joda-time</groupId>
<artifactId>joda-time</artifactId> <artifactId>joda-time</artifactId>
</dependency> </dependency>
...@@ -112,7 +113,7 @@ ...@@ -112,7 +113,7 @@
<type>pom</type> <type>pom</type>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.atlas</groupId> <groupId>org.apache.atlas</groupId>
<artifactId>atlas-authorization</artifactId> <artifactId>atlas-authorization</artifactId>
...@@ -256,7 +257,7 @@ ...@@ -256,7 +257,7 @@
<artifactId>maven-resources-plugin</artifactId> <artifactId>maven-resources-plugin</artifactId>
<executions> <executions>
<execution> <execution>
<id>copy-resources</id> <id>copy-resources-solr</id>
<phase>validate</phase> <phase>validate</phase>
<goals> <goals>
<goal>copy-resources</goal> <goal>copy-resources</goal>
......
...@@ -67,27 +67,7 @@ import java.util.List; ...@@ -67,27 +67,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*;
import static org.apache.atlas.repository.Constants.BACKING_INDEX; import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
import static org.apache.atlas.repository.Constants.EDGE_INDEX;
import static org.apache.atlas.repository.Constants.ENTITY_TEXT_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.FULLTEXT_INDEX;
import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.TYPENAME_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX;
import static org.apache.atlas.repository.Constants.VERTEX_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.graphdb.AtlasCardinality.LIST; import static org.apache.atlas.repository.graphdb.AtlasCardinality.LIST;
import static org.apache.atlas.repository.graphdb.AtlasCardinality.SET; import static org.apache.atlas.repository.graphdb.AtlasCardinality.SET;
import static org.apache.atlas.repository.graphdb.AtlasCardinality.SINGLE; import static org.apache.atlas.repository.graphdb.AtlasCardinality.SINGLE;
...@@ -101,6 +81,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -101,6 +81,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class);
private static final String VERTEX_ID_IN_IMPORT_KEY = "__vIdInImport";
private static final String EDGE_ID_IN_IMPORT_KEY = "__eIdInImport";
private static final List<Class> INDEX_EXCLUSION_CLASSES = new ArrayList() { private static final List<Class> INDEX_EXCLUSION_CLASSES = new ArrayList() {
{ {
add(Boolean.class); add(Boolean.class);
...@@ -284,6 +266,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -284,6 +266,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
createVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class, false, LIST, true, true); createVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class, false, LIST, true, true);
createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, true); createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, true);
createVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true); createVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);
createVertexIndex(management, CLASSIFICATION_ENTITY_GUID, String.class, false, SINGLE, true, true);
createVertexIndex(management, VERTEX_ID_IN_IMPORT_KEY, Long.class, false, SINGLE, true, false);
// create vertex-centric index // create vertex-centric index
createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE); createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE);
...@@ -292,6 +277,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang ...@@ -292,6 +277,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang
// create edge indexes // create edge indexes
createEdgeIndex(management, RELATIONSHIP_GUID_PROPERTY_KEY, String.class, SINGLE, true); createEdgeIndex(management, RELATIONSHIP_GUID_PROPERTY_KEY, String.class, SINGLE, true);
createEdgeIndex(management, EDGE_ID_IN_IMPORT_KEY, String.class, SINGLE, true);
// create fulltext indexes // create fulltext indexes
createFullTextIndex(management, ENTITY_TEXT_PROPERTY_KEY, String.class, SINGLE); createFullTextIndex(management, ENTITY_TEXT_PROPERTY_KEY, String.class, SINGLE);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.model.impexp.MigrationStatus;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@AtlasService
@Singleton
public class MigrationProgressService {
private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class);
private static final String MIGRATION_STATUS_TYPE_NAME = "__MigrationStatus";
private static final String CURRENT_INDEX_PROPERTY = "currentIndex";
private static final String OPERATION_STATUS_PROPERTY = "operationStatus";
private static final String START_TIME_PROPERTY = "startTime";
private static final String END_TIME_PROPERTY = "endTime";
private static final String TOTAL_COUNT_PROPERTY = "totalCount";
private static final String MIGRATION_STATUS_KEY = "1";
private final AtlasGraph graph;
private final MigrationStatus defaultStatus = new MigrationStatus();
private LoadingCache<String, MigrationStatus> cache;
@Inject
public MigrationProgressService(AtlasGraph graph) {
this.graph = graph;
}
public MigrationStatus getStatus() {
try {
if (cache == null) {
initCache();
cache.get(MIGRATION_STATUS_KEY);
}
if(cache.size() > 0) {
return cache.get(MIGRATION_STATUS_KEY);
}
return defaultStatus;
} catch (ExecutionException e) {
return defaultStatus;
}
}
private void initCache() {
this.cache = CacheBuilder.newBuilder().refreshAfterWrite(30, TimeUnit.SECONDS).
build(new CacheLoader<String, MigrationStatus>() {
@Override
public MigrationStatus load(String key) {
try {
return from(fetchStatusVertex());
} catch (Exception e) {
LOG.error("Error retrieving status.", e);
return defaultStatus;
}
}
private MigrationStatus from(AtlasVertex vertex) {
if (vertex == null) {
return null;
}
MigrationStatus ms = new MigrationStatus();
ms.setStartTime(GraphHelper.getSingleValuedProperty(vertex, START_TIME_PROPERTY, Date.class));
ms.setEndTime(GraphHelper.getSingleValuedProperty(vertex, END_TIME_PROPERTY, Date.class));
ms.setCurrentIndex(GraphHelper.getSingleValuedProperty(vertex, CURRENT_INDEX_PROPERTY, Long.class));
ms.setOperationStatus(GraphHelper.getSingleValuedProperty(vertex, OPERATION_STATUS_PROPERTY, String.class));
ms.setTotalCount(GraphHelper.getSingleValuedProperty(vertex, TOTAL_COUNT_PROPERTY, Long.class));
return ms;
}
private AtlasVertex fetchStatusVertex() {
Iterator<AtlasVertex> itr = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, MIGRATION_STATUS_TYPE_NAME).vertices().iterator();
return itr.hasNext() ? itr.next() : null;
}
});
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.impexp.ImportTypeDefProcessor;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.service.Service;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
@Component
public class DataMigrationService implements Service {
private static final Logger LOG = LoggerFactory.getLogger(DataMigrationService.class);
private static String ATLAS_MIGRATION_DATA_NAME = "atlas-migration-data.json";
private static String ATLAS_MIGRATION_TYPESDEF_NAME = "atlas-migration-typesdef.json";
private final Configuration configuration;
private final Thread thread;
@Inject
public DataMigrationService(AtlasTypeDefStore typeDefStore, Configuration configuration,
GraphBackedSearchIndexer indexer, AtlasTypeDefStoreInitializer storeInitializer,
AtlasTypeRegistry typeRegistry) {
this.configuration = configuration;
this.thread = new Thread(new FileImporter(typeDefStore, typeRegistry, storeInitializer, getFileName(), indexer));
}
@Override
public void start() {
Runtime.getRuntime().addShutdownHook(thread);
thread.start();
}
@Override
public void stop() {
try {
thread.join();
} catch (InterruptedException e) {
LOG.error("Data Migration: Interrupted", e);
}
}
public String getFileName() {
return configuration.getString(ATLAS_MIGRATION_MODE_FILENAME, "");
}
public static class FileImporter implements Runnable {
private final AtlasTypeDefStore typeDefStore;
private final String importDirectory;
private final GraphBackedSearchIndexer indexer;
private final AtlasTypeRegistry typeRegistry;
private final AtlasTypeDefStoreInitializer storeInitializer;
public FileImporter(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry,
AtlasTypeDefStoreInitializer storeInitializer,
String directoryName, GraphBackedSearchIndexer indexer) {
this.typeDefStore = typeDefStore;
this.typeRegistry = typeRegistry;
this.storeInitializer = storeInitializer;
this.importDirectory = directoryName;
this.indexer = indexer;
}
public void performImport() throws AtlasBaseException {
try {
performInit();
FileInputStream fs = new FileInputStream(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_DATA_NAME));
typeDefStore.loadLegacyData(RelationshipCacheGenerator.get(typeRegistry), fs);
} catch (Exception ex) {
LOG.error("Import failed!", ex);
throw new AtlasBaseException(ex);
}
}
private void performInit() throws AtlasBaseException, AtlasException {
storeInitializer.init();
processIncomingTypesDef(getFileFromImportDirectory(importDirectory, ATLAS_MIGRATION_TYPESDEF_NAME));
indexer.instanceIsActive();
}
@VisibleForTesting
void processIncomingTypesDef(File typesDefFile) throws AtlasBaseException {
try {
String jsonStr = FileUtils.readFileToString(typesDefFile);
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class);
ImportTypeDefProcessor processor = new ImportTypeDefProcessor(typeDefStore, typeRegistry);
processor.processTypes(typesDef, new AtlasImportResult());
} catch (IOException e) {
LOG.error("processIncomingTypesDef: Could not process file: {}! Imported data may not be usable.", typesDefFile.getName());
}
}
private File getFileFromImportDirectory(String importDirectory, String fileName) {
return Paths.get(importDirectory, fileName).toFile();
}
@Override
public void run() {
try {
performImport();
} catch (AtlasBaseException e) {
LOG.error("Data Migration:", e);
}
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class RelationshipCacheGenerator {
public static Map<String, String> get(AtlasTypeRegistry typeRegistry) {
Map<String, String> ret = new HashMap<>();
Collection<AtlasRelationshipType> relationshipTypes = typeRegistry.getAllRelationshipTypes();
for (AtlasRelationshipType rt : relationshipTypes) {
AtlasRelationshipDef rd = rt.getRelationshipDef();
String relTypeName = rt.getTypeName();
add(ret, getKey(rd.getEndDef1(), rt.getEnd1Type()), relTypeName);
add(ret, getKey(rd.getEndDef2(), rt.getEnd2Type()), relTypeName);
}
return ret;
}
private static String getKey(AtlasRelationshipEndDef ed, AtlasEntityType rt) {
return getKey(ed.getIsLegacyAttribute(), rt.getTypeName(), ed.getName());
}
private static String getKey(String lhs, String rhs) {
return String.format("%s%s.%s", Constants.INTERNAL_PROPERTY_KEY_PREFIX, lhs, rhs);
}
private static String getKey(boolean isLegacy, String typeName, String name) {
if(!isLegacy) {
return "";
}
return getKey(typeName, name);
}
private static void add(Map<String, String> map, String key, String value) {
if(StringUtils.isEmpty(key) || map.containsKey(key)) {
return;
}
map.put(key, value);
}
}
...@@ -63,6 +63,7 @@ import java.util.Map; ...@@ -63,6 +63,7 @@ import java.util.Map;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
/** /**
...@@ -88,10 +89,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -88,10 +89,10 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
@PostConstruct @PostConstruct
public void init() throws AtlasBaseException { public void init() throws AtlasBaseException {
LOG.info("==> AtlasTypeDefStoreInitializer.init()"); LOG.info("==> AtlasTypeDefStoreInitializer.init()");
boolean isMigrationEnabled = !StringUtils.isEmpty(conf.getString(ATLAS_MIGRATION_MODE_FILENAME));
if (!HAConfiguration.isHAEnabled(conf)) { if (!HAConfiguration.isHAEnabled(conf) || isMigrationEnabled) {
atlasTypeDefStore.init(); atlasTypeDefStore.init();
loadBootstrapTypeDefs(); loadBootstrapTypeDefs();
} else { } else {
LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation"); LOG.info("AtlasTypeDefStoreInitializer.init(): deferring type loading until instance activation");
...@@ -151,44 +152,50 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { ...@@ -151,44 +152,50 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
File[] typeDefFiles = typesDir.exists() ? typesDir.listFiles() : null; File[] typeDefFiles = typesDir.exists() ? typesDir.listFiles() : null;
if (typeDefFiles == null || typeDefFiles.length == 0) { if (typeDefFiles == null || typeDefFiles.length == 0) {
LOG.info("Types directory {} does not exist or not readable or has no typedef files", typesDirName ); LOG.info("Types directory {} does not exist or not readable or has no typedef files", typesDirName);
} else { } else {
// sort the files by filename // sort the files by filename
Arrays.sort(typeDefFiles); Arrays.sort(typeDefFiles);
for (File typeDefFile : typeDefFiles) { for (File typeDefFile : typeDefFiles) {
if (typeDefFile.isFile()) { try {
try { readTypesFromFile(typeDefFile);
String jsonStr = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8); } catch (Throwable t) {
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class); LOG.error("error while registering types in file {}", typeDefFile.getAbsolutePath(), t);
}
}
}
if (typesDef == null || typesDef.isEmpty()) { LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir);
LOG.info("No type in file {}", typeDefFile.getAbsolutePath()); }
continue; public void readTypesFromFile(File typeDefFile) {
} if (!typeDefFile.isFile()) {
return;
}
AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, atlasTypeRegistry); try {
AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, atlasTypeRegistry, true); String jsonStr = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8);
AtlasTypesDef typesDef = AtlasType.fromJson(jsonStr, AtlasTypesDef.class);
if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) { if (typesDef == null || typesDef.isEmpty()) {
atlasTypeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate); LOG.info("No type in file {}", typeDefFile.getAbsolutePath());
return;
}
LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath()); AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, atlasTypeRegistry);
} else { AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, atlasTypeRegistry, true);
LOG.info("No new type in file {}", typeDefFile.getAbsolutePath());
}
} catch (Throwable t) { if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) {
LOG.error("error while registering types in file {}", typeDefFile.getAbsolutePath(), t); atlasTypeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate);
}
} LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath());
} else {
LOG.info("No new type in file {}", typeDefFile.getAbsolutePath());
} }
applyTypePatches(typesDir.getPath()); } catch (Throwable t) {
LOG.error("error while registering types in file {}", typeDefFile.getAbsolutePath(), t);
} }
LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir);
} }
public static AtlasTypesDef getTypesToCreate(AtlasTypesDef typesDef, AtlasTypeRegistry typeRegistry) { public static AtlasTypesDef getTypesToCreate(AtlasTypesDef typesDef, AtlasTypeRegistry typeRegistry) {
......
...@@ -43,6 +43,7 @@ import org.apache.atlas.utils.AtlasEntityUtil; ...@@ -43,6 +43,7 @@ import org.apache.atlas.utils.AtlasEntityUtil;
import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -51,6 +52,7 @@ import org.springframework.stereotype.Component; ...@@ -51,6 +52,7 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.*; import java.util.*;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
......
...@@ -24,6 +24,7 @@ import static org.apache.atlas.repository.Constants.TYPE_CATEGORY_PROPERTY_KEY; ...@@ -24,6 +24,7 @@ import static org.apache.atlas.repository.Constants.TYPE_CATEGORY_PROPERTY_KEY;
import static org.apache.atlas.repository.Constants.VERTEX_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.VERTEX_TYPE_PROPERTY_KEY;
import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.VERTEX_TYPE; import static org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1.VERTEX_TYPE;
import java.io.InputStream;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
...@@ -37,10 +38,7 @@ import org.apache.atlas.exception.AtlasBaseException; ...@@ -37,10 +38,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.TypeDefChangeListener; import org.apache.atlas.listener.TypeDefChangeListener;
import org.apache.atlas.model.typedef.*; import org.apache.atlas.model.typedef.*;
import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.*; import org.apache.atlas.repository.store.graph.*;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
...@@ -112,6 +110,11 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -112,6 +110,11 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
LOG.info("<== AtlasTypeDefGraphStoreV1.init()"); LOG.info("<== AtlasTypeDefGraphStoreV1.init()");
} }
@Override
public void loadLegacyData(Map<String, String> relationshipCache, InputStream fs) throws AtlasBaseException {
getAtlasGraph().loadLegacyGraphSON(relationshipCache, fs);
}
AtlasGraph getAtlasGraph() { return atlasGraph; } AtlasGraph getAtlasGraph() { return atlasGraph; }
@VisibleForTesting @VisibleForTesting
...@@ -275,9 +278,20 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -275,9 +278,20 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
String updatedBy = vertex.getProperty(Constants.MODIFIED_BY_KEY, String.class); String updatedBy = vertex.getProperty(Constants.MODIFIED_BY_KEY, String.class);
Long createTime = vertex.getProperty(Constants.TIMESTAMP_PROPERTY_KEY, Long.class); Long createTime = vertex.getProperty(Constants.TIMESTAMP_PROPERTY_KEY, Long.class);
Long updateTime = vertex.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class); Long updateTime = vertex.getProperty(Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
Long version = vertex.getProperty(Constants.VERSION_PROPERTY_KEY, Long.class); Object versionObj = vertex.getProperty(Constants.VERSION_PROPERTY_KEY, Object.class);
String options = vertex.getProperty(Constants.TYPEOPTIONS_PROPERTY_KEY, String.class); String options = vertex.getProperty(Constants.TYPEOPTIONS_PROPERTY_KEY, String.class);
Long version = null;
if(versionObj instanceof Number) {
version = ((Number)versionObj).longValue();
} else if (versionObj != null) {
version = Long.valueOf(versionObj.toString());
} else {
version = Long.valueOf(0);
}
typeDef.setName(name); typeDef.setName(name);
typeDef.setDescription(description); typeDef.setDescription(description);
typeDef.setTypeVersion(typeVersion); typeDef.setTypeVersion(typeVersion);
...@@ -313,7 +327,15 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore { ...@@ -313,7 +327,15 @@ public class AtlasTypeDefGraphStoreV1 extends AtlasTypeDefGraphStore {
boolean ret = false; boolean ret = false;
if (isTypeVertex(vertex)) { if (isTypeVertex(vertex)) {
TypeCategory vertexCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, TypeCategory.class); Object objTypeCategory = vertex.getProperty(Constants.TYPE_CATEGORY_PROPERTY_KEY, Object.class);
TypeCategory vertexCategory = null;
if(objTypeCategory instanceof TypeCategory) {
vertexCategory = (TypeCategory) objTypeCategory;
} else if (objTypeCategory != null) {
vertexCategory = TypeCategory.valueOf(objTypeCategory.toString());
}
ret = category.equals(vertexCategory); ret = category.equals(vertexCategory);
} }
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.bootstrap.AtlasTypeDefStoreInitializer;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.List;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.testng.Assert.assertEquals;
@Guice(modules = TestModules.TestOnlyModule.class)
public class HiveParititionIT extends MigrationBaseAsserts {
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStoreInitializer storeInitializer;
@Inject
private GraphBackedSearchIndexer indexer;
@Inject
public HiveParititionIT(AtlasGraph graph) {
super(graph);
}
@AfterClass
public void clear() throws Exception {
AtlasGraphProvider.cleanup();
if (useLocalSolr()) {
LocalSolrRunner.stop();
}
}
@Test
public void fileImporterTest() throws IOException, AtlasBaseException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
String directoryName = TestResourceFileUtils.getDirectory("parts_db");
DataMigrationService.FileImporter fi = new DataMigrationService.FileImporter(typeDefStore, typeRegistry,
storeInitializer, directoryName, indexer);
fi.run();
assertPartitionKeyProperty(getVertex("hive_table", "t1"), 1);
assertPartitionKeyProperty(getVertex("hive_table", "tv1"), 1);
assertHiveVertices(1, 2, 7);
assertTypeCountNameGuid("hive_db", 1, "parts_db", "ae30d78b-51b4-42ab-9436-8d60c8f68b95");
assertTypeCountNameGuid("hive_process", 1, "", "");
assertEdges("hive_db", "parts_db", AtlasEdgeDirection.IN,1, 1, "");
assertEdges("hive_table", "t1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables");
assertEdges("hive_table", "tv1", AtlasEdgeDirection.OUT, 1, 1, "hive_db_tables");
assertMigrationStatus(136);
}
private void assertPartitionKeyProperty(AtlasVertex vertex, int expectedCount) {
List<String> keys = GraphHelper.getListProperty(vertex, "hive_table.partitionKeys");
assertEquals(keys.size(), expectedCount);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.runner.LocalSolrRunner;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.TestResourceFileUtils;
import org.testng.ITestContext;
import org.testng.annotations.*;
import java.io.FileInputStream;
import java.io.IOException;
import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
@Guice(modules = TestModules.TestOnlyModule.class)
public class HiveStocksIT extends MigrationBaseAsserts {
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasTypeRegistry typeRegistry;
@Inject
public HiveStocksIT(AtlasGraph graph) {
super(graph);
}
@BeforeTest
public void setupTest() {
RequestContextV1.clear();
RequestContextV1.get().setUser(TestUtilsV2.TEST_USER, null);
}
@AfterClass
public void clear() throws Exception {
AtlasGraphProvider.cleanup();
if (useLocalSolr()) {
LocalSolrRunner.stop();
}
}
@DataProvider(name = "stocks-2-branch08-tag")
public static Object[][] getStocksTag(ITestContext context) throws IOException {
return new Object[][]{{ TestResourceFileUtils.getFileInputStream("stocks-2-0.8-extended-tag.json") }};
}
@Test(dataProvider = "stocks-2-branch08-tag")
public void migrateFromEarlierVersionWithTag(FileInputStream fs) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
typeDefStore.loadLegacyData(RelationshipCacheGenerator.get(typeRegistry), fs);
assertHiveVertices(1, 1, 7);
assertTypeCountNameGuid("hive_db", 1, "stocks", "4e13b36b-9c54-4616-9001-1058221165d0");
assertTypeCountNameGuid("hive_table", 1, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774");
assertTypeAttribute("hive_table", 7, "stocks_daily", "5cfc2540-9947-40e0-8905-367e07481774", "hive_table.columns");
assertTypeCountNameGuid("hive_column", 1, "high", "d72ce4fb-6f17-4e68-aa85-967366c9e891");
assertTypeCountNameGuid("hive_column", 1, "open", "788ba8fe-b7d8-41ba-84ef-c929732924ec");
assertTypeCountNameGuid("hive_column", 1, "dt", "643a0a71-0d97-477d-a43b-7ca433f85160");
assertTypeCountNameGuid("hive_column", 1, "low", "38caeaf7-49e6-4d6d-8727-231406a46821");
assertTypeCountNameGuid("hive_column", 1, "close", "3bae9b76-f812-4745-b4d2-2a72d2773d07");
assertTypeCountNameGuid("hive_column", 1, "volume", "bee376a4-3d8d-4943-b7e8-9bce042c2657");
assertTypeCountNameGuid("hive_column", 1, "adj_close", "fcba2002-cb38-4c2e-b853-68d421d66703");
assertTypeCountNameGuid("hive_process", 0, "", "");
assertTypeCountNameGuid("hive_storagedesc", 1, "", "294290d8-4498-4677-973c-c266d594b039");
assertTypeCountNameGuid("Tag1", 1, "", "");
assertEdges(getVertex("hive_db", "stocks").getEdges(AtlasEdgeDirection.IN).iterator(),1, 1, "");
assertEdges(getVertex("hive_table", "stocks_daily").getEdges(AtlasEdgeDirection.OUT).iterator(), 1, 1, "hive_db_tables");
assertEdges(getVertex("hive_column", "high").getEdges(AtlasEdgeDirection.OUT).iterator(), 1,1, "hive_table_columns");
assertMigrationStatus(164);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.*;
import org.apache.commons.lang.StringUtils;
import java.util.Iterator;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
public class MigrationBaseAsserts {
protected static final String ASSERT_NAME_PROPERTY = "Asset.name";
private final String TYPE_NAME_PROPERTY = "__typeName";
private final String R_GUID_PROPERTY_NAME = "_r__guid";
protected AtlasGraph graph;
protected MigrationBaseAsserts(AtlasGraph graph) {
this.graph = graph;
}
protected void assertHiveVertices(int dbCount, int tableCount, int columnCount) {
int i = 0;
Iterator<AtlasVertex> results = getVertices("hive_db", null);
for (Iterator<AtlasVertex> it = results; it.hasNext(); i++) {
assertNotNull(it.next());
}
assertEquals(i, dbCount);
i = 0;
results = getVertices("hive_table", null);
for (Iterator<AtlasVertex> it = results; it.hasNext(); i++) {
assertNotNull(it.next());
}
assertEquals(i, tableCount);
i = 0;
results = getVertices("hive_column", null);
for (Iterator<AtlasVertex> it = results; it.hasNext(); i++) {
assertNotNull(it.next());
}
assertTrue(i > 0);
assertEquals(i, columnCount);
}
protected Iterator<AtlasVertex> getVertices(String typeName, String name) {
AtlasGraphQuery query = graph.query().has(TYPE_NAME_PROPERTY, typeName);
if(!StringUtils.isEmpty(name)) {
query = query.has(ASSERT_NAME_PROPERTY, name);
}
return query.vertices().iterator();
}
protected AtlasVertex getVertex(String typeName, String name) {
Iterator<AtlasVertex> iterator = getVertices(typeName, name);
return iterator.hasNext() ? iterator.next() : null;
}
protected void assertEdges(String typeName, String assetName, AtlasEdgeDirection edgeDirection, int startIdx, int expectedItems, String edgeTypeName) {
assertEdges(getVertex(typeName, assetName).getEdges(edgeDirection).iterator(),startIdx, expectedItems, edgeTypeName);
}
protected void assertEdges(Iterator<AtlasEdge> results, int startIdx, int expectedItems, String edgeTypeName) {
int count = 0;
AtlasEdge e = null;
for (Iterator<AtlasEdge> it = results; it.hasNext() && count < startIdx; count++) {
e = it.next();
}
assertNotNull(GraphHelper.getProperty(e, R_GUID_PROPERTY_NAME));
assertNotNull(GraphHelper.getProperty(e, "tagPropagation"));
if(StringUtils.isNotEmpty(edgeTypeName)) {
assertEquals(GraphHelper.getProperty(e, TYPE_NAME_PROPERTY), edgeTypeName, edgeTypeName);
}
assertEquals(count, expectedItems, String.format("%s", edgeTypeName));
}
protected void assertTypeAttribute(String typeName, int expectedSize, String name, String guid, String propertyName) {
AtlasVertex v = getVertex(typeName, name);
String guidActual = GraphHelper.getGuid(v);
List list = (List) GraphHelper.getProperty(v, propertyName);
assertEquals(guidActual, guid);
assertNotNull(list);
assertEquals(list.size(), expectedSize);
}
protected void assertTypeCountNameGuid(String typeName, int expectedItems, String name, String guid) {
Iterator<AtlasVertex> results = getVertices(typeName, name);
int count = 0;
for (Iterator<AtlasVertex> it = results; it.hasNext(); ) {
AtlasVertex v = it.next();
assertEquals(GraphHelper.getTypeName(v), typeName);
if(StringUtils.isNotEmpty(guid)) {
assertEquals(GraphHelper.getGuid(v), guid, name);
}
if(StringUtils.isNotEmpty(name)) {
assertEquals(GraphHelper.getProperty(v, ASSERT_NAME_PROPERTY), name, name);
}
count++;
}
assertEquals(count, expectedItems, String.format("%s:%s", typeName, name));
}
protected void assertMigrationStatus(int expectedTotalCount) {
AtlasVertex v = getVertex("__MigrationStatus", "");
assertEquals((long) GraphHelper.getProperty(v, "currentIndex"), expectedTotalCount);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.migration;
import com.google.inject.Inject;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.impexp.ZipFileResourceTestUtils;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.jcodings.util.Hash;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class RelationshipMappingTest {
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasTypeRegistry typeRegistry;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
}
@Test
public void createLookup() {
Map<String, String> cache = RelationshipCacheGenerator.get(typeRegistry);
assertEquals(cache.size(), getLegacyAttributeCount() - 1);
for (Map.Entry<String, String> entry : cache.entrySet()) {
assertTrue(StringUtils.isNotEmpty(entry.getKey()));
assertTrue(entry.getKey().startsWith(Constants.INTERNAL_PROPERTY_KEY_PREFIX), entry.getKey());
}
}
private int getLegacyAttributeCount() {
int count = 0;
for (AtlasRelationshipType rt : typeRegistry.getAllRelationshipTypes()) {
AtlasRelationshipDef rd = rt.getRelationshipDef();
if(rd.getEndDef1().getIsLegacyAttribute()) {
count++;
}
if(rd.getEndDef2().getIsLegacyAttribute()) {
count++;
}
}
return count;
}
}
...@@ -54,6 +54,7 @@ import org.apache.atlas.type.AtlasTypeRegistry; ...@@ -54,6 +54,7 @@ import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils; import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testng.Assert; import org.testng.Assert;
...@@ -107,6 +108,8 @@ public class AtlasEntityStoreV1Test { ...@@ -107,6 +108,8 @@ public class AtlasEntityStoreV1Test {
@Inject @Inject
private EntityGraphMapper graphMapper; private EntityGraphMapper graphMapper;
@Inject
private Configuration configuration;
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
*/ */
package org.apache.atlas.repository.store.graph.v1; package org.apache.atlas.repository.store.graph.v1;
import org.apache.commons.configuration.Configuration;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.atlas.RequestContextV1; import org.apache.atlas.RequestContextV1;
import org.apache.atlas.TestModules; import org.apache.atlas.TestModules;
......
...@@ -55,6 +55,11 @@ public class TestResourceFileUtils { ...@@ -55,6 +55,11 @@ public class TestResourceFileUtils {
return fs; return fs;
} }
public static String getDirectory(String subDir) {
final String userDir = System.getProperty("user.dir");
return getTestFilePath(userDir, subDir, "");
}
public static <T> T readObjectFromJson(String subDir, String filename, Class<T> objectClass) throws IOException { public static <T> T readObjectFromJson(String subDir, String filename, Class<T> objectClass) throws IOException {
final String userDir = System.getProperty("user.dir"); final String userDir = System.getProperty("user.dir");
String filePath = getTestJsonPath(userDir, subDir, filename); String filePath = getTestJsonPath(userDir, subDir, filename);
......
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -87,6 +87,18 @@ public class ActiveServerFilter implements Filter { ...@@ -87,6 +87,18 @@ public class ActiveServerFilter implements Filter {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse; HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
LOG.error("Instance in transition. Service may not be ready to return a result"); LOG.error("Instance in transition. Service may not be ready to return a result");
httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} if(serviceState.isInstanceInMigration()) {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
LOG.error("Instance in migration. Service may not be ready to return a result");
httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} if (serviceState.isInstanceInMigration()) {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
LOG.error("Instance in migration. Service may not be ready to return a result");
httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} else if (serviceState.isInstanceInMigration()) {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
LOG.error("Instance in migration. Service may not be ready to return a result");
httpServletResponse.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} else { } else {
HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse; HttpServletResponse httpServletResponse = (HttpServletResponse) servletResponse;
String activeServerAddress = activeInstanceState.getActiveServerAddress(); String activeServerAddress = activeInstanceState.getActiveServerAddress();
......
...@@ -28,15 +28,9 @@ import org.apache.atlas.authorize.AtlasPrivilege; ...@@ -28,15 +28,9 @@ import org.apache.atlas.authorize.AtlasPrivilege;
import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasAuthorizationUtils;
import org.apache.atlas.discovery.SearchContext; import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.*;
import org.apache.atlas.model.impexp.AtlasExportResult;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.model.metrics.AtlasMetrics;
import org.apache.atlas.repository.impexp.ExportService; import org.apache.atlas.repository.impexp.*;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.repository.impexp.ZipSink;
import org.apache.atlas.repository.impexp.ZipSource;
import org.apache.atlas.services.MetricsService; import org.apache.atlas.services.MetricsService;
import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry;
...@@ -75,17 +69,12 @@ import javax.ws.rs.core.MediaType; ...@@ -75,17 +69,12 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Collection; import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
* Jersey Resource for admin operations. * Jersey Resource for admin operations
*/ */
@Path("admin") @Path("admin")
@Singleton @Singleton
...@@ -93,32 +82,32 @@ import java.util.concurrent.locks.ReentrantLock; ...@@ -93,32 +82,32 @@ import java.util.concurrent.locks.ReentrantLock;
public class AdminResource { public class AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(AdminResource.class); private static final Logger LOG = LoggerFactory.getLogger(AdminResource.class);
private static final String isCSRF_ENABLED = "atlas.rest-csrf.enabled";
private static final String BROWSER_USER_AGENT_PARAM = "atlas.rest-csrf.browser-useragents-regex";
private static final String CUSTOM_METHODS_TO_IGNORE_PARAM = "atlas.rest-csrf.methods-to-ignore";
private static final String CUSTOM_HEADER_PARAM = "atlas.rest-csrf.custom-header";
private static final String isEntityUpdateAllowed = "atlas.entity.update.allowed";
private static final String isEntityCreateAllowed = "atlas.entity.create.allowed";
private static final String editableEntityTypes = "atlas.ui.editable.entity.types";
private static final String DEFAULT_EDITABLE_ENTITY_TYPES = "hdfs_path,hbase_table,hbase_column,hbase_column_family,kafka_topic,hbase_namespace";
@Context @Context
private HttpServletRequest httpServletRequest; private HttpServletRequest httpServletRequest;
@Context @Context
private HttpServletResponse httpServletResponse; private HttpServletResponse httpServletResponse;
private final AtlasTypeRegistry typeRegistry;
private final ReentrantLock importExportOperationLock;
private static final String isCSRF_ENABLED = "atlas.rest-csrf.enabled";
private static final String BROWSER_USER_AGENT_PARAM = "atlas.rest-csrf.browser-useragents-regex";
private static final String CUSTOM_METHODS_TO_IGNORE_PARAM = "atlas.rest-csrf.methods-to-ignore";
private static final String CUSTOM_HEADER_PARAM = "atlas.rest-csrf.custom-header";
private static final String isEntityUpdateAllowed = "atlas.entity.update.allowed";
private static final String isEntityCreateAllowed = "atlas.entity.create.allowed";
private static final String editableEntityTypes = "atlas.ui.editable.entity.types";
private static final String DEFAULT_EDITABLE_ENTITY_TYPES = "hdfs_path,hbase_table,hbase_column,hbase_column_family,kafka_topic,hbase_namespace";
private Response version; private Response version;
private final ServiceState serviceState; private final ServiceState serviceState;
private final MetricsService metricsService; private final MetricsService metricsService;
private static Configuration atlasProperties; private static Configuration atlasProperties;
private final ExportService exportService; private final ExportService exportService;
private final ImportService importService; private final ImportService importService;
private final SearchTracker activeSearches; private final SearchTracker activeSearches;
private final AtlasTypeRegistry typeRegistry;
private final MigrationProgressService migrationProgressService;
private final ReentrantLock importExportOperationLock;
static { static {
try { try {
...@@ -129,15 +118,16 @@ public class AdminResource { ...@@ -129,15 +118,16 @@ public class AdminResource {
} }
@Inject @Inject
public AdminResource(ServiceState serviceState, MetricsService metricsService, public AdminResource(ServiceState serviceState, MetricsService metricsService, AtlasTypeRegistry typeRegistry,
ExportService exportService, ImportService importService, ExportService exportService, ImportService importService, SearchTracker activeSearches,
SearchTracker activeSearches, AtlasTypeRegistry typeRegistry) { MigrationProgressService migrationProgressService) {
this.serviceState = serviceState; this.serviceState = serviceState;
this.metricsService = metricsService; this.metricsService = metricsService;
this.exportService = exportService; this.exportService = exportService;
this.importService = importService; this.importService = importService;
this.activeSearches = activeSearches; this.activeSearches = activeSearches;
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
this.migrationProgressService = migrationProgressService;
importExportOperationLock = new ReentrantLock(); importExportOperationLock = new ReentrantLock();
} }
...@@ -223,8 +213,18 @@ public class AdminResource { ...@@ -223,8 +213,18 @@ public class AdminResource {
LOG.debug("==> AdminResource.getStatus()"); LOG.debug("==> AdminResource.getStatus()");
} }
Map<String, Object> responseData = Collections.singletonMap(AtlasClient.STATUS, serviceState.getState().toString()); Map<String, Object> responseData = new HashMap() {{
Response response = Response.ok(AtlasJson.toV1Json(responseData)).build(); put(AtlasClient.STATUS, serviceState.getState().toString());
}};
if(serviceState.isInstanceInMigration()) {
MigrationStatus status = migrationProgressService.getStatus();
if (status != null) {
responseData.put("MigrationStatus", status);
}
}
Response response = Response.ok(AtlasJson.toV1Json(responseData)).build();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("<== AdminResource.getStatus()"); LOG.debug("<== AdminResource.getStatus()");
......
...@@ -24,6 +24,7 @@ import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter; ...@@ -24,6 +24,7 @@ import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter;
import org.apache.atlas.web.filters.AtlasKnoxSSOAuthenticationFilter; import org.apache.atlas.web.filters.AtlasKnoxSSOAuthenticationFilter;
import org.apache.atlas.web.filters.StaleTransactionCleanupFilter; import org.apache.atlas.web.filters.StaleTransactionCleanupFilter;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder; import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
...@@ -44,6 +45,8 @@ import org.springframework.security.web.util.matcher.RequestMatcher; ...@@ -44,6 +45,8 @@ import org.springframework.security.web.util.matcher.RequestMatcher;
import javax.inject.Inject; import javax.inject.Inject;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
@EnableWebSecurity @EnableWebSecurity
@EnableGlobalMethodSecurity(prePostEnabled = true) @EnableGlobalMethodSecurity(prePostEnabled = true)
public class AtlasSecurityConfig extends WebSecurityConfigurerAdapter { public class AtlasSecurityConfig extends WebSecurityConfigurerAdapter {
...@@ -151,8 +154,14 @@ public class AtlasSecurityConfig extends WebSecurityConfigurerAdapter { ...@@ -151,8 +154,14 @@ public class AtlasSecurityConfig extends WebSecurityConfigurerAdapter {
//@formatter:on //@formatter:on
if (configuration.getBoolean("atlas.server.ha.enabled", false)) { boolean configMigrationEnabled = !StringUtils.isEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME));
LOG.info("Atlas is in HA Mode, enabling ActiveServerFilter"); if (configuration.getBoolean("atlas.server.ha.enabled", false) ||
configMigrationEnabled) {
if(configMigrationEnabled) {
LOG.info("Atlas is in Migration Mode, enabling ActiveServerFilter");
} else {
LOG.info("Atlas is in HA Mode, enabling ActiveServerFilter");
}
httpSecurity.addFilterAfter(activeServerFilter, BasicAuthenticationFilter.class); httpSecurity.addFilterAfter(activeServerFilter, BasicAuthenticationFilter.class);
} }
httpSecurity httpSecurity
......
...@@ -23,12 +23,15 @@ import org.apache.atlas.ApplicationProperties; ...@@ -23,12 +23,15 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.ha.HAConfiguration;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.inject.Singleton; import javax.inject.Singleton;
import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
/** /**
* A class that maintains the state of this instance. * A class that maintains the state of this instance.
* *
...@@ -38,14 +41,14 @@ import javax.inject.Singleton; ...@@ -38,14 +41,14 @@ import javax.inject.Singleton;
@Singleton @Singleton
@Component @Component
public class ServiceState { public class ServiceState {
private static final Logger LOG = LoggerFactory.getLogger(ServiceState.class); private static final Logger LOG = LoggerFactory.getLogger(ServiceState.class);
public enum ServiceStateValue { public enum ServiceStateValue {
ACTIVE, ACTIVE,
PASSIVE, PASSIVE,
BECOMING_ACTIVE, BECOMING_ACTIVE,
BECOMING_PASSIVE BECOMING_PASSIVE,
MIGRATING
} }
private Configuration configuration; private Configuration configuration;
...@@ -57,8 +60,16 @@ public class ServiceState { ...@@ -57,8 +60,16 @@ public class ServiceState {
public ServiceState(Configuration configuration) { public ServiceState(Configuration configuration) {
this.configuration = configuration; this.configuration = configuration;
state = !HAConfiguration.isHAEnabled(configuration) ?
ServiceStateValue.ACTIVE : ServiceStateValue.PASSIVE; if(StringUtils.isNotEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME))) {
state = ServiceStateValue.MIGRATING;
} else {
state = !HAConfiguration.isHAEnabled(configuration) ? ServiceStateValue.ACTIVE : ServiceStateValue.PASSIVE;
}
if(!StringUtils.isEmpty(configuration.getString(ATLAS_MIGRATION_MODE_FILENAME, ""))) {
state = ServiceStateValue.MIGRATING;
}
} }
public ServiceStateValue getState() { public ServiceStateValue getState() {
...@@ -96,4 +107,13 @@ public class ServiceState { ...@@ -96,4 +107,13 @@ public class ServiceState {
return state == ServiceStateValue.BECOMING_ACTIVE return state == ServiceStateValue.BECOMING_ACTIVE
|| state == ServiceStateValue.BECOMING_PASSIVE; || state == ServiceStateValue.BECOMING_PASSIVE;
} }
public void setMigration() {
LOG.warn("Instance in {}", state);
setState(ServiceStateValue.MIGRATING);
}
public boolean isInstanceInMigration() {
return getState() == ServiceStateValue.MIGRATING;
}
} }
...@@ -51,7 +51,7 @@ public class AdminResourceTest { ...@@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null); AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null);
Response response = adminResource.getStatus(); Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK); assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity()); JsonNode entity = AtlasJson.parseToV1JsonNode((String) response.getEntity());
...@@ -62,7 +62,7 @@ public class AdminResourceTest { ...@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException { public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null); AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null);
Response response = adminResource.getStatus(); Response response = adminResource.getStatus();
verify(serviceState).getState(); verify(serviceState).getState();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment