Commit 0adfeb18 by Venkat Ranganathan

Fixed hive model: StorageDescription is now a class, removed creating a…

Fixed hive model: StorageDescription is now a class, removed creating a duplicate storagedesc per partition (need to fix handling of partitions that can diverge). Made mem repository test work again. Validated changes with TPCDS. Fixed empty array handling in Graph repo
parent b10f93a2
......@@ -60,6 +60,26 @@ public class HiveImporter {
private List<Id> partitionInstances;
private List<Id> columnInstances;
private class Pair<L, R> {
final L left;
final R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
public L left() { return this.left;}
public R right() { return this.right;}
}
private class InstancePair extends Pair<ITypedReferenceableInstance, Referenceable> {
public InstancePair(ITypedReferenceableInstance left, Referenceable right) {
super(left, right);
}
}
public HiveImporter(MetadataRepository repo, HiveTypeSystem hts, HiveMetaStoreClient hmc) throws RepositoryException {
this(hts, hmc);
......@@ -69,6 +89,7 @@ public class HiveImporter {
}
this.graphRepository = repo;
}
public HiveImporter(IRepository repo, HiveTypeSystem hts, HiveMetaStoreClient hmc) throws RepositoryException {
......@@ -122,10 +143,15 @@ public class HiveImporter {
}
}
private Referenceable createInstance(Referenceable ref)
private boolean usingMemRepository() {
return this.graphRepository == null;
}
private InstancePair createInstance(Referenceable ref)
throws MetadataException {
if (repository != null) {
return (Referenceable) repository.create(ref);
if (usingMemRepository()) {
return new InstancePair(repository.create(ref), null);
} else {
String typeName = ref.getTypeName();
IDataType dataType = hiveTypeSystem.getDataType(typeName);
......@@ -136,7 +162,16 @@ public class HiveImporter {
System.out.println("creating instance of type " + typeName + " dataType " + dataType
+ ", guid: " + guid);
return new Referenceable(guid, ref.getTypeName(), ref.getValuesMap());
return new InstancePair(null, new Referenceable(guid, ref.getTypeName(), ref.getValuesMap()));
}
}
private void setReferenceInstanceAttribute(Referenceable ref, String attr,
InstancePair instance) {
if (usingMemRepository()) {
ref.set(attr, instance.left());
} else {
ref.set(attr, instance.right());
}
}
......@@ -152,15 +187,17 @@ public class HiveImporter {
dbRef.set("parameters", hiveDB.getParameters());
dbRef.set("ownerName", hiveDB.getOwnerName());
dbRef.set("ownerType", hiveDB.getOwnerType().getValue());
Referenceable dbRefTyped = createInstance(dbRef);
dbInstances.add(dbRefTyped.getId());
InstancePair dbRefTyped = createInstance(dbRef);
if (usingMemRepository()) {
dbInstances.add(dbRefTyped.left().getId());
}
importTables(db, dbRefTyped);
} catch (Exception e) {
throw new MetadataException(e);
}
}
private void importTables(String db, Referenceable dbRefTyped) throws MetadataException {
private void importTables(String db, InstancePair dbRefTyped) throws MetadataException {
try {
List<String> hiveTables = hiveMetastoreClient.getAllTables(db);
......@@ -170,7 +207,7 @@ public class HiveImporter {
Table hiveTable = hiveMetastoreClient.getTable(db, table);
Referenceable tableRef = new Referenceable(HiveTypeSystem.DefinedTypes.HIVE_TABLE.name());
tableRef.set("dbName", dbRefTyped);
setReferenceInstanceAttribute(tableRef, "dbName", dbRefTyped);
tableRef.set("tableName", hiveTable.getTableName());
tableRef.set("owner", hiveTable.getOwner());
tableRef.set("createTime", hiveTable.getCreateTime());
......@@ -178,10 +215,9 @@ public class HiveImporter {
tableRef.set("retention", hiveTable.getRetention());
StorageDescriptor storageDesc = hiveTable.getSd();
ITypedStruct sdStruct = fillStorageDescStruct(storageDesc);
tableRef.set("sd", sdStruct);
tableRef.set("columns", sdStruct.get("cols"));
List<Referenceable> partKeys = new ArrayList<>();
InstancePair sdRefTyped = fillStorageDescStruct(storageDesc);
setReferenceInstanceAttribute(tableRef, "sd", sdRefTyped);
List<InstancePair> partKeys = new ArrayList<>();
Referenceable colRef;
if (hiveTable.getPartitionKeysSize() > 0) {
for (FieldSchema fs : hiveTable.getPartitionKeys()) {
......@@ -189,10 +225,22 @@ public class HiveImporter {
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
Referenceable colRefTyped = createInstance(colRef);
InstancePair colRefTyped = createInstance(colRef);
partKeys.add(colRefTyped);
}
tableRef.set("partitionKeys", partKeys);
if (usingMemRepository()) {
List<ITypedReferenceableInstance> keys = new ArrayList<>();
for (InstancePair ip : partKeys) {
keys.add(ip.left());
}
tableRef.set("partitionKeys", keys);
} else {
List<Referenceable> keys = new ArrayList<>();
for (InstancePair ip : partKeys) {
keys.add(ip.right());
}
tableRef.set("partitionKeys", keys);
}
}
tableRef.set("parameters", hiveTable.getParameters());
if (hiveTable.isSetViewOriginalText()) {
......@@ -204,26 +252,32 @@ public class HiveImporter {
tableRef.set("tableType", hiveTable.getTableType());
tableRef.set("temporary", hiveTable.isTemporary());
Referenceable tableRefTyped = createInstance(tableRef);
tableInstances.add(tableRefTyped.getId());
InstancePair tableRefTyped = createInstance(tableRef);
if (usingMemRepository()) {
tableInstances.add(tableRefTyped.left().getId());
}
List<Partition> tableParts = hiveMetastoreClient.listPartitions(db, table, Short.MAX_VALUE);
hiveMetastoreClient.listPartitionSpecs(db, table, Short.MAX_VALUE);
if (tableParts.size() > 0) {
for (Partition hivePart : tableParts) {
Referenceable partRef = new Referenceable(HiveTypeSystem.DefinedTypes.HIVE_PARTITION.name());
partRef.set("values", hivePart.getValues());
partRef.set("dbName", dbRefTyped);
partRef.set("tableName", tableRefTyped);
setReferenceInstanceAttribute(partRef, "dbName", dbRefTyped);
setReferenceInstanceAttribute(partRef, "tableName", tableRefTyped);
partRef.set("createTime", hivePart.getCreateTime());
partRef.set("lastAccessTime", hivePart.getLastAccessTime());
sdStruct = fillStorageDescStruct(hivePart.getSd());
partRef.set("sd", sdStruct);
partRef.set("columns", sdStruct.get("cols"));
//sdStruct = fillStorageDescStruct(hivePart.getSd());
// Instead of creating copies of the sdstruct for partitions we are reusing existing ones
// will fix to identify partitions with differing schema.
setReferenceInstanceAttribute(partRef, "sd", sdRefTyped);
partRef.set("parameters", hivePart.getParameters());
Referenceable partRefTyped = createInstance(partRef);
partitionInstances.add(partRefTyped.getId());
InstancePair partRefTyped = createInstance(partRef);
if (usingMemRepository()) {
partitionInstances.add(partRefTyped.left().getId());
}
}
}
}
......@@ -233,13 +287,12 @@ public class HiveImporter {
}
private ITypedStruct fillStorageDescStruct(StorageDescriptor storageDesc) throws Exception {
String storageDescName = HiveTypeSystem.DefinedTypes.HIVE_STORAGEDESC.name();
private InstancePair fillStorageDescStruct(StorageDescriptor storageDesc) throws Exception {
Referenceable sdRef = new Referenceable(HiveTypeSystem.DefinedTypes.HIVE_STORAGEDESC.name());
SerDeInfo serdeInfo = storageDesc.getSerdeInfo();
// SkewedInfo skewedInfo = storageDesc.getSkewedInfo();
Struct sdStruct = new Struct(storageDescName);
LOG.debug("Filling storage descriptor information for " + storageDesc);
......@@ -254,9 +307,9 @@ public class HiveImporter {
StructType serdeInfotype = (StructType) hiveTypeSystem.getDataType(serdeInfoName);
ITypedStruct serdeInfoStructTyped =
serdeInfotype.convert(serdeInfoStruct, Multiplicity.OPTIONAL);
serdeInfotype.convert(serdeInfoStruct, Multiplicity.OPTIONAL);
sdStruct.set("serdeInfo", serdeInfoStructTyped);
sdRef.set("serdeInfo", serdeInfoStructTyped);
// Will need to revisit this after we fix typesystem.
......@@ -276,7 +329,8 @@ public class HiveImporter {
List<Referenceable> fieldsList = new ArrayList<>();
List<InstancePair> fieldsList = new ArrayList<>();
Referenceable colRef;
for (FieldSchema fs : storageDesc.getCols()) {
LOG.debug("Processing field " + fs);
......@@ -284,11 +338,25 @@ public class HiveImporter {
colRef.set("name", fs.getName());
colRef.set("type", fs.getType());
colRef.set("comment", fs.getComment());
Referenceable colRefTyped = createInstance(colRef);
InstancePair colRefTyped = createInstance(colRef);
fieldsList.add(colRefTyped);
columnInstances.add(colRefTyped.getId());
if (usingMemRepository()) {
columnInstances.add(colRefTyped.left().getId());
}
}
if (usingMemRepository()) {
List<ITypedReferenceableInstance> flds = new ArrayList<>();
for (InstancePair ip : fieldsList) {
flds.add(ip.left());
}
sdRef.set("cols", flds);
} else {
List<Referenceable> flds = new ArrayList<>();
for (InstancePair ip : fieldsList) {
flds.add(ip.right());
}
sdRef.set("cols", flds);
}
sdStruct.set("cols", fieldsList);
List<ITypedStruct> sortColsStruct = new ArrayList<>();
......@@ -303,19 +371,20 @@ public class HiveImporter {
sortColsStruct.add(sortColTyped);
}
sdStruct.set("location", storageDesc.getLocation());
sdStruct.set("inputFormat", storageDesc.getInputFormat());
sdStruct.set("outputFormat", storageDesc.getOutputFormat());
sdStruct.set("compressed", storageDesc.isCompressed());
sdRef.set("location", storageDesc.getLocation());
sdRef.set("inputFormat", storageDesc.getInputFormat());
sdRef.set("outputFormat", storageDesc.getOutputFormat());
sdRef.set("compressed", storageDesc.isCompressed());
if (storageDesc.getBucketCols().size() > 0) {
sdStruct.set("bucketCols", storageDesc.getBucketCols());
sdRef.set("bucketCols", storageDesc.getBucketCols());
}
if (sortColsStruct.size() > 0) {
sdStruct.set("sortCols", sortColsStruct);
sdRef.set("sortCols", sortColsStruct);
}
sdStruct.set("parameters", storageDesc.getParameters());
sdStruct.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories());
StructType storageDesctype = (StructType) hiveTypeSystem.getDataType(storageDescName);
return storageDesctype.convert(sdStruct, Multiplicity.OPTIONAL);
sdRef.set("parameters", storageDesc.getParameters());
sdRef.set("storedAsSubDirectories", storageDesc.isStoredAsSubDirectories());
InstancePair sdRefTyped = createInstance(sdRef);
return sdRefTyped;
}
}
......@@ -61,7 +61,6 @@ public class HiveTypeSystem {
// Structs
HIVE_SERDE,
HIVE_STORAGEDESC,
HIVE_SKEWEDINFO,
HIVE_ORDER,
HIVE_RESOURCEURI,
......@@ -69,6 +68,7 @@ public class HiveTypeSystem {
// Classes
HIVE_DB,
HIVE_STORAGEDESC,
HIVE_TABLE,
HIVE_COLUMN,
HIVE_PARTITION,
......@@ -122,7 +122,7 @@ public class HiveTypeSystem {
//createSkewedInfoStruct();
createOrderStruct();
createResourceUriStruct();
createStorageDescStruct();
createStorageDescClass();
createDBClass();
createTypeClass();
......@@ -168,6 +168,7 @@ public class HiveTypeSystem {
if (valid) {
return ImmutableList.of(
(HierarchicalType) typeMap.get(DefinedTypes.HIVE_DB.name()),
(HierarchicalType) typeMap.get(DefinedTypes.HIVE_STORAGEDESC.name()),
(HierarchicalType) typeMap.get(DefinedTypes.HIVE_TABLE.name()),
(HierarchicalType) typeMap.get(DefinedTypes.HIVE_COLUMN.name()),
(HierarchicalType) typeMap.get(DefinedTypes.HIVE_PARTITION.name()),
......@@ -300,7 +301,7 @@ public class HiveTypeSystem {
private void createStorageDescStruct() throws MetadataException {
private void createStorageDescClass() throws MetadataException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("cols", String.format("array<%s>", DefinedTypes.HIVE_COLUMN.name()), Multiplicity.COLLECTION, false, null),
new AttributeDefinition("location", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
......@@ -316,11 +317,10 @@ public class HiveTypeSystem {
new AttributeDefinition("storedAsSubDirectories", DataTypes.BOOLEAN_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
};
StructTypeDefinition definition =
new StructTypeDefinition(DefinedTypes.HIVE_STORAGEDESC.name(), attributeDefinitions);
structTypeDefinitionMap.put(DefinedTypes.HIVE_STORAGEDESC.name(), definition);
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, DefinedTypes.HIVE_STORAGEDESC.name(),
null, attributeDefinitions);
classTypeDefinitions.put(DefinedTypes.HIVE_STORAGEDESC.name(), definition);
LOG.debug("Created definition for " + DefinedTypes.HIVE_STORAGEDESC.name());
}
......@@ -401,8 +401,8 @@ public class HiveTypeSystem {
new AttributeDefinition("createTime", DataTypes.INT_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("lastAccessTime", DataTypes.INT_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("sd", DefinedTypes.HIVE_STORAGEDESC.name(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("columns", String.format("array<%s>", DefinedTypes.HIVE_COLUMN.name()),
Multiplicity.COLLECTION, true, null),
//new AttributeDefinition("columns", String.format("array<%s>", DefinedTypes.HIVE_COLUMN.name()),
// Multiplicity.COLLECTION, true, null),
new AttributeDefinition("parameters", mapStrToStrMap.getName(), Multiplicity.OPTIONAL, false, null),
};
......@@ -426,8 +426,8 @@ public class HiveTypeSystem {
new AttributeDefinition("sd", DefinedTypes.HIVE_STORAGEDESC.name(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("partitionKeys", String.format("array<%s>", DefinedTypes.HIVE_COLUMN.name()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("columns", String.format("array<%s>", DefinedTypes.HIVE_COLUMN.name()),
Multiplicity.COLLECTION, true, null),
//new AttributeDefinition("columns", String.format("array<%s>", DefinedTypes.HIVE_COLUMN.name()),
// Multiplicity.COLLECTION, true, null),
new AttributeDefinition("parameters", mapStrToStrMap.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("viewOriginalText", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("viewExpandedText", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
......
......@@ -126,7 +126,8 @@
<property>
<name>hive.metastore.uris</name>
<value>thrift://10.10.11.207:9083</value>
<!-- <value>thrift://10.10.11.207:9083</value> -->
<value>thrift://localhost:9083</value>
</property>
<property>
......
......@@ -39,6 +39,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.util.List;
......@@ -82,7 +83,9 @@ public class HiveGraphRepositoryTest {
HiveImporter hImporter = new HiveImporter(repository, hts, new HiveMetaStoreClient(new HiveConf()));
hImporter.importHiveMetadata();
LOG.info("Defined DB instances");
FileWriter fw = new FileWriter("hiveobjs.txt");
File f = new File("./target/logs/hiveobjs.txt");
f.getParentFile().mkdirs();
FileWriter fw = new FileWriter(f);
BufferedWriter bw = new BufferedWriter(fw);
List<String> idList =
repository.getEntityList(HiveTypeSystem.DefinedTypes.HIVE_DB.name());
......
......@@ -32,6 +32,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
......@@ -59,8 +60,9 @@ public class HiveTypeSystemTest {
HiveImporter hImporter = new HiveImporter(mr, hts, new HiveMetaStoreClient(new HiveConf()));
hImporter.importHiveMetadata();
LOG.info("Defined DB instances");
FileWriter fw = new FileWriter("hiveobjs.txt");
BufferedWriter bw = new BufferedWriter(fw);
File f = new File("./target/logs/hiveobjs.txt");
f.getParentFile().mkdirs();
FileWriter fw = new FileWriter(f); BufferedWriter bw = new BufferedWriter(fw);
for (Id id : hImporter.getDBInstances()) {
ITypedReferenceableInstance instance = mr.get(id);
bw.write(instance.toString());
......
......@@ -932,7 +932,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.debug("mapping vertex {} to array {}", instanceVertex, attributeInfo.name);
String propertyName = typedInstance.getTypeName() + "." + attributeInfo.name;
String keys = instanceVertex.getProperty(propertyName);
if (keys == null || keys.length() == 0) {
return;
}
DataTypes.ArrayType arrayType = (DataTypes.ArrayType) attributeInfo.dataType();
final IDataType elementType = arrayType.getElemType();
......@@ -983,7 +985,9 @@ public class GraphBackedMetadataRepository implements MetadataRepository {
LOG.debug("mapping vertex {} to array {}", instanceVertex, attributeInfo.name);
String propertyName = typedInstance.getTypeName() + "." + attributeInfo.name;
String keys = instanceVertex.getProperty(propertyName);
if (keys == null || keys.length() == 0) {
return;
}
DataTypes.MapType mapType = (DataTypes.MapType) attributeInfo.dataType();
final IDataType elementType = mapType.getValueType();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment