Commit 5618ad4c by rdsolani Committed by Madhan Neethiraj

ATLAS-2439: updated Sqoop hook to use V2 notifications

Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org> (cherry picked from commit e8908dbfe1d4dfe641cc7a802625f396aa0a399d)
parent 9cd28bbe
...@@ -26,10 +26,12 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; ...@@ -26,10 +26,12 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.hook.AtlasHookException; import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.sqoop.SqoopJobDataPublisher; import org.apache.sqoop.SqoopJobDataPublisher;
...@@ -47,11 +49,10 @@ import java.util.Properties; ...@@ -47,11 +49,10 @@ import java.util.Properties;
* AtlasHook sends lineage information to the AtlasSever. * AtlasHook sends lineage information to the AtlasSever.
*/ */
public class SqoopHook extends SqoopJobDataPublisher { public class SqoopHook extends SqoopJobDataPublisher {
private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class); private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
public static final String CONF_PREFIX = "atlas.hook.sqoop."; public static final String CONF_PREFIX = "atlas.hook.sqoop.";
public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String DEFAULT_CLUSTER_NAME = "primary";
...@@ -65,7 +66,6 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -65,7 +66,6 @@ public class SqoopHook extends SqoopJobDataPublisher {
public static final String START_TIME = "startTime"; public static final String START_TIME = "startTime";
public static final String END_TIME = "endTime"; public static final String END_TIME = "endTime";
public static final String CMD_LINE_OPTS = "commandlineOpts"; public static final String CMD_LINE_OPTS = "commandlineOpts";
// multiple inputs and outputs for process
public static final String INPUTS = "inputs"; public static final String INPUTS = "inputs";
public static final String OUTPUTS = "outputs"; public static final String OUTPUTS = "outputs";
...@@ -73,31 +73,56 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -73,31 +73,56 @@ public class SqoopHook extends SqoopJobDataPublisher {
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml"); org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
} }
public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) { @Override
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); try {
dbRef.set(AtlasClient.NAME, dbName); Configuration atlasProperties = ApplicationProperties.get();
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); AtlasEntity entDbStore = createDBStoreInstance(data);
return dbRef; AtlasEntity entHiveDb = createHiveDatabaseInstance(clusterName, data.getHiveDB());
AtlasEntity entHiveTable = createHiveTableInstance(entHiveDb, data.getHiveTable());
AtlasEntity entProcess = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName);
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
entities.addReferredEntity(entDbStore);
entities.addReferredEntity(entHiveDb);
entities.addReferredEntity(entHiveTable);
HookNotification message = new EntityUpdateRequestV2(AtlasHook.getUser(), entities);
AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
} catch(Exception e) {
throw new AtlasHookException("SqoopHook.publish() failed.", e);
}
} }
public Referenceable createHiveTableInstance(String clusterName, Referenceable dbRef, private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) {
String tableName, String dbName) { AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); entHiveDb.setAttribute(AtlasClient.NAME, dbName);
tableRef.set(HiveMetaStoreBridge.DB, dbRef); entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
return tableRef;
return entHiveDb;
} }
private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data data) private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) {
throws ImportException { AtlasEntity entHiveTable = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
entHiveTable.setAttribute(HiveMetaStoreBridge.DB, AtlasTypeUtil.getAtlasObjectId(entHiveDb));
return entHiveTable;
}
Referenceable storeRef = new Referenceable(SqoopDataTypes.SQOOP_DBDATASTORE.getName()); private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException {
String table = data.getStoreTable(); String table = data.getStoreTable();
String query = data.getStoreQuery(); String query = data.getStoreQuery();
if (StringUtils.isBlank(table) && StringUtils.isBlank(query)) { if (StringUtils.isBlank(table) && StringUtils.isBlank(query)) {
throw new ImportException("Both table and query cannot be empty for DBStoreInstance"); throw new ImportException("Both table and query cannot be empty for DBStoreInstance");
} }
...@@ -105,93 +130,82 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -105,93 +130,82 @@ public class SqoopHook extends SqoopJobDataPublisher {
String usage = table != null ? "TABLE" : "QUERY"; String usage = table != null ? "TABLE" : "QUERY";
String source = table != null ? table : query; String source = table != null ? table : query;
String name = getSqoopDBStoreName(data); String name = getSqoopDBStoreName(data);
storeRef.set(AtlasClient.NAME, name);
storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); AtlasEntity entDbStore = new AtlasEntity(SqoopDataTypes.SQOOP_DBDATASTORE.getName());
storeRef.set(SqoopHook.DB_STORE_TYPE, data.getStoreType());
storeRef.set(SqoopHook.DB_STORE_USAGE, usage); entDbStore.setAttribute(AtlasClient.NAME, name);
storeRef.set(SqoopHook.STORE_URI, data.getUrl()); entDbStore.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
storeRef.set(SqoopHook.SOURCE, source); entDbStore.setAttribute(SqoopHook.DB_STORE_TYPE, data.getStoreType());
storeRef.set(SqoopHook.DESCRIPTION, ""); entDbStore.setAttribute(SqoopHook.DB_STORE_USAGE, usage);
storeRef.set(AtlasClient.OWNER, data.getUser()); entDbStore.setAttribute(SqoopHook.STORE_URI, data.getUrl());
return storeRef; entDbStore.setAttribute(SqoopHook.SOURCE, source);
} entDbStore.setAttribute(SqoopHook.DESCRIPTION, "");
entDbStore.setAttribute(AtlasClient.OWNER, data.getUser());
private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef,
SqoopJobDataPublisher.Data data, String clusterName) { return entDbStore;
Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName());
final String sqoopProcessName = getSqoopProcessName(data, clusterName);
procRef.set(AtlasClient.NAME, sqoopProcessName);
procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
procRef.set(SqoopHook.OPERATION, data.getOperation());
if (isImportOperation(data)) {
procRef.set(SqoopHook.INPUTS, dbStoreRef);
procRef.set(SqoopHook.OUTPUTS, hiveTableRef);
} else {
procRef.set(SqoopHook.INPUTS, hiveTableRef);
procRef.set(SqoopHook.OUTPUTS, dbStoreRef);
} }
procRef.set(SqoopHook.USER, data.getUser());
procRef.set(SqoopHook.START_TIME, new Date(data.getStartTime()));
procRef.set(SqoopHook.END_TIME, new Date(data.getEndTime()));
private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
String sqoopProcessName = getSqoopProcessName(data, clusterName);
Map<String, String> sqoopOptionsMap = new HashMap<>(); Map<String, String> sqoopOptionsMap = new HashMap<>();
Properties options = data.getOptions(); Properties options = data.getOptions();
for (Object k : options.keySet()) { for (Object k : options.keySet()) {
sqoopOptionsMap.put((String)k, (String) options.get(k)); sqoopOptionsMap.put((String)k, (String) options.get(k));
} }
procRef.set(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
return procRef; entProcess.setAttribute(AtlasClient.NAME, sqoopProcessName);
entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
if (isImportOperation(data)) {
entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore));
entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable));
} else {
entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable));
entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore));
}
entProcess.setAttribute(SqoopHook.USER, data.getUser());
entProcess.setAttribute(SqoopHook.START_TIME, new Date(data.getStartTime()));
entProcess.setAttribute(SqoopHook.END_TIME, new Date(data.getEndTime()));
entProcess.setAttribute(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
return entProcess;
}
private boolean isImportOperation(SqoopJobDataPublisher.Data data) {
return data.getOperation().toLowerCase().equals("import");
} }
static String getSqoopProcessName(Data data, String clusterName) { static String getSqoopProcessName(Data data, String clusterName) {
StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
data.getUrl()));
if (StringUtils.isNotEmpty(data.getStoreTable())) { if (StringUtils.isNotEmpty(data.getStoreTable())) {
name.append(" --table ").append(data.getStoreTable()); name.append(" --table ").append(data.getStoreTable());
} }
if (StringUtils.isNotEmpty(data.getStoreQuery())) { if (StringUtils.isNotEmpty(data.getStoreQuery())) {
name.append(" --query ").append(data.getStoreQuery()); name.append(" --query ").append(data.getStoreQuery());
} }
name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s",
data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
return name.toString(); return name.toString();
} }
static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) { static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) {
StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl())); StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl()));
if (StringUtils.isNotEmpty(data.getStoreTable())) { if (StringUtils.isNotEmpty(data.getStoreTable())) {
name.append(" --table ").append(data.getStoreTable()); name.append(" --table ").append(data.getStoreTable());
} }
if (StringUtils.isNotEmpty(data.getStoreQuery())) { if (StringUtils.isNotEmpty(data.getStoreQuery())) {
name.append(" --query ").append(data.getStoreQuery()); name.append(" --query ").append(data.getStoreQuery());
} }
return name.toString();
}
static boolean isImportOperation(SqoopJobDataPublisher.Data data) { return name.toString();
return data.getOperation().toLowerCase().equals("import");
}
@Override
public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
try {
Configuration atlasProperties = ApplicationProperties.get();
String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
Referenceable dbStoreRef = createDBStoreInstance(data);
Referenceable dbRef = createHiveDatabaseInstance(clusterName, data.getHiveDB());
Referenceable hiveTableRef = createHiveTableInstance(clusterName, dbRef,
data.getHiveTable(), data.getHiveDB());
Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName);
int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
HookNotification message =
new EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef);
AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
}
catch(Exception e) {
throw new AtlasHookException("SqoopHook.publish() failed.", e);
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment