Commit 555960a5 by Ballistar13

Merge remote-tracking branch 'origin/master'

Conflicts: metadata-bridge-parent/metadata-bridge-core/src/main/java/org/apache/hadoop/metadata/bridge/hivelineage/HiveLineageBridge.java
parents 04997e4c bc7bf44a
......@@ -55,6 +55,10 @@
<artifactId>metadata-typesystem</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-repository</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
......
......@@ -18,12 +18,10 @@
package org.apache.hadoop.metadata.hivetypes;
;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.metadata.*;
import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.storage.IRepository;
import org.apache.hadoop.metadata.storage.Id;
import org.apache.hadoop.metadata.storage.RepositoryException;
......@@ -32,18 +30,23 @@ import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
;
public class HiveImporter {
private final HiveMetaStoreClient hiveMetastoreClient;
public static final Log LOG = LogFactory.getLog(HiveImporter.class);
private static final Logger LOG =
LoggerFactory.getLogger(HiveImporter.class);
private TypeSystem typeSystem;
private IRepository repository;
private MetadataRepository graphRepository;
private HiveTypeSystem hiveTypeSystem;
private List<Id> dbInstances;
......@@ -51,27 +54,41 @@ public class HiveImporter {
private List<Id> partitionInstances;
private List<Id> columnInstances;
public HiveImporter(MetadataRepository repo, HiveTypeSystem hts, HiveMetaStoreClient hmc) throws RepositoryException {
this(hts, hmc);
if (repo == null) {
LOG.error("repository is null");
throw new RuntimeException("repository is null");
}
this.graphRepository = repo;
}
public HiveImporter(IRepository repo, HiveTypeSystem hts, HiveMetaStoreClient hmc) throws RepositoryException {
this.repository = repo;
this.hiveMetastoreClient = hmc;
this.hiveTypeSystem = hts;
typeSystem = TypeSystem.getInstance();
dbInstances = new ArrayList<>();
tableInstances = new ArrayList<>();
partitionInstances = new ArrayList<>();
columnInstances = new ArrayList<>();
this(hts, hmc);
if (repository == null) {
if (repo == null) {
LOG.error("repository is null");
throw new RuntimeException("repository is null");
}
repository = repo;
repository.defineTypes(hts.getHierarchicalTypeDefinitions());
}
private HiveImporter(HiveTypeSystem hts, HiveMetaStoreClient hmc) {
this.hiveMetastoreClient = hmc;
this.hiveTypeSystem = hts;
typeSystem = TypeSystem.getInstance();
dbInstances = new ArrayList<>();
tableInstances = new ArrayList<>();
partitionInstances = new ArrayList<>();
columnInstances = new ArrayList<>();
}
public List<Id> getDBInstances() {
return dbInstances;
......@@ -102,6 +119,21 @@ public class HiveImporter {
}
}
private ITypedReferenceableInstance createInstance(Referenceable ref)
throws MetadataException {
if (repository != null) {
return repository.create(ref);
} else {
String typeName = ref.getTypeName();
IDataType dataType = hiveTypeSystem.getDataType(typeName);
LOG.debug("creating instance of type " + typeName + " dataType " + dataType);
ITypedReferenceableInstance instance =
(ITypedReferenceableInstance) dataType.convert(ref, Multiplicity.OPTIONAL);
graphRepository.createEntity(instance, typeName);
return instance;
}
}
private void importDatabase(String db) throws MetadataException {
try {
LOG.info("Importing objects from database : " + db);
......@@ -113,8 +145,8 @@ public class HiveImporter {
dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set("parameters", hiveDB.getParameters());
dbRef.set("ownerName", hiveDB.getOwnerName());
dbRef.set("ownerType", hiveDB.getOwnerType().toString());
ITypedReferenceableInstance dbRefTyped = repository.create(dbRef);
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
ITypedReferenceableInstance dbRefTyped = createInstance(dbRef);
dbInstances.add(dbRefTyped.getId());
importTables(db, dbRefTyped);
} catch (NoSuchObjectException nsoe) {
......@@ -153,7 +185,7 @@ public class HiveImporter {
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
ITypedReferenceableInstance colRefTyped = repository.create(colRef);
ITypedReferenceableInstance colRefTyped = createInstance(colRef);
partKeys.add(colRefTyped);
}
tableRef.set("partitionKeys", partKeys);
......@@ -168,7 +200,7 @@ public class HiveImporter {
tableRef.set("tableType", hiveTable.getTableType());
tableRef.set("temporary", hiveTable.isTemporary());
ITypedReferenceableInstance tableRefTyped = repository.create(tableRef);
ITypedReferenceableInstance tableRefTyped = createInstance(tableRef);
tableInstances.add(tableRefTyped.getId());
......@@ -186,7 +218,7 @@ public class HiveImporter {
partRef.set("sd", sdStruct);
partRef.set("columns", sdStruct.get("cols"));
partRef.set("parameters", hivePart.getParameters());
ITypedReferenceableInstance partRefTyped = repository.create(partRef);
ITypedReferenceableInstance partRefTyped = createInstance(partRef);
partitionInstances.add(partRefTyped.getId());
}
}
......@@ -251,7 +283,7 @@ public class HiveImporter {
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
ITypedReferenceableInstance colRefTyped = repository.create(colRef);
ITypedReferenceableInstance colRefTyped = createInstance(colRef);
fieldsList.add(colRefTyped);
columnInstances.add(colRefTyped.getId());
}
......
......@@ -23,6 +23,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.types.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -31,7 +33,9 @@ import java.util.Map;
public class HiveTypeSystem {
public static final Log LOG = LogFactory.getLog(HiveTypeSystem.class);
private static final Logger LOG =
LoggerFactory.getLogger(HiveTypeSystem.class);
public static final class Holder {
public static final HiveTypeSystem instance = new HiveTypeSystem();
}
......
......@@ -16,13 +16,13 @@
# limitations under the License.
#
org.apache.hadoop.metadata=INFO, console
org.apache.hadoop=INFO, console
org.apache.hadoop.metadata=DEBUG, console
org.apache.hadoop=DEBUG, console
org.apache.hive=INFO, console
org.apache.hcatalog=INFO, console
metadata.root.logger=INFO,console,DRFA
metadata.root.logger=DEBUG,console,DRFA
hive.root.logger=INFO,console,DRFA
hcatalog.root.logger=INFO,console,DRFA
metadata.log.dir=${user.dir}/metadata/logs
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.hivetypes;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.repository.graph.GraphBackedMetadataRepository;
import org.apache.hadoop.metadata.repository.graph.GraphService;
import org.apache.hadoop.metadata.repository.graph.TitanGraphProvider;
import org.apache.hadoop.metadata.repository.graph.TitanGraphService;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
@Ignore
public class HiveGraphRepositoryTest {
protected HiveTypeSystem hts;
GraphBackedMetadataRepository repository;
private static final Logger LOG =
LoggerFactory.getLogger(HiveGraphRepositoryTest.class);
@Before
public void setup() throws ConfigurationException, MetadataException {
TypeSystem ts = TypeSystem.getInstance();
GraphService gs = new TitanGraphService(new TitanGraphProvider());
repository = new GraphBackedMetadataRepository(gs);
hts = HiveTypeSystem.getInstance();
}
@Test
public void testHiveImport() throws MetaException, MetadataException, IOException {
HiveImporter hImporter = new HiveImporter(repository, hts, new HiveMetaStoreClient(new HiveConf()));
hImporter.importHiveMetadata();
LOG.info("Defined DB instances");
FileWriter fw = new FileWriter("hiveobjs.txt");
BufferedWriter bw = new BufferedWriter(fw);
List<String> idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_DB.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
LOG.info("Defined Table instances");
idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_TABLE.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
LOG.info("Defined Partition instances");
idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_PARTITION.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
LOG.info("Defined Column instances");
idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_COLUMN.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
bw.flush();
bw.close();
}
}
......@@ -29,14 +29,24 @@ import org.apache.hadoop.metadata.storage.Id;
import org.apache.hadoop.metadata.storage.memory.MemRepository;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
@Ignore
public class HiveTypeSystemTest {
protected MemRepository mr;
protected HiveTypeSystem hts;
public static final Log LOG = LogFactory.getLog(HiveTypeSystemTest.class);
private static final Logger LOG =
LoggerFactory.getLogger(HiveTypeSystemTest.class);
@Before
public void setup() throws MetadataException {
......@@ -48,30 +58,34 @@ public class HiveTypeSystemTest {
}
@Test
public void testHiveImport() throws MetaException, MetadataException {
public void testHiveImport() throws MetaException, MetadataException, IOException {
HiveImporter himport = new HiveImporter(mr, hts, new HiveMetaStoreClient(new HiveConf()));
himport.importHiveMetadata();
HiveImporter hImporter = new HiveImporter(mr, hts, new HiveMetaStoreClient(new HiveConf()));
hImporter.importHiveMetadata();
LOG.info("Defined DB instances");
for (Id id : himport.getDBInstances()) {
FileWriter fw = new FileWriter("hiveobjs.txt");
BufferedWriter bw = new BufferedWriter(fw);
for (Id id : hImporter.getDBInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
LOG.info("Defined Table instances");
for (Id id : himport.getTableInstances()) {
for (Id id : hImporter.getTableInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
LOG.info("Defined Partition instances");
for (Id id : himport.getPartitionInstances()) {
for (Id id : hImporter.getPartitionInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
LOG.info("Defined Column instances");
for (Id id : himport.getColumnInstances()) {
for (Id id : hImporter.getColumnInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
bw.flush();
bw.close();
}
}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# GraphService implementation
metadata.graph.impl.class=org.apache.hadoop.metadata.repository.graph.TitanGraphService
# Graph implementation
#metadata.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
# Graph Storage
metadata.graph.storage.backend=inmemory
# Graph Search Index
#metadata.graph.index.search.backend=elasticsearch
#metadata.graph.index.search.directory=target/data/es
#metadata.graph.index.search.elasticsearch.client-only=false
#metadata.graph.index.search.elasticsearch.local-mode=true
metadata.enableTLS=false
storage.backend=inmemory
# Graph Search Index
index.search.backend=elasticsearch
index.search.directory=target/data/es
index.search.elasticsearch.client-only=false
index.search.elasticsearch.local-mode=true
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-bridge-parent</artifactId>
<version>0.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>metadata-bridge-core</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>0.14.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-typesystem</artifactId>
<version>0.1-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-bridge-parent</artifactId>
<version>0.1-incubating-SNAPSHOT</version>
</parent>
<artifactId>metadata-bridge-core</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>0.14.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-repository</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-bridge-hive</artifactId>
<version>0.1-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-core</artifactId>
<version>0.5.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -23,18 +23,18 @@ import com.google.common.collect.ImmutableList;
public abstract class ABridge implements IBridge {
protected ArrayList<Class<AEnitityBean>> typeBeanClasses;
protected MetadataRepository repo;
protected ArrayList<Class<? extends AEnitityBean>> typeBeanClasses = new ArrayList<Class<? extends AEnitityBean>>();
MetadataRepository repo;
protected static final Logger LOG = LoggerFactory.getLogger("BridgeLogger");
protected HierarchicalTypeDefinition<ClassType> createClassTypeDef(String name, ImmutableList<String> superTypes, AttributeDefinition... attrDefs) {return new HierarchicalTypeDefinition(ClassType.class, name, superTypes, attrDefs);}
public ArrayList<Class<AEnitityBean>> getTypeBeanClasses() {
public ArrayList<Class<? extends AEnitityBean>> getTypeBeanClasses() {
return typeBeanClasses;
}
@Inject
ABridge(MetadataRepository repo) {
protected ABridge(MetadataRepository repo) {
this.repo = repo;
}
......
package org.apache.hadoop.metadata.bridge;
import org.apache.hadoop.metadata.types.TypeSystem;
public interface IBridge {
boolean defineBridgeTypes(TypeSystem ts);
}
package org.apache.hadoop.metadata.bridge.hivelineage;
import org.apache.hadoop.metadata.MetadataException;
import javax.inject.Inject;
import org.apache.hadoop.metadata.bridge.ABridge;
import org.apache.hadoop.metadata.types.AttributeDefinition;
import org.apache.hadoop.metadata.types.ClassType;
import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.apache.hadoop.metadata.bridge.hivelineage.hook.HiveLineageBean;
import org.apache.hadoop.metadata.repository.MetadataRepository;
public class HiveLineageBridge extends ABridge {
static final String LINEAGE_CLASS_TYPE = "HiveLineage";
@Override
public boolean defineBridgeTypes(TypeSystem ts) {
try {
HierarchicalTypeDefinition<ClassType> lineageClassTypeDef =
new HierarchicalTypeDefinition<ClassType>(
"ClassType",
LINEAGE_CLASS_TYPE,
null,
new AttributeDefinition[] {
new AttributeDefinition("QUERY_ID", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("HIVE_ID", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("USER", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("QUERY_START_TIME", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("QUERY_END_TIME", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("QUERY", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("TABLE_NAME", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("TABLE_LOCATION", "STRING_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("SUCCESS", "BOOLEAN_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("FAILED", "BOOLEAN_TYPE", Multiplicity.REQUIRED, false, null),
new AttributeDefinition("EXECUTION_ENGINE", "STRING_TYPE", Multiplicity.REQUIRED, false, null)
});
// TODO - assess these
/*
* Not sure what to do with these attributes - wouldn't tables and columns be linked to
* Hive Structure instances?
*
ArrayList<SourceTables> sourceTables;
ArrayList<QueryColumns> queryColumns;
ArrayList<WhereClause> whereClause;
ArrayList<CreateColumns> createColumns;
ArrayList<GroupBy> groupBy;
ArrayList<GroupBy> orderBy;*/
ts.defineClassType(lineageClassTypeDef);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (MetadataException e) {
e.printStackTrace();
}
return false;
}
@Inject
public HiveLineageBridge(MetadataRepository mr) {
super(mr);
this.typeBeanClasses.add(HiveLineageBean.class);
}
}
package org.apache.hadoop.metadata.web.resources;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.metadata.bridge.hivelineage.HiveLineageBridge;
import org.apache.hadoop.metadata.bridge.hivelineage.hook.HiveLineageBean;
import org.apache.hadoop.metadata.storage.RepositoryException;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
@Path("bridge/hive")
@Singleton
public class HiveLineageResource {
private final HiveLineageBridge bridge;
@Inject
public HiveLineageResource(HiveLineageBridge bridge) {
this.bridge = bridge;
}
/*
* @PathParam("entityType") String entityType,
*
* @DefaultValue("0") @QueryParam("offset") Integer offset,
*
* @QueryParam("numResults") Integer resultsPerPage
*/
@GET
@Path("{id}")
@Produces(MediaType.APPLICATION_JSON)
public JsonElement getById(@PathParam("id") String id) throws RepositoryException {
// get the lineage bean
HiveLineageBean hlb = bridge.get(id);
// turn it into a JsonTree & return
return new Gson().toJsonTree(hlb);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
public JsonElement list() throws RepositoryException {
// make a new JsonArray to be returned
JsonArray ja = new JsonArray();
// iterate over each item returned by the hive bridge's list() method
for (String s: bridge.list()) {
// they are GUIDs so make them into JsonPrimitives
ja.add(new JsonPrimitive(s));
}
return ja;
}
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public JsonElement addLineage(@Context HttpServletRequest request) throws IOException, RepositoryException {
// create a reader
Reader reader = new InputStreamReader(request.getInputStream());
try {
// deserialize
HiveLineageBean bean = new Gson().fromJson(reader, HiveLineageBean.class);
String id = bridge.create(bean);
JsonObject jo = new JsonObject();
jo.addProperty("id", id);
return jo;
} finally {
// be a good citizen
reader.close();
}
}
}
package org.apache.hadoop.metadata.bridge.hivelineage;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.inject.Inject;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.metadata.RepositoryMetadataModule;
import org.apache.hadoop.metadata.bridge.hivelineage.hook.HiveLineageBean;
import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.storage.RepositoryException;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import com.google.gson.Gson;
@Test(enabled = false)
@Guice(modules = RepositoryMetadataModule.class)
public class TestHiveLineageBridge {
@Inject
MetadataRepository repo;
HiveLineageBridge bridge;
HiveLineageBean hlb;
// the id of one.json in the repo (test #1)
String oneId;
@BeforeClass
public void bootstrap() throws IOException {
// this used in lieu of DI for now
bridge = new HiveLineageBridge(repo);
// create a hive lineage bean
FileInputStream fis = new FileInputStream("one.json");
List<String> lines = IOUtils.readLines(fis);
String json = StringUtils.join(lines, "");
hlb = new Gson().fromJson(json, HiveLineageBean.class);
}
@Test(priority = 1, enabled = false)
public void testCreate() throws RepositoryException {
// add the lineage bean to the repo
oneId = bridge.create(hlb);
// make sure this actually did worked
Assert.assertNotNull(oneId);
}
@Test(priority = 2, enabled = false)
public void testGet() throws RepositoryException, IOException {
HiveLineageBean bean = bridge.get(oneId);
Assert.assertEquals(hlb, bean);
}
@Test(priority = 3, enabled = false)
public void testList() throws RepositoryException {
List<String> list = IteratorUtils.toList(bridge.list().iterator());
Assert.assertEquals(list.size(), 1);
Assert.assertEquals(list.get(0), oneId);
}
}
storage.backend=inmemory
# Graph Search Index
index.search.backend=elasticsearch
index.search.directory=target/data/es
index.search.elasticsearch.client-only=false
index.search.elasticsearch.local-mode=true
\ No newline at end of file
{"queryId":"a760104_20150106120303_036186d5-a991-4dfc-9ff2-05b072c7e711","hiveId":"90797386-3933-4ab0-ae68-a7baa7e155d4","user":"","queryStartTime":"1420563838114","queryEndTime":"1420563853806","query":"create table nyse_gss_count_dump as select count(nyse.stock_symbol) stock_symbol_count, stock_symbol from nyse_stocks nyse where (nyse.stock_symbol \u003d \u0027AET\u0027 or nyse.stock_symbol \u003d \u0027UNH\u0027 ) and nyse.stock_symbol \u003d \u0027T\u0027 GROUP by stock_symbol","tableName":"nyse_gss_count_dump","success":true,"failed":false,"executionEngine":"tez","sourceTables":[{"tableName":"nyse_stocks","tableAlias":"nyse"}],"queryColumns":[{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnAlias":"stock_symbol_count","columnFunction":"count"},{"columnName":"stock_symbol"}],"whereClause":[{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnOperator":"\u003d","columnValue":"\u0027AET\u0027"},{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnOperator":"\u003d","columnValue":"\u0027UNH\u0027"},{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnOperator":"\u003d","columnValue":"\u0027T\u0027"}],"groupBy":[{"columnName":"stock_symbol"}]}
\ No newline at end of file
{"queryId":"a760104_20150108124747_53cb7716-8756-4dfe-b746-4055f53e2895","hiveId":"1aebd95c-c7d5-4893-8c8c-c9ae098bdd5c","user":"","queryStartTime":"1420739257453","queryEndTime":"1420739277589","query":"create table nyse_gss_count_dump as select count(nyse.stock_symbol) stock_symbol_count, stock_symbol from nyse_stocks nyse where (nyse.stock_symbol \u003d \u0027AET\u0027 or nyse.stock_symbol \u003d \u0027UNH\u0027 ) and nyse.stock_symbol \u003d \u0027T\u0027 GROUP by stock_symbol","tableName":"nyse_gss_count_dump","success":true,"failed":false,"executionEngine":"tez","sourceTables":[{"tableName":"nyse_stocks","tableAlias":"nyse"}],"queryColumns":[{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnAlias":"stock_symbol_count","columnFunction":"count"},{"columnName":"stock_symbol"}],"whereClause":[{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnOperator":"\u003d","columnValue":"\u0027AET\u0027"},{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnOperator":"\u003d","columnValue":"\u0027UNH\u0027"},{"tbAliasOrName":"nyse","columnName":"stock_symbol","columnOperator":"\u003d","columnValue":"\u0027T\u0027"}],"groupBy":[{"columnName":"stock_symbol"}]}
\ No newline at end of file
......@@ -16,16 +16,19 @@
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>0.13.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.13.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -4,7 +4,9 @@ import java.io.Serializable;
import java.util.List;
import java.util.ArrayList;
public class HiveLineageBean implements Serializable {
import org.apache.hadoop.metadata.bridge.AEnitityBean;
public class HiveLineageBean extends AEnitityBean implements Serializable {
/**
*
......
......@@ -83,7 +83,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private final TypeSystem typeSystem;
@Inject
GraphBackedMetadataRepository(GraphService graphService) throws MetadataException {
public GraphBackedMetadataRepository(GraphService graphService) throws MetadataException {
this.graphService = graphService;
this.typeSystem = TypeSystem.getInstance();
}
......
package org.apache.hadoop.metadata.repository.graph;
import com.thinkaurelius.titan.core.TitanGraph;
public class GraphServiceConfigurator extends PropertyBasedConfigurator<GraphService> {
private static final String PROPERTY_NAME = "metadata.graph.impl.class";
private static final String DEFAULT_IMPL_CLASS = TitanGraph.class.getName();
private static final String DEFAULT_IMPL_CLASS = TitanGraphService.class.getName();
private static final String CONFIG_PATH = "application.properties";
public GraphServiceConfigurator() {
......
......@@ -66,7 +66,7 @@ public class TitanGraphService implements GraphService {
* @throws ConfigurationException
*/
@Inject
TitanGraphService(GraphProvider<TitanGraph> graph) throws ConfigurationException {
public TitanGraphService(GraphProvider<TitanGraph> graph) throws ConfigurationException {
// TODO reimplement to save the Provider and initialize the graph inside the start() method
this.titanGraph = graph.get();
//start();
......
......@@ -240,13 +240,13 @@
</javacArgs>
<!-- The following plugin is required to use quasiquotes in Scala 2.10 and is used
by Spark SQL for code generation. -->
<compilerPlugins>
<!--<compilerPlugins>
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</compilerPlugins>-->
</configuration>
</plugin>
</plugins>
......
......@@ -227,7 +227,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BOOLEAN_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BOOLEAN_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -248,7 +248,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BYTE_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BYTE_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -269,7 +269,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.SHORT_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.SHORT_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -288,9 +288,10 @@ public class StructInstance implements ITypedStruct {
throw new MetadataException(String.format("Unknown field %s for Struct %s", attrName, getTypeName()));
}
if ( i.dataType() != DataTypes.INT_TYPE ) {
if ( i.dataType() != DataTypes.INT_TYPE && !(i.dataType() instanceof EnumType)) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.INT_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -311,7 +312,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.LONG_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.LONG_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -332,7 +333,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.FLOAT_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.FLOAT_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -353,7 +354,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.DOUBLE_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.DOUBLE_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -374,7 +375,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BIGINTEGER_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BIGINTEGER_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -395,7 +396,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BIGDECIMAL_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BIGDECIMAL_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -416,7 +417,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.DATE_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.DATE_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -437,7 +438,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.STRING_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic get method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.STRING_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -458,7 +459,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BOOLEAN_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BOOLEAN_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -476,7 +477,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BYTE_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BYTE_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -494,7 +495,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.SHORT_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.SHORT_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -510,9 +511,9 @@ public class StructInstance implements ITypedStruct {
throw new MetadataException(String.format("Unknown field %s for Struct %s", attrName, getTypeName()));
}
if ( i.dataType() != DataTypes.INT_TYPE ) {
if ( i.dataType() != DataTypes.INT_TYPE && !(i.dataType() instanceof EnumType)) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.INT_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -530,7 +531,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.LONG_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.LONG_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -548,7 +549,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.FLOAT_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.FLOAT_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -566,7 +567,7 @@ public class StructInstance implements ITypedStruct {
if (i.dataType() != DataTypes.DOUBLE_TYPE) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.DOUBLE_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -584,7 +585,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BIGINTEGER_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BIGINTEGER_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -602,7 +603,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.BIGDECIMAL_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.BIGDECIMAL_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -620,7 +621,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.DATE_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.DATE_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......@@ -638,7 +639,7 @@ public class StructInstance implements ITypedStruct {
if ( i.dataType() != DataTypes.STRING_TYPE ) {
throw new MetadataException(String.format("Field %s for Struct %s is not a %s, call generic set method",
attrName, getTypeName()));
attrName, getTypeName(), DataTypes.STRING_TYPE.getName()));
}
int pos = fieldMapping.fieldPos.get(attrName);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment