Commit 5ff1cb16 by Shwetha GS

Merge branch 'master' into dal

parents 1ed4dee6 6e9d6948
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.hadoop.metadata.hive.bridge; package org.apache.hadoop.metadata.hive.bridge;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
...@@ -158,11 +159,24 @@ public class HiveMetaStoreBridge { ...@@ -158,11 +159,24 @@ public class HiveMetaStoreBridge {
LOG.debug("Getting reference for database {}", databaseName); LOG.debug("Getting reference for database {}", databaseName);
String typeName = HiveDataTypes.HIVE_DB.getName(); String typeName = HiveDataTypes.HIVE_DB.getName();
String dslQuery = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(), String dslQuery = String.format("%s where name = '%s' and clusterName = '%s'", typeName,
databaseName.toLowerCase(), clusterName); databaseName.toLowerCase(), clusterName);
return getEntityReferenceFromDSL(typeName, dslQuery); return getEntityReferenceFromDSL(typeName, dslQuery);
} }
public Referenceable getProcessReference(String queryStr) throws Exception {
LOG.debug("Getting reference for process with query {}", queryStr);
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
//todo enable DSL
// String dslQuery = String.format("%s where queryText = \"%s\"", typeName, queryStr);
// return getEntityReferenceFromDSL(typeName, dslQuery);
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()",
typeName, typeName, StringEscapeUtils.escapeJava(queryStr));
return getEntityReferenceFromGremlin(typeName, gremlinQuery);
}
private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception { private Referenceable getEntityReferenceFromDSL(String typeName, String dslQuery) throws Exception {
MetadataServiceClient dgiClient = getMetadataServiceClient(); MetadataServiceClient dgiClient = getMetadataServiceClient();
JSONArray results = dgiClient.searchByDSL(dslQuery); JSONArray results = dgiClient.searchByDSL(dslQuery);
......
...@@ -15,28 +15,13 @@ ...@@ -15,28 +15,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.hive.hook; package org.apache.hadoop.metadata.hive.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.ExplainTask;
...@@ -271,6 +256,13 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -271,6 +256,13 @@ public class HiveHook implements ExecuteWithHookContext {
} }
} }
private String normalize(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return str.toLowerCase().trim();
}
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
Set<ReadEntity> inputs = event.inputs; Set<ReadEntity> inputs = event.inputs;
Set<WriteEntity> outputs = event.outputs; Set<WriteEntity> outputs = event.outputs;
...@@ -285,11 +277,13 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -285,11 +277,13 @@ public class HiveHook implements ExecuteWithHookContext {
} }
String queryId = event.queryPlan.getQueryId(); String queryId = event.queryPlan.getQueryId();
String queryStr = event.queryPlan.getQueryStr(); String queryStr = normalize(event.queryPlan.getQueryStr());
long queryStartTime = event.queryPlan.getQueryStartTime(); long queryStartTime = event.queryPlan.getQueryStartTime();
LOG.debug("Registering CTAS query: {}", queryStr); LOG.debug("Registering CTAS query: {}", queryStr);
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); Referenceable processReferenceable = dgiBridge.getProcessReference(queryStr);
if (processReferenceable == null) {
processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
processReferenceable.set("name", event.operation.getOperationName()); processReferenceable.set("name", event.operation.getOperationName());
processReferenceable.set("startTime", queryStartTime); processReferenceable.set("startTime", queryStartTime);
processReferenceable.set("userName", event.user); processReferenceable.set("userName", event.user);
...@@ -327,6 +321,9 @@ public class HiveHook implements ExecuteWithHookContext { ...@@ -327,6 +321,9 @@ public class HiveHook implements ExecuteWithHookContext {
//TODO set //TODO set
processReferenceable.set("queryGraph", "queryGraph"); processReferenceable.set("queryGraph", "queryGraph");
dgiBridge.createInstance(processReferenceable); dgiBridge.createInstance(processReferenceable);
} else {
LOG.debug("Query {} is already registered", queryStr);
}
} }
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
package org.apache.hadoop.metadata.hive.hook; package org.apache.hadoop.metadata.hive.hook;
import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.Driver;
...@@ -222,7 +224,7 @@ public class HiveHookIT { ...@@ -222,7 +224,7 @@ public class HiveHookIT {
String tableName = createTable(false); String tableName = createTable(false);
String filename = "pfile://" + mkdir("export"); String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to '" + filename + "'"; String query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query); runCommand(query);
assertProcessIsRegistered(query); assertProcessIsRegistered(query);
...@@ -239,6 +241,11 @@ public class HiveHookIT { ...@@ -239,6 +241,11 @@ public class HiveHookIT {
String query = "select * from " + tableName; String query = "select * from " + tableName;
runCommand(query); runCommand(query);
assertProcessIsRegistered(query); assertProcessIsRegistered(query);
//single entity per query
query = "SELECT * from " + tableName.toUpperCase();
runCommand(query);
assertProcessIsRegistered(query);
} }
@Test @Test
...@@ -268,8 +275,23 @@ public class HiveHookIT { ...@@ -268,8 +275,23 @@ public class HiveHookIT {
} }
private void assertProcessIsRegistered(String queryStr) throws Exception { private void assertProcessIsRegistered(String queryStr) throws Exception {
String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), queryStr); // String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
assertEntityIsRegistered(dslQuery, true); // normalize(queryStr));
// assertEntityIsRegistered(dslQuery, true);
//todo replace with DSL
String typeName = HiveDataTypes.HIVE_PROCESS.getName();
String gremlinQuery = String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()",
typeName, typeName, normalize(queryStr));
JSONObject response = dgiCLient.searchByGremlin(gremlinQuery);
JSONArray results = response.getJSONArray(MetadataServiceClient.RESULTS);
Assert.assertEquals(results.length(), 1);
}
private String normalize(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return StringEscapeUtils.escapeJava(str.toLowerCase());
} }
private String assertTableIsRegistered(String dbName, String tableName) throws Exception { private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
......
...@@ -38,8 +38,6 @@ public class GraphTransactionInterceptor implements MethodInterceptor { ...@@ -38,8 +38,6 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
} }
try { try {
LOG.debug("graph rollback to cleanup previous state");
titanGraph.rollback(); //cleanup previous state
Object response = invocation.proceed(); Object response = invocation.proceed();
titanGraph.commit(); titanGraph.commit();
LOG.debug("graph commit"); LOG.debug("graph commit");
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.hadoop.metadata.repository.graph; package org.apache.hadoop.metadata.repository.graph;
import com.google.inject.Singleton;
import com.thinkaurelius.titan.core.Cardinality; import com.thinkaurelius.titan.core.Cardinality;
import com.thinkaurelius.titan.core.EdgeLabel; import com.thinkaurelius.titan.core.EdgeLabel;
import com.thinkaurelius.titan.core.Order; import com.thinkaurelius.titan.core.Order;
...@@ -51,6 +52,7 @@ import java.util.Map; ...@@ -51,6 +52,7 @@ import java.util.Map;
/** /**
* Adds index for properties of a given type when its added before any instances are added. * Adds index for properties of a given type when its added before any instances are added.
*/ */
@Singleton
public class GraphBackedSearchIndexer implements SearchIndexer { public class GraphBackedSearchIndexer implements SearchIndexer {
private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class);
......
...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.repository.typestore; ...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.repository.typestore;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanGraph;
import com.tinkerpop.blueprints.Direction; import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Edge;
...@@ -53,6 +54,7 @@ import java.util.ArrayList; ...@@ -53,6 +54,7 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@Singleton
public class GraphBackedTypeStore implements ITypeStore { public class GraphBackedTypeStore implements ITypeStore {
public static final String VERTEX_TYPE = "typeSystem"; public static final String VERTEX_TYPE = "typeSystem";
private static final String PROPERTY_PREFIX = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "type."; private static final String PROPERTY_PREFIX = Constants.INTERNAL_PROPERTY_KEY_PREFIX + "type.";
...@@ -73,7 +75,6 @@ public class GraphBackedTypeStore implements ITypeStore { ...@@ -73,7 +75,6 @@ public class GraphBackedTypeStore implements ITypeStore {
} }
@Override @Override
@GraphTransaction
public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException { public void store(TypeSystem typeSystem, ImmutableList<String> typeNames) throws MetadataException {
ImmutableList<String> coreTypes = typeSystem.getCoreTypes(); ImmutableList<String> coreTypes = typeSystem.getCoreTypes();
for (String typeName : typeNames) { for (String typeName : typeNames) {
...@@ -280,6 +281,7 @@ public class GraphBackedTypeStore implements ITypeStore { ...@@ -280,6 +281,7 @@ public class GraphBackedTypeStore implements ITypeStore {
private AttributeDefinition[] getAttributes(Vertex vertex, String typeName) throws MetadataException { private AttributeDefinition[] getAttributes(Vertex vertex, String typeName) throws MetadataException {
List<AttributeDefinition> attributes = new ArrayList<>(); List<AttributeDefinition> attributes = new ArrayList<>();
List<String> attrNames = vertex.getProperty(getPropertyKey(typeName)); List<String> attrNames = vertex.getProperty(getPropertyKey(typeName));
if (attrNames != null) {
for (String attrName : attrNames) { for (String attrName : attrNames) {
try { try {
String propertyKey = getPropertyKey(typeName, attrName); String propertyKey = getPropertyKey(typeName, attrName);
...@@ -288,6 +290,7 @@ public class GraphBackedTypeStore implements ITypeStore { ...@@ -288,6 +290,7 @@ public class GraphBackedTypeStore implements ITypeStore {
throw new MetadataException(e); throw new MetadataException(e);
} }
} }
}
return attributes.toArray(new AttributeDefinition[attributes.size()]); return attributes.toArray(new AttributeDefinition[attributes.size()]);
} }
......
...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.services; ...@@ -20,6 +20,7 @@ package org.apache.hadoop.metadata.services;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataServiceClient; import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.ParamChecker; import org.apache.hadoop.metadata.ParamChecker;
...@@ -145,6 +146,7 @@ public class DefaultMetadataService implements MetadataService { ...@@ -145,6 +146,7 @@ public class DefaultMetadataService implements MetadataService {
* @return a unique id for this type * @return a unique id for this type
*/ */
@Override @Override
@GraphTransaction
public JSONObject createType(String typeDefinition) throws MetadataException { public JSONObject createType(String typeDefinition) throws MetadataException {
ParamChecker.notEmpty(typeDefinition, "type definition cannot be empty"); ParamChecker.notEmpty(typeDefinition, "type definition cannot be empty");
......
...@@ -23,6 +23,7 @@ import com.tinkerpop.blueprints.Direction; ...@@ -23,6 +23,7 @@ import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex; import com.tinkerpop.blueprints.Vertex;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.metadata.GraphTransaction;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.RepositoryMetadataModule; import org.apache.hadoop.metadata.RepositoryMetadataModule;
import org.apache.hadoop.metadata.TestUtils; import org.apache.hadoop.metadata.TestUtils;
...@@ -63,6 +64,7 @@ public class GraphBackedTypeStoreTest { ...@@ -63,6 +64,7 @@ public class GraphBackedTypeStoreTest {
} }
@Test @Test
@GraphTransaction
public void testStore() throws MetadataException { public void testStore() throws MetadataException {
typeStore.store(ts); typeStore.store(ts);
dumpGraph(); dumpGraph();
...@@ -79,6 +81,7 @@ public class GraphBackedTypeStoreTest { ...@@ -79,6 +81,7 @@ public class GraphBackedTypeStoreTest {
} }
@Test (dependsOnMethods = "testStore") @Test (dependsOnMethods = "testStore")
@GraphTransaction
public void testRestore() throws Exception { public void testRestore() throws Exception {
TypesDef types = typeStore.restore(); TypesDef types = typeStore.restore();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment