Commit 5b627b5a by Shwetha GS

ATLAS-837 Enhance Sqoop addon to handle export operation (venkatnrangan via shwethags)

parent 67a1133a
...@@ -109,8 +109,13 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -109,8 +109,13 @@ public class SqoopHook extends SqoopJobDataPublisher {
procRef.set(SqoopDataModelGenerator.NAME, sqoopProcessName); procRef.set(SqoopDataModelGenerator.NAME, sqoopProcessName);
procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName); procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation()); procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation());
if (isImportOperation(data)) {
procRef.set(SqoopDataModelGenerator.INPUTS, dbStoreRef); procRef.set(SqoopDataModelGenerator.INPUTS, dbStoreRef);
procRef.set(SqoopDataModelGenerator.OUTPUTS, hiveTableRef); procRef.set(SqoopDataModelGenerator.OUTPUTS, hiveTableRef);
} else {
procRef.set(SqoopDataModelGenerator.INPUTS, hiveTableRef);
procRef.set(SqoopDataModelGenerator.OUTPUTS, dbStoreRef);
}
procRef.set(SqoopDataModelGenerator.USER, data.getUser()); procRef.set(SqoopDataModelGenerator.USER, data.getUser());
procRef.set(SqoopDataModelGenerator.START_TIME, new Date(data.getStartTime())); procRef.set(SqoopDataModelGenerator.START_TIME, new Date(data.getStartTime()));
procRef.set(SqoopDataModelGenerator.END_TIME, new Date(data.getEndTime())); procRef.set(SqoopDataModelGenerator.END_TIME, new Date(data.getEndTime()));
...@@ -126,15 +131,16 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -126,15 +131,16 @@ public class SqoopHook extends SqoopJobDataPublisher {
} }
static String getSqoopProcessName(Data data, String clusterName) { static String getSqoopProcessName(Data data, String clusterName) {
StringBuilder name = new StringBuilder(String.format("sqoop import --connect %s", data.getUrl())); StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(),
data.getUrl()));
if (StringUtils.isNotEmpty(data.getStoreTable())) { if (StringUtils.isNotEmpty(data.getStoreTable())) {
name.append(" --table ").append(data.getStoreTable()); name.append(" --table ").append(data.getStoreTable());
} }
if (StringUtils.isNotEmpty(data.getStoreQuery())) { if (StringUtils.isNotEmpty(data.getStoreQuery())) {
name.append(" --query ").append(data.getStoreQuery()); name.append(" --query ").append(data.getStoreQuery());
} }
name.append(String.format(" --hive-import --hive-database %s --hive-table %s --hive-cluster %s", name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s",
data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
return name.toString(); return name.toString();
} }
...@@ -149,6 +155,10 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -149,6 +155,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
return name.toString(); return name.toString();
} }
static boolean isImportOperation(SqoopJobDataPublisher.Data data) {
return data.getOperation().toLowerCase().equals("import");
}
@Override @Override
public void publish(SqoopJobDataPublisher.Data data) throws Exception { public void publish(SqoopJobDataPublisher.Data data) throws Exception {
Configuration atlasProperties = ApplicationProperties.get(); Configuration atlasProperties = ApplicationProperties.get();
......
...@@ -88,6 +88,21 @@ public class SqoopHookIT { ...@@ -88,6 +88,21 @@ public class SqoopHookIT {
assertHiveTableIsRegistered(DEFAULT_DB, "hiveTable"); assertHiveTableIsRegistered(DEFAULT_DB, "hiveTable");
} }
@Test
public void testSqoopExport() throws Exception {
SqoopJobDataPublisher.Data d = new SqoopJobDataPublisher.Data("export", "jdbc:mysql:///localhost/db",
"mysqluser", "mysql", "myTable", null, "default", "hiveTable", new Properties(),
System.currentTimeMillis() - 100, System.currentTimeMillis());
SqoopHook hook = new SqoopHook();
hook.publish(d);
Thread.sleep(1000);
String storeName = SqoopHook.getSqoopDBStoreName(d);
assertDBStoreIsRegistered(storeName);
String name = SqoopHook.getSqoopProcessName(d, CLUSTER_NAME);
assertSqoopProcessIsRegistered(name);
assertHiveTableIsRegistered(DEFAULT_DB, "hiveTable");
}
private String assertDBStoreIsRegistered(String storeName) throws Exception { private String assertDBStoreIsRegistered(String storeName) throws Exception {
LOG.debug("Searching for db store {}", storeName); LOG.debug("Searching for db store {}", storeName);
String query = String.format( String query = String.format(
......
...@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ...@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES: ALL CHANGES:
ATLAS-837 Enhance Sqoop addon to handle export operation (venkatnrangan via shwethags)
ATLAS-869 Make LDAP/AD properties to be configurable. (nixonrodrigues via yhemanth) ATLAS-869 Make LDAP/AD properties to be configurable. (nixonrodrigues via yhemanth)
ATLAS-730 Change titan hbase table name (svimal2106 via sumasai ) ATLAS-730 Change titan hbase table name (svimal2106 via sumasai )
ATLAS-871 Make audit repository implementation configurable (jnhagelb via shwethags) ATLAS-871 Make audit repository implementation configurable (jnhagelb via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment