Commit 0d74787e by Venkatesh Seetharam

BUG-37859 Fix broken hive lineage

parent 58ea4650
...@@ -20,9 +20,9 @@ package org.apache.hadoop.metadata.services; ...@@ -20,9 +20,9 @@ package org.apache.hadoop.metadata.services;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.hadoop.metadata.MetadataException; import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.MetadataServiceClient; import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.classification.InterfaceAudience;
import org.apache.hadoop.metadata.discovery.SearchIndexer; import org.apache.hadoop.metadata.discovery.SearchIndexer;
import org.apache.hadoop.metadata.listener.EntityChangeListener; import org.apache.hadoop.metadata.listener.EntityChangeListener;
import org.apache.hadoop.metadata.listener.TypesChangeListener; import org.apache.hadoop.metadata.listener.TypesChangeListener;
...@@ -34,14 +34,20 @@ import org.apache.hadoop.metadata.typesystem.Referenceable; ...@@ -34,14 +34,20 @@ import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.Struct; import org.apache.hadoop.metadata.typesystem.Struct;
import org.apache.hadoop.metadata.typesystem.TypesDef; import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization; import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.Serialization$;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization; import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.types.*; import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
import org.apache.hadoop.metadata.typesystem.types.ClassType;
import org.apache.hadoop.metadata.typesystem.types.DataTypes;
import org.apache.hadoop.metadata.typesystem.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.typesystem.types.IDataType;
import org.apache.hadoop.metadata.typesystem.types.Multiplicity;
import org.apache.hadoop.metadata.typesystem.types.TraitType;
import org.apache.hadoop.metadata.typesystem.types.TypeSystem;
import org.apache.hadoop.metadata.typesystem.types.utils.TypesUtil;
import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.tools.cmd.Meta;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
...@@ -84,12 +90,38 @@ public class DefaultMetadataService implements MetadataService { ...@@ -84,12 +90,38 @@ public class DefaultMetadataService implements MetadataService {
try { try {
TypesDef typesDef = typeStore.restore(); TypesDef typesDef = typeStore.restore();
typeSystem.defineTypes(typesDef); typeSystem.defineTypes(typesDef);
createSuperTypes();
} catch (MetadataException e) { } catch (MetadataException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
LOG.info("Restored type system from the store"); LOG.info("Restored type system from the store");
} }
private static final AttributeDefinition NAME_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE);
private static final AttributeDefinition DESCRIPTION_ATTRIBUTE =
TypesUtil.createRequiredAttrDef("description", DataTypes.STRING_TYPE);
private static final String[] SUPER_TYPES = {
"DataSet",
"Process",
"Infrastructure",
};
@InterfaceAudience.Private
public void createSuperTypes() throws MetadataException {
if (typeSystem.isRegistered(SUPER_TYPES[0])) {
return; // this is already registered
}
for (String superTypeName : SUPER_TYPES) {
HierarchicalTypeDefinition<ClassType> superTypeDefinition =
TypesUtil.createClassTypeDef(superTypeName,
ImmutableList.<String>of(),
NAME_ATTRIBUTE, DESCRIPTION_ATTRIBUTE);
typeSystem.defineClassType(superTypeDefinition);
}
}
/** /**
* Creates a new type based on the type system to enable adding * Creates a new type based on the type system to enable adding
...@@ -114,10 +146,9 @@ public class DefaultMetadataService implements MetadataService { ...@@ -114,10 +146,9 @@ public class DefaultMetadataService implements MetadataService {
typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet())); typeStore.store(typeSystem, ImmutableList.copyOf(typesAdded.keySet()));
onTypesAddedToRepo(typesAdded); onTypesAddedToRepo(typesAdded);
JSONObject response = new JSONObject() {{ return new JSONObject() {{
put(MetadataServiceClient.TYPES, typesAdded.keySet()); put(MetadataServiceClient.TYPES, typesAdded.keySet());
}}; }};
return response;
} catch (JSONException e) { } catch (JSONException e) {
LOG.error("Unable to create response for types={}", typeDefinition, e); LOG.error("Unable to create response for types={}", typeDefinition, e);
throw new MetadataException("Unable to create response"); throw new MetadataException("Unable to create response");
......
...@@ -29,7 +29,6 @@ import org.apache.hadoop.metadata.discovery.graph.GraphBackedDiscoveryService; ...@@ -29,7 +29,6 @@ import org.apache.hadoop.metadata.discovery.graph.GraphBackedDiscoveryService;
import org.apache.hadoop.metadata.query.HiveTitanSample; import org.apache.hadoop.metadata.query.HiveTitanSample;
import org.apache.hadoop.metadata.query.QueryTestsUtils; import org.apache.hadoop.metadata.query.QueryTestsUtils;
import org.apache.hadoop.metadata.repository.graph.GraphBackedMetadataRepository; import org.apache.hadoop.metadata.repository.graph.GraphBackedMetadataRepository;
import org.apache.hadoop.metadata.repository.graph.GraphBackedSearchIndexer;
import org.apache.hadoop.metadata.repository.graph.GraphHelper; import org.apache.hadoop.metadata.repository.graph.GraphHelper;
import org.apache.hadoop.metadata.repository.graph.GraphProvider; import org.apache.hadoop.metadata.repository.graph.GraphProvider;
import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance; import org.apache.hadoop.metadata.typesystem.ITypedReferenceableInstance;
......
...@@ -57,6 +57,11 @@ import java.util.List; ...@@ -57,6 +57,11 @@ import java.util.List;
@Guice(modules = RepositoryMetadataModule.class) @Guice(modules = RepositoryMetadataModule.class)
public class HiveLineageServiceTest { public class HiveLineageServiceTest {
static {
// this would override super types creation if not first thing
TypeSystem.getInstance().reset();
}
@Inject @Inject
private DefaultMetadataService metadataService; private DefaultMetadataService metadataService;
...@@ -71,8 +76,6 @@ public class HiveLineageServiceTest { ...@@ -71,8 +76,6 @@ public class HiveLineageServiceTest {
@BeforeClass @BeforeClass
public void setUp() throws Exception { public void setUp() throws Exception {
TypeSystem.getInstance().reset();
setUpTypes(); setUpTypes();
setupInstances(); setupInstances();
...@@ -280,9 +283,7 @@ public class HiveLineageServiceTest { ...@@ -280,9 +283,7 @@ public class HiveLineageServiceTest {
); );
HierarchicalTypeDefinition<ClassType> tblClsDef = HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, null, TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("owner", DataTypes.STRING_TYPE),
attrDef("createTime", DataTypes.INT_TYPE), attrDef("createTime", DataTypes.INT_TYPE),
attrDef("lastAccessTime", DataTypes.INT_TYPE), attrDef("lastAccessTime", DataTypes.INT_TYPE),
...@@ -298,8 +299,7 @@ public class HiveLineageServiceTest { ...@@ -298,8 +299,7 @@ public class HiveLineageServiceTest {
); );
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, null, TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("name", DataTypes.STRING_TYPE),
attrDef("userName", DataTypes.STRING_TYPE), attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE), attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE), attrDef("endTime", DataTypes.INT_TYPE),
...@@ -401,7 +401,7 @@ public class HiveLineageServiceTest { ...@@ -401,7 +401,7 @@ public class HiveLineageServiceTest {
"sales fact daily materialized view", "sales fact daily materialized view",
reportingDB, sd, "Joe BI", "Managed", salesFactColumns, "Metric"); reportingDB, sd, "Joe BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "John ETL", loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily), ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph", "create table as select ", "plan", "id", "graph",
"ETL"); "ETL");
...@@ -434,7 +434,7 @@ public class HiveLineageServiceTest { ...@@ -434,7 +434,7 @@ public class HiveLineageServiceTest {
"sales fact monthly materialized view", "sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric"); reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "John ETL", loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly), ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph", "create table as select ", "plan", "id", "graph",
"ETL"); "ETL");
...@@ -496,7 +496,7 @@ public class HiveLineageServiceTest { ...@@ -496,7 +496,7 @@ public class HiveLineageServiceTest {
return createInstance(referenceable); return createInstance(referenceable);
} }
Id loadProcess(String name, String user, Id loadProcess(String name, String description, String user,
List<Id> inputTables, List<Id> inputTables,
List<Id> outputTables, List<Id> outputTables,
String queryText, String queryPlan, String queryText, String queryPlan,
...@@ -504,6 +504,7 @@ public class HiveLineageServiceTest { ...@@ -504,6 +504,7 @@ public class HiveLineageServiceTest {
String... traitNames) throws Exception { String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name); referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user); referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000); referenceable.set("endTime", System.currentTimeMillis() + 10000);
......
...@@ -29,14 +29,12 @@ metadata.graph.index.search.elasticsearch.local-mode=true ...@@ -29,14 +29,12 @@ metadata.graph.index.search.elasticsearch.local-mode=true
metadata.graph.index.search.elasticsearch.create.sleep=2000 metadata.graph.index.search.elasticsearch.create.sleep=2000
######### Hive Lineage Configs ######### ######### Hive Lineage Configs #########
# This models follows the quick-start guide # This models reflects the base super types for Data and Process
metadata.lineage.hive.table.type.name=Table metadata.lineage.hive.table.type.name=DataSet
metadata.lineage.hive.table.column.name=columns metadata.lineage.hive.table.column.name=columns
metadata.lineage.hive.process.type.name=LoadProcess metadata.lineage.hive.process.type.name=Process
metadata.lineage.hive.process.inputs.name=inputTables metadata.lineage.hive.process.inputs.name=inputTables
metadata.lineage.hive.process.outputs.name=outputTables metadata.lineage.hive.process.outputs.name=outputTables
#Currently unused
#metadata.lineage.hive.column.type.name=Column
######### Security Properties ######### ######### Security Properties #########
......
...@@ -128,9 +128,7 @@ public class QuickStart { ...@@ -128,9 +128,7 @@ public class QuickStart {
); );
HierarchicalTypeDefinition<ClassType> tblClsDef = HierarchicalTypeDefinition<ClassType> tblClsDef =
TypesUtil.createClassTypeDef(TABLE_TYPE, null, TypesUtil.createClassTypeDef(TABLE_TYPE, ImmutableList.of("DataSet"),
attrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, new AttributeDefinition("db", DATABASE_TYPE,
Multiplicity.REQUIRED, false, null), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE, new AttributeDefinition("sd", STORAGE_DESC_TYPE,
...@@ -149,8 +147,7 @@ public class QuickStart { ...@@ -149,8 +147,7 @@ public class QuickStart {
); );
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
TypesUtil.createClassTypeDef(LOAD_PROCESS_TYPE, null, TypesUtil.createClassTypeDef(LOAD_PROCESS_TYPE, ImmutableList.of("Process"),
attrDef("name", DataTypes.STRING_TYPE),
attrDef("userName", DataTypes.STRING_TYPE), attrDef("userName", DataTypes.STRING_TYPE),
attrDef("startTime", DataTypes.INT_TYPE), attrDef("startTime", DataTypes.INT_TYPE),
attrDef("endTime", DataTypes.INT_TYPE), attrDef("endTime", DataTypes.INT_TYPE),
...@@ -273,7 +270,7 @@ public class QuickStart { ...@@ -273,7 +270,7 @@ public class QuickStart {
"sales fact daily materialized view", reportingDB, sd, "sales fact daily materialized view", reportingDB, sd,
"Joe BI", "Managed", salesFactColumns, "Metric"); "Joe BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "John ETL", loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily), ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph", "create table as select ", "plan", "id", "graph",
"ETL"); "ETL");
...@@ -288,7 +285,7 @@ public class QuickStart { ...@@ -288,7 +285,7 @@ public class QuickStart {
"sales fact monthly materialized view", "sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric"); reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "John ETL", loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly), ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph", "create table as select ", "plan", "id", "graph",
"ETL"); "ETL");
...@@ -362,7 +359,7 @@ public class QuickStart { ...@@ -362,7 +359,7 @@ public class QuickStart {
return createInstance(referenceable); return createInstance(referenceable);
} }
Id loadProcess(String name, String user, Id loadProcess(String name, String description, String user,
List<Id> inputTables, List<Id> inputTables,
List<Id> outputTables, List<Id> outputTables,
String queryText, String queryPlan, String queryText, String queryPlan,
...@@ -370,6 +367,7 @@ public class QuickStart { ...@@ -370,6 +367,7 @@ public class QuickStart {
String... traitNames) throws Exception { String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames); Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
referenceable.set("name", name); referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("user", user); referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis()); referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000); referenceable.set("endTime", System.currentTimeMillis() + 10000);
...@@ -465,6 +463,8 @@ public class QuickStart { ...@@ -465,6 +463,8 @@ public class QuickStart {
*/ */
"Table where name=\"sales_fact\", columns", "Table where name=\"sales_fact\", columns",
"Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment", "Table where name=\"sales_fact\", columns as column select column.name, column.dataType, column.comment",
"from DataSet",
"from Process",
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment