Commit ef402516 by Sarath Subramanian

ATLAS-3321: Introduce atlas metadata namespace

parent 25e2e461
...@@ -114,7 +114,7 @@ else ...@@ -114,7 +114,7 @@ else
exit 1 exit 1
fi fi
CP="${ATLASCPPATH}:${HBASE_CP}:${HADOOP_CP}" CP="${HBASE_CP}:${HADOOP_CP}:${ATLASCPPATH}"
# If running in cygwin, convert pathnames and classpath to Windows format. # If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ] if [ "${CYGWIN}" == "true" ]
......
...@@ -109,7 +109,7 @@ else ...@@ -109,7 +109,7 @@ else
exit 1 exit 1
fi fi
CP="${ATLASCPPATH}:${HIVE_CP}:${HADOOP_CP}" CP="${HIVE_CP}:${HADOOP_CP}:${ATLASCPPATH}"
# If running in cygwin, convert pathnames and classpath to Windows format. # If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ] if [ "${CYGWIN}" == "true" ]
......
...@@ -38,12 +38,12 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable; ...@@ -38,12 +38,12 @@ import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable;
public class AtlasHiveHookContext { public class AtlasHiveHookContext {
public static final char QNAME_SEP_CLUSTER_NAME = '@'; public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final char QNAME_SEP_PROCESS = ':'; public static final char QNAME_SEP_PROCESS = ':';
public static final String TEMP_TABLE_PREFIX = "_temp-"; public static final String TEMP_TABLE_PREFIX = "_temp-";
public static final String CREATE_OPERATION = "CREATE"; public static final String CREATE_OPERATION = "CREATE";
public static final String ALTER_OPERATION = "ALTER"; public static final String ALTER_OPERATION = "ALTER";
private final HiveHook hook; private final HiveHook hook;
private final HiveOperation hiveOperation; private final HiveOperation hiveOperation;
...@@ -157,8 +157,8 @@ public class AtlasHiveHookContext { ...@@ -157,8 +157,8 @@ public class AtlasHiveHookContext {
public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); } public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
public String getClusterName() { public String getMetadataNamespace() {
return hook.getClusterName(); return hook.getMetadataNamespace();
} }
public String getHostName() { return hook.getHostName(); } public String getHostName() { return hook.getHostName(); }
...@@ -192,7 +192,7 @@ public class AtlasHiveHookContext { ...@@ -192,7 +192,7 @@ public class AtlasHiveHookContext {
} }
public String getQualifiedName(Database db) { public String getQualifiedName(Database db) {
return (db.getName() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); return (db.getName() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
} }
public String getQualifiedName(Table table) { public String getQualifiedName(Table table) {
...@@ -206,7 +206,7 @@ public class AtlasHiveHookContext { ...@@ -206,7 +206,7 @@ public class AtlasHiveHookContext {
} }
} }
return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); return (table.getDbName() + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
} }
public boolean isKnownDatabase(String dbQualifiedName) { public boolean isKnownDatabase(String dbQualifiedName) {
......
...@@ -55,7 +55,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -55,7 +55,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public enum PreprocessAction { NONE, IGNORE, PRUNE } public enum PreprocessAction { NONE, IGNORE, PRUNE }
public static final String CONF_PREFIX = "atlas.hook.hive."; public static final String CONF_PREFIX = "atlas.hook.hive.";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String HOOK_NAME_CACHE_ENABLED = CONF_PREFIX + "name.cache.enabled"; public static final String HOOK_NAME_CACHE_ENABLED = CONF_PREFIX + "name.cache.enabled";
public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count"; public static final String HOOK_NAME_CACHE_DATABASE_COUNT = CONF_PREFIX + "name.cache.database.count";
...@@ -66,13 +65,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -66,13 +65,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN = CONF_PREFIX + "hive_table.ignore.pattern"; public static final String HOOK_HIVE_TABLE_IGNORE_PATTERN = CONF_PREFIX + "hive_table.ignore.pattern";
public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN = CONF_PREFIX + "hive_table.prune.pattern"; public static final String HOOK_HIVE_TABLE_PRUNE_PATTERN = CONF_PREFIX + "hive_table.prune.pattern";
public static final String HOOK_HIVE_TABLE_CACHE_SIZE = CONF_PREFIX + "hive_table.cache.size"; public static final String HOOK_HIVE_TABLE_CACHE_SIZE = CONF_PREFIX + "hive_table.cache.size";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String DEFAULT_HOST_NAME = "localhost"; public static final String DEFAULT_HOST_NAME = "localhost";
private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>(); private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
private static final String clusterName;
private static final boolean convertHdfsPathToLowerCase; private static final boolean convertHdfsPathToLowerCase;
private static final boolean nameCacheEnabled; private static final boolean nameCacheEnabled;
private static final int nameCacheDatabaseMaxCount; private static final int nameCacheDatabaseMaxCount;
...@@ -96,7 +92,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -96,7 +92,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation); OPERATION_MAP.put(hiveOperation.getOperationName(), hiveOperation);
} }
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false); convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
nameCacheEnabled = atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true); nameCacheEnabled = atlasProperties.getBoolean(HOOK_NAME_CACHE_ENABLED, true);
nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000); nameCacheDatabaseMaxCount = atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
...@@ -253,10 +248,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { ...@@ -253,10 +248,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
} }
} }
public String getClusterName() {
return clusterName;
}
public boolean isConvertHdfsPathToLowerCase() { public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase; return convertHdfsPathToLowerCase;
} }
......
...@@ -62,7 +62,7 @@ import java.util.List; ...@@ -62,7 +62,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_CLUSTER_NAME; import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_METADATA_NAMESPACE;
import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_ENTITY_NAME; import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_ENTITY_NAME;
import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_PROCESS; import static org.apache.atlas.hive.hook.AtlasHiveHookContext.QNAME_SEP_PROCESS;
...@@ -350,7 +350,7 @@ public abstract class BaseHiveEvent { ...@@ -350,7 +350,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_DESCRIPTION, db.getDescription()); ret.setAttribute(ATTRIBUTE_DESCRIPTION, db.getDescription());
ret.setAttribute(ATTRIBUTE_OWNER, db.getOwnerName()); ret.setAttribute(ATTRIBUTE_OWNER, db.getOwnerName());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getMetadataNamespace());
ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(db.getLocationUri())); ret.setAttribute(ATTRIBUTE_LOCATION, HdfsNameServiceResolver.getPathWithNameServiceID(db.getLocationUri()));
ret.setAttribute(ATTRIBUTE_PARAMETERS, db.getParameters()); ret.setAttribute(ATTRIBUTE_PARAMETERS, db.getParameters());
...@@ -596,7 +596,8 @@ public abstract class BaseHiveEvent { ...@@ -596,7 +596,8 @@ public abstract class BaseHiveEvent {
protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) { protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo) {
AtlasEntity ret; AtlasEntity ret;
String strPath = path.toString(); String strPath = path.toString();
String metadataNamespace = getMetadataNamespace();
if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) { if (strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase()) {
strPath = strPath.toLowerCase(); strPath = strPath.toLowerCase();
...@@ -604,8 +605,8 @@ public abstract class BaseHiveEvent { ...@@ -604,8 +605,8 @@ public abstract class BaseHiveEvent {
if (isS3Path(strPath)) { if (isS3Path(strPath)) {
String bucketName = path.toUri().getAuthority(); String bucketName = path.toUri().getAuthority();
String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); String bucketQualifiedName = (path.toUri().getScheme() + SCHEME_SEPARATOR + path.toUri().getAuthority() + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
String pathQualifiedName = (strPath + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); String pathQualifiedName = (strPath + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName);
ret = context.getEntity(pathQualifiedName); ret = context.getEntity(pathQualifiedName);
...@@ -654,7 +655,7 @@ public abstract class BaseHiveEvent { ...@@ -654,7 +655,7 @@ public abstract class BaseHiveEvent {
ret.setAttribute(ATTRIBUTE_PATH, attrPath); ret.setAttribute(ATTRIBUTE_PATH, attrPath);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, name); ret.setAttribute(ATTRIBUTE_NAME, name);
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
context.putEntity(pathQualifiedName, ret); context.putEntity(pathQualifiedName, ret);
} }
...@@ -751,8 +752,8 @@ public abstract class BaseHiveEvent { ...@@ -751,8 +752,8 @@ public abstract class BaseHiveEvent {
return hiveDDL; return hiveDDL;
} }
protected String getClusterName() { protected String getMetadataNamespace() {
return context.getClusterName(); return context.getMetadataNamespace();
} }
protected Database getDatabases(String dbName) throws Exception { protected Database getDatabases(String dbName) throws Exception {
...@@ -870,7 +871,7 @@ public abstract class BaseHiveEvent { ...@@ -870,7 +871,7 @@ public abstract class BaseHiveEvent {
protected String getQualifiedName(Table table, FieldSchema column) { protected String getQualifiedName(Table table, FieldSchema column) {
String tblQualifiedName = getQualifiedName(table); String tblQualifiedName = getQualifiedName(table);
int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME); int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE);
if (sepPos == -1) { if (sepPos == -1) {
return tblQualifiedName + QNAME_SEP_ENTITY_NAME + column.getName().toLowerCase(); return tblQualifiedName + QNAME_SEP_ENTITY_NAME + column.getName().toLowerCase();
...@@ -888,19 +889,20 @@ public abstract class BaseHiveEvent { ...@@ -888,19 +889,20 @@ public abstract class BaseHiveEvent {
} }
protected String getQualifiedName(BaseColumnInfo column) { protected String getQualifiedName(BaseColumnInfo column) {
String dbName = column.getTabAlias().getTable().getDbName(); String dbName = column.getTabAlias().getTable().getDbName();
String tableName = column.getTabAlias().getTable().getTableName(); String tableName = column.getTabAlias().getTable().getTableName();
String colName = column.getColumn() != null ? column.getColumn().getName() : null; String colName = column.getColumn() != null ? column.getColumn().getName() : null;
String metadataNamespace = getMetadataNamespace();
if (colName == null) { if (colName == null) {
return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
} else { } else {
return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
} }
} }
protected String getQualifiedName(String dbName, String tableName, String colName) { protected String getQualifiedName(String dbName, String tableName, String colName) {
return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + colName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
} }
protected String getQualifiedName(URI location) { protected String getQualifiedName(URI location) {
...@@ -918,14 +920,14 @@ public abstract class BaseHiveEvent { ...@@ -918,14 +920,14 @@ public abstract class BaseHiveEvent {
protected String getQualifiedName(String path) { protected String getQualifiedName(String path) {
if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) { if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
return path + QNAME_SEP_CLUSTER_NAME + getClusterName(); return path + QNAME_SEP_METADATA_NAMESPACE + getMetadataNamespace();
} }
return path.toLowerCase(); return path.toLowerCase();
} }
protected String getColumnQualifiedName(String tblQualifiedName, String columnName) { protected String getColumnQualifiedName(String tblQualifiedName, String columnName) {
int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_CLUSTER_NAME); int sepPos = tblQualifiedName.lastIndexOf(QNAME_SEP_METADATA_NAMESPACE);
if (sepPos == -1) { if (sepPos == -1) {
return tblQualifiedName + QNAME_SEP_ENTITY_NAME + columnName.toLowerCase(); return tblQualifiedName + QNAME_SEP_ENTITY_NAME + columnName.toLowerCase();
...@@ -980,26 +982,27 @@ public abstract class BaseHiveEvent { ...@@ -980,26 +982,27 @@ public abstract class BaseHiveEvent {
} }
protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) { protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) {
AtlasEntity ret = null; AtlasEntity ret = null;
HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table); HBaseTableInfo hBaseTableInfo = new HBaseTableInfo(table);
String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace(); String hbaseNameSpace = hBaseTableInfo.getHbaseNameSpace();
String hbaseTableName = hBaseTableInfo.getHbaseTableName(); String hbaseTableName = hBaseTableInfo.getHbaseTableName();
String metadataNamespace = getMetadataNamespace();
if (hbaseTableName != null) { if (hbaseTableName != null) {
AtlasEntity nsEntity = new AtlasEntity(HBASE_TYPE_NAMESPACE); AtlasEntity nsEntity = new AtlasEntity(HBASE_TYPE_NAMESPACE);
nsEntity.setAttribute(ATTRIBUTE_NAME, hbaseNameSpace); nsEntity.setAttribute(ATTRIBUTE_NAME, hbaseNameSpace);
nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, getClusterName()); nsEntity.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(getClusterName(), hbaseNameSpace)); nsEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseNameSpaceQualifiedName(metadataNamespace, hbaseNameSpace));
ret = new AtlasEntity(HBASE_TYPE_TABLE); ret = new AtlasEntity(HBASE_TYPE_TABLE);
ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName); ret.setAttribute(ATTRIBUTE_NAME, hbaseTableName);
ret.setAttribute(ATTRIBUTE_URI, hbaseTableName); ret.setAttribute(ATTRIBUTE_URI, hbaseTableName);
AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE); AtlasRelatedObjectId objIdRelatedObject = new AtlasRelatedObjectId(getObjectId(nsEntity), RELATIONSHIP_HBASE_TABLE_NAMESPACE);
ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject); ret.setRelationshipAttribute(ATTRIBUTE_NAMESPACE, objIdRelatedObject);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(getClusterName(), hbaseNameSpace, hbaseTableName)); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getHBaseTableQualifiedName(metadataNamespace, hbaseNameSpace, hbaseTableName));
entities.addReferredEntity(nsEntity); entities.addReferredEntity(nsEntity);
entities.addEntity(ret); entities.addEntity(ret);
...@@ -1021,12 +1024,12 @@ public abstract class BaseHiveEvent { ...@@ -1021,12 +1024,12 @@ public abstract class BaseHiveEvent {
return ret; return ret;
} }
private static String getHBaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { private static String getHBaseTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) {
return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName); return String.format("%s:%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), metadataNamespace);
} }
private static String getHBaseNameSpaceQualifiedName(String clusterName, String nameSpace) { private static String getHBaseNameSpaceQualifiedName(String metadataNamespace, String nameSpace) {
return String.format("%s@%s", nameSpace.toLowerCase(), clusterName); return String.format("%s@%s", nameSpace.toLowerCase(), metadataNamespace);
} }
private boolean ignoreHDFSPathsinProcessQualifiedName() { private boolean ignoreHDFSPathsinProcessQualifiedName() {
......
...@@ -519,7 +519,7 @@ public class HiveITBase { ...@@ -519,7 +519,7 @@ public class HiveITBase {
Table outTable = entity.getTable(); Table outTable = entity.getTable();
//refresh table //refresh table
outTable = dgiBridge.getHiveClient().getTable(outTable.getDbName(), outTable.getTableName()); outTable = dgiBridge.getHiveClient().getTable(outTable.getDbName(), outTable.getTableName());
return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getClusterName(), outTable); return HiveMetaStoreBridge.getTableProcessQualifiedName(dgiBridge.getMetadataNamespace(), outTable);
} }
} }
......
...@@ -33,9 +33,9 @@ import org.apache.commons.lang.StringUtils; ...@@ -33,9 +33,9 @@ import org.apache.commons.lang.StringUtils;
* Contain the info related to an linear record from Impala * Contain the info related to an linear record from Impala
*/ */
public class AtlasImpalaHookContext { public class AtlasImpalaHookContext {
public static final char QNAME_SEP_CLUSTER_NAME = '@'; public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final char QNAME_SEP_ENTITY_NAME = '.';
public static final char QNAME_SEP_PROCESS = ':'; public static final char QNAME_SEP_PROCESS = ':';
private final ImpalaLineageHook hook; private final ImpalaLineageHook hook;
private final ImpalaOperationType impalaOperation; private final ImpalaOperationType impalaOperation;
...@@ -69,8 +69,8 @@ public class AtlasImpalaHookContext { ...@@ -69,8 +69,8 @@ public class AtlasImpalaHookContext {
public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); } public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); }
public String getClusterName() { public String getMetadataNamespace() {
return hook.getClusterName(); return hook.getMetadataNamespace();
} }
public String getHostName() { public String getHostName() {
...@@ -82,7 +82,7 @@ public class AtlasImpalaHookContext { ...@@ -82,7 +82,7 @@ public class AtlasImpalaHookContext {
} }
public String getQualifiedNameForDb(String dbName) { public String getQualifiedNameForDb(String dbName) {
return (dbName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); return (dbName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
} }
public String getQualifiedNameForTable(String fullTableName) throws IllegalArgumentException { public String getQualifiedNameForTable(String fullTableName) throws IllegalArgumentException {
...@@ -100,8 +100,7 @@ public class AtlasImpalaHookContext { ...@@ -100,8 +100,7 @@ public class AtlasImpalaHookContext {
} }
public String getQualifiedNameForTable(String dbName, String tableName) { public String getQualifiedNameForTable(String dbName, String tableName) {
return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + return (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
getClusterName();
} }
public String getQualifiedNameForColumn(LineageVertex vertex) { public String getQualifiedNameForColumn(LineageVertex vertex) {
...@@ -179,7 +178,7 @@ public class AtlasImpalaHookContext { ...@@ -179,7 +178,7 @@ public class AtlasImpalaHookContext {
public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) { public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) {
return return
(dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME + (dbName + QNAME_SEP_ENTITY_NAME + tableName + QNAME_SEP_ENTITY_NAME +
columnName + QNAME_SEP_CLUSTER_NAME).toLowerCase() + getClusterName(); columnName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace();
} }
public String getUserName() { return lineageQuery.getUser(); } public String getUserName() { return lineageQuery.getUser(); }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.apache.atlas.impala.hook; package org.apache.atlas.impala.hook;
import static org.apache.atlas.AtlasConstants.DEFAULT_CLUSTER_NAME;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
...@@ -42,20 +41,17 @@ public class ImpalaLineageHook extends AtlasHook { ...@@ -42,20 +41,17 @@ public class ImpalaLineageHook extends AtlasHook {
public static final String ATLAS_ENDPOINT = "atlas.rest.address"; public static final String ATLAS_ENDPOINT = "atlas.rest.address";
public static final String REALM_SEPARATOR = "@"; public static final String REALM_SEPARATOR = "@";
public static final String CONF_PREFIX = "atlas.hook.impala."; public static final String CONF_PREFIX = "atlas.hook.impala.";
public static final String CONF_CLUSTER_NAME = "atlas.cluster.name";
public static final String CONF_REALM_NAME = "atlas.realm.name"; public static final String CONF_REALM_NAME = "atlas.realm.name";
public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase"; public static final String HDFS_PATH_CONVERT_TO_LOWER_CASE = CONF_PREFIX + "hdfs_path.convert_to_lowercase";
public static final String DEFAULT_HOST_NAME = "localhost"; public static final String DEFAULT_HOST_NAME = "localhost";
private static final String clusterName; private static final String realm;
private static final String realm;
private static final boolean convertHdfsPathToLowerCase; private static final boolean convertHdfsPathToLowerCase;
private static String hostName; private static String hostName;
static { static {
clusterName = atlasProperties.getString(CONF_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ??
realm = atlasProperties.getString(CONF_REALM_NAME, DEFAULT_CLUSTER_NAME); // what should default be ?? convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
convertHdfsPathToLowerCase = atlasProperties.getBoolean(HDFS_PATH_CONVERT_TO_LOWER_CASE, false);
try { try {
hostName = InetAddress.getLocalHost().getHostName(); hostName = InetAddress.getLocalHost().getHostName();
...@@ -143,10 +139,6 @@ public class ImpalaLineageHook extends AtlasHook { ...@@ -143,10 +139,6 @@ public class ImpalaLineageHook extends AtlasHook {
return UserGroupInformation.getUGIFromSubject(userSubject); return UserGroupInformation.getUGIFromSubject(userSubject);
} }
public String getClusterName() {
return clusterName;
}
public String getRealm() { public String getRealm() {
return realm; return realm;
} }
......
...@@ -340,7 +340,7 @@ public abstract class BaseImpalaEvent { ...@@ -340,7 +340,7 @@ public abstract class BaseImpalaEvent {
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName);
ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase()); ret.setAttribute(ATTRIBUTE_NAME, dbName.toLowerCase());
ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getClusterName()); ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, context.getMetadataNamespace());
context.putEntity(dbQualifiedName, ret); context.putEntity(dbQualifiedName, ret);
} }
......
...@@ -293,7 +293,7 @@ public class ImpalaLineageITBase { ...@@ -293,7 +293,7 @@ public class ImpalaLineageITBase {
protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception { protected String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
LOG.debug("Searching for database: {}", dbName); LOG.debug("Searching for database: {}", dbName);
String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + String dbQualifiedName = dbName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME; CLUSTER_NAME;
dbQualifiedName = dbQualifiedName.toLowerCase(); dbQualifiedName = dbQualifiedName.toLowerCase();
...@@ -320,7 +320,7 @@ public class ImpalaLineageITBase { ...@@ -320,7 +320,7 @@ public class ImpalaLineageITBase {
protected String assertTableIsRegistered(String fullTableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception { protected String assertTableIsRegistered(String fullTableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}", fullTableName); LOG.debug("Searching for table {}", fullTableName);
String tableQualifiedName = (fullTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME).toLowerCase() + String tableQualifiedName = (fullTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
CLUSTER_NAME; CLUSTER_NAME;
return assertEntityIsRegistered(HIVE_TYPE_TABLE, REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName, return assertEntityIsRegistered(HIVE_TYPE_TABLE, REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName,
......
...@@ -77,7 +77,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -77,7 +77,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// the value is from info in IMPALA_3 // the value is from info in IMPALA_3
String createTime = new Long((long)(1554750072)*1000).toString(); String createTime = new Long((long)(1554750072)*1000).toString();
String processQFName = String processQFName =
"db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + "db_1.view_1" + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
...@@ -140,7 +140,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -140,7 +140,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
Long afterCreateTime = System.currentTimeMillis() / BaseImpalaEvent.MILLIS_CONVERT_FACTOR; Long afterCreateTime = System.currentTimeMillis() / BaseImpalaEvent.MILLIS_CONVERT_FACTOR;
String processQFNameWithoutTime = String processQFNameWithoutTime =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS;
processQFNameWithoutTime = processQFNameWithoutTime.toLowerCase(); processQFNameWithoutTime = processQFNameWithoutTime.toLowerCase();
...@@ -210,7 +210,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -210,7 +210,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// the value is from info in IMPALA_4. // the value is from info in IMPALA_4.
String createTime = new Long(TABLE_CREATE_TIME*1000).toString(); String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
String processQFName = String processQFName =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
...@@ -266,7 +266,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -266,7 +266,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// the value is from info in IMPALA_4. // the value is from info in IMPALA_4.
String createTime = new Long(TABLE_CREATE_TIME*1000).toString(); String createTime = new Long(TABLE_CREATE_TIME*1000).toString();
String processQFName = String processQFName =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
...@@ -322,9 +322,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -322,9 +322,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// the value is from info in IMPALA_4. // the value is from info in IMPALA_4.
String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString(); String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString(); String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1;
String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase(); String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
...@@ -385,9 +385,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -385,9 +385,9 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// the value is from info in IMPALA_4. // the value is from info in IMPALA_4.
String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString(); String createTime1 = new Long(TABLE_CREATE_TIME_SOURCE*1000).toString();
String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString(); String createTime2 = new Long(TABLE_CREATE_TIME*1000).toString();
String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + String sourceQFName = dbName + "." + sourceTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime1;
String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + String targetQFName = dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime2;
String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase(); String processQFName = "QUERY:" + sourceQFName.toLowerCase() + "->:INSERT:" + targetQFName.toLowerCase();
...@@ -454,7 +454,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase { ...@@ -454,7 +454,7 @@ public class ImpalaLineageToolIT extends ImpalaLineageITBase {
// the value is from info in IMPALA_4. // the value is from info in IMPALA_4.
String createTime = new Long((long)1560885039*1000).toString(); String createTime = new Long((long)1560885039*1000).toString();
String processQFName = String processQFName =
dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + dbName + "." + targetTableName + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
......
...@@ -137,7 +137,7 @@ public class ImpalaLineageHookIT extends ImpalaLineageITBase { ...@@ -137,7 +137,7 @@ public class ImpalaLineageHookIT extends ImpalaLineageITBase {
impalaHook.process(queryObj); impalaHook.process(queryObj);
String createTime = new Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString(); String createTime = new Long(BaseImpalaEvent.getTableCreateTime(vertex5)).toString();
String processQFName = String processQFName =
vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_CLUSTER_NAME + vertex5.getVertexId() + AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE +
CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime; CLUSTER_NAME + AtlasImpalaHookContext.QNAME_SEP_PROCESS + createTime;
processQFName = processQFName.toLowerCase(); processQFName = processQFName.toLowerCase();
......
...@@ -117,7 +117,7 @@ else ...@@ -117,7 +117,7 @@ else
exit 1 exit 1
fi fi
CP="${KAFKA_CP}:${ATLASCPPATH}:${HADOOP_CP}" CP="${ATLASCPPATH}:${HADOOP_CP}:${KAFKA_CP}"
# If running in cygwin, convert pathnames and classpath to Windows format. # If running in cygwin, convert pathnames and classpath to Windows format.
if [ "${CYGWIN}" == "true" ] if [ "${CYGWIN}" == "true" ]
......
...@@ -58,19 +58,20 @@ import java.util.regex.Pattern; ...@@ -58,19 +58,20 @@ import java.util.regex.Pattern;
public class KafkaBridge { public class KafkaBridge {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class); private static final Logger LOG = LoggerFactory.getLogger(KafkaBridge.class);
private static final int EXIT_CODE_SUCCESS = 0; private static final int EXIT_CODE_SUCCESS = 0;
private static final int EXIT_CODE_FAILED = 1; private static final int EXIT_CODE_FAILED = 1;
private static final String ATLAS_ENDPOINT = "atlas.rest.address"; private static final String ATLAS_ENDPOINT = "atlas.rest.address";
private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/";
private static final String KAFKA_CLUSTER_NAME = "atlas.cluster.name"; private static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
private static final String DEFAULT_CLUSTER_NAME = "primary"; private static final String KAFKA_METADATA_NAMESPACE = "atlas.metadata.namespace";
private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; private static final String DEFAULT_CLUSTER_NAME = "primary";
private static final String DESCRIPTION_ATTR = "description"; private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
private static final String PARTITION_COUNT = "partitionCount"; private static final String DESCRIPTION_ATTR = "description";
private static final String NAME = "name"; private static final String PARTITION_COUNT = "partitionCount";
private static final String URI = "uri"; private static final String NAME = "name";
private static final String CLUSTERNAME = "clusterName"; private static final String URI = "uri";
private static final String TOPIC = "topic"; private static final String CLUSTERNAME = "clusterName";
private static final String TOPIC = "topic";
private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s"; private static final String FORMAT_KAKFA_TOPIC_QUALIFIED_NAME = "%s@%s";
private static final String ZOOKEEPER_CONNECT = "atlas.kafka.zookeeper.connect"; private static final String ZOOKEEPER_CONNECT = "atlas.kafka.zookeeper.connect";
...@@ -81,7 +82,7 @@ public class KafkaBridge { ...@@ -81,7 +82,7 @@ public class KafkaBridge {
private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000; private static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10 * 1000;
private final List<String> availableTopics; private final List<String> availableTopics;
private final String clusterName; private final String metadataNamespace;
private final AtlasClientV2 atlasClientV2; private final AtlasClientV2 atlasClientV2;
private final ZkUtils zkUtils; private final ZkUtils zkUtils;
...@@ -163,10 +164,18 @@ public class KafkaBridge { ...@@ -163,10 +164,18 @@ public class KafkaBridge {
int connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS); int connectionTimeOutMs = atlasConf.getInt(ZOOKEEPER_CONNECTION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$); ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeOutMs, connectionTimeOutMs, ZKStringSerializer$.MODULE$);
this.atlasClientV2 = atlasClientV2; this.atlasClientV2 = atlasClientV2;
this.clusterName = atlasConf.getString(KAFKA_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); this.metadataNamespace = getMetadataNamespace(atlasConf);
this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled()); this.zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), JaasUtils.isZkSecurityEnabled());
this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()); this.availableTopics = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics());
}
private String getMetadataNamespace(Configuration config) {
return config.getString(KAFKA_METADATA_NAMESPACE, getClusterName(config));
}
private String getClusterName(Configuration config) {
return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
} }
public void importTopic(String topicToImport) throws Exception { public void importTopic(String topicToImport) throws Exception {
...@@ -191,7 +200,7 @@ public class KafkaBridge { ...@@ -191,7 +200,7 @@ public class KafkaBridge {
@VisibleForTesting @VisibleForTesting
AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception { AtlasEntityWithExtInfo createOrUpdateTopic(String topic) throws Exception {
String topicQualifiedName = getTopicQualifiedName(clusterName, topic); String topicQualifiedName = getTopicQualifiedName(metadataNamespace, topic);
AtlasEntityWithExtInfo topicEntity = findTopicEntityInAtlas(topicQualifiedName); AtlasEntityWithExtInfo topicEntity = findTopicEntityInAtlas(topicQualifiedName);
if (topicEntity == null) { if (topicEntity == null) {
...@@ -225,10 +234,10 @@ public class KafkaBridge { ...@@ -225,10 +234,10 @@ public class KafkaBridge {
ret = topicEntity; ret = topicEntity;
} }
String qualifiedName = getTopicQualifiedName(clusterName, topic); String qualifiedName = getTopicQualifiedName(metadataNamespace, topic);
ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, qualifiedName);
ret.setAttribute(CLUSTERNAME, clusterName); ret.setAttribute(CLUSTERNAME, metadataNamespace);
ret.setAttribute(TOPIC, topic); ret.setAttribute(TOPIC, topic);
ret.setAttribute(NAME,topic); ret.setAttribute(NAME,topic);
ret.setAttribute(DESCRIPTION_ATTR, topic); ret.setAttribute(DESCRIPTION_ATTR, topic);
...@@ -239,8 +248,8 @@ public class KafkaBridge { ...@@ -239,8 +248,8 @@ public class KafkaBridge {
} }
@VisibleForTesting @VisibleForTesting
static String getTopicQualifiedName(String clusterName, String topic) { static String getTopicQualifiedName(String metadataNamespace, String topic) {
return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), clusterName); return String.format(FORMAT_KAKFA_TOPIC_QUALIFIED_NAME, topic.toLowerCase(), metadataNamespace);
} }
private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) { private AtlasEntityWithExtInfo findTopicEntityInAtlas(String topicQualifiedName) {
......
...@@ -52,8 +52,9 @@ import java.util.Date; ...@@ -52,8 +52,9 @@ import java.util.Date;
public class SqoopHook extends SqoopJobDataPublisher { public class SqoopHook extends SqoopJobDataPublisher {
private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class); private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String ATLAS_METADATA_NAMESPACE = "atlas.metadata.namespace";
public static final String DEFAULT_CLUSTER_NAME = "primary";
public static final String USER = "userName"; public static final String USER = "userName";
public static final String DB_STORE_TYPE = "dbStoreType"; public static final String DB_STORE_TYPE = "dbStoreType";
...@@ -80,12 +81,14 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -80,12 +81,14 @@ public class SqoopHook extends SqoopJobDataPublisher {
@Override @Override
public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException { public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException {
try { try {
Configuration atlasProperties = ApplicationProperties.get(); Configuration atlasProperties = ApplicationProperties.get();
String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); String metadataNamespace = atlasProperties.getString(ATLAS_METADATA_NAMESPACE, getClusterName(atlasProperties));
AtlasEntity entDbStore = toSqoopDBStoreEntity(data);
AtlasEntity entHiveDb = toHiveDatabaseEntity(clusterName, data.getHiveDB()); AtlasEntity entDbStore = toSqoopDBStoreEntity(data);
AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null; AtlasEntity entHiveDb = toHiveDatabaseEntity(metadataNamespace, data.getHiveDB());
AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, clusterName); AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null;
AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, metadataNamespace);
AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess); AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess);
...@@ -105,11 +108,15 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -105,11 +108,15 @@ public class SqoopHook extends SqoopJobDataPublisher {
} }
} }
private AtlasEntity toHiveDatabaseEntity(String clusterName, String dbName) { private String getClusterName(Configuration config) {
return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
}
private AtlasEntity toHiveDatabaseEntity(String metadataNamespace, String dbName) {
AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName()); AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName());
String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName); String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName);
entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace);
entHiveDb.setAttribute(AtlasClient.NAME, dbName); entHiveDb.setAttribute(AtlasClient.NAME, dbName);
entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName); entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName);
...@@ -153,9 +160,10 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -153,9 +160,10 @@ public class SqoopHook extends SqoopJobDataPublisher {
return entDbStore; return entDbStore;
} }
private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) { private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable,
SqoopJobDataPublisher.Data data, String metadataNamespace) {
AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName()); AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
String sqoopProcessName = getSqoopProcessName(data, clusterName); String sqoopProcessName = getSqoopProcessName(data, metadataNamespace);
Map<String, String> sqoopOptionsMap = new HashMap<>(); Map<String, String> sqoopOptionsMap = new HashMap<>();
Properties options = data.getOptions(); Properties options = data.getOptions();
...@@ -190,7 +198,7 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -190,7 +198,7 @@ public class SqoopHook extends SqoopJobDataPublisher {
return data.getOperation().toLowerCase().equals("import"); return data.getOperation().toLowerCase().equals("import");
} }
static String getSqoopProcessName(Data data, String clusterName) { static String getSqoopProcessName(Data data, String metadataNamespace) {
StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl())); StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl()));
if (StringUtils.isNotEmpty(data.getHiveTable())) { if (StringUtils.isNotEmpty(data.getHiveTable())) {
...@@ -204,9 +212,9 @@ public class SqoopHook extends SqoopJobDataPublisher { ...@@ -204,9 +212,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
} }
if (data.getHiveTable() != null) { if (data.getHiveTable() != null) {
name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), metadataNamespace));
} else { } else {
name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName)); name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), metadataNamespace));
} }
return name.toString(); return name.toString();
......
...@@ -118,7 +118,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -118,7 +118,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name()); topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name());
topology.setAttribute(AtlasClient.OWNER, owner); topology.setAttribute(AtlasClient.OWNER, owner);
topology.setAttribute("startTime", new Date(System.currentTimeMillis())); topology.setAttribute("startTime", new Date(System.currentTimeMillis()));
topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getMetadataNamespace());
return topology; return topology;
} }
...@@ -166,9 +166,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -166,9 +166,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
} }
private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) { private AtlasEntity addDataSet(String dataSetType, String topologyOwner, Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null); Map<String, String> config = StormTopologyUtil.getFieldValues(instance, true, null);
String clusterName = null; AtlasEntity ret = null;
AtlasEntity ret = null; String metadataNamespace = getMetadataNamespace();
// todo: need to redo this with a config driven approach // todo: need to redo this with a config driven approach
switch (dataSetType) { switch (dataSetType) {
...@@ -188,8 +188,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -188,8 +188,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
topologyOwner = ANONYMOUS_OWNER; topologyOwner = ANONYMOUS_OWNER;
} }
clusterName = getClusterName(stormConf);
if (topicName == null) { if (topicName == null) {
LOG.error("Kafka topic name not found"); LOG.error("Kafka topic name not found");
} else { } else {
...@@ -198,7 +196,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -198,7 +196,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
ret.setAttribute("topic", topicName); ret.setAttribute("topic", topicName);
ret.setAttribute("uri", uri); ret.setAttribute("uri", uri);
ret.setAttribute(AtlasClient.OWNER, topologyOwner); ret.setAttribute(AtlasClient.OWNER, topologyOwner);
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(clusterName, topicName)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(metadataNamespace, topicName));
ret.setAttribute(AtlasClient.NAME, topicName); ret.setAttribute(AtlasClient.NAME, topicName);
} }
} }
...@@ -212,7 +210,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -212,7 +210,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
uri = hbaseTableName; uri = hbaseTableName;
} }
clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); metadataNamespace = extractComponentMetadataNamespace(HBaseConfiguration.create(), stormConf);
if (hbaseTableName == null) { if (hbaseTableName == null) {
LOG.error("HBase table name not found"); LOG.error("HBase table name not found");
...@@ -223,7 +221,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -223,7 +221,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
ret.setAttribute(AtlasClient.NAME, uri); ret.setAttribute(AtlasClient.NAME, uri);
ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); ret.setAttribute(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal"));
//TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, hbaseTableName)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(metadataNamespace, HBASE_NAMESPACE_DEFAULT, hbaseTableName));
} }
} }
break; break;
...@@ -234,11 +232,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -234,11 +232,9 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
final Path hdfsPath = new Path(hdfsPathStr); final Path hdfsPath = new Path(hdfsPathStr);
final String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr); final String nameServiceID = HdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
clusterName = getClusterName(stormConf);
ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH); ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH);
ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace);
ret.setAttribute(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal")); ret.setAttribute(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal"));
ret.setAttribute(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase()); ret.setAttribute(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
...@@ -247,16 +243,16 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -247,16 +243,16 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
ret.setAttribute("path", updatedPath); ret.setAttribute("path", updatedPath);
ret.setAttribute("nameServiceId", nameServiceID); ret.setAttribute("nameServiceId", nameServiceID);
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, updatedPath)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(metadataNamespace, updatedPath));
} else { } else {
ret.setAttribute("path", hdfsPathStr); ret.setAttribute("path", hdfsPathStr);
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(clusterName, hdfsPathStr)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHdfsPathQualifiedName(metadataNamespace, hdfsPathStr));
} }
} }
break; break;
case "HiveBolt": { case "HiveBolt": {
clusterName = extractComponentClusterName(new HiveConf(), stormConf); metadataNamespace = extractComponentMetadataNamespace(new HiveConf(), stormConf);
final String dbName = config.get("HiveBolt.options.databaseName"); final String dbName = config.get("HiveBolt.options.databaseName");
final String tblName = config.get("HiveBolt.options.tableName"); final String tblName = config.get("HiveBolt.options.tableName");
...@@ -267,8 +263,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -267,8 +263,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
AtlasEntity dbEntity = new AtlasEntity("hive_db"); AtlasEntity dbEntity = new AtlasEntity("hive_db");
dbEntity.setAttribute(AtlasClient.NAME, dbName); dbEntity.setAttribute(AtlasClient.NAME, dbName);
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName)); dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(metadataNamespace, dbName));
dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, metadataNamespace);
entityExtInfo.addReferredEntity(dbEntity); entityExtInfo.addReferredEntity(dbEntity);
...@@ -277,7 +273,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -277,7 +273,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
ret.setAttribute(AtlasClient.NAME, tblName); ret.setAttribute(AtlasClient.NAME, tblName);
ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity)); ret.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(dbEntity));
ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName)); ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(metadataNamespace, dbName, tblName));
} }
} }
break; break;
...@@ -384,30 +380,25 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { ...@@ -384,30 +380,25 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
} }
} }
public static String getKafkaTopicQualifiedName(String clusterName, String topicName) { public static String getKafkaTopicQualifiedName(String metadataNamespace, String topicName) {
return String.format("%s@%s", topicName.toLowerCase(), clusterName); return String.format("%s@%s", topicName.toLowerCase(), metadataNamespace);
} }
public static String getHbaseTableQualifiedName(String clusterName, String nameSpace, String tableName) { public static String getHbaseTableQualifiedName(String metadataNamespace, String nameSpace, String tableName) {
return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), clusterName); return String.format("%s.%s@%s", nameSpace.toLowerCase(), tableName.toLowerCase(), metadataNamespace);
} }
public static String getHdfsPathQualifiedName(String clusterName, String hdfsPath) { public static String getHdfsPathQualifiedName(String metadataNamespace, String hdfsPath) {
return String.format("%s@%s", hdfsPath.toLowerCase(), clusterName); return String.format("%s@%s", hdfsPath.toLowerCase(), metadataNamespace);
} }
private String getClusterName(Map stormConf) { private String extractComponentMetadataNamespace(Configuration configuration, Map stormConf) {
return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, AtlasConstants.DEFAULT_CLUSTER_NAME); String clusterName = configuration.get(CLUSTER_NAME_KEY, null);
}
private String extractComponentClusterName(Configuration configuration, Map stormConf) {
String clusterName = configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
if (clusterName == null) { if (clusterName == null) {
clusterName = getClusterName(stormConf); clusterName = getMetadataNamespace();
} }
return clusterName; return clusterName;
} }
}
} \ No newline at end of file
...@@ -25,16 +25,16 @@ public final class AtlasConstants { ...@@ -25,16 +25,16 @@ public final class AtlasConstants {
private AtlasConstants() { private AtlasConstants() {
} }
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port";
public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address";
public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port";
public static final String DEFAULT_APP_PORT_STR = "21000";
public static final String ATLAS_REST_ADDRESS_KEY = "atlas.rest.address";
public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.data.filename"; public static final String ATLAS_MIGRATION_MODE_FILENAME = "atlas.migration.data.filename";
public static final String ATLAS_SERVICES_ENABLED = "atlas.services.enabled"; public static final String ATLAS_SERVICES_ENABLED = "atlas.services.enabled";
public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName";
public static final String DEFAULT_APP_PORT_STR = "21000";
public static final String DEFAULT_ATLAS_REST_ADDRESS = "http://localhost:21000";
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
} }
...@@ -59,10 +59,14 @@ public abstract class AtlasHook { ...@@ -59,10 +59,14 @@ public abstract class AtlasHook {
public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = "atlas.notification.failed.messages.filename"; public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = "atlas.notification.failed.messages.filename";
public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = "atlas.notification.log.failed.messages"; public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = "atlas.notification.log.failed.messages";
public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log"; public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log";
public static final String CONF_METADATA_NAMESPACE = "atlas.metadata.namespace";
public static final String CLUSTER_NAME_KEY = "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME = "primary";
protected static Configuration atlasProperties; protected static Configuration atlasProperties;
protected static NotificationInterface notificationInterface; protected static NotificationInterface notificationInterface;
private static final String metadataNamespace;
private static final int SHUTDOWN_HOOK_WAIT_TIME_MS = 3000; private static final int SHUTDOWN_HOOK_WAIT_TIME_MS = 3000;
private static final boolean logFailedMessages; private static final boolean logFailedMessages;
private static final FailedMessagesLogger failedMessagesLogger; private static final FailedMessagesLogger failedMessagesLogger;
...@@ -95,6 +99,7 @@ public abstract class AtlasHook { ...@@ -95,6 +99,7 @@ public abstract class AtlasHook {
} }
} }
metadataNamespace = getMetadataNamespace(atlasProperties);
notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3); notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000); notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
notificationInterface = NotificationProvider.get(); notificationInterface = NotificationProvider.get();
...@@ -306,4 +311,15 @@ public abstract class AtlasHook { ...@@ -306,4 +311,15 @@ public abstract class AtlasHook {
return ret; return ret;
} }
} private static String getMetadataNamespace(Configuration config) {
return config.getString(CONF_METADATA_NAMESPACE, getClusterName(config));
}
private static String getClusterName(Configuration config) {
return config.getString(CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
}
public String getMetadataNamespace() {
return metadataNamespace;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment