Commit 1c034cba by Pinal Shah Committed by nixonrodrigues

ATLAS-3788 : BasicSearch: Classification with System attribute(indexed) filters…

ATLAS-3788 : BasicSearch: Classification with System attribute(indexed) filters has pagination issue Signed-off-by: 's avatarnixonrodrigues <nixon@apache.org>
parent 103e867c
......@@ -21,12 +21,7 @@ import org.apache.atlas.SortOrder;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.util.AtlasGremlinQueryProvider;
......@@ -44,10 +39,6 @@ import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_CLASSIFIED;
import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_NOT_CLASSIFIED;
import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_WILDCARD_CLASSIFICATION;
/**
* This class is needed when this is a registered classification type or wildcard search,
* registered classification includes special type as well. (tag filters will be ignored, and front-end should not enable
......@@ -293,6 +284,8 @@ public class ClassificationSearchProcessor extends SearchProcessor {
getVerticesFromIndexQueryResult(queryResult, classificationVertices);
isLastResultPage = classificationVertices.size() < limit;
// Do in-memory filtering before the graph query
CollectionUtils.filter(classificationVertices, inMemoryPredicate);
}
......
......@@ -18,8 +18,6 @@
package org.apache.atlas;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
......@@ -58,6 +56,7 @@ public abstract class BasicTestSetup {
public static final String ETL_CLASSIFICATION = "ETL";
public static final String JDBC_CLASSIFICATION = "JdbcAccess";
public static final String LOGDATA_CLASSIFICATION = "Log Data";
public static final String DIMENSIONAL_CLASSIFICATION = "Dimensional";
@Inject
protected AtlasTypeRegistry typeRegistry;
......@@ -143,11 +142,11 @@ public abstract class BasicTestSetup {
List<AtlasEntity> salesFactColumns = ImmutableList
.of(column("time_id", "int", "time id"),
column("product_id", "int", "product id"),
column("customer_id", "int", "customer id", "PII"),
column("sales", "double", "product id", "Metric"));
column("customer_id", "int", "customer id", PII_CLASSIFICATION),
column("sales", "double", "product id", METRIC_CLASSIFICATION));
entities.addAll(salesFactColumns);
AtlasEntity salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
AtlasEntity salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, FACT_CLASSIFICATION);
salesFact.setAttribute("createTime", new Date(2018, 01, 01));
entities.add(salesFact);
......@@ -155,7 +154,7 @@ public abstract class BasicTestSetup {
.of(column("time_id", "int", "time id"),
column("app_id", "int", "app id"),
column("machine_id", "int", "machine id"),
column("log", "string", "log data", "Log Data"));
column("log", "string", "log data", LOGDATA_CLASSIFICATION));
entities.addAll(logFactColumns);
List<AtlasEntity> timeDimColumns = ImmutableList
......@@ -168,7 +167,7 @@ public abstract class BasicTestSetup {
entities.add(sd);
AtlasEntity timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
"Dimension");
DIMENSION_CLASSIFICATION);
entities.add(timeDim);
AtlasEntity reportingDB =
......@@ -180,32 +179,32 @@ public abstract class BasicTestSetup {
AtlasEntity salesFactDaily =
table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
salesFactColumns, "Metric");
salesFactColumns, METRIC_CLASSIFICATION);
salesFactDaily.setAttribute("createTime", Date.from(LocalDate.of(2016, 8, 19).atStartOfDay(ZoneId.systemDefault()).toInstant()));
entities.add(salesFactDaily);
sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id")));
entities.add(sd);
AtlasEntity circularLineageTable1 = table("table1", "", reportingDB, sd, "Vimal", "Managed", salesFactColumns, "Metric");
AtlasEntity circularLineageTable1 = table("table1", "", reportingDB, sd, "Vimal", "Managed", salesFactColumns, METRIC_CLASSIFICATION);
entities.add(circularLineageTable1);
sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id")));
entities.add(sd);
AtlasEntity circularLineageTable2 = table("table2", "", reportingDB, sd, "Vimal 2", "Managed", salesFactColumns, "Metric");
AtlasEntity circularLineageTable2 = table("table2", "", reportingDB, sd, "Vimal 2", "Managed", salesFactColumns, METRIC_CLASSIFICATION);
entities.add(circularLineageTable2);
AtlasEntity circularLineage1Process = loadProcess("circularLineage1", "hive query for daily summary", "John ETL", ImmutableList.of(circularLineageTable1),
ImmutableList.of(circularLineageTable2), "create table as select ", "plan", "id", "graph", "ETL");
ImmutableList.of(circularLineageTable2), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
entities.add(circularLineage1Process);
AtlasEntity circularLineage2Process = loadProcess("circularLineage2", "hive query for daily summary", "John ETL", ImmutableList.of(circularLineageTable2),
ImmutableList.of(circularLineageTable1), "create table as select ", "plan", "id", "graph", "ETL");
ImmutableList.of(circularLineageTable1), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
entities.add(circularLineage2Process);
AtlasEntity loadSalesDaily = loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
entities.add(loadSalesDaily);
AtlasEntity logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
......@@ -216,7 +215,7 @@ public abstract class BasicTestSetup {
AtlasEntity loggingFactDaily =
table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
logFactColumns, "Log Data");
logFactColumns, LOGDATA_CLASSIFICATION);
entities.add(loggingFactDaily);
List<AtlasEntity> productDimColumns = ImmutableList
......@@ -230,16 +229,16 @@ public abstract class BasicTestSetup {
AtlasEntity productDim =
table("product_dim", "product dimension table", salesDB, sd, "John Doe 2", "Managed", productDimColumns,
"Dimension");
DIMENSION_CLASSIFICATION);
entities.add(productDim);
AtlasEntity productDimView = view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
AtlasEntity productDimView = view("product_dim_view", reportingDB, ImmutableList.of(productDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION);
entities.add(productDimView);
List<AtlasEntity> customerDimColumns = ImmutableList.of(
column("customer_id", "int", "customer id", "PII"),
column("name", "string", "customer name", "PII"),
column("address", "string", "customer address", "PII"));
column("customer_id", "int", "customer id", PII_CLASSIFICATION),
column("name", "string", "customer name", PII_CLASSIFICATION),
column("address", "string", "customer address", PII_CLASSIFICATION));
entities.addAll(customerDimColumns);
sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id")));
......@@ -247,10 +246,10 @@ public abstract class BasicTestSetup {
AtlasEntity customerDim =
table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
"Dimension");
DIMENSION_CLASSIFICATION);
entities.add(customerDim);
AtlasEntity customerDimView = view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
AtlasEntity customerDimView = view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION);
entities.add(customerDimView);
sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id")));
......@@ -258,11 +257,11 @@ public abstract class BasicTestSetup {
AtlasEntity salesFactMonthly =
table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
"Managed", salesFactColumns, "Metric");
"Managed", salesFactColumns, METRIC_CLASSIFICATION);
entities.add(salesFactMonthly);
AtlasEntity loadSalesMonthly = loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
entities.add(loadSalesMonthly);
sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id")));
......@@ -270,11 +269,11 @@ public abstract class BasicTestSetup {
AtlasEntity loggingFactMonthly =
table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL 2",
"Managed", logFactColumns, "Log Data");
"Managed", logFactColumns, LOGDATA_CLASSIFICATION);
entities.add(loggingFactMonthly);
AtlasEntity loadLogsMonthly = loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION);
entities.add(loadLogsMonthly);
AtlasEntity datasetSubType = datasetSubType("dataSetSubTypeInst1", "testOwner");
......@@ -290,7 +289,9 @@ public abstract class BasicTestSetup {
new AtlasClassificationDef(METRIC_CLASSIFICATION, "Metric Classification", "1.0"),
new AtlasClassificationDef(ETL_CLASSIFICATION, "ETL Classification", "1.0"),
new AtlasClassificationDef(JDBC_CLASSIFICATION, "JdbcAccess Classification", "1.0"),
new AtlasClassificationDef(LOGDATA_CLASSIFICATION, "LogData Classification", "1.0"));
new AtlasClassificationDef(LOGDATA_CLASSIFICATION, "LogData Classification", "1.0"),
new AtlasClassificationDef(DIMENSIONAL_CLASSIFICATION,"Dimensional Classification", "1.0" ,
Arrays.asList(new AtlasStructDef.AtlasAttributeDef("attr1","string"))));
AtlasTypesDef tds = new AtlasTypesDef(Collections.<AtlasEnumDef>emptyList(),
Collections.<AtlasStructDef>emptyList(),
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.BasicTestSetup;
import org.apache.atlas.TestModules;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.commons.collections.CollectionUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.apache.atlas.model.discovery.SearchParameters.*;
import static org.testng.Assert.assertEquals;
@Guice(modules = TestModules.TestOnlyModule.class)
public class BasicSearchClassificationTest extends BasicTestSetup {
@Inject
private AtlasDiscoveryService discoveryService;
private int totalEntities = 0;
private int totalClassifiedEntities = 0;
private int getTotalClassifiedEntitiesHistorical = 0;
private int dimensionTagEntities = 10;
private String dimensionTagDeleteGuid;
private String dimensionalTagGuid;
@BeforeClass
public void setup() throws AtlasBaseException {
setupTestData();
createDimensionTaggedEntityAndDelete();
createDimensionalTaggedEntityWithAttr();
}
@Test(priority = -1)
public void searchByALLTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(ALL_CLASSIFICATION_TYPES);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders));
totalEntities = getEntityCount();
totalClassifiedEntities = entityHeaders.size();
getTotalClassifiedEntitiesHistorical = getEntityWithTagCountHistorical();
}
@Test
public void searchByALLTagAndIndexSysFilters() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(ALL_CLASSIFICATION_TYPES);
FilterCriteria filterCriteria = getSingleFilterCondition("__timestamp", Operator.LT, String.valueOf(System.currentTimeMillis()));
params.setTagFilters(filterCriteria);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), totalClassifiedEntities);
}
@Test
public void searchByALLTagAndIndexSysFiltersToTestLimit() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(ALL_CLASSIFICATION_TYPES);
FilterCriteria filterCriteria = getSingleFilterCondition("__timestamp", Operator.LT, String.valueOf(System.currentTimeMillis()));
params.setTagFilters(filterCriteria);
params.setLimit(totalClassifiedEntities - 2);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), totalClassifiedEntities - 2);
}
@Test
public void searchByNOTCLASSIFIED() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(NO_CLASSIFICATIONS);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), totalEntities - totalClassifiedEntities);
}
@Test
public void searchByTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(DIMENSION_CLASSIFICATION);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), dimensionTagEntities);
}
@Test
public void searchByTagAndTagFilters() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(DIMENSIONAL_CLASSIFICATION);
FilterCriteria filterCriteria = getSingleFilterCondition("attr1", Operator.EQ, "Test");
params.setTagFilters(filterCriteria);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), 1);
assertEquals(entityHeaders.get(0).getGuid(), dimensionalTagGuid);
}
@Test
public void searchByTagAndIndexSysFilters() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(DIMENSION_CLASSIFICATION);
FilterCriteria filterCriteria = getSingleFilterCondition("__timestamp", Operator.LT, String.valueOf(System.currentTimeMillis()));
params.setTagFilters(filterCriteria);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), dimensionTagEntities);
}
@Test
public void searchByWildcardTag() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification("Dimension*");
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), dimensionTagEntities + 1);
}
//@Test
public void searchByTagAndGraphSysFilters() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(DIMENSION_CLASSIFICATION);
FilterCriteria filterCriteria = getSingleFilterCondition("__entityStatus", Operator.EQ, "DELETED");
params.setTagFilters(filterCriteria);
params.setExcludeDeletedEntities(false);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
assertEquals(entityHeaders.size(), 1);
assertEquals(entityHeaders.get(0).getGuid(), dimensionTagDeleteGuid);
}
private void createDimensionTaggedEntityAndDelete() throws AtlasBaseException {
AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE);
entityToDelete.setAttribute("name", "entity to be deleted");
entityToDelete.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "entity.tobedeleted");
List<AtlasClassification> cls = new ArrayList<>();
cls.add(new AtlasClassification(DIMENSION_CLASSIFICATION));
entityToDelete.setClassifications(cls);
//create entity
EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(new AtlasEntity.AtlasEntitiesWithExtInfo(entityToDelete)), false);
AtlasEntityHeader entityHeader = response.getCreatedEntities().get(0);
dimensionTagDeleteGuid = entityHeader.getGuid();
//delete entity
entityStore.deleteById(dimensionTagDeleteGuid);
}
private void createDimensionalTaggedEntityWithAttr() throws AtlasBaseException {
AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE);
entityToDelete.setAttribute("name", "Entity1");
entityToDelete.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "entity.one");
List<AtlasClassification> cls = new ArrayList<>();
cls.add(new AtlasClassification(DIMENSIONAL_CLASSIFICATION, new HashMap<String, Object>() {{
put("attr1", "Test");
}}));
entityToDelete.setClassifications(cls);
//create entity
final EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(new AtlasEntity.AtlasEntitiesWithExtInfo(entityToDelete)), false);
AtlasEntityHeader entityHeader = response.getCreatedEntities().get(0);
dimensionalTagGuid = entityHeader.getGuid();
}
private int getEntityCount() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setTypeName(ALL_ENTITY_TYPES);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
return entityHeaders.size();
}
private int getEntityWithTagCountHistorical() throws AtlasBaseException {
SearchParameters params = new SearchParameters();
params.setClassification(ALL_CLASSIFICATION_TYPES);
params.setExcludeDeletedEntities(false);
List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities();
return entityHeaders.size();
}
private FilterCriteria getSingleFilterCondition(String attName, Operator op, String attrValue) {
FilterCriteria filterCriteria = new FilterCriteria();
filterCriteria.setCondition(FilterCriteria.Condition.AND);
List<FilterCriteria> criteria = new ArrayList<>();
FilterCriteria f1 = new FilterCriteria();
f1.setAttributeName(attName);
f1.setOperator(op);
String time = String.valueOf(System.currentTimeMillis());
f1.setAttributeValue(attrValue);
criteria.add(f1);
filterCriteria.setCriterion(criteria);
return filterCriteria;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment