Commit 9d1040b7 by Suma Shivaprasad

ATLAS-642 import-hive should create the lineage for external tables ( svimal2106 via sumasai)

parent 4f681657
...@@ -27,6 +27,7 @@ import org.apache.atlas.fs.model.FSDataModel; ...@@ -27,6 +27,7 @@ import org.apache.atlas.fs.model.FSDataModel;
import org.apache.atlas.fs.model.FSDataTypes; import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.InstanceSerialization;
...@@ -37,6 +38,7 @@ import org.apache.commons.configuration.Configuration; ...@@ -37,6 +38,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Order;
...@@ -239,6 +241,18 @@ public class HiveMetaStoreBridge { ...@@ -239,6 +241,18 @@ public class HiveMetaStoreBridge {
return String.format("%s@%s", dbName.toLowerCase(), clusterName); return String.format("%s@%s", dbName.toLowerCase(), clusterName);
} }
private String getCreateTableString(Table table, String location){
String colString = "";
List<FieldSchema> colList = table.getAllCols();
for(FieldSchema col:colList){
colString += col.getName() + " " + col.getType() + ",";
}
colString = colString.substring(0, colString.length() - 1);
String query = "create external table " + table.getTableName() + "(" + colString + ")" +
" location '" + location + "'";
return query;
}
/** /**
* Imports all tables for the given db * Imports all tables for the given db
* @param databaseName * @param databaseName
...@@ -247,10 +261,45 @@ public class HiveMetaStoreBridge { ...@@ -247,10 +261,45 @@ public class HiveMetaStoreBridge {
*/ */
private void importTables(Referenceable databaseReferenceable, String databaseName) throws Exception { private void importTables(Referenceable databaseReferenceable, String databaseName) throws Exception {
List<String> hiveTables = hiveClient.getAllTables(databaseName); List<String> hiveTables = hiveClient.getAllTables(databaseName);
LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
for (String tableName : hiveTables) { for (String tableName : hiveTables) {
Table table = hiveClient.getTable(databaseName, tableName); Table table = hiveClient.getTable(databaseName, tableName);
Referenceable tableReferenceable = registerTable(databaseReferenceable, table); Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
if (table.getTableType() == TableType.EXTERNAL_TABLE){
String tableQualifiedName = getTableQualifiedName(clusterName, table);
Referenceable process = getProcessReference(tableQualifiedName);
if (process == null){
LOG.info("Attempting to register create table process for {}", tableQualifiedName);
Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
ArrayList<Referenceable> sourceList = new ArrayList<>();
ArrayList<Referenceable> targetList = new ArrayList<>();
String tableLocation = table.getDataLocation().toString();
Referenceable path = fillHDFSDataSet(tableLocation);
String query = getCreateTableString(table, tableLocation);
sourceList.add(path);
targetList.add(tableReferenceable);
lineageProcess.set("inputs", sourceList);
lineageProcess.set("outputs", targetList);
lineageProcess.set("userName", table.getOwner());
lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
lineageProcess.set("operationType", "CREATETABLE");
lineageProcess.set("queryText", query);
lineageProcess.set("queryId", query);
lineageProcess.set("queryPlan", "{}");
lineageProcess.set("clusterName", clusterName);
List<String> recentQueries = new ArrayList<>(1);
recentQueries.add(query);
lineageProcess.set("recentQueries", recentQueries);
lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
lineageProcess.set(AtlasClient.NAME, query);
registerInstance(lineageProcess);
}
else {
LOG.info("Process {} is already registered", process.toString());
}
}
} }
} }
...@@ -269,9 +318,21 @@ public class HiveMetaStoreBridge { ...@@ -269,9 +318,21 @@ public class HiveMetaStoreBridge {
return getEntityReferenceFromDSL(typeName, dslQuery); return getEntityReferenceFromDSL(typeName, dslQuery);
} }
private Referenceable getProcessReference(String qualifiedName) throws Exception{
LOG.debug("Getting reference for process {}", qualifiedName);
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
String dslQuery = getProcessDSLQuery(typeName, qualifiedName);
return getEntityReferenceFromDSL(typeName, dslQuery);
}
static String getProcessDSLQuery(String typeName, String qualifiedName) throws Exception{
String dslQuery = String.format("%s as t where qualifiedName = '%s'", typeName, qualifiedName);
return dslQuery;
}
static String getTableDSLQuery(String clusterName, String dbName, String tableName, String typeName, boolean isTemporary) { static String getTableDSLQuery(String clusterName, String dbName, String tableName, String typeName, boolean isTemporary) {
String entityName = getTableQualifiedName(clusterName, dbName, tableName, isTemporary); String entityName = getTableQualifiedName(clusterName, dbName, tableName, isTemporary);
return String.format("%s as t where name = '%s'", typeName, entityName); return String.format("%s as t where qualifiedName = '%s'", typeName, entityName);
} }
/** /**
...@@ -398,6 +459,7 @@ public class HiveMetaStoreBridge { ...@@ -398,6 +459,7 @@ public class HiveMetaStoreBridge {
String tableName = table.getTableName(); String tableName = table.getTableName();
LOG.info("Attempting to register table [" + tableName + "]"); LOG.info("Attempting to register table [" + tableName + "]");
Referenceable tableReference = getTableReference(table); Referenceable tableReference = getTableReference(table);
LOG.info("Found result " + tableReference);
if (tableReference == null) { if (tableReference == null) {
tableReference = createTableInstance(dbReference, table); tableReference = createTableInstance(dbReference, table);
tableReference = registerInstance(tableReference); tableReference = registerInstance(tableReference);
......
...@@ -678,6 +678,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -678,6 +678,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}}; }};
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs); Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
if(isCreateOp(event)){
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
}
entities.addAll(tables.values()); entities.addAll(tables.values());
entities.add(processReferenceable); entities.add(processReferenceable);
messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), entities)); messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
......
...@@ -90,7 +90,7 @@ public class HiveMetaStoreBridgeTest { ...@@ -90,7 +90,7 @@ public class HiveMetaStoreBridgeTest {
public void testImportThatUpdatesRegisteredTable() throws Exception { public void testImportThatUpdatesRegisteredTable() throws Exception {
setupDB(hiveClient, TEST_DB_NAME); setupDB(hiveClient, TEST_DB_NAME);
setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
...@@ -99,6 +99,9 @@ public class HiveMetaStoreBridgeTest { ...@@ -99,6 +99,9 @@ public class HiveMetaStoreBridgeTest {
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn( HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
bridge.importHiveMetadata(); bridge.importHiveMetadata();
...@@ -140,6 +143,9 @@ public class HiveMetaStoreBridgeTest { ...@@ -140,6 +143,9 @@ public class HiveMetaStoreBridgeTest {
TEST_TABLE_NAME, TEST_TABLE_NAME,
HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn( HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable);
when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
Partition partition = mock(Partition.class); Partition partition = mock(Partition.class);
......
...@@ -240,8 +240,8 @@ public class HiveHookIT { ...@@ -240,8 +240,8 @@ public class HiveHookIT {
final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile); final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile);
runCommand(query); runCommand(query);
assertTableIsRegistered(DEFAULT_DB, tableName, null, true); assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
String processId = assertProcessIsRegistered(query); HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, true), null);
Referenceable processReference = atlasClient.getEntity(processId); Referenceable processReference = atlasClient.getEntity(processId);
assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName()); assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
......
...@@ -24,6 +24,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -24,6 +24,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-642 import-hive should create the lineage for external tables (svimal2106 via sumasai)
ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth) ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth)
ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags) ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags)
ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth) ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment