Commit 21109f1e by Shwetha GS

de-duping on query string in hive hook

parent 2270d05f
......@@ -18,6 +18,7 @@
package org.apache.hadoop.metadata.hive.bridge;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
......@@ -158,11 +159,24 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName();
String dslQuery = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
String dslQuery = String.format("%s where name = '%s' and clusterName = '%s'", typeName,
databaseName.toLowerCase(), clusterName);
return getEntityReferenceFromDSL(typeName, dslQuery);
}
public Referenceable getProcessReference(String queryStr) throws Exception {
LOG.debug("Getting reference for process with query {}", queryStr);
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
//todo enable DSL
// String dslQuery = String.format("%s where queryText = \"%s\"", typeName, queryStr);
// return getEntityReferenceFromDSL(typeName, dslQuery);
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()",
typeName, typeName, StringEscapeUtils.escapeJava(queryStr));
return getEntityReferenceFromGremlin(typeName, gremlinQuery);
}
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
MetadataServiceClient dgiClient = getMetadataServiceClient();
JSONArray results = dgiClient.searchByDSL(dslQuery);
......
......@@ -37,6 +37,8 @@ package org.apache.hadoop.metadata.hive.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
......@@ -271,6 +273,13 @@ public class HiveHook implements ExecuteWithHookContext {
}
}
private String normalize(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return str.toLowerCase().trim();
}
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
Set<ReadEntity> inputs = event.inputs;
Set<WriteEntity> outputs = event.outputs;
......@@ -285,48 +294,53 @@ public class HiveHook implements ExecuteWithHookContext {
}
String queryId = event.queryPlan.getQueryId();
String queryStr = event.queryPlan.getQueryStr();
String queryStr = normalize(event.queryPlan.getQueryStr());
long queryStartTime = event.queryPlan.getQueryStartTime();
LOG.debug("Registering CTAS query: {}", queryStr);
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("name", event.operation.getOperationName());
processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Entity.Type.TABLE) {
Table table = readEntity.getTable();
String dbName = table.getDbName();
source.add(dgiBridge.registerTable(dbName, table.getTableName()));
}
if (readEntity.getType() == Entity.Type.PARTITION) {
dgiBridge.registerPartition(readEntity.getPartition());
}
}
processReferenceable.set("inputs", source);
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) {
Table table = writeEntity.getTable();
String dbName = table.getDbName();
target.add(dgiBridge.registerTable(dbName, table.getTableName()));
Referenceable processReferenceable = dgiBridge.getProcessReference(queryStr);
if (processReferenceable == null) {
processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("name", event.operation.getOperationName());
processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("userName", event.user);
List<Referenceable> source = new ArrayList<>();
for (ReadEntity readEntity : inputs) {
if (readEntity.getType() == Entity.Type.TABLE) {
Table table = readEntity.getTable();
String dbName = table.getDbName();
source.add(dgiBridge.registerTable(dbName, table.getTableName()));
}
if (readEntity.getType() == Entity.Type.PARTITION) {
dgiBridge.registerPartition(readEntity.getPartition());
}
}
if (writeEntity.getType() == Entity.Type.PARTITION) {
dgiBridge.registerPartition(writeEntity.getPartition());
processReferenceable.set("inputs", source);
List<Referenceable> target = new ArrayList<>();
for (WriteEntity writeEntity : outputs) {
if (writeEntity.getType() == Entity.Type.TABLE || writeEntity.getType() == Entity.Type.PARTITION) {
Table table = writeEntity.getTable();
String dbName = table.getDbName();
target.add(dgiBridge.registerTable(dbName, table.getTableName()));
}
if (writeEntity.getType() == Entity.Type.PARTITION) {
dgiBridge.registerPartition(writeEntity.getPartition());
}
}
processReferenceable.set("outputs", target);
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString());
processReferenceable.set("endTime", System.currentTimeMillis());
//TODO set
processReferenceable.set("queryGraph", "queryGraph");
dgiBridge.createInstance(processReferenceable);
} else {
LOG.debug("Query {} is already registered", queryStr);
}
processReferenceable.set("outputs", target);
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", queryId);
processReferenceable.set("queryPlan", event.jsonPlan.toString());
processReferenceable.set("endTime", System.currentTimeMillis());
//TODO set
processReferenceable.set("queryGraph", "queryGraph");
dgiBridge.createInstance(processReferenceable);
}
......
......@@ -19,6 +19,8 @@
package org.apache.hadoop.metadata.hive.hook;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.Driver;
......@@ -222,7 +224,7 @@ public class HiveHookIT {
String tableName = createTable(false);
String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to '" + filename + "'";
String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
assertProcessIsRegistered(query);
......@@ -239,6 +241,11 @@ public class HiveHookIT {
String query = "select * from " + tableName;
runCommand(query);
assertProcessIsRegistered(query);
//single entity per query
query = "SELECT * from " + tableName.toUpperCase();
runCommand(query);
assertProcessIsRegistered(query);
}
@Test
......@@ -268,8 +275,23 @@ public class HiveHookIT {
}
private void assertProcessIsRegistered(String queryStr) throws Exception {
String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), queryStr);
assertEntityIsRegistered(dslQuery, true);
// String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
// normalize(queryStr));
// assertEntityIsRegistered(dslQuery, true);
//todo replace with DSL
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()",
typeName, typeName, normalize(queryStr));
JSONObject response = dgiCLient.searchByGremlin(gremlinQuery);
JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS);
Assert.assertEquals(results.length(), 1);
}
private String normalize(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return StringEscapeUtils.escapeJava(str.toLowerCase());
}
private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
......
......@@ -281,12 +281,14 @@ public class GraphBackedTypeStore implements ITypeStore {
private AttributeDefinition[] getAttributes(Vertex vertex, String typeName) throws MetadataException {
List<AttributeDefinition> attributes = new ArrayList<>();
List<String> attrNames = vertex.getProperty(getPropertyKey(typeName));
for (String attrName : attrNames) {
try {
String propertyKey = getPropertyKey(typeName, attrName);
attributes.add(AttributeInfo.fromJson((String) vertex.getProperty(propertyKey)));
} catch (JSONException e) {
throw new MetadataException(e);
if (attrNames != null) {
for (String attrName : attrNames) {
try {
String propertyKey = getPropertyKey(typeName, attrName);
attributes.add(AttributeInfo.fromJson((String) vertex.getProperty(propertyKey)));
} catch (JSONException e) {
throw new MetadataException(e);
}
}
}
return attributes.toArray(new AttributeDefinition[attributes.size()]);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment