Commit 91cfa6a6 by Shwetha GS

Merge branch 'master' into dal

parents dd9a76b0 01ee72a3
...@@ -90,15 +90,9 @@ ...@@ -90,15 +90,9 @@
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId> <artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version> <version>${hadoop.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
......
...@@ -59,9 +59,11 @@ for i in "${BASEDIR}/bridge/hive/"*.jar; do ...@@ -59,9 +59,11 @@ for i in "${BASEDIR}/bridge/hive/"*.jar; do
METADATACPPATH="${METADATACPPATH}:$i" METADATACPPATH="${METADATACPPATH}:$i"
done done
echo $METADATACPPATH # log dir for applications
METADATA_LOG_DIR="${METADATA_LOG_DIR:-$BASEDIR/logs}"
export METADATA_LOG_DIR
JAVA_PROPERTIES="$METADATA_OPTS" JAVA_PROPERTIES="$METADATA_OPTS -Dmetadata.log.dir=$METADATA_LOG_DIR -Dmetadata.log.file=import-hive.log"
shift shift
while [[ ${1} =~ ^\-D ]]; do while [[ ${1} =~ ^\-D ]]; do
...@@ -70,6 +72,7 @@ while [[ ${1} =~ ^\-D ]]; do ...@@ -70,6 +72,7 @@ while [[ ${1} =~ ^\-D ]]; do
done done
TIME=`date +%Y%m%d%H%M%s` TIME=`date +%Y%m%d%H%M%s`
#Add hive conf in classpath
if [ ! -z "$HIVE_CONF_DIR" ]; then if [ ! -z "$HIVE_CONF_DIR" ]; then
HIVE_CP=$HIVE_CONF_DIR HIVE_CP=$HIVE_CONF_DIR
elif [ ! -z "$HIVE_HOME" ]; then elif [ ! -z "$HIVE_HOME" ]; then
...@@ -86,5 +89,5 @@ echo Using Hive configuration directory [$HIVE_CP] ...@@ -86,5 +89,5 @@ echo Using Hive configuration directory [$HIVE_CP]
${JAVA_BIN} ${JAVA_PROPERTIES} -cp ${HIVE_CP}:${METADATACPPATH} org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge ${JAVA_BIN} ${JAVA_PROPERTIES} -cp ${HIVE_CP}:${METADATACPPATH} org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge
RETVAL=$? RETVAL=$?
[ $RETVAL -eq 0 ] && echo Hive Data Model Imported!!! [ $RETVAL -eq 0 ] && echo Hive Data Model imported successfully!!!
[ $RETVAL -ne 0 ] && echo Failure in Hive Data Model import!!! [ $RETVAL -ne 0 ] && echo Failed to import Hive Data Model!!!
...@@ -18,28 +18,35 @@ ...@@ -18,28 +18,35 @@
package org.apache.hadoop.metadata.hive.bridge; package org.apache.hadoop.metadata.hive.bridge;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.metadata.MetadataServiceClient; import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator; import org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator;
import org.apache.hadoop.metadata.hive.model.HiveDataTypes; import org.apache.hadoop.metadata.hive.model.HiveDataTypes;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.typesystem.Referenceable; import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.Struct; import org.apache.hadoop.metadata.typesystem.Struct;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization; import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.Serialization;
import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* A Bridge Utility that imports metadata from the Hive Meta Store * A Bridge Utility that imports metadata from the Hive Meta Store
...@@ -66,7 +73,7 @@ public class HiveMetaStoreBridge { ...@@ -66,7 +73,7 @@ public class HiveMetaStoreBridge {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
private final HiveMetaStoreClient hiveMetaStoreClient; private final Hive hiveClient;
private final MetadataServiceClient metadataServiceClient; private final MetadataServiceClient metadataServiceClient;
/** /**
...@@ -74,7 +81,7 @@ public class HiveMetaStoreBridge { ...@@ -74,7 +81,7 @@ public class HiveMetaStoreBridge {
* @param hiveConf * @param hiveConf
*/ */
public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception { public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception {
hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); hiveClient = Hive.get(hiveConf);
metadataServiceClient = new MetadataServiceClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL)); metadataServiceClient = new MetadataServiceClient(hiveConf.get(DGI_URL_PROPERTY, DEFAULT_DGI_URL));
} }
...@@ -88,7 +95,7 @@ public class HiveMetaStoreBridge { ...@@ -88,7 +95,7 @@ public class HiveMetaStoreBridge {
} }
private void importDatabases() throws Exception { private void importDatabases() throws Exception {
List<String> databases = hiveMetaStoreClient.getAllDatabases(); List<String> databases = hiveClient.getAllDatabases();
for (String databaseName : databases) { for (String databaseName : databases) {
Referenceable dbReference = registerDatabase(databaseName); Referenceable dbReference = registerDatabase(databaseName);
...@@ -96,22 +103,48 @@ public class HiveMetaStoreBridge { ...@@ -96,22 +103,48 @@ public class HiveMetaStoreBridge {
} }
} }
public Referenceable registerDatabase(String databaseName) throws Exception { /**
LOG.info("Importing objects from databaseName : " + databaseName); * Gets reference for the database
*
Database hiveDB = hiveMetaStoreClient.getDatabase(databaseName); * @param dbName database name
* @return Reference for database if exists, else null
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); * @throws Exception
dbRef.set("name", hiveDB.getName()); */
dbRef.set("description", hiveDB.getDescription()); private Referenceable getDatabaseReference(String dbName) throws Exception {
dbRef.set("locationUri", hiveDB.getLocationUri()); LOG.debug("Getting reference for database {}", dbName);
dbRef.set("parameters", hiveDB.getParameters()); String typeName = HiveDataTypes.HIVE_DB.getName();
dbRef.set("ownerName", hiveDB.getOwnerName()); MetadataServiceClient dgiClient = getMetadataServiceClient();
if (hiveDB.getOwnerType() != null) {
dbRef.set("ownerType", hiveDB.getOwnerType().getValue()); JSONArray results = dgiClient.rawSearch(typeName, "name", dbName);
if (results.length() == 0) {
return null;
} else {
ITypedReferenceableInstance reference = Serialization.fromJson(results.get(0).toString());
return new Referenceable(reference.getId().id, typeName, null);
} }
}
public Referenceable registerDatabase(String databaseName) throws Exception {
Referenceable dbRef = getDatabaseReference(databaseName);
if (dbRef == null) {
LOG.info("Importing objects from databaseName : " + databaseName);
Database hiveDB = hiveClient.getDatabase(databaseName);
dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set("name", hiveDB.getName());
dbRef.set("description", hiveDB.getDescription());
dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set("parameters", hiveDB.getParameters());
dbRef.set("ownerName", hiveDB.getOwnerName());
if (hiveDB.getOwnerType() != null) {
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
}
return createInstance(dbRef); dbRef = createInstance(dbRef);
} else {
LOG.info("Database {} is already registered with id {}", databaseName, dbRef.getId().id);
}
return dbRef;
} }
public Referenceable createInstance(Referenceable referenceable) throws Exception { public Referenceable createInstance(Referenceable referenceable) throws Exception {
...@@ -128,81 +161,134 @@ public class HiveMetaStoreBridge { ...@@ -128,81 +161,134 @@ public class HiveMetaStoreBridge {
} }
private void importTables(String databaseName, Referenceable databaseReferenceable) throws Exception { private void importTables(String databaseName, Referenceable databaseReferenceable) throws Exception {
List<String> hiveTables = hiveMetaStoreClient.getAllTables(databaseName); List<String> hiveTables = hiveClient.getAllTables(databaseName);
for (String tableName : hiveTables) { for (String tableName : hiveTables) {
Pair<Referenceable, Referenceable> tableReferenceable = registerTable(databaseReferenceable, databaseName, tableName); Referenceable tableReferenceable = registerTable(databaseReferenceable, databaseName, tableName);
// Import Partitions // Import Partitions
importPartitions(databaseName, tableName, databaseReferenceable, tableReferenceable.first, tableReferenceable.second); Referenceable sdReferenceable = getSDForTable(databaseReferenceable, tableName);
importPartitions(databaseName, tableName, databaseReferenceable, tableReferenceable, sdReferenceable);
// Import Indexes // Import Indexes
importIndexes(databaseName, tableName, databaseReferenceable, tableReferenceable.first); importIndexes(databaseName, tableName, databaseReferenceable, tableReferenceable);
} }
} }
public Pair<Referenceable, Referenceable> registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception { /**
LOG.info("Importing objects from " + dbName + "." + tableName); * Gets reference for the table
*
Table hiveTable = hiveMetaStoreClient.getTable(dbName, tableName); * @param dbRef
* @param tableName table name
Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); * @return table reference if exists, else null
tableRef.set("tableName", hiveTable.getTableName()); * @throws Exception
tableRef.set("owner", hiveTable.getOwner()); */
tableRef.set("createTime", hiveTable.getCreateTime()); private Referenceable getTableReference(Referenceable dbRef, String tableName) throws Exception {
tableRef.set("lastAccessTime", hiveTable.getLastAccessTime()); LOG.debug("Getting reference for table {}.{}", dbRef, tableName);
tableRef.set("retention", hiveTable.getRetention());
// add reference to the database
tableRef.set("dbName", dbReference);
// add reference to the StorageDescriptor
StorageDescriptor storageDesc = hiveTable.getSd();
Referenceable sdReferenceable = fillStorageDescStruct(storageDesc);
tableRef.set("sd", sdReferenceable);
// add reference to the Partition Keys
List<Referenceable> partKeys = new ArrayList<>();
Referenceable colRef;
if (hiveTable.getPartitionKeysSize() > 0) {
for (FieldSchema fs : hiveTable.getPartitionKeys()) {
colRef = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
Referenceable colRefTyped = createInstance(colRef);
partKeys.add(colRefTyped);
}
tableRef.set("partitionKeys", partKeys);
}
tableRef.set("parameters", hiveTable.getParameters()); String typeName = HiveDataTypes.HIVE_TABLE.getName();
MetadataServiceClient dgiClient = getMetadataServiceClient();
if (hiveTable.isSetViewOriginalText()) { //todo DSL support for reference doesn't work. is the usage right?
tableRef.set("viewOriginalText", hiveTable.getViewOriginalText()); // String query = String.format("%s where dbName = \"%s\" and tableName = \"%s\"", typeName, dbRef.getId().id,
// tableName);
String query = String.format("%s where tableName = \"%s\"", typeName, tableName);
JSONArray results = dgiClient.searchByDSL(query);
if (results.length() == 0) {
return null;
} else {
//There should be just one instance with the given name
ITypedReferenceableInstance reference = Serialization.fromJson(results.get(0).toString());
String guid = reference.getId().id;
LOG.debug("Got reference for table {}.{} = {}", dbRef, tableName, guid);
return new Referenceable(guid, typeName, null);
} }
}
if (hiveTable.isSetViewExpandedText()) { private Referenceable getSDForTable(Referenceable dbRef, String tableName) throws Exception {
tableRef.set("viewExpandedText", hiveTable.getViewExpandedText()); Referenceable tableRef = getTableReference(dbRef, tableName);
if (tableRef == null) {
throw new IllegalArgumentException("Table " + dbRef + "." + tableName + " doesn't exist");
} }
tableRef.set("tableType", hiveTable.getTableType()); MetadataServiceClient dgiClient = getMetadataServiceClient();
tableRef.set("temporary", hiveTable.isTemporary()); ITypedReferenceableInstance tableInstance = dgiClient.getEntity(tableRef.getId().id);
Id sdId = (Id) tableInstance.get("sd");
return new Referenceable(sdId.id, sdId.getTypeName(), null);
}
// List<Referenceable> fieldsList = getColumns(storageDesc); public Referenceable registerTable(String dbName, String tableName) throws Exception {
// tableRef.set("columns", fieldsList); Referenceable dbReferenceable = registerDatabase(dbName);
return registerTable(dbReferenceable, dbName, tableName);
}
Referenceable tableReferenceable = createInstance(tableRef); public Referenceable registerTable(Referenceable dbReference, String dbName, String tableName) throws Exception {
return Pair.of(tableReferenceable, sdReferenceable); Referenceable tableRef = getTableReference(dbReference, tableName);
if (tableRef == null) {
LOG.info("Importing objects from " + dbName + "." + tableName);
Table hiveTable = hiveClient.getTable(dbName, tableName);
tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set("tableName", hiveTable.getTableName());
tableRef.set("owner", hiveTable.getOwner());
//todo fix
tableRef.set("createTime", hiveTable.getLastAccessTime());
tableRef.set("lastAccessTime", hiveTable.getLastAccessTime());
tableRef.set("retention", hiveTable.getRetention());
// add reference to the database
tableRef.set("dbName", dbReference);
// add reference to the StorageDescriptor
StorageDescriptor storageDesc = hiveTable.getSd();
Referenceable sdReferenceable = fillStorageDescStruct(storageDesc);
tableRef.set("sd", sdReferenceable);
// add reference to the Partition Keys
List<Referenceable> partKeys = new ArrayList<>();
Referenceable colRef;
if (hiveTable.getPartitionKeys().size() > 0) {
for (FieldSchema fs : hiveTable.getPartitionKeys()) {
colRef = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
Referenceable colRefTyped = createInstance(colRef);
partKeys.add(colRefTyped);
}
tableRef.set("partitionKeys", partKeys);
}
tableRef.set("parameters", hiveTable.getParameters());
if (hiveTable.getViewOriginalText() != null) {
tableRef.set("viewOriginalText", hiveTable.getViewOriginalText());
}
if (hiveTable.getViewExpandedText() != null) {
tableRef.set("viewExpandedText", hiveTable.getViewExpandedText());
}
tableRef.set("tableType", hiveTable.getTableType());
tableRef.set("temporary", hiveTable.isTemporary());
// List<Referenceable> fieldsList = getColumns(storageDesc);
// tableRef.set("columns", fieldsList);
tableRef = createInstance(tableRef);
} else {
LOG.info("Table {}.{} is already registered with id {}", dbName, tableName, tableRef.getId().id);
}
return tableRef;
} }
private void importPartitions(String db, String table, private void importPartitions(String db, String tableName,
Referenceable dbReferenceable, Referenceable dbReferenceable,
Referenceable tableReferenceable, Referenceable tableReferenceable,
Referenceable sdReferenceable) throws Exception { Referenceable sdReferenceable) throws Exception {
List<Partition> tableParts = hiveMetaStoreClient.listPartitions( Set<Partition> tableParts = hiveClient.getAllPartitionsOf(new Table(Table.getEmptyTable(db, tableName)));
db, table, Short.MAX_VALUE);
if (tableParts.size() > 0) { if (tableParts.size() > 0) {
for (Partition hivePart : tableParts) { for (Partition hivePart : tableParts) {
...@@ -211,17 +297,21 @@ public class HiveMetaStoreBridge { ...@@ -211,17 +297,21 @@ public class HiveMetaStoreBridge {
} }
} }
//todo should be idempotent
private Referenceable importPartition(Partition hivePart, private Referenceable importPartition(Partition hivePart,
Referenceable dbReferenceable, Referenceable dbReferenceable,
Referenceable tableReferenceable, Referenceable tableReferenceable,
Referenceable sdReferenceable) throws Exception { Referenceable sdReferenceable) throws Exception {
LOG.info("Importing partition for {}.{} with values {}", dbReferenceable, tableReferenceable,
StringUtils.join(hivePart.getValues(), ","));
Referenceable partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName()); Referenceable partRef = new Referenceable(HiveDataTypes.HIVE_PARTITION.getName());
partRef.set("values", hivePart.getValues()); partRef.set("values", hivePart.getValues());
partRef.set("dbName", dbReferenceable); partRef.set("dbName", dbReferenceable);
partRef.set("tableName", tableReferenceable); partRef.set("tableName", tableReferenceable);
partRef.set("createTime", hivePart.getCreateTime()); //todo fix
partRef.set("createTime", hivePart.getLastAccessTime());
partRef.set("lastAccessTime", hivePart.getLastAccessTime()); partRef.set("lastAccessTime", hivePart.getLastAccessTime());
// sdStruct = fillStorageDescStruct(hivePart.getSd()); // sdStruct = fillStorageDescStruct(hivePart.getSd());
...@@ -237,7 +327,7 @@ public class HiveMetaStoreBridge { ...@@ -237,7 +327,7 @@ public class HiveMetaStoreBridge {
private void importIndexes(String db, String table, private void importIndexes(String db, String table,
Referenceable dbReferenceable, Referenceable dbReferenceable,
Referenceable tableReferenceable) throws Exception { Referenceable tableReferenceable) throws Exception {
List<Index> indexes = hiveMetaStoreClient.listIndexes(db, table, Short.MAX_VALUE); List<Index> indexes = hiveClient.getIndexes(db, table, Short.MAX_VALUE);
if (indexes.size() > 0) { if (indexes.size() > 0) {
for (Index index : indexes) { for (Index index : indexes) {
importIndex(index, dbReferenceable, tableReferenceable); importIndex(index, dbReferenceable, tableReferenceable);
...@@ -245,9 +335,11 @@ public class HiveMetaStoreBridge { ...@@ -245,9 +335,11 @@ public class HiveMetaStoreBridge {
} }
} }
//todo should be idempotent
private void importIndex(Index index, private void importIndex(Index index,
Referenceable dbReferenceable, Referenceable dbReferenceable,
Referenceable tableReferenceable) throws Exception { Referenceable tableReferenceable) throws Exception {
LOG.info("Importing index {} for {}.{}", index.getIndexName(), dbReferenceable, tableReferenceable);
Referenceable indexRef = new Referenceable(HiveDataTypes.HIVE_INDEX.getName()); Referenceable indexRef = new Referenceable(HiveDataTypes.HIVE_INDEX.getName());
indexRef.set("indexName", index.getIndexName()); indexRef.set("indexName", index.getIndexName());
...@@ -357,10 +449,15 @@ public class HiveMetaStoreBridge { ...@@ -357,10 +449,15 @@ public class HiveMetaStoreBridge {
//Register hive data model if its not already registered //Register hive data model if its not already registered
if (dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName()) == null ) { if (dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName()) == null ) {
LOG.info("Registering Hive data model");
dgiClient.createType(dataModelGenerator.getModelAsJson()); dgiClient.createType(dataModelGenerator.getModelAsJson());
} else { } else {
LOG.debug("Hive data model is already registered!"); LOG.info("Hive data model is already registered!");
} }
//todo remove when fromJson(entityJson) is supported on client
dataModelGenerator.createDataModel();
TypeSystem.getInstance().defineTypes(dataModelGenerator.getTypesDef());
} }
public static void main(String[] argv) throws Exception { public static void main(String[] argv) throws Exception {
......
...@@ -155,7 +155,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -155,7 +155,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
// clone to avoid concurrent access // clone to avoid concurrent access
final HiveConf conf = new HiveConf(hookContext.getConf()); final HiveConf conf = new HiveConf(hookContext.getConf());
boolean debug = conf.get("debug", "false").equals("true"); boolean debug = conf.get("hive.hook.dgi.synchronous", "false").equals("true");
if (debug) { if (debug) {
fireAndForget(hookContext, conf); fireAndForget(hookContext, conf);
...@@ -178,8 +178,9 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -178,8 +178,9 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
private void fireAndForget(HookContext hookContext, HiveConf conf) throws Exception { private void fireAndForget(HookContext hookContext, HiveConf conf) throws Exception {
assert hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; assert hookContext.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
LOG.info("Entered DGI hook for hook type {} operation {}", hookContext.getHookType(),
hookContext.getOperationName());
HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName()); HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName());
LOG.info("Entered DGI hook for hook type {} operation {}", hookContext.getHookType(), operation);
HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(conf); HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(conf);
...@@ -206,7 +207,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -206,7 +207,7 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
Table table = entity.getTable(); Table table = entity.getTable();
//TODO table.getDbName().toLowerCase() is required as hive stores in lowercase, //TODO table.getDbName().toLowerCase() is required as hive stores in lowercase,
// but table.getDbName() is not lowercase // but table.getDbName() is not lowercase
Referenceable dbReferenceable = getDatabaseReference(dgiBridge, table.getDbName().toLowerCase()); Referenceable dbReferenceable = dgiBridge.registerDatabase(table.getDbName().toLowerCase());
dgiBridge.registerTable(dbReferenceable, table.getDbName(), table.getTableName()); dgiBridge.registerTable(dbReferenceable, table.getDbName(), table.getTableName());
} }
} }
...@@ -229,7 +230,8 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -229,7 +230,8 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
LOG.info("Explain statement. Skipping..."); LOG.info("Explain statement. Skipping...");
} }
String user = hookContext.getUserName(); //todo hookContext.getUserName() is null in hdp sandbox 2.2.4
String user = hookContext.getUserName() == null ? System.getProperty("user.name") : hookContext.getUserName();
HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName()); HiveOperation operation = HiveOperation.valueOf(hookContext.getOperationName());
String queryId = null; String queryId = null;
String queryStr = null; String queryStr = null;
...@@ -252,19 +254,19 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -252,19 +254,19 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
if (readEntity.getTyp() == Entity.Type.TABLE) { if (readEntity.getTyp() == Entity.Type.TABLE) {
Table table = readEntity.getTable(); Table table = readEntity.getTable();
String dbName = table.getDbName().toLowerCase(); String dbName = table.getDbName().toLowerCase();
source.add(getTableReference(dgiBridge, dbName, table.getTableName())); source.add(dgiBridge.registerTable(dbName, table.getTableName()));
} }
} }
processReferenceable.set("sourceTableNames", source); processReferenceable.set("inputTables", source);
List<Referenceable> target = new ArrayList<>(); List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) { for (WriteEntity writeEntity : outputs) {
if (writeEntity.getTyp() == Entity.Type.TABLE) { if (writeEntity.getTyp() == Entity.Type.TABLE) {
Table table = writeEntity.getTable(); Table table = writeEntity.getTable();
String dbName = table.getDbName().toLowerCase(); String dbName = table.getDbName().toLowerCase();
target.add(getTableReference(dgiBridge, dbName, table.getTableName())); target.add(dgiBridge.registerTable(dbName, table.getTableName()));
} }
} }
processReferenceable.set("targetTableNames", target); processReferenceable.set("outputTables", target);
processReferenceable.set("queryText", queryStr); processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId); processReferenceable.set("queryId", queryId);
processReferenceable.set("queryPlan", getQueryPlan(hookContext, conf)); processReferenceable.set("queryPlan", getQueryPlan(hookContext, conf));
...@@ -275,58 +277,6 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo ...@@ -275,58 +277,6 @@ public class HiveHook implements ExecuteWithHookContext, HiveSemanticAnalyzerHoo
dgiBridge.createInstance(processReferenceable); dgiBridge.createInstance(processReferenceable);
} }
/**
* Gets reference for the database. Creates new instance if it doesn't exist
*
* @param dgiBridge
* @param dbName database name
* @return Reference for database
* @throws Exception
*/
private Referenceable getDatabaseReference(HiveMetaStoreBridge dgiBridge, String dbName) throws Exception {
String typeName = HiveDataTypes.HIVE_DB.getName();
MetadataServiceClient dgiClient = dgiBridge.getMetadataServiceClient();
JSONObject result = dgiClient.rawSearch(typeName, "name", dbName);
JSONArray results = (JSONArray) result.get("results");
if (results.length() == 0) {
//Create new instance
return dgiBridge.registerDatabase(dbName);
} else {
String guid = (String) ((JSONObject) results.get(0)).get("guid");
return new Referenceable(guid, typeName, null);
}
}
/**
* Gets reference for the table. Creates new instance if it doesn't exist
*
* @param dgiBridge
* @param dbName
* @param tableName table name
* @return table reference
* @throws Exception
*/
private Referenceable getTableReference(HiveMetaStoreBridge dgiBridge, String dbName, String tableName) throws Exception {
String typeName = HiveDataTypes.HIVE_TABLE.getName();
MetadataServiceClient dgiClient = dgiBridge.getMetadataServiceClient();
JSONObject result = dgiClient.rawSearch(typeName, "tableName", tableName);
JSONArray results = (JSONArray) result.get("results");
if (results.length() == 0) {
Referenceable dbRererence = getDatabaseReference(dgiBridge, dbName);
return dgiBridge.registerTable(dbRererence, dbName, tableName).first;
} else {
//There should be just one instance with the given name
String guid = (String) ((JSONObject) results.get(0)).get("guid");
return new Referenceable(guid, typeName, null);
}
}
private String getQueryPlan(HookContext hookContext, HiveConf conf) throws Exception { private String getQueryPlan(HookContext hookContext, HiveConf conf) throws Exception {
//We need to somehow get the sem associated with the plan and use it here. //We need to somehow get the sem associated with the plan and use it here.
......
...@@ -340,8 +340,8 @@ public class HiveDataModelGenerator { ...@@ -340,8 +340,8 @@ public class HiveDataModelGenerator {
private void createPartitionClass() throws MetadataException { private void createPartitionClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("values", DataTypes.STRING_TYPE.getName(), new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
Multiplicity.COLLECTION, false, null), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(), new AttributeDefinition("dbName", HiveDataTypes.HIVE_DB.getName(),
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("tableName", HiveDataTypes.HIVE_TABLE.getName(), new AttributeDefinition("tableName", HiveDataTypes.HIVE_TABLE.getName(),
...@@ -354,10 +354,9 @@ public class HiveDataModelGenerator { ...@@ -354,10 +354,9 @@ public class HiveDataModelGenerator {
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("columns", new AttributeDefinition("columns",
DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()), DataTypes.arrayTypeName(HiveDataTypes.HIVE_COLUMN.getName()),
Multiplicity.COLLECTION, true, null), Multiplicity.OPTIONAL, true, null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(),
Multiplicity.OPTIONAL, false, null), Multiplicity.OPTIONAL, false, null),
}; };
HierarchicalTypeDefinition<ClassType> definition = HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, new HierarchicalTypeDefinition<>(ClassType.class,
......
...@@ -21,8 +21,8 @@ Hive metadata can be modelled in DGI using its Type System. The default modellin ...@@ -21,8 +21,8 @@ Hive metadata can be modelled in DGI using its Type System. The default modellin
---++ Importing Hive Metadata ---++ Importing Hive Metadata
org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge imports the hive metadata into DGI using the typesystem defined in org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this. org.apache.hadoop.metadata.hive.bridge.HiveMetaStoreBridge imports the hive metadata into DGI using the typesystem defined in org.apache.hadoop.metadata.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
Set-up the following configs in <dgi package>/conf/hive-site.xml: Set-up the following configs in hive-site.xml of your hive set-up and set environment variable HIVE_CONFIG to the
* Hive metastore configuration - Refer [[https://cwiki.apache.org/confluence/display/Hive/AdminManual+MetastoreAdmin][Hive Metastore Configuration documentation]] hive conf directory:
* DGI endpoint - Add the following property with the DGI endpoint for your set-up * DGI endpoint - Add the following property with the DGI endpoint for your set-up
<verbatim> <verbatim>
<property> <property>
...@@ -57,4 +57,5 @@ The following properties in hive-site.xml control the thread pool details: ...@@ -57,4 +57,5 @@ The following properties in hive-site.xml control the thread pool details:
* hive.hook.dgi.minThreads - core number of threads. default 5 * hive.hook.dgi.minThreads - core number of threads. default 5
* hive.hook.dgi.maxThreads - maximum number of threads. default 5 * hive.hook.dgi.maxThreads - maximum number of threads. default 5
* hive.hook.dgi.keepAliveTime - keep alive time in msecs. default 10 * hive.hook.dgi.keepAliveTime - keep alive time in msecs. default 10
* hive.hook.dgi.synchronous - boolean, true to run the hook synchronously. default false
...@@ -58,7 +58,7 @@ public class HiveHookIT { ...@@ -58,7 +58,7 @@ public class HiveHookIT {
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, System.getProperty("user.dir") + "/target/metastore"); hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, System.getProperty("user.dir") + "/target/metastore");
hiveConf.set(HiveMetaStoreBridge.DGI_URL_PROPERTY, DGI_URL); hiveConf.set(HiveMetaStoreBridge.DGI_URL_PROPERTY, DGI_URL);
hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:./target/metastore_db;create=true"); hiveConf.set("javax.jdo.option.ConnectionURL", "jdbc:derby:./target/metastore_db;create=true");
hiveConf.set("debug", "true"); hiveConf.set("hive.hook.dgi.synchronous", "true");
return hiveConf; return hiveConf;
} }
...@@ -114,10 +114,7 @@ public class HiveHookIT { ...@@ -114,10 +114,7 @@ public class HiveHookIT {
} }
private void assertInstanceIsRegistered(String typeName, String colName, String colValue) throws Exception{ private void assertInstanceIsRegistered(String typeName, String colName, String colValue) throws Exception{
JSONObject result = dgiCLient.rawSearch(typeName, colName, colValue); JSONArray results = dgiCLient.rawSearch(typeName, colName, colValue);
JSONArray results = (JSONArray) result.get("results");
Assert.assertEquals(results.length(), 1); Assert.assertEquals(results.length(), 1);
JSONObject resultRow = (JSONObject) results.get(0);
Assert.assertEquals(resultRow.get(typeName + "." + colName), colValue);
} }
} }
...@@ -22,6 +22,10 @@ import com.sun.jersey.api.client.Client; ...@@ -22,6 +22,10 @@ import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.Serialization;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
...@@ -149,8 +153,14 @@ public class MetadataServiceClient { ...@@ -149,8 +153,14 @@ public class MetadataServiceClient {
* @return result json object * @return result json object
* @throws MetadataServiceException * @throws MetadataServiceException
*/ */
public JSONObject getEntity(String guid) throws MetadataServiceException { public ITypedReferenceableInstance getEntity(String guid) throws MetadataServiceException {
return callAPI(API.GET_ENTITY, null, guid); JSONObject jsonResponse = callAPI(API.GET_ENTITY, null, guid);
try {
String entityInstanceDefinition = jsonResponse.getString(MetadataServiceClient.RESULTS);
return Serialization.fromJson(entityInstanceDefinition);
} catch (JSONException e) {
throw new MetadataServiceException(e);
}
} }
public JSONObject searchEntity(String searchQuery) throws MetadataServiceException { public JSONObject searchEntity(String searchQuery) throws MetadataServiceException {
...@@ -167,14 +177,14 @@ public class MetadataServiceClient { ...@@ -167,14 +177,14 @@ public class MetadataServiceClient {
* @return result json object * @return result json object
* @throws MetadataServiceException * @throws MetadataServiceException
*/ */
public JSONObject rawSearch(String typeName, String attributeName, public JSONArray rawSearch(String typeName, String attributeName, Object attributeValue) throws
Object attributeValue) throws MetadataServiceException { MetadataServiceException {
String gremlinQuery = String.format( // String gremlinQuery = String.format(
"g.V.has(\"typeName\",\"%s\").and(_().has(\"%s.%s\", T.eq, \"%s\")).toList()", // "g.V.has(\"typeName\",\"%s\").and(_().has(\"%s.%s\", T.eq, \"%s\")).toList()",
typeName, typeName, attributeName, attributeValue); // typeName, typeName, attributeName, attributeValue);
return searchByGremlin(gremlinQuery); // return searchByGremlin(gremlinQuery);
// String dslQuery = String.format("%s where %s = \"%s\"", typeName, attributeName, attributeValue); String dslQuery = String.format("%s where %s = \"%s\"", typeName, attributeName, attributeValue);
// return searchByDSL(dslQuery); return searchByDSL(dslQuery);
} }
/** /**
...@@ -183,10 +193,15 @@ public class MetadataServiceClient { ...@@ -183,10 +193,15 @@ public class MetadataServiceClient {
* @return result json object * @return result json object
* @throws MetadataServiceException * @throws MetadataServiceException
*/ */
public JSONObject searchByDSL(String query) throws MetadataServiceException { public JSONArray searchByDSL(String query) throws MetadataServiceException {
WebResource resource = getResource(API.SEARCH_DSL); WebResource resource = getResource(API.SEARCH_DSL);
resource = resource.queryParam("query", query); resource = resource.queryParam("query", query);
return callAPIWithResource(API.SEARCH_DSL, resource); JSONObject result = callAPIWithResource(API.SEARCH_DSL, resource);
try {
return result.getJSONObject("results").getJSONArray("rows");
} catch (JSONException e) {
throw new MetadataServiceException(e);
}
} }
/** /**
......
...@@ -104,7 +104,6 @@ class GremlinEvaluator(qry: GremlinQuery, persistenceStrategy: GraphPersistenceS ...@@ -104,7 +104,6 @@ class GremlinEvaluator(qry: GremlinQuery, persistenceStrategy: GraphPersistenceS
val v = rV.getColumn(src).get(idx) val v = rV.getColumn(src).get(idx)
sInstance.set(cName, persistenceStrategy.constructInstance(aE.dataType, v)) sInstance.set(cName, persistenceStrategy.constructInstance(aE.dataType, v))
} }
sInstance
addPathStruct(r, sInstance) addPathStruct(r, sInstance)
} }
GremlinQueryResult(qry.expr.toString, rType, rows.toList) GremlinQueryResult(qry.expr.toString, rType, rows.toList)
...@@ -137,4 +136,4 @@ object JsonHelper { ...@@ -137,4 +136,4 @@ object JsonHelper {
def toJson(r: GremlinQueryResult): String = { def toJson(r: GremlinQueryResult): String = {
writePretty(r) writePretty(r)
} }
} }
\ No newline at end of file
...@@ -99,7 +99,7 @@ mkdir -p $METADATA_LOG_DIR ...@@ -99,7 +99,7 @@ mkdir -p $METADATA_LOG_DIR
pushd ${BASEDIR} > /dev/null pushd ${BASEDIR} > /dev/null
JAVA_PROPERTIES="$METADATA_OPTS $METADATA_PROPERTIES -Dmetadata.log.dir=$METADATA_LOG_DIR -Dmetadata.home=${METADATA_HOME_DIR} -Dmetadata.conf=${METADATA_CONF}" JAVA_PROPERTIES="$METADATA_OPTS $METADATA_PROPERTIES -Dmetadata.log.dir=$METADATA_LOG_DIR -Dmetadata.home=${METADATA_HOME_DIR} -Dmetadata.conf=${METADATA_CONF} -Dmetadata.log.file=application.log"
shift shift
while [[ ${1} =~ ^\-D ]]; do while [[ ${1} =~ ^\-D ]]; do
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
</appender> </appender>
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender"> <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${metadata.log.dir}/application.log"/> <param name="File" value="${metadata.log.dir}/${metadata.log.file}"/>
<param name="Append" value="true"/> <param name="Append" value="true"/>
<param name="Threshold" value="debug"/> <param name="Threshold" value="debug"/>
<layout class="org.apache.log4j.PatternLayout"> <layout class="org.apache.log4j.PatternLayout">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment