Commit 838a0d45 by Suma Shivaprasad

ATLAS-965 Old lineage still exists after dropping tables and re-creating tables…

ATLAS-965 Old lineage still exists after dropping tables and re-creating tables with same name. (shwethags via sumasai)
parent 8a3c167a
......@@ -18,6 +18,7 @@
package org.apache.atlas.hive.bridge;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
......@@ -25,6 +26,7 @@ import org.apache.atlas.AtlasConstants;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.fs.model.FSDataModel;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.hook.HiveHook;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
......@@ -272,10 +274,24 @@ public class HiveMetaStoreBridge {
List<String> hiveTables = hiveClient.getAllTables(databaseName);
LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
for (String tableName : hiveTables) {
int imported = importTable(databaseReferenceable, databaseName, tableName, failOnError);
tablesImported += imported;
}
if (tablesImported == hiveTables.size()) {
LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName);
} else {
LOG.error("Able to import {} tables out of {} tables from {}. Please check logs for import errors", tablesImported, hiveTables.size(), databaseName);
}
return tablesImported;
}
@VisibleForTesting
public int importTable(Referenceable databaseReferenceable, String databaseName, String tableName, final boolean failOnError) throws Exception {
try {
Table table = hiveClient.getTable(databaseName, tableName);
Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
tablesImported++;
if (table.getTableType() == TableType.EXTERNAL_TABLE) {
String tableQualifiedName = getTableQualifiedName(clusterName, table);
Referenceable process = getProcessReference(tableQualifiedName);
......@@ -302,30 +318,24 @@ public class HiveMetaStoreBridge {
List<String> recentQueries = new ArrayList<>(1);
recentQueries.add(query);
lineageProcess.set("recentQueries", recentQueries);
lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
String processQualifiedName = getTableProcessQualifiedName(clusterName, table);
lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName);
lineageProcess.set(AtlasClient.NAME, query);
registerInstance(lineageProcess);
} else {
LOG.info("Process {} is already registered", process.toString());
}
}
return 1;
} catch (Exception e) {
LOG.error("Import failed for hive_table {} ", tableName, e);
if (failOnError) {
throw e;
}
return 0;
}
}
if ( tablesImported == hiveTables.size()) {
LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName);
} else {
LOG.error("Unable to import {} tables out of {} tables from {}", tablesImported, hiveTables.size(), databaseName);
}
return tablesImported;
}
/**
* Gets reference for the table
*
......@@ -389,6 +399,12 @@ public class HiveMetaStoreBridge {
return getTableQualifiedName(clusterName, table.getDbName(), table.getTableName(), table.isTemporary());
}
public static String getTableProcessQualifiedName(String clusterName, Table table) {
String tableQualifiedName = getTableQualifiedName(clusterName, table);
Date createdTime = getTableCreatedTime(table);
return tableQualifiedName + HiveHook.SEP + createdTime.getTime();
}
/**
* Construct the qualified name used to uniquely identify a Table instance in Atlas.
* @param clusterName Name of the cluster to which the Hive component belongs
......@@ -412,6 +428,10 @@ public class HiveMetaStoreBridge {
return createOrUpdateTableInstance(dbReference, null, hiveTable);
}
private static Date getTableCreatedTime(Table table) {
return new Date(table.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
}
private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
final Table hiveTable) throws Exception {
LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
......@@ -428,7 +448,7 @@ public class HiveMetaStoreBridge {
Date createDate = new Date();
if (hiveTable.getTTable() != null){
try {
createDate = new Date(hiveTable.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
createDate = getTableCreatedTime(hiveTable);
LOG.debug("Setting create time to {} ", createDate);
tableReference.set(HiveDataModelGenerator.CREATE_TIME, createDate);
} catch(Exception ne) {
......
......@@ -21,7 +21,6 @@ package org.apache.atlas.hive.hook;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import kafka.security.auth.Write;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
......@@ -90,7 +89,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
static final String SEP = ":".intern();
public static final String SEP = ":".intern();
static final String IO_SEP = "->".intern();
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
......@@ -216,7 +215,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
break;
case CREATETABLE_AS_SELECT:
case CREATEVIEW:
case ALTERVIEW_AS:
case LOAD:
......@@ -244,9 +242,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
case ALTERTABLE_PARTCOLTYPE:
handleEventOutputs(dgiBridge, event, Type.TABLE);
break;
case ALTERTABLE_RENAMECOL:
renameColumn(dgiBridge, event);
break;
case ALTERTABLE_LOCATION:
LinkedHashMap<Type, Referenceable> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE);
if (tablesUpdated != null && tablesUpdated.size() > 0) {
......@@ -547,7 +547,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return null;
}
private Entity getEntityByType(Set<? extends Entity> entities, Type entityType) {
private static Entity getEntityByType(Set<? extends Entity> entities, Type entityType) {
for (Entity entity : entities) {
if (entity.getType() == entityType) {
return entity;
......@@ -708,20 +708,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
sortedIps, sortedOps, hiveInputsMap, hiveOutputsMap);
String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
if (isCreateOp(event)){
LOG.info("Overriding process qualified name to {}", tableQualifiedName);
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
}
entities.addAll(tables.values());
entities.add(processReferenceable);
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
}
}
private boolean isCreateOp(HiveEventContext hiveEvent) {
private static boolean isCreateOp(HiveEventContext hiveEvent) {
if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
|| HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
|| HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
......@@ -733,11 +727,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent,
final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> source, SortedMap<WriteEntity, Referenceable> target) {
final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> source, SortedMap<WriteEntity, Referenceable> target)
throws HiveException {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
String queryStr = lower(hiveEvent.getQueryStr());
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, target));
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getProcessQualifiedName(dgiBridge, hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, target));
LOG.debug("Registering query: {}", queryStr);
List<Referenceable> sourceList = new ArrayList<>(source.values());
......@@ -770,8 +766,19 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
@VisibleForTesting
static String getProcessQualifiedName(HiveEventContext eventContext, final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> hiveInputsMap, SortedMap<WriteEntity, Referenceable> hiveOutputsMap) {
static String getProcessQualifiedName(HiveMetaStoreBridge dgiBridge, HiveEventContext eventContext,
final SortedSet<ReadEntity> sortedHiveInputs,
final SortedSet<WriteEntity> sortedHiveOutputs,
SortedMap<ReadEntity, Referenceable> hiveInputsMap,
SortedMap<WriteEntity, Referenceable> hiveOutputsMap) throws HiveException {
HiveOperation op = eventContext.getOperation();
if (isCreateOp(eventContext)) {
Table outTable = getEntityByType(sortedHiveOutputs, Type.TABLE).getTable();
//refresh table
outTable = dgiBridge.hiveClient.getTable(outTable.getDbName(), outTable.getTableName());
return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), outTable);
}
StringBuilder buffer = new StringBuilder(op.getOperationName());
boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, sortedHiveInputs, sortedHiveOutputs);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.hook.HiveHookIT;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import java.io.File;
import java.util.List;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
public class HiveITBase {
private static final Logger LOG = LoggerFactory.getLogger(HiveITBase.class);
protected static final String DGI_URL = "http://localhost:21000/";
protected static final String CLUSTER_NAME = "primary";
public static final String DEFAULT_DB = "default";
protected static final String PART_FILE = "2015-01-01";
protected Driver driver;
protected AtlasClient atlasClient;
protected HiveMetaStoreBridge hiveMetaStoreBridge;
protected SessionState ss;
protected HiveConf conf;
protected static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
protected static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
protected Driver driverWithoutContext;
@BeforeClass
public void setUp() throws Exception {
//Set-up hive session
conf = new HiveConf();
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
driver = new Driver(conf);
ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
Configuration configuration = ApplicationProperties.get();
atlasClient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
HiveConf conf = new HiveConf();
conf.set("hive.exec.post.hooks", "");
SessionState ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
driverWithoutContext = new Driver(conf);
}
protected void runCommand(String cmd) throws Exception {
runCommandWithDelay(cmd, 0);
}
protected void runCommand(Driver driver, String cmd) throws Exception {
runCommandWithDelay(driver, cmd, 0);
}
protected void runCommandWithDelay(String cmd, int sleepMs) throws Exception {
runCommandWithDelay(driver, cmd, sleepMs);
}
protected void runCommandWithDelay(Driver driver, String cmd, int sleepMs) throws Exception {
LOG.debug("Running command '{}'", cmd);
ss.setCommandType(null);
CommandProcessorResponse response = driver.run(cmd);
assertEquals(response.getResponseCode(), 0);
if (sleepMs != 0) {
Thread.sleep(sleepMs);
}
}
protected String createTestDFSPath(String path) throws Exception {
return "pfile://" + mkdir(path);
}
protected String mkdir(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
file.mkdirs();
return file.getAbsolutePath();
}
protected String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
protected String tableName() {
return "table" + random();
}
protected String assertTableIsRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, null, false);
}
protected String assertTableIsRegistered(String dbName, String tableName, HiveHookIT.AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporary);
return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
assertPredicate);
}
protected String assertEntityIsRegistered(final String typeName, final String property, final String value,
final HiveHookIT.AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new HiveHookIT.Predicate() {
@Override
public void evaluate() throws Exception {
Referenceable entity = atlasClient.getEntity(typeName, property, value);
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
Referenceable entity = atlasClient.getEntity(typeName, property, value);
return entity.getId()._getId();
}
public interface AssertPredicate {
void assertOnEntity(Referenceable entity) throws Exception;
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
void evaluate() throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
while (true) {
try {
predicate.evaluate();
return;
} catch(Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
LOG.debug("Waiting up to " + (mustEnd - System.currentTimeMillis()) + " msec as assertion failed", e);
Thread.sleep(5000);
}
}
}
protected String getTableProcessQualifiedName(String dbName, String tableName) throws Exception {
return HiveMetaStoreBridge.getTableProcessQualifiedName(CLUSTER_NAME,
hiveMetaStoreBridge.hiveClient.getTable(dbName, tableName));
}
protected void validateHDFSPaths(Referenceable processReference, String attributeName, String... testPaths) throws Exception {
List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
for (int i = 0; i < testPaths.length; i++) {
final String testPathNormed = lower(new Path(testPaths[i]).toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName());
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
}
}
private String assertHDFSPathIsRegistered(String path) throws Exception {
LOG.debug("Searching for hdfs path {}", path);
return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null);
}
protected String assertDatabaseIsRegistered(String dbName) throws Exception {
return assertDatabaseIsRegistered(dbName, null);
}
protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for database {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
dbQualifiedName, assertPredicate);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.bridge;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.hive.HiveITBase;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.testng.annotations.Test;
import java.util.List;
import static org.testng.Assert.assertEquals;
public class HiveMetastoreBridgeIT extends HiveITBase {
@Test
public void testCreateTableAndImport() throws Exception {
String tableName = tableName();
String pFile = createTestDFSPath("parentPath");
final String query = String.format("create EXTERNAL table %s(id string, cnt int) location '%s'", tableName, pFile);
runCommand(query);
String dbId = assertDatabaseIsRegistered(DEFAULT_DB);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
//verify lineage is created
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
Referenceable processReference = atlasClient.getEntity(processId);
validateHDFSPaths(processReference, INPUTS, pFile);
List<Id> outputs = (List<Id>) processReference.get(OUTPUTS);
assertEquals(outputs.size(), 1);
assertEquals(outputs.get(0).getId()._getId(), tableId);
//Now import using import tool - should be no-op
hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), DEFAULT_DB, tableName, true);
String tableId2 = assertTableIsRegistered(DEFAULT_DB, tableName);
assertEquals(tableId2, tableId);
String processId2 = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
assertEquals(processId2, processId);
}
@Test
public void testImportCreatedTable() throws Exception {
String tableName = tableName();
String pFile = createTestDFSPath("parentPath");
runCommand(driverWithoutContext, String.format("create EXTERNAL table %s(id string) location '%s'", tableName, pFile));
String dbId = assertDatabaseIsRegistered(DEFAULT_DB);
hiveMetaStoreBridge.importTable(atlasClient.getEntity(dbId), DEFAULT_DB, tableName, true);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
List<Id> outputs = (List<Id>) atlasClient.getEntity(processId).get(OUTPUTS);
assertEquals(outputs.size(), 1);
assertEquals(outputs.get(0).getId()._getId(), tableId);
}
}
......@@ -21,10 +21,10 @@ package org.apache.atlas.hive.hook;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.HiveITBase;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
......@@ -32,29 +32,24 @@ import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.TypeSystem;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
......@@ -73,55 +68,20 @@ import java.util.TreeMap;
import java.util.TreeSet;
import static org.apache.atlas.AtlasClient.NAME;
import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
import static org.apache.atlas.hive.hook.HiveHook.SEP;
import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
import static org.apache.atlas.hive.hook.HiveHook.SEP;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
public class HiveHookIT {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class);
private static final String DGI_URL = "http://localhost:21000/";
private static final String CLUSTER_NAME = "primary";
public static final String DEFAULT_DB = "default";
public class HiveHookIT extends HiveITBase {
private static final Logger LOG = LoggerFactory.getLogger(HiveHookIT.class);
private static final String PART_FILE = "2015-01-01";
private Driver driver;
private AtlasClient atlasClient;
private HiveMetaStoreBridge hiveMetaStoreBridge;
private SessionState ss;
private HiveConf conf;
private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
@BeforeClass
public void setUp() throws Exception {
//Set-up hive session
conf = new HiveConf();
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
driver = new Driver(conf);
ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
Configuration configuration = ApplicationProperties.get();
atlasClient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
hiveMetaStoreBridge = new HiveMetaStoreBridge(configuration, conf, atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
}
private void runCommand(String cmd) throws Exception {
runCommandWithDelay(cmd, 0);
}
@Test
public void testCreateDatabase() throws Exception {
......@@ -158,10 +118,6 @@ public class HiveHookIT {
return dbName;
}
private String tableName() {
return "table" + random();
}
private String columnName() {
return "col" + random();
}
......@@ -260,8 +216,9 @@ public class HiveHookIT {
final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile);
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, true), null);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableProcessQualifiedName(DEFAULT_DB, tableName), null);
Referenceable processReference = atlasClient.getEntity(processId);
assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
......@@ -271,7 +228,7 @@ public class HiveHookIT {
validateHDFSPaths(processReference, INPUTS, pFile);
}
private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) {
private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) throws HiveException {
final ReadEntity entity = new ReadEntity();
if ( Entity.Type.DFS_DIR.equals(entityType)) {
......@@ -282,10 +239,14 @@ public class HiveHookIT {
entity.setTyp(entityType);
}
if (entityType == Entity.Type.TABLE) {
entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName));
}
return new LinkedHashSet<ReadEntity>() {{ add(entity); }};
}
private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) {
private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) throws HiveException {
final WriteEntity entity = new WriteEntity();
if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
......@@ -296,6 +257,9 @@ public class HiveHookIT {
entity.setTyp(entityType);
}
if (entityType == Entity.Type.TABLE) {
entity.setT(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, inputName));
}
return new LinkedHashSet<WriteEntity>() {{ add(entity); }};
}
......@@ -373,7 +337,6 @@ public class HiveHookIT {
String command = "create table " + tableName + "(id int, name string) row format delimited lines terminated by '\n' null defined as ''";
runCommand(command);
assertTableIsRegistered(DEFAULT_DB, tableName);
}
@Test
......@@ -392,19 +355,17 @@ public class HiveHookIT {
String processId = assertProcessIsRegistered(hiveEventContext);
final String drpquery = String.format("drop table %s ", ctasTableName);
runCommand(drpquery);
runCommandWithDelay(drpquery, 100);
assertTableIsNotRegistered(DEFAULT_DB, ctasTableName);
//TODO : Fix after ATLAS-876
runCommand(query);
assertTableIsRegistered(DEFAULT_DB, ctasTableName);
outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs);
Assert.assertEquals(process2Id, processId);
assertNotEquals(process2Id, processId);
Referenceable processRef = atlasClient.getEntity(processId);
outputs.add(outputs.iterator().next());
validateOutputTables(processRef, outputs);
}
......@@ -421,7 +382,6 @@ public class HiveHookIT {
@Test
public void testAlterViewAsSelect() throws Exception {
//Create the view from table1
String table1Name = createTable();
String viewName = tableName();
......@@ -466,10 +426,6 @@ public class HiveHookIT {
Assert.assertEquals(vertices.length(), 0);
}
private String createTestDFSPath(String path) throws Exception {
return "pfile://" + mkdir(path);
}
private String createTestDFSFile(String path) throws Exception {
return "pfile://" + file(path);
}
......@@ -763,10 +719,6 @@ public class HiveHookIT {
//TODO -Add update test case
}
private String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
private String file(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
......@@ -774,13 +726,6 @@ public class HiveHookIT {
return file.getAbsolutePath();
}
private String mkdir(String tag) throws Exception {
String filename = "./target/" + tag + "-data-" + random();
File file = new File(filename);
file.mkdirs();
return file.getAbsolutePath();
}
@Test
public void testExportImportUnPartitionedTable() throws Exception {
String tableName = createTable(false);
......@@ -1159,16 +1104,6 @@ public class HiveHookIT {
);
}
private void runCommandWithDelay(String cmd, int sleepMs) throws CommandNeedRetryException, InterruptedException {
LOG.debug("Running command '{}'", cmd);
ss.setCommandType(null);
CommandProcessorResponse response = driver.run(cmd);
assertEquals(response.getResponseCode(), 0);
if (sleepMs != 0) {
Thread.sleep(sleepMs);
}
}
@Test
public void testTruncateTable() throws Exception {
String tableName = createTable(false);
......@@ -1217,15 +1152,9 @@ public class HiveHookIT {
@Test
public void testAlterTableWithoutHookConf() throws Exception {
HiveConf conf = new HiveConf();
conf.set("hive.exec.post.hooks", "");
SessionState ss = new SessionState(conf);
ss = SessionState.start(ss);
SessionState.setCurrentSessionState(ss);
Driver driver = new Driver(conf);
String tableName = tableName();
String createCommand = "create table " + tableName + " (id int, name string)";
driver.run(createCommand);
driverWithoutContext.run(createCommand);
assertTableIsNotRegistered(DEFAULT_DB, tableName);
String command = "alter table " + tableName + " change id id_new string";
runCommand(command);
......@@ -1285,34 +1214,15 @@ public class HiveHookIT {
}
});
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, false), null);
String processQualifiedName = getTableProcessQualifiedName(DEFAULT_DB, tableName);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQualifiedName, null);
Referenceable processReference = atlasClient.getEntity(processId);
validateHDFSPaths(processReference, INPUTS, testPath);
}
private void validateHDFSPaths(Referenceable processReference, String attributeName, String... testPaths) throws Exception {
List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
for (int i = 0; i < testPaths.length; i++) {
final String testPathNormed = lower(new Path(testPaths[i]).toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName());
Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
}
}
private String assertHDFSPathIsRegistered(String path) throws Exception {
LOG.debug("Searching for hdfs path {}", path);
return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, path, null);
}
@Test
public void testAlterTableFileFormat() throws Exception {
String tableName = createTable();
......@@ -1614,7 +1524,7 @@ public class HiveHookIT {
}};
String query = String.format(fmtQuery, entityName, SET_OP, getSerializedProps(expectedProps));
runCommand(query);
runCommandWithDelay(query, 1000);
verifyEntityProperties(entityType, entityName, expectedProps, false);
expectedProps.put("testPropKey2", "testPropValue2");
......@@ -1710,7 +1620,7 @@ public class HiveHookIT {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
......@@ -1735,7 +1645,7 @@ public class HiveHookIT {
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
......@@ -1777,7 +1687,7 @@ public class HiveHookIT {
if ( event.getOutputs() != null) {
sortedHiveOutputs.addAll(event.getOutputs());
}
String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
LOG.debug("Searching for process with query {}", processQFName);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
} catch( Exception e) {
......@@ -1803,49 +1713,10 @@ public class HiveHookIT {
assertEntityIsNotRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName);
}
private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
return assertTableIsRegistered(dbName, tableName, null, false);
}
private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName, isTemporary);
return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
assertPredicate);
}
private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception {
return assertTableIsRegistered(dbName, tableName, assertPredicate, false);
}
private String assertDatabaseIsRegistered(String dbName) throws Exception {
return assertDatabaseIsRegistered(dbName, null);
}
private String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for database {}", dbName);
String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
dbQualifiedName, assertPredicate);
}
private String assertEntityIsRegistered(final String typeName, final String property, final String value,
final AssertPredicate assertPredicate) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
Referenceable entity = atlasClient.getEntity(typeName, property, value);
assertNotNull(entity);
if (assertPredicate != null) {
assertPredicate.assertOnEntity(entity);
}
}
});
Referenceable entity = atlasClient.getEntity(typeName, property, value);
return entity.getId()._getId();
}
private void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
......@@ -1894,42 +1765,4 @@ public class HiveHookIT {
runCommand("show compactions");
runCommand("show transactions");
}
public interface AssertPredicate {
void assertOnEntity(Referenceable entity) throws Exception;
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
void evaluate() throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
while (true) {
try {
predicate.evaluate();
return;
} catch(Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
LOG.debug("Waiting up to " + (mustEnd - System.currentTimeMillis()) + " msec as assertion failed", e);
Thread.sleep(5000);
}
}
}
}
......@@ -45,6 +45,9 @@
<skip>${skipDocs}</skip>
<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
<webAccessUrl>https://github.com/apache/incubator-atlas.git</webAccessUrl>
<anonymousConnection>scm:git://git.apache.org/incubator-atlas.git</anonymousConnection>
<developerConnection>scm:https://git-wip-us.apache.org/repos/asf/incubator-atlas.git</developerConnection>
</configuration>
<reportSets>
<reportSet>
......
......@@ -80,7 +80,7 @@
</mailingLists>
<scm>
<connection>scm:git:https://github.com/apache/incubator-atlas.git</connection>
<connection>scm:git:git://git.apache.org/incubator-atlas.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-atlas.git</developerConnection>
<tag>HEAD</tag>
<url>https://github.com/apache/incubator-atlas.git</url>
......
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ALL CHANGES:
ATLAS-965 Old lineage still exists after dropping tables and re-creating tables with same name. (shwethags via sumasai)
ATLAS-1048 TestMetadata.py test in distro project fails on Windows (jnhagelb via shwethags)
ATLAS-1026 StoreBackedTypeCache issues (dkantor via shwethags)
ATLAS-861 1 table out of 50,000 tables is left unimported throwing exception during deserialization (sumasai via shwethags)
......
......@@ -23,6 +23,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.Stage;
import com.google.inject.TypeLiteral;
import com.google.inject.servlet.GuiceServletContextListener;
import com.sun.jersey.api.core.PackagesResourceConfig;
......@@ -39,12 +40,10 @@ import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.repository.graph.GraphProvider;
import org.apache.atlas.service.Services;
import org.apache.atlas.web.filters.ActiveServerFilter;
import org.apache.atlas.web.filters.AtlasAuthenticationFilter;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ActiveInstanceElectorModule;
import org.apache.atlas.web.service.ServiceModule;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
......@@ -75,7 +74,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
LoginProcessor loginProcessor = new LoginProcessor();
loginProcessor.login();
injector = Guice.createInjector(getRepositoryModule(), new ActiveInstanceElectorModule(),
injector = Guice.createInjector(Stage.PRODUCTION, getRepositoryModule(), new ActiveInstanceElectorModule(),
new NotificationModule(), new ServiceModule(), new JerseyServletModule() {
private Configuration appConfiguration = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment