Commit dc6be8e2 by Ashutosh Mestry

ATLAS-2864: Improved incremental export queries.

parent c6466004
......@@ -80,18 +80,6 @@ public class AtlasExportRequest implements Serializable {
this.options = options;
}
public String getMatchTypeOptionValue() {
String matchType = null;
if (MapUtils.isNotEmpty(getOptions())) {
if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
}
}
return matchType;
}
public String getFetchTypeOptionValue() {
if(getOptions() == null || !getOptions().containsKey(OPTION_FETCH_TYPE)) {
return FETCH_TYPE_FULL;
......@@ -122,6 +110,27 @@ public class AtlasExportRequest implements Serializable {
return false;
}
public String getMatchTypeOptionValue() {
String matchType = null;
if (MapUtils.isNotEmpty(getOptions())) {
if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
}
}
return matchType;
}
public long getChangeTokenFromOptions() {
if(getFetchTypeOptionValue().equalsIgnoreCase(FETCH_TYPE_INCREMENTAL) &&
getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) {
return Long.parseLong(getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
}
return 0L;
}
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
......
......@@ -22,8 +22,8 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
......@@ -50,10 +50,10 @@ public class HdfsPathEntityCreator {
private final String PATH_SEPARATOR = "/";
private AtlasTypeRegistry typeRegistry;
private AtlasEntityStoreV1 entityStore;
private AtlasEntityStoreV2 entityStore;
@Inject
public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV1 entityStore) {
public HdfsPathEntityCreator(AtlasTypeRegistry typeRegistry, AtlasEntityStoreV2 entityStore) {
this.typeRegistry = typeRegistry;
this.entityStore = entityStore;
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.util.UniqueList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class IncrementalExportEntityProvider {
private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
private static final String QUERY_PARAMETER_START_GUID = "startGuid";
private static final String QUERY_PARAMETER_MODIFICATION_TIMESTAMP = "modificationTimestamp";
private AtlasGraph atlasGraph;
private static final String QUERY_DB = "g.V().has('__guid', startGuid)";
private static final String QUERY_TABLE = QUERY_DB + ".in('__hive_table.db')";
private static final String QUERY_SD = QUERY_TABLE + ".out('__hive_table.sd')";
private static final String QUERY_COLUMN = QUERY_TABLE + ".out('__hive_table.columns')";
private static final String TRANSFORM_CLAUSE = ".project('__guid').by('__guid').dedup().toList()";
private static final String TIMESTAMP_CLAUSE = ".has('__modificationTimestamp', gt(modificationTimestamp))";
private ScriptEngine scriptEngine;
@Inject
public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
this.atlasGraph = atlasGraph;
this.scriptEngine = scriptEngine;
}
public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
if(timeStamp == 0L) {
full(dbEntityGuid, guidsToProcess);
} else {
partial(dbEntityGuid, timeStamp, guidsToProcess);
}
}
private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp));
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp));
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_COLUMN, timeStamp));
}
private void full(String dbEntityGuid, UniqueList<String> guidsToProcess) {
guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, 0L));
}
private List<String> fetchGuids(final String dbEntityGuid, String query, long timeStamp) {
Map<String, Object> bindings = new HashMap<String, Object>() {{
put(QUERY_PARAMETER_START_GUID, dbEntityGuid);
}};
String queryWithClause = query;
if(timeStamp > 0L) {
bindings.put(QUERY_PARAMETER_MODIFICATION_TIMESTAMP, timeStamp);
queryWithClause = queryWithClause.concat(TIMESTAMP_CLAUSE);
}
return executeGremlinQuery(queryWithClause, bindings);
}
private List<String> executeGremlinQuery(String query, Map<String, Object> bindings) {
try {
List<String> guids = new ArrayList<>();
String queryWithTransform = query + TRANSFORM_CLAUSE;
List<Map<String, Object>> result = (List<Map<String, Object>>)
atlasGraph.executeGremlinScript(scriptEngine, bindings, queryWithTransform, false);
if (result == null) {
return guids;
}
for (Map<String, Object> item : result) {
guids.add((String) item.get(ExportService.PROPERTY_GUID));
}
return guids;
} catch (ScriptException e) {
LOG.error("error executing query: {}: bindings: {}", query, bindings, e);
return null;
}
}
}
......@@ -44,6 +44,12 @@ public class UniqueList<T> {
}
}
public void addAll(List<T> list) {
for (T item : list) {
add(item);
}
}
public T remove(int index) {
T e = list.remove(index);
set.remove(e);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.impexp;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import javax.script.ScriptEngine;
import java.io.IOException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
@Inject
AtlasTypeRegistry typeRegistry;
@Inject
private AtlasTypeDefStore typeDefStore;
@Inject
private AtlasEntityStoreV2 entityStore;
@Inject
private AtlasGraph atlasGraph;
private IncrementalExportEntityProvider incrementalExportEntityProvider;
private ScriptEngine gremlinScriptEngine;
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
final String[] entityGuids = {DB_GUID, TABLE_GUID};
verifyCreatedEntities(entityStore, entityGuids, 2);
gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
}
@AfterClass
public void tearDown() {
if(gremlinScriptEngine != null) {
atlasGraph.releaseGremlinScriptEngine(gremlinScriptEngine);
}
}
@Test
public void verify() {
executeQueries(0L, 1);
executeQueries(1L, 9);
}
private void executeQueries(long timeStamp, int expectedEntityCount) {
UniqueList<String> uniqueList = new UniqueList<>();
incrementalExportEntityProvider.populate(DB_GUID, timeStamp, uniqueList);
for (String g : uniqueList.getList()) {
assertTrue(g instanceof String);
}
assertEquals(uniqueList.size(), expectedEntityCount);
}
}
......@@ -6,6 +6,7 @@
],
"options": {
"fetchType": "incremental",
"skipLineage": "true",
"changeMarker": 0
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment