Commit 1089e036 by Venkat Ranganathan

Fix Hive Importer to work with Graph repository and needed refactoring. Also…

Fix Hive Importer to work with Graph repository and needed refactoring. Also fix enum handling in the repository
parent 0e9af017
......@@ -55,6 +55,10 @@
<artifactId>metadata-typesystem</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop.metadata</groupId>
<artifactId>metadata-repository</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
......
......@@ -18,12 +18,10 @@
package org.apache.hadoop.metadata.hivetypes;
;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.metadata.*;
import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.storage.IRepository;
import org.apache.hadoop.metadata.storage.Id;
import org.apache.hadoop.metadata.storage.RepositoryException;
......@@ -32,18 +30,23 @@ import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
;
public class HiveImporter {
private final HiveMetaStoreClient hiveMetastoreClient;
public static final Log LOG = LogFactory.getLog(HiveImporter.class);
private static final Logger LOG =
LoggerFactory.getLogger(HiveImporter.class);
private TypeSystem typeSystem;
private IRepository repository;
private MetadataRepository graphRepository;
private HiveTypeSystem hiveTypeSystem;
private List<Id> dbInstances;
......@@ -51,27 +54,41 @@ public class HiveImporter {
private List<Id> partitionInstances;
private List<Id> columnInstances;
public HiveImporter(MetadataRepository repo, HiveTypeSystem hts, HiveMetaStoreClient hmc) throws RepositoryException {
this(hts, hmc);
if (repo == null) {
LOG.error("repository is null");
throw new RuntimeException("repository is null");
}
this.graphRepository = repo;
}
public HiveImporter(IRepository repo, HiveTypeSystem hts, HiveMetaStoreClient hmc) throws RepositoryException {
this.repository = repo;
this.hiveMetastoreClient = hmc;
this.hiveTypeSystem = hts;
typeSystem = TypeSystem.getInstance();
dbInstances = new ArrayList<>();
tableInstances = new ArrayList<>();
partitionInstances = new ArrayList<>();
columnInstances = new ArrayList<>();
this(hts, hmc);
if (repository == null) {
if (repo == null) {
LOG.error("repository is null");
throw new RuntimeException("repository is null");
}
repository = repo;
repository.defineTypes(hts.getHierarchicalTypeDefinitions());
}
private HiveImporter(HiveTypeSystem hts, HiveMetaStoreClient hmc) {
this.hiveMetastoreClient = hmc;
this.hiveTypeSystem = hts;
typeSystem = TypeSystem.getInstance();
dbInstances = new ArrayList<>();
tableInstances = new ArrayList<>();
partitionInstances = new ArrayList<>();
columnInstances = new ArrayList<>();
}
public List<Id> getDBInstances() {
return dbInstances;
......@@ -102,6 +119,21 @@ public class HiveImporter {
}
}
private ITypedReferenceableInstance createInstance(Referenceable ref)
throws MetadataException {
if (repository != null) {
return repository.create(ref);
} else {
String typeName = ref.getTypeName();
IDataType dataType = hiveTypeSystem.getDataType(typeName);
LOG.debug("creating instance of type " + typeName + " dataType " + dataType);
ITypedReferenceableInstance instance =
(ITypedReferenceableInstance) dataType.convert(ref, Multiplicity.OPTIONAL);
graphRepository.createEntity(instance, typeName);
return instance;
}
}
private void importDatabase(String db) throws MetadataException {
try {
LOG.info("Importing objects from database : " + db);
......@@ -113,8 +145,8 @@ public class HiveImporter {
dbRef.set("locationUri", hiveDB.getLocationUri());
dbRef.set("parameters", hiveDB.getParameters());
dbRef.set("ownerName", hiveDB.getOwnerName());
dbRef.set("ownerType", hiveDB.getOwnerType().toString());
ITypedReferenceableInstance dbRefTyped = repository.create(dbRef);
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
ITypedReferenceableInstance dbRefTyped = createInstance(dbRef);
dbInstances.add(dbRefTyped.getId());
importTables(db, dbRefTyped);
} catch (NoSuchObjectException nsoe) {
......@@ -153,7 +185,7 @@ public class HiveImporter {
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
ITypedReferenceableInstance colRefTyped = repository.create(colRef);
ITypedReferenceableInstance colRefTyped = createInstance(colRef);
partKeys.add(colRefTyped);
}
tableRef.set("partitionKeys", partKeys);
......@@ -168,7 +200,7 @@ public class HiveImporter {
tableRef.set("tableType", hiveTable.getTableType());
tableRef.set("temporary", hiveTable.isTemporary());
ITypedReferenceableInstance tableRefTyped = repository.create(tableRef);
ITypedReferenceableInstance tableRefTyped = createInstance(tableRef);
tableInstances.add(tableRefTyped.getId());
......@@ -186,7 +218,7 @@ public class HiveImporter {
partRef.set("sd", sdStruct);
partRef.set("columns", sdStruct.get("cols"));
partRef.set("parameters", hivePart.getParameters());
ITypedReferenceableInstance partRefTyped = repository.create(partRef);
ITypedReferenceableInstance partRefTyped = createInstance(partRef);
partitionInstances.add(partRefTyped.getId());
}
}
......@@ -251,7 +283,7 @@ public class HiveImporter {
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
ITypedReferenceableInstance colRefTyped = repository.create(colRef);
ITypedReferenceableInstance colRefTyped = createInstance(colRef);
fieldsList.add(colRefTyped);
columnInstances.add(colRefTyped.getId());
}
......
......@@ -23,6 +23,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.types.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -31,7 +33,9 @@ import java.util.Map;
public class HiveTypeSystem {
public static final Log LOG = LogFactory.getLog(HiveTypeSystem.class);
private static final Logger LOG =
LoggerFactory.getLogger(HiveTypeSystem.class);
public static final class Holder {
public static final HiveTypeSystem instance = new HiveTypeSystem();
}
......
......@@ -16,13 +16,13 @@
# limitations under the License.
#
org.apache.hadoop.metadata=INFO, console
org.apache.hadoop=INFO, console
org.apache.hadoop.metadata=DEBUG, console
org.apache.hadoop=DEBUG, console
org.apache.hive=INFO, console
org.apache.hcatalog=INFO, console
metadata.root.logger=INFO,console,DRFA
metadata.root.logger=DEBUG,console,DRFA
hive.root.logger=INFO,console,DRFA
hcatalog.root.logger=INFO,console,DRFA
metadata.log.dir=${user.dir}/metadata/logs
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metadata.hivetypes;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.metadata.ITypedReferenceableInstance;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.repository.graph.GraphBackedMetadataRepository;
import org.apache.hadoop.metadata.repository.graph.GraphService;
import org.apache.hadoop.metadata.repository.graph.TitanGraphProvider;
import org.apache.hadoop.metadata.repository.graph.TitanGraphService;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
public class HiveGraphRepositoryTest {
protected HiveTypeSystem hts;
GraphBackedMetadataRepository repository;
private static final Logger LOG =
LoggerFactory.getLogger(HiveGraphRepositoryTest.class);
@Before
public void setup() throws ConfigurationException, MetadataException {
TypeSystem ts = TypeSystem.getInstance();
GraphService gs = new TitanGraphService(new TitanGraphProvider());
repository = new GraphBackedMetadataRepository(gs);
hts = HiveTypeSystem.getInstance();
}
@Test
public void testHiveImport() throws MetaException, MetadataException, IOException {
HiveImporter hImporter = new HiveImporter(repository, hts, new HiveMetaStoreClient(new HiveConf()));
hImporter.importHiveMetadata();
LOG.info("Defined DB instances");
FileWriter fw = new FileWriter("hiveobjs.txt");
BufferedWriter bw = new BufferedWriter(fw);
List<String> idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_DB.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
LOG.info("Defined Table instances");
idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_TABLE.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
LOG.info("Defined Partition instances");
idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_PARTITION.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
LOG.info("Defined Column instances");
idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_COLUMN.name());
for (String id : idList) {
ITypedReferenceableInstance instance = repository.getEntityDefinition(id);
bw.write(instance.toString());
}
bw.flush();
bw.close();
}
}
......@@ -30,13 +30,22 @@ import org.apache.hadoop.metadata.storage.memory.MemRepository;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
public class HiveTypeSystemTest {
protected MemRepository mr;
protected HiveTypeSystem hts;
public static final Log LOG = LogFactory.getLog(HiveTypeSystemTest.class);
private static final Logger LOG =
LoggerFactory.getLogger(HiveTypeSystemTest.class);
@Before
public void setup() throws MetadataException {
......@@ -48,30 +57,34 @@ public class HiveTypeSystemTest {
}
@Test
public void testHiveImport() throws MetaException, MetadataException {
public void testHiveImport() throws MetaException, MetadataException, IOException {
HiveImporter himport = new HiveImporter(mr, hts, new HiveMetaStoreClient(new HiveConf()));
himport.importHiveMetadata();
HiveImporter hImporter = new HiveImporter(mr, hts, new HiveMetaStoreClient(new HiveConf()));
hImporter.importHiveMetadata();
LOG.info("Defined DB instances");
for (Id id : himport.getDBInstances()) {
FileWriter fw = new FileWriter("hiveobjs.txt");
BufferedWriter bw = new BufferedWriter(fw);
for (Id id : hImporter.getDBInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
LOG.info("Defined Table instances");
for (Id id : himport.getTableInstances()) {
for (Id id : hImporter.getTableInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
LOG.info("Defined Partition instances");
for (Id id : himport.getPartitionInstances()) {
for (Id id : hImporter.getPartitionInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
LOG.info("Defined Column instances");
for (Id id : himport.getColumnInstances()) {
for (Id id : hImporter.getColumnInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
LOG.info(instance.toString());
bw.write(instance.toString());
}
bw.flush();
bw.close();
}
}
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# GraphService implementation
metadata.graph.impl.class=org.apache.hadoop.metadata.repository.graph.TitanGraphService
# Graph implementation
#metadata.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
# Graph Storage
metadata.graph.storage.backend=inmemory
# Graph Search Index
#metadata.graph.index.search.backend=elasticsearch
#metadata.graph.index.search.directory=target/data/es
#metadata.graph.index.search.elasticsearch.client-only=false
#metadata.graph.index.search.elasticsearch.local-mode=true
metadata.enableTLS=false
storage.backend=inmemory
# Graph Search Index
index.search.backend=elasticsearch
index.search.directory=target/data/es
index.search.elasticsearch.client-only=false
index.search.elasticsearch.local-mode=true
\ No newline at end of file
......@@ -83,7 +83,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
private final TypeSystem typeSystem;
@Inject
GraphBackedMetadataRepository(GraphService graphService) throws MetadataException {
public GraphBackedMetadataRepository(GraphService graphService) throws MetadataException {
this.graphService = graphService;
this.typeSystem = TypeSystem.getInstance();
}
......
......@@ -66,7 +66,7 @@ public class TitanGraphService implements GraphService {
* @throws ConfigurationException
*/
@Inject
TitanGraphService(GraphProvider<TitanGraph> graph) throws ConfigurationException {
public TitanGraphService(GraphProvider<TitanGraph> graph) throws ConfigurationException {
// TODO reimplement to save the Provider and initialize the graph inside the start() method
this.titanGraph = graph.get();
//start();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment