Commit 0e81ceb4 by Shwetha GS

ATLAS-432 QuickStart lineage is broken (yhemanth via shwethags)

parent 646f29c3
......@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES:
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-432 QuickStart lineage is broken (yhemanth via shwethags)
ATLAS-421 typo in Architecture.twiki (dbist13 via shwethags)
ATLAS-387 Running quick_start without a valid atlas endpoint in configuration or argument prints a spurious success message (yhemanth via shwethags)
ATLAS-182 Add data model for Storm topology elements (svenkat,yhemanth via shwethags)
......
......@@ -37,6 +37,7 @@ import org.apache.atlas.typesystem.types.IDataType;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.commons.configuration.Configuration;
import org.codehaus.jettison.json.JSONArray;
......@@ -50,6 +51,23 @@ import java.util.List;
*/
public class QuickStart {
public static final String ATLAS_REST_ADDRESS = "atlas.rest.address";
public static final String SALES_DB = "Sales";
public static final String SALES_DB_DESCRIPTION = "Sales Database";
public static final String SALES_FACT_TABLE = "sales_fact";
public static final String FACT_TRAIT = "Fact";
public static final String COLUMNS_ATTRIBUTE = "columns";
public static final String TIME_ID_COLUMN = "time_id";
public static final String DB_ATTRIBUTE = "db";
public static final String SALES_FACT_TABLE_DESCRIPTION = "sales fact table";
public static final String LOAD_SALES_DAILY_PROCESS = "loadSalesDaily";
public static final String LOAD_SALES_DAILY_PROCESS_DESCRIPTION = "hive query for daily summary";
public static final String INPUTS_ATTRIBUTE = "inputs";
public static final String OUTPUTS_ATTRIBUTE = "outputs";
public static final String TIME_DIM_TABLE = "time_dim";
public static final String SALES_FACT_DAILY_MV_TABLE = "sales_fact_daily_mv";
public static final String PRODUCT_DIM_VIEW = "product_dim_view";
public static final String PRODUCT_DIM_TABLE = "product_dim";
public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";
public static void main(String[] args) throws Exception {
String baseUrl = getServerUrl(args);
......@@ -80,12 +98,12 @@ public class QuickStart {
return baseUrl;
}
private static final String DATABASE_TYPE = "DB";
private static final String COLUMN_TYPE = "Column";
private static final String TABLE_TYPE = "Table";
private static final String VIEW_TYPE = "View";
private static final String LOAD_PROCESS_TYPE = "LoadProcess";
private static final String STORAGE_DESC_TYPE = "StorageDesc";
static final String DATABASE_TYPE = "DB";
static final String COLUMN_TYPE = "Column";
static final String TABLE_TYPE = "Table";
static final String VIEW_TYPE = "View";
static final String LOAD_PROCESS_TYPE = "LoadProcess";
static final String STORAGE_DESC_TYPE = "StorageDesc";
private static final String[] TYPES =
{DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, LOAD_PROCESS_TYPE, VIEW_TYPE, "JdbcAccess",
......@@ -110,7 +128,8 @@ public class QuickStart {
TypesDef createTypeDefinitions() throws Exception {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
.createClassTypeDef(DATABASE_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
......@@ -125,14 +144,14 @@ public class QuickStart {
HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(TABLE_TYPE, ImmutableList.of("DataSet"),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE),
attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("retention", DataTypes.LONG_TYPE),
attrDef("viewOriginalText", DataTypes.STRING_TYPE),
attrDef("viewExpandedText", DataTypes.STRING_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
attrDef("temporary", DataTypes.BOOLEAN_TYPE),
new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null));
HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
......@@ -145,7 +164,8 @@ public class QuickStart {
attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
.createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
.createClassTypeDef(VIEW_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null));
......@@ -185,7 +205,7 @@ public class QuickStart {
}
void createEntities() throws Exception {
Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
Id salesDB = database(SALES_DB, SALES_DB_DESCRIPTION, "John ETL", "hdfs://host:8000/apps/warehouse/sales");
Referenceable sd =
......@@ -193,7 +213,7 @@ public class QuickStart {
true);
List<Referenceable> salesFactColumns = ImmutableList
.of(rawColumn("time_id", "int", "time id"), rawColumn("product_id", "int", "product id"),
.of(rawColumn(TIME_ID_COLUMN, "int", "time id"), rawColumn("product_id", "int", "product id"),
rawColumn("customer_id", "int", "customer id", "PII"),
rawColumn("sales", "double", "product id", "Metric"));
......@@ -201,21 +221,22 @@ public class QuickStart {
.of(rawColumn("time_id", "int", "time id"), rawColumn("app_id", "int", "app id"),
rawColumn("machine_id", "int", "machine id"), rawColumn("log", "string", "log data", "Log Data"));
Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
Id salesFact = table(SALES_FACT_TABLE, SALES_FACT_TABLE_DESCRIPTION, salesDB, sd, "Joe", "Managed",
salesFactColumns, FACT_TRAIT);
List<Referenceable> productDimColumns = ImmutableList
.of(rawColumn("product_id", "int", "product id"), rawColumn("product_name", "string", "product name"),
rawColumn("brand_name", "int", "brand name"));
Id productDim =
table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
"Dimension");
table(PRODUCT_DIM_TABLE, "product dimension table", salesDB, sd, "John Doe", "Managed",
productDimColumns, "Dimension");
List<Referenceable> timeDimColumns = ImmutableList
.of(rawColumn("time_id", "int", "time id"), rawColumn("dayOfYear", "int", "day Of Year"),
rawColumn("weekDay", "int", "week Day"));
Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
Id timeDim = table(TIME_DIM_TABLE, "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
"Dimension");
......@@ -234,17 +255,18 @@ public class QuickStart {
Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
Id salesFactDaily =
table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
salesFactColumns, "Metric");
table(SALES_FACT_DAILY_MV_TABLE, "sales fact daily materialized view", reportingDB, sd, "Joe BI",
"Managed", salesFactColumns, "Metric");
Id loggingFactDaily =
table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
logFactColumns, "Log Data");
loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
loadProcess(LOAD_SALES_DAILY_PROCESS, LOAD_SALES_DAILY_PROCESS_DESCRIPTION, "John ETL",
ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
view(PRODUCT_DIM_VIEW, reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
......@@ -272,7 +294,8 @@ public class QuickStart {
System.out.println("created instance for type " + typeName + ", guid: " + guids);
// return the Id for created instance with guid
return new Id(guids.getString(0), referenceable.getId().getVersion(), referenceable.getTypeName());
return new Id(guids.getString(guids.length()-1), referenceable.getId().getVersion(),
referenceable.getTypeName());
}
Id database(String name, String description, String owner, String locationUri, String... traitNames)
......@@ -331,8 +354,8 @@ public class QuickStart {
// super type attributes
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("inputs", inputTables);
referenceable.set("outputs", outputTables);
referenceable.set(INPUTS_ATTRIBUTE, inputTables);
referenceable.set(OUTPUTS_ATTRIBUTE, outputTables);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
......@@ -351,7 +374,7 @@ public class QuickStart {
referenceable.set("name", name);
referenceable.set("db", dbId);
referenceable.set("inputTables", inputTables);
referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);
return createInstance(referenceable);
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.examples;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.web.resources.BaseResourceIT;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;
public class QuickStartIT extends BaseResourceIT {
@BeforeClass
public void runQuickStart() throws Exception {
super.setUp();
QuickStart.main(new String[]{});
}
@Test
public void testDBIsAdded() throws Exception {
Referenceable db = getDB(QuickStart.SALES_DB);
assertEquals(QuickStart.SALES_DB, db.get("name"));
assertEquals(QuickStart.SALES_DB_DESCRIPTION, db.get("description"));
}
private Referenceable getDB(String dbName) throws AtlasServiceException, JSONException {
return serviceClient.getEntity(QuickStart.DATABASE_TYPE, "name", dbName);
}
@Test
public void testTablesAreAdded() throws AtlasServiceException, JSONException {
Referenceable table = getTable(QuickStart.SALES_FACT_TABLE);
verifySimpleTableAttributes(table);
verifyDBIsLinkedToTable(table);
verifyColumnsAreAddedToTable(table);
verifyTrait(table);
}
private Referenceable getTable(String tableName) throws AtlasServiceException {
return serviceClient.getEntity(QuickStart.TABLE_TYPE, "name", tableName);
}
private void verifyTrait(Referenceable table) throws JSONException {
assertNotNull(table.getTrait(QuickStart.FACT_TRAIT));
}
private void verifyColumnsAreAddedToTable(Referenceable table) throws JSONException {
List<Referenceable> columns = (List<Referenceable>) table.get(QuickStart.COLUMNS_ATTRIBUTE);
assertEquals(4, columns.size());
Referenceable column = columns.get(0);
assertEquals(QuickStart.TIME_ID_COLUMN, column.get("name"));
assertEquals("int", column.get("dataType"));
}
private void verifyDBIsLinkedToTable(Referenceable table) throws AtlasServiceException, JSONException {
Referenceable db = getDB(QuickStart.SALES_DB);
assertEquals(db.getId(), table.get(QuickStart.DB_ATTRIBUTE));
}
private void verifySimpleTableAttributes(Referenceable table) throws JSONException {
assertEquals(QuickStart.SALES_FACT_TABLE, table.get("name"));
assertEquals(QuickStart.SALES_FACT_TABLE_DESCRIPTION, table.get("description"));
}
@Test
public void testProcessIsAdded() throws AtlasServiceException, JSONException {
Referenceable loadProcess = serviceClient.getEntity(QuickStart.LOAD_PROCESS_TYPE, "name",
QuickStart.LOAD_SALES_DAILY_PROCESS);
assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS, loadProcess.get("name"));
assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS_DESCRIPTION, loadProcess.get("description"));
List<Id> inputs = (List<Id>)loadProcess.get(QuickStart.INPUTS_ATTRIBUTE);
List<Id> outputs = (List<Id>)loadProcess.get(QuickStart.OUTPUTS_ATTRIBUTE);
assertEquals(2, inputs.size());
String salesFactTableId = getTableId(QuickStart.SALES_FACT_TABLE);
String timeDimTableId = getTableId(QuickStart.TIME_DIM_TABLE);
String salesFactDailyMVId = getTableId(QuickStart.SALES_FACT_DAILY_MV_TABLE);
assertEquals(salesFactTableId, inputs.get(0)._getId());
assertEquals(timeDimTableId, inputs.get(1)._getId());
assertEquals(salesFactDailyMVId, outputs.get(0)._getId());
}
private String getTableId(String tableName) throws AtlasServiceException {
return getTable(tableName).getId()._getId();
}
@Test
public void testLineageIsMaintained() throws AtlasServiceException, JSONException {
String salesFactTableId = getTableId(QuickStart.SALES_FACT_TABLE);
String timeDimTableId = getTableId(QuickStart.TIME_DIM_TABLE);
String salesFactDailyMVId = getTableId(QuickStart.SALES_FACT_DAILY_MV_TABLE);
JSONObject inputGraph = serviceClient.getInputGraph(QuickStart.SALES_FACT_DAILY_MV_TABLE);
JSONObject vertices = (JSONObject) ((JSONObject) inputGraph.get("values")).get("vertices");
JSONObject edges = (JSONObject) ((JSONObject) inputGraph.get("values")).get("edges");
assertTrue(vertices.has(salesFactTableId));
assertTrue(vertices.has(timeDimTableId));
assertTrue(vertices.has(salesFactDailyMVId));
assertTrue(edges.has(salesFactDailyMVId));
JSONArray inputs = (JSONArray)edges.get((String) ((JSONArray) edges.get(salesFactDailyMVId)).get(0));
String i1 = inputs.getString(0);
String i2 = inputs.getString(1);
assertTrue(salesFactTableId.equals(i1) || salesFactTableId.equals(i2));
assertTrue(timeDimTableId.equals(i1) || timeDimTableId.equals(i2));
}
@Test
public void testViewIsAdded() throws AtlasServiceException, JSONException {
Referenceable view = serviceClient.getEntity(QuickStart.VIEW_TYPE, "name", QuickStart.PRODUCT_DIM_VIEW);
assertEquals(QuickStart.PRODUCT_DIM_VIEW, view.get("name"));
Id productDimId = getTable(QuickStart.PRODUCT_DIM_TABLE).getId();
Id inputTableId = ((List<Id>)view.get(QuickStart.INPUT_TABLES_ATTRIBUTE)).get(0);
assertEquals(productDimId, inputTableId);
}
}
......@@ -194,18 +194,9 @@ public abstract class BaseResourceIT {
HierarchicalTypeDefinition<TraitType> financeTrait =
TypesUtil.createTraitTypeDef("finance", ImmutableList.<String>of());
HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
TypesDef typesDef = TypesUtil.getTypesDef(ImmutableList.of(enumTypeDefinition),
ImmutableList.of(structTypeDefinition),
ImmutableList.of(classificationTrait, piiTrait, phiTrait, pciTrait, soxTrait, secTrait, financeTrait,
dimTraitDef, factTraitDef, metricTraitDef, etlTraitDef),
ImmutableList.of(classificationTrait, piiTrait, phiTrait, pciTrait, soxTrait, secTrait, financeTrait),
ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, loadProcessClsDef));
createType(typesDef);
......
......@@ -184,7 +184,7 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
"Joe BI", "MANAGED", salesFactColumns, "Metric");
loadProcess("loadSalesDaily", "John ETL", ImmutableList.of(salesFact, timeDim),
loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
......@@ -192,7 +192,7 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
table(salesMonthlyTable, "sales fact monthly materialized view", reportingDB, "Jane BI",
"MANAGED", salesFactColumns, "Metric");
loadProcess("loadSalesMonthly", "John ETL", ImmutableList.of(salesFactDaily),
loadProcess("loadSalesMonthly" + randomString(), "John ETL", ImmutableList.of(salesFactDaily),
ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment