Commit 729d9238 by rdsolani Committed by Madhan Neethiraj

ATLAS-2462: Sqoop import for all tables throws NPE for no table provided in command

Signed-off-by: 's avatarMadhan Neethiraj <madhan@apache.org> (cherry picked from commit 7adbf8ffd27904577ecf694a9f861cdd46833069)
parent ae23e783
......@@ -29,7 +29,8 @@ import org.apache.atlas.hook.AtlasHookException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.configuration.Configuration;
......@@ -39,12 +40,12 @@ import org.apache.sqoop.util.ImportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.util.List;
import java.util.Date;
/**
* AtlasHook sends lineage information to the AtlasSever.
*/
......@@ -79,26 +80,30 @@ public class SqoopHook extends SqoopJobDataPublisher {
try {
Configuration atlasProperties = ApplicationProperties.get();
String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
AtlasEntity entDbStore = createDBStoreInstance(data);
AtlasEntity entHiveDb = createHiveDatabaseInstance(clusterName, data.getHiveDB());
AtlasEntity entHiveTable = createHiveTableInstance(entHiveDb, data.getHiveTable());
AtlasEntity entProcess = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName);
AtlasEntity entDbStore = toSqoopDBStoreEntity(data);
AtlasEntity entHiveDb = toHiveDatabaseEntity(clusterName, data.getHiveDB());
AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, clusterName);
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
entities.addReferredEntity(entDbStore);
entities.addReferredEntity(entHiveDb);
if (entHiveTable != null) {
entities.addReferredEntity(entHiveTable);
}
HookNotification message = new EntityUpdateRequestV2(AtlasHook.getUser(), entities);
HookNotification message = new EntityCreateRequestV2(AtlasHook.getUser(), entities);
AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
AtlasHook.notifyEntities(Collections.singletonList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
} catch(Exception e) {
LOG.error("SqoopHook.publish() failed", e);
throw new AtlasHookException("SqoopHook.publish() failed.", e);
}
}
private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) {
private AtlasEntity toHiveDatabaseEntity(String clusterName, String dbName) {
AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
......@@ -109,7 +114,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entHiveDb;
}
private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) {
private AtlasEntity toHiveTableEntity(AtlasEntity entHiveDb, String tableName) {
AtlasEntity entHiveTable = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
......@@ -120,7 +125,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entHiveTable;
}
private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException {
private AtlasEntity toSqoopDBStoreEntity(SqoopJobDataPublisher.Data data) throws ImportException {
String table = data.getStoreTable();
String query = data.getStoreQuery();
......@@ -146,7 +151,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entDbStore;
}
private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
String sqoopProcessName = getSqoopProcessName(data, clusterName);
Map<String, String> sqoopOptionsMap = new HashMap<>();
......@@ -160,12 +165,15 @@ public class SqoopHook extends SqoopJobDataPublisher {
entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
List<AtlasObjectId> sqoopObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entDbStore));
List<AtlasObjectId> hiveObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb));
if (isImportOperation(data)) {
entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore)));
entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable)));
entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects);
entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects);
} else {
entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable)));
entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore)));
entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects);
entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects);
}
entProcess.setAttribute(SqoopHook.USER, data.getUser());
......@@ -183,15 +191,21 @@ public class SqoopHook extends SqoopJobDataPublisher {
static String getSqoopProcessName(Data data, String clusterName) {
StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
if (StringUtils.isNotEmpty(data.getStoreTable())) {
if (StringUtils.isNotEmpty(data.getHiveTable())) {
name.append(" --table ").append(data.getStoreTable());
} else {
name.append(" --database ").append(data.getHiveDB());
}
if (StringUtils.isNotEmpty(data.getStoreQuery())) {
name.append(" --query ").append(data.getStoreQuery());
}
if (data.getHiveTable() != null) {
name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName));
} else {
name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName));
}
return name.toString();
}
......@@ -199,8 +213,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) {
StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl()));
if (StringUtils.isNotEmpty(data.getStoreTable())) {
if (StringUtils.isNotEmpty(data.getHiveTable())) {
name.append(" --table ").append(data.getStoreTable());
} else {
name.append(" --database ").append(data.getHiveDB());
}
if (StringUtils.isNotEmpty(data.getStoreQuery())) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment