Commit ed06c791 by Venkatesh Seetharam

Refactor Quick start and add DSL for columns

parent f592043c
......@@ -21,11 +21,11 @@ package org.apache.hadoop.metadata.examples;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.metadata.MetadataServiceClient;
import org.apache.hadoop.metadata.typesystem.IStruct;
import org.apache.hadoop.metadata.typesystem.Referenceable;
import org.apache.hadoop.metadata.typesystem.TypesDef;
import org.apache.hadoop.metadata.typesystem.json.InstanceSerialization;
import org.apache.hadoop.metadata.typesystem.json.TypesSerialization;
import org.apache.hadoop.metadata.typesystem.persistence.Id;
import org.apache.hadoop.metadata.typesystem.types.AttributeDefinition;
import org.apache.hadoop.metadata.typesystem.types.ClassType;
import org.apache.hadoop.metadata.typesystem.types.DataTypes;
......@@ -41,14 +41,12 @@ import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A driver that sets up sample types and data for testing purposes.
* Please take a look at QueryDSL in docs for the Meta Model.
* todo - move this to examples module. Fix failing collections.
* todo - move this to examples module.
*/
public class QuickStart {
......@@ -79,12 +77,12 @@ public class QuickStart {
private static final String COLUMN_TYPE = "Column";
private static final String TABLE_TYPE = "Table";
private static final String VIEW_TYPE = "View";
private static final String LOAD_PROCESS_TYPE = "LoadProcess";
private static final String LOAD_PROCESS_TYPE = "hive_process";
private static final String STORAGE_DESC_TYPE = "StorageDesc";
private static final String[] TYPES = {
DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, LOAD_PROCESS_TYPE, VIEW_TYPE,
"JdbcAccess", "ETL", "Metric", "PII", "Fact", "Dimension"
DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, LOAD_PROCESS_TYPE, VIEW_TYPE,
"JdbcAccess", "ETL", "Metric", "PII", "Fact", "Dimension"
};
private final MetadataServiceClient metadataServiceClient;
......@@ -97,6 +95,7 @@ public class QuickStart {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
System.out.println("typesAsJSON = " + typesAsJSON);
metadataServiceClient.createType(typesAsJSON);
// verify types created
......@@ -119,10 +118,7 @@ public class QuickStart {
attrDef("inputFormat", DataTypes.STRING_TYPE),
attrDef("outputFormat", DataTypes.STRING_TYPE),
attrDef("compressed", DataTypes.STRING_TYPE,
Multiplicity.REQUIRED, false, null),
new AttributeDefinition("columns",
DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null)
Multiplicity.REQUIRED, false, null)
);
HierarchicalTypeDefinition<ClassType> columnClsDef =
......@@ -147,7 +143,10 @@ public class QuickStart {
attrDef("viewOriginalText", DataTypes.STRING_TYPE),
attrDef("viewExpandedText", DataTypes.STRING_TYPE),
attrDef("tableType", DataTypes.STRING_TYPE),
attrDef("temporary", DataTypes.BOOLEAN_TYPE)
attrDef("temporary", DataTypes.BOOLEAN_TYPE),
new AttributeDefinition("columns",
DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null)
);
HierarchicalTypeDefinition<ClassType> loadProcessClsDef =
......@@ -159,8 +158,9 @@ public class QuickStart {
new AttributeDefinition("inputTables",
DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
new AttributeDefinition("outputTable", TABLE_TYPE,
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("outputTables",
DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null),
attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
......@@ -221,7 +221,7 @@ public class QuickStart {
}
void createEntities() throws Exception {
Referenceable salesDB = database(
Id salesDB = database(
"Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
......@@ -234,7 +234,7 @@ public class QuickStart {
salesFactColumns.add(rawColumn("customer_id", "int", "customer id", "PII"));
salesFactColumns.add(rawColumn("sales", "double", "product id", "Metric"));
Referenceable salesFact = tableDefinition("sales_fact", "sales fact table",
Id salesFact = table("sales_fact", "sales fact table",
salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
ArrayList<Referenceable> productDimColumns = new ArrayList<>();
......@@ -242,7 +242,7 @@ public class QuickStart {
productDimColumns.add(rawColumn("product_name", "string", "product name"));
productDimColumns.add(rawColumn("brand_name", "int", "brand name"));
Referenceable productDim = tableDefinition("product_dim", "product dimension table",
Id productDim = table("product_dim", "product dimension table",
salesDB, sd, "John Doe", "Managed", productDimColumns, "Dimension");
ArrayList<Referenceable> timeDimColumns = new ArrayList<>();
......@@ -250,7 +250,7 @@ public class QuickStart {
timeDimColumns.add(rawColumn("dayOfYear", "int", "day Of Year"));
timeDimColumns.add(rawColumn("weekDay", "int", "week Day"));
Referenceable timeDim = tableDefinition("time_dim", "time dimension table",
Id timeDim = table("time_dim", "time dimension table",
salesDB, sd, "John Doe", "External", timeDimColumns, "Dimension");
......@@ -259,43 +259,43 @@ public class QuickStart {
customerDimColumns.add(rawColumn("name", "string", "customer name", "PII"));
customerDimColumns.add(rawColumn("address", "string", "customer address", "PII"));
Referenceable customerDim = tableDefinition("customer_dim", "customer dimension table",
Id customerDim = table("customer_dim", "customer dimension table",
salesDB, sd, "fetl", "External", customerDimColumns, "Dimension");
Referenceable reportingDB = database("Reporting", "reporting database", "Jane BI",
Id reportingDB = database("Reporting", "reporting database", "Jane BI",
"hdfs://host:8000/apps/warehouse/reporting");
Referenceable salesFactDaily = tableDefinition("sales_fact_daily_mv",
Id salesFactDaily = table("sales_fact_daily_mv",
"sales fact daily materialized view", reportingDB, sd,
"Joe BI", "Managed", salesFactColumns, "Metric");
Referenceable loadSalesFactDaily = loadProcess("loadSalesDaily", "John ETL",
ImmutableList.of(salesFact, timeDim), salesFactDaily,
Id loadSalesFactDaily = loadProcess("loadSalesDaily", "John ETL",
ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily),
"create table as select ", "plan", "id", "graph",
"ETL");
System.out.println("added loadSalesFactDaily = " + loadSalesFactDaily);
Referenceable productDimView = view("product_dim_view", reportingDB,
Id productDimView = view("product_dim_view", reportingDB,
ImmutableList.of(productDim), "Dimension", "JdbcAccess");
System.out.println("added productDimView = " + productDimView);
Referenceable customerDimView = view("customer_dim_view", reportingDB,
Id customerDimView = view("customer_dim_view", reportingDB,
ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
System.out.println("added customerDimView = " + customerDimView);
Referenceable salesFactMonthly = tableDefinition("sales_fact_monthly_mv",
Id salesFactMonthly = table("sales_fact_monthly_mv",
"sales fact monthly materialized view",
reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric");
Referenceable loadSalesFactMonthly = loadProcess("loadSalesMonthly", "John ETL",
ImmutableList.of(salesFactDaily), salesFactMonthly,
Id loadSalesFactMonthly = loadProcess("loadSalesMonthly", "John ETL",
ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly),
"create table as select ", "plan", "id", "graph",
"ETL");
System.out.println("added loadSalesFactMonthly = " + loadSalesFactMonthly);
}
private Referenceable createInstance(Referenceable referenceable) throws Exception {
private Id createInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName();
String entityJSON = InstanceSerialization.toJson(referenceable, true);
......@@ -304,23 +304,11 @@ public class QuickStart {
String guid = jsonObject.getString(MetadataServiceClient.RESULTS);
System.out.println("created instance for type " + typeName + ", guid: " + guid);
// return the reference to created instance with guid
final ImmutableList<String> traitNames = referenceable.getTraits();
if (traitNames.isEmpty()) {
return new Referenceable(guid, referenceable.getTypeName(),
referenceable.getValuesMap());
} else {
Map<String, IStruct> traits = new HashMap<>();
for (String traitName : traitNames) {
traits.put(traitName, referenceable.getTrait(traitName));
}
return new Referenceable(guid, referenceable.getTypeName(),
referenceable.getValuesMap(), traitNames, traits);
}
// return the Id for created instance with guid
return new Id(guid, referenceable.getId().getVersion(), referenceable.getTypeName());
}
Referenceable database(String name, String description,
Id database(String name, String description,
String owner, String locationUri,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
......@@ -355,28 +343,11 @@ public class QuickStart {
return referenceable;
}
Referenceable tableDefinition(String name, String description,
Referenceable db, Referenceable sdReferenceable,
String owner, String tableType,
List<Referenceable> columnsList,
String... traitNames) throws Exception {
List<Referenceable> columns = new ArrayList<>();
for (Referenceable columnReferenceable : columnsList) {
columns.add(createInstance(columnReferenceable));
}
sdReferenceable.set("columns", columns);
Referenceable sd = createInstance(sdReferenceable);
return table(name, description, db, sd, owner, tableType, traitNames);
}
Referenceable table(String name, String description,
Referenceable db, Referenceable sd,
String owner, String tableType,
// List<Referenceable> columns,
String... traitNames) throws Exception {
Id table(String name, String description,
Id dbId, Referenceable sd,
String owner, String tableType,
List<Referenceable> columns,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
......@@ -385,18 +356,19 @@ public class QuickStart {
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", db);
referenceable.set("db", dbId);
referenceable.set("sd", sd);
referenceable.set("columns", columns);
return createInstance(referenceable);
}
Referenceable loadProcess(String name, String user,
List<Referenceable> inputTables,
Referenceable outputTable,
String queryText, String queryPlan,
String queryId, String queryGraph,
String... traitNames) throws Exception {
Id loadProcess(String name, String user,
List<Id> inputTables,
List<Id> outputTables,
String queryText, String queryPlan,
String queryId, String queryGraph,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("user", user);
......@@ -404,7 +376,7 @@ public class QuickStart {
referenceable.set("endTime", System.currentTimeMillis() + 10000);
referenceable.set("inputTables", inputTables);
referenceable.set("outputTable", outputTable);
referenceable.set("outputTables", outputTables);
referenceable.set("queryText", queryText);
referenceable.set("queryPlan", queryPlan);
......@@ -414,12 +386,12 @@ public class QuickStart {
return createInstance(referenceable);
}
Referenceable view(String name, Referenceable db,
List<Referenceable> inputTables,
String... traitNames) throws Exception {
Id view(String name, Id dbId,
List<Id> inputTables,
String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("db", db);
referenceable.set("db", dbId);
referenceable.set("inputTables", inputTables);
......@@ -448,7 +420,7 @@ public class QuickStart {
"DB, Table",
"DB is JdbcAccess",
/*
"DB, LoadProcess has name",
"DB, hive_process has name",
"DB as db1, Table where db1.name = \"Reporting\"",
"DB where DB.name=\"Reporting\" and DB.createTime < " + System.currentTimeMillis()},
*/
......@@ -487,12 +459,12 @@ public class QuickStart {
"PII",
/*
// Lineage - todo - fix this, its not working
"Table LoadProcess outputTable",
"Table loop (LoadProcess outputTable)",
"Table as _loop0 loop (LoadProcess outputTable) withPath",
"Table as src loop (LoadProcess outputTable) as dest select src.name as srcTable, dest.name as destTable withPath",
"Table hive_process outputTables",
"Table loop (hive_process outputTables)",
"Table as _loop0 loop (hive_process outputTables) withPath",
"Table as src loop (hive_process outputTables) as dest select src.name as srcTable, dest.name as destTable withPath",
*/
"Table as t, sd, columns where t.name=\"sales_fact\"",
"Table as t, columns where t.name=\"sales_fact\"",
};
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment