Commit 087a7156 by Venkatesh Seetharam

BUG-37859 Fix broken hive lineage

parent 3b9e1b2f
......@@ -20,9 +20,9 @@ package org.apache.hadoop.metadata.services;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.classification.InterfaceAudience;
import org.apache.hadoop.metadata.discovery.SearchIndexer;
import org.apache.hadoop.metadata.listener.EntityChangeListener;
import org.apache.hadoop.metadata.listener.TypesChangeListener;
......@@ -34,14 +34,20 @@ import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.Struct;
import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.Serialization$;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.types.*;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
import org.apache.hadoop.metadata.typesystem.types.ClassType;
import org.apache.hadoop.metadata.typesystem.types.DataTypes;
import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.IDataType;
import org.apache.hadoop.metadata.typesystem.types.Multiplicity;
import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.tools.cmd.Meta;
import javax.inject.Inject;
import javax.inject.Singleton;
......@@ -84,12 +90,38 @@ public class DefaultMetadataService implements MetadataService {
try {
TypesDef typesDef = typeStore.restore();
typeSystem.defineTypes(typesDef);
createSuperTypes();
} catch (MetadataException e) {
throw new RuntimeException(e);
}
LOG.info("Restored type system from the store");
}
private static final AttributeDefinition NAME_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE);
private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE);
private static final String[] SUPER_TYPES = {
"DataSet",
"Process",
"Infrastructure",
};
@InterfaceAudience.Private
public void createSuperTypes() throws MetadataException {
if (typeSystem.isRegistered(SUPER_TYPES[0])) {
return; // this is already registered
}
for (String superTypeName : SUPER_TYPES) {
HierarchicalTypeDefinition<ClassType> superTypeDefinition =
TypesUtil.createClassTypeDef(superTypeName,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition);
}
}
/**
* Creates a new type based on the type system to enable adding
......@@ -114,10 +146,9 @@ public class DefaultMetadataService implements MetadataService {
typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet()));
onTypesAddedToRepo(typesAdded);
JSONObject response = new JSONObject() {{
return new JSONObject() {{
put(MetadataServiceClient.TYPES, typesAdded.keySet());
}};
return response;
} catch (JSONException e) {
LOG.error("Unable to create response for types={}", typeDefinition, e);
throw new MetadataException("Unable to create response");
......
......@@ -29,7 +29,6 @@ import org.apache.hadoop.metadata.discovery.graph.GraphBackedDiscoveryService;
import org.apache.hadoop.metadata.query.HiveTitanSample;
import org.apache.hadoop.metadata.query.QueryTestsUtils;
import org.apache.hadoop.metadata.repository.graph.GraphBackedMetadataRepository;
import org.apache.hadoop.metadata.repository.graph.GraphBackedSearchIndexer;
import org.apache.hadoop.metadata.repository.graph.GraphHelper;
import org.apache.hadoop.metadata.repository.graph.GraphProvider;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
......
......@@ -57,6 +57,11 @@ import java.util.List;
@Guice(modules = RepositoryMetadataModule.class)
public class HiveLineageServiceTest {
static {
// this would override super types creation if not first thing
TypeSystem.getInstance().reset();
}
@Inject
private DefaultMetadataService metadataService;
......@@ -71,8 +76,6 @@ public class HiveLineageServiceTest {
@BeforeClass
public void setUp() throws Exception {
TypeSystem.getInstance().reset();
setUpTypes();
setupInstances();
......@@ -280,9 +283,7 @@ public class HiveLineageServiceTest {
);
HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE),
attrDef("lastAccessTime", DataTypes.INT_TYPE),
......@@ -298,8 +299,7 @@ public class HiveLineageServiceTest {
);
HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
......@@ -401,7 +401,7 @@ public class HiveLineageServiceTest {
"sales fact daily materialized view",
reportingDB, sd, "Joe BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "John ETL",
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -434,7 +434,7 @@ public class HiveLineageServiceTest {
"sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "John ETL",
loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -496,7 +496,7 @@ public class HiveLineageServiceTest {
return createInstance(referenceable);
}
Id loadProcess(String name, String user,
Id loadProcess(String name, String description, String user,
List<Id> inputTables,
List<Id> outputTables,
String queryText, String queryPlan,
......@@ -504,6 +504,7 @@ public class HiveLineageServiceTest {
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
......
......@@ -29,14 +29,12 @@ metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs #########
# This models follows the quick-start guide
metadata.lineage.hive.table.type.name=Table
# This models reflects the base super types for Data and Process
metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=LoadProcess
metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables
#Currently unused
#metadata.lineage.hive.column.type.name=Column
######### Security Properties #########
......
......@@ -128,9 +128,7 @@ public class QuickStart {
);
HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(TABLE_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(TABLE_TYPE, ImmutableList.of("DataSet"),
new AttributeDefinition("db", DATABASE_TYPE,
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE,
......@@ -149,8 +147,7 @@ public class QuickStart {
);
HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(LOAD_PROCESS_TYPE, null,
attrDef("name", DataTypes.STRING_TYPE),
TypesUtil.createClassTypeDef(LOAD_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE),
......@@ -273,7 +270,7 @@ public class QuickStart {
"sales fact daily materialized view", reportingDB, sd,
"Joe BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "John ETL",
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -288,7 +285,7 @@ public class QuickStart {
"sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "John ETL",
loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph",
"ETL");
......@@ -362,7 +359,7 @@ public class QuickStart {
return createInstance(referenceable);
}
Id loadProcess(String name, String user,
Id loadProcess(String name, String description, String user,
List<Id> inputTables,
List<Id> outputTables,
String queryText, String queryPlan,
......@@ -370,6 +367,7 @@ public class QuickStart {
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
......@@ -465,6 +463,8 @@ public class QuickStart {
*/
"Table where name=\"sales_fact\", columns",
"Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment",
"from DataSet",
"from Process",
};
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment