Commit 9e1f3663 by Suma Shivaprasad

ATLAS-619 Canonicalize hive queries (sumasai)

parent b6a0eee7
......@@ -211,10 +211,10 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
if (!inputs.isEmpty() || !outputs.isEmpty()) {
Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
processEntity.set(FalconDataModelGenerator.NAME, String.format("%s@%s", process.getName(),
processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(),
cluster.getName()));
processEntity.set(FalconDataModelGenerator.PROCESS_NAME, process.getName());
processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(),
cluster.getName()));
processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
if (!inputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
......
......@@ -57,7 +57,6 @@ public class FalconDataModelGenerator {
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
public static final String NAME = "name";
public static final String PROCESS_NAME = "processName";
public static final String TIMESTAMP = "timestamp";
public static final String USER = "owned-by";
public static final String TAGS = "tag-classification";
......@@ -107,8 +106,6 @@ public class FalconDataModelGenerator {
private void createProcessEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(PROCESS_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
......
......@@ -150,7 +150,7 @@ public class FalconHookIT {
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
assertNotNull(processEntity);
assertEquals(processEntity.get("processName"), process.getName());
assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
Referenceable inEntity = atlasClient.getEntity(inId._getId());
......@@ -207,7 +207,7 @@ public class FalconHookIT {
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
assertEquals(processEntity.get("processName"), process.getName());
assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
assertNull(processEntity.get("inputs"));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
......@@ -233,8 +233,8 @@ public class FalconHookIT {
private String assertProcessIsRegistered(String clusterName, String processName) throws Exception {
String name = processName + "@" + clusterName;
LOG.debug("Searching for process {}", name);
String query = String.format("%s as t where name = '%s' select t",
FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), name);
String query = String.format("%s as t where %s = '%s' select t",
FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
return assertEntityIsRegistered(query);
}
......
......@@ -72,6 +72,13 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
......
......@@ -19,11 +19,14 @@
package org.apache.atlas.hive.hook;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hive.rewrite.HiveASTRewriter;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
......@@ -92,110 +95,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
static class HiveEventContext {
private Set<ReadEntity> inputs;
private Set<WriteEntity> outputs;
private String user;
private UserGroupInformation ugi;
private HiveOperation operation;
private HookContext.HookType hookType;
private org.json.JSONObject jsonPlan;
private String queryId;
private String queryStr;
private Long queryStartTime;
private String queryType;
public void setInputs(Set<ReadEntity> inputs) {
this.inputs = inputs;
}
public void setOutputs(Set<WriteEntity> outputs) {
this.outputs = outputs;
}
public void setUser(String user) {
this.user = user;
}
public void setUgi(UserGroupInformation ugi) {
this.ugi = ugi;
}
public void setOperation(HiveOperation operation) {
this.operation = operation;
}
public void setHookType(HookContext.HookType hookType) {
this.hookType = hookType;
}
public void setJsonPlan(JSONObject jsonPlan) {
this.jsonPlan = jsonPlan;
}
public void setQueryId(String queryId) {
this.queryId = queryId;
}
public void setQueryStr(String queryStr) {
this.queryStr = queryStr;
}
public void setQueryStartTime(Long queryStartTime) {
this.queryStartTime = queryStartTime;
}
public void setQueryType(String queryType) {
this.queryType = queryType;
}
public Set<ReadEntity> getInputs() {
return inputs;
}
public Set<WriteEntity> getOutputs() {
return outputs;
}
public String getUser() {
return user;
}
public UserGroupInformation getUgi() {
return ugi;
}
public HiveOperation getOperation() {
return operation;
}
public HookContext.HookType getHookType() {
return hookType;
}
public org.json.JSONObject getJsonPlan() {
return jsonPlan;
}
public String getQueryId() {
return queryId;
}
public String getQueryStr() {
return queryStr;
}
public Long getQueryStartTime() {
return queryStartTime;
}
public String getQueryType() {
return queryType;
}
}
private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
private static final HiveConf hiveConf;
......@@ -362,7 +261,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private void deleteTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
for (WriteEntity output : event.outputs) {
for (WriteEntity output : event.getOutputs()) {
if (Type.TABLE.equals(output.getType())) {
deleteTable(dgiBridge, event, output);
}
......@@ -380,11 +279,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
if (event.outputs.size() > 1) {
LOG.info("Starting deletion of tables and databases with cascade {} " , event.queryStr);
if (event.getOutputs().size() > 1) {
LOG.info("Starting deletion of tables and databases with cascade {} " , event.getQueryStr());
}
for (WriteEntity output : event.outputs) {
for (WriteEntity output : event.getOutputs()) {
if (Type.TABLE.equals(output.getType())) {
deleteTable(dgiBridge, event, output);
} else if (Type.DATABASE.equals(output.getType())) {
......@@ -552,13 +451,28 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return entitiesCreatedOrUpdated;
}
public static String normalize(String str) {
public static String lower(String str) {
if (StringUtils.isEmpty(str)) {
return null;
}
return str.toLowerCase().trim();
}
public static String normalize(String queryStr) {
String result = null;
if (queryStr != null) {
try {
HiveASTRewriter rewriter = new HiveASTRewriter(hiveConf);
result = rewriter.rewrite(queryStr);
} catch (Exception e) {
LOG.warn("Could not rewrite query due to error. Proceeding with original query {}", queryStr, e);
}
}
result = lower(result);
return result;
}
private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
Set<ReadEntity> inputs = event.getInputs();
Set<WriteEntity> outputs = event.getOutputs();
......@@ -589,7 +503,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
if (source.size() > 0 || target.size() > 0) {
Referenceable processReferenceable = getProcessReferenceable(event,
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
new ArrayList<Referenceable>() {{
addAll(source.values());
}},
......@@ -613,7 +527,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
dataSets.put(tblQFName, inTable);
}
} else if (entity.getType() == Type.DFS_DIR) {
final String pathUri = normalize(new Path(entity.getLocation()).toString());
final String pathUri = lower(new Path(entity.getLocation()).toString());
LOG.info("Registering DFS Path {} ", pathUri);
Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
dataSets.put(pathUri, hdfsPath);
......@@ -657,7 +571,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Refresh to get the correct location
hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
final String location = normalize(hiveTable.getDataLocation().toString());
final String location = lower(hiveTable.getDataLocation().toString());
if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
LOG.info("Registering external table process {} ", event.getQueryStr());
List<Referenceable> inputs = new ArrayList<Referenceable>() {{
......@@ -668,15 +582,33 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
add(tblRef);
}};
Referenceable processReferenceable = getProcessReferenceable(event, inputs, outputs);
Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
}
}
private Referenceable getProcessReferenceable(HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) {
private boolean isCreateOp(HiveEventContext hiveEvent) {
if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
|| HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
|| HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
|| HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation())) {
return true;
}
return false;
}
private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) {
Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
String queryStr = normalize(hiveEvent.getQueryStr());
String queryStr = hiveEvent.getQueryStr();
if (!isCreateOp(hiveEvent)) {
queryStr = normalize(queryStr);
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(queryStr, sourceList, targetList));
} else {
queryStr = lower(queryStr);
processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, queryStr);
}
LOG.debug("Registering query: {}", queryStr);
//The serialization code expected a list
......@@ -686,15 +618,145 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
if (targetList != null || !targetList.isEmpty()) {
processReferenceable.set("outputs", targetList);
}
processReferenceable.set("name", queryStr);
processReferenceable.set(AtlasClient.NAME, queryStr);
processReferenceable.set("operationType", hiveEvent.getOperation().getOperationName());
processReferenceable.set("startTime", new Date(hiveEvent.getQueryStartTime()));
processReferenceable.set("userName", hiveEvent.getUser());
processReferenceable.set("queryText", queryStr);
processReferenceable.set("queryId", hiveEvent.getQueryId());
processReferenceable.set("queryPlan", hiveEvent.getJsonPlan());
processReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, dgiBridge.getClusterName());
List<String> recentQueries = new ArrayList<>(1);
recentQueries.add(hiveEvent.getQueryStr());
processReferenceable.set("recentQueries", recentQueries);
processReferenceable.set("endTime", new Date(System.currentTimeMillis()));
//TODO set queryGraph
return processReferenceable;
}
@VisibleForTesting
static String getProcessQualifiedName(String normalizedQuery, List<Referenceable> inputs, List<Referenceable> outputs) {
StringBuilder buffer = new StringBuilder(normalizedQuery);
addDatasets(buffer, inputs);
addDatasets(buffer, outputs);
return buffer.toString();
}
private static void addDatasets(StringBuilder buffer, List<Referenceable> refs) {
if (refs != null) {
for (Referenceable input : refs) {
//TODO - Change to qualifiedName later
buffer.append(":");
String dataSetQlfdName = (String) input.get(AtlasClient.NAME);
buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
}
}
}
public static class HiveEventContext {
private Set<ReadEntity> inputs;
private Set<WriteEntity> outputs;
private String user;
private UserGroupInformation ugi;
private HiveOperation operation;
private HookContext.HookType hookType;
private JSONObject jsonPlan;
private String queryId;
private String queryStr;
private Long queryStartTime;
private String queryType;
public void setInputs(Set<ReadEntity> inputs) {
this.inputs = inputs;
}
public void setOutputs(Set<WriteEntity> outputs) {
this.outputs = outputs;
}
public void setUser(String user) {
this.user = user;
}
public void setUgi(UserGroupInformation ugi) {
this.ugi = ugi;
}
public void setOperation(HiveOperation operation) {
this.operation = operation;
}
public void setHookType(HookContext.HookType hookType) {
this.hookType = hookType;
}
public void setJsonPlan(JSONObject jsonPlan) {
this.jsonPlan = jsonPlan;
}
public void setQueryId(String queryId) {
this.queryId = queryId;
}
public void setQueryStr(String queryStr) {
this.queryStr = queryStr;
}
public void setQueryStartTime(Long queryStartTime) {
this.queryStartTime = queryStartTime;
}
public void setQueryType(String queryType) {
this.queryType = queryType;
}
public Set<ReadEntity> getInputs() {
return inputs;
}
public Set<WriteEntity> getOutputs() {
return outputs;
}
public String getUser() {
return user;
}
public UserGroupInformation getUgi() {
return ugi;
}
public HiveOperation getOperation() {
return operation;
}
public HookContext.HookType getHookType() {
return hookType;
}
public JSONObject getJsonPlan() {
return jsonPlan;
}
public String getQueryId() {
return queryId;
}
public String getQueryStr() {
return queryStr;
}
public Long getQueryStartTime() {
return queryStartTime;
}
public String getQueryType() {
return queryType;
}
}
}
......@@ -288,6 +288,8 @@ public class HiveDataModelGenerator {
new AttributeDefinition("queryPlan", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition("queryId", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("recentQueries", String.format("array<%s>", DataTypes.STRING_TYPE.getName()), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("queryGraph", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),};
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.rewrite;
import org.apache.hadoop.hive.ql.parse.ASTNode;
public interface ASTRewriter {
void rewrite(RewriteContext ctx, ASTNode node) throws RewriteException;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.rewrite;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HiveASTRewriter {
private Context queryContext;
private RewriteContext rwCtx;
private List<ASTRewriter> rewriters = new ArrayList<>();
private static final Logger LOG = LoggerFactory.getLogger(HiveASTRewriter.class);
public HiveASTRewriter(HiveConf conf) throws RewriteException {
try {
queryContext = new Context(conf);
setUpRewriters();
} catch (IOException e) {
throw new RewriteException("Exception while rewriting query : " , e);
}
}
private void setUpRewriters() throws RewriteException {
ASTRewriter rewriter = new LiteralRewriter();
rewriters.add(rewriter);
}
public String rewrite(String sourceQry) throws RewriteException {
String result = sourceQry;
ASTNode tree = null;
try {
ParseDriver pd = new ParseDriver();
tree = pd.parse(sourceQry, queryContext, true);
tree = ParseUtils.findRootNonNullToken(tree);
this.rwCtx = new RewriteContext(sourceQry, tree, queryContext.getTokenRewriteStream());
rewrite(tree);
result = toSQL();
} catch (ParseException e) {
LOG.error("Could not parse the query {} ", sourceQry, e);
throw new RewriteException("Could not parse query : " , e);
}
return result;
}
private void rewrite(ASTNode origin) throws RewriteException {
ASTNode node = origin;
if (node != null) {
for(ASTRewriter rewriter : rewriters) {
rewriter.rewrite(rwCtx, node);
}
if (node.getChildren() != null) {
for (int i = 0; i < node.getChildren().size(); i++) {
rewrite((ASTNode) node.getChild(i));
}
}
}
}
public String toSQL() {
return rwCtx.getTokenRewriteStream().toString();
}
public String printAST() {
return rwCtx.getOriginNode().dump();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.rewrite;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import java.util.HashMap;
import java.util.Map;
public class LiteralRewriter implements ASTRewriter {
public static Map<Integer, String> LITERAL_TOKENS = new HashMap<Integer, String>() {{
put(HiveParser.Number, "NUMBER_LITERAL");
put(HiveParser.Digit, "DIGIT_LITERAL");
put(HiveParser.HexDigit, "HEX_LITERAL");
put(HiveParser.Exponent, "EXPONENT_LITERAL");
put(HiveParser.StringLiteral, "'STRING_LITERAL'");
put(HiveParser.BigintLiteral, "BIGINT_LITERAL");
put(HiveParser.SmallintLiteral, "SMALLINT_LITERAL");
put(HiveParser.TinyintLiteral, "TINYINT_LITERAL");
put(HiveParser.DecimalLiteral, "DECIMAL_LITERAL");
put(HiveParser.ByteLengthLiteral, "BYTE_LENGTH_LITERAL");
put(HiveParser.TOK_STRINGLITERALSEQUENCE, "'STRING_LITERAL_SEQ'");
put(HiveParser.TOK_CHARSETLITERAL, "'CHARSET_LITERAL'");
put(HiveParser.KW_TRUE, "BOOLEAN_LITERAL");
put(HiveParser.KW_FALSE, "BOOLEAN_LITERAL");
}};
@Override
public void rewrite(RewriteContext ctx, final ASTNode node) throws RewriteException {
try {
processLiterals(ctx, node);
} catch(Exception e) {
throw new RewriteException("Could not normalize query", e);
}
}
private void processLiterals(final RewriteContext ctx, final ASTNode node) {
// Take child ident.totext
if (isLiteral(node)) {
replaceLiteral(ctx, node);
}
}
private boolean isLiteral(ASTNode node) {
if (LITERAL_TOKENS.containsKey(node.getType())) {
return true;
}
return false;
}
void replaceLiteral(RewriteContext ctx, ASTNode valueNode) {
//Reset the token stream
String literalVal = LITERAL_TOKENS.get(valueNode.getType());
ctx.getTokenRewriteStream().replace(valueNode.getTokenStartIndex(),
valueNode.getTokenStopIndex(), literalVal);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.rewrite;
import org.antlr.runtime.TokenRewriteStream;
import org.apache.hadoop.hive.ql.parse.ASTNode;
public class RewriteContext {
private String origQuery;
private TokenRewriteStream rewriteStream;
private ASTNode origin;
RewriteContext(String origQuery, ASTNode origin, TokenRewriteStream rewriteStream) {
this.origin = origin;
this.rewriteStream = rewriteStream;
}
public TokenRewriteStream getTokenRewriteStream() {
return rewriteStream;
}
public ASTNode getOriginNode() {
return origin;
}
public String getOriginalQuery() {
return origQuery;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.rewrite;
import org.apache.hadoop.hive.ql.parse.ParseException;
public class RewriteException extends Exception {
public RewriteException(final String message, final Exception exception) {
super(message, exception);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.hive.bridge;
import org.apache.atlas.hive.hook.HiveHook;
import org.apache.atlas.hive.rewrite.HiveASTRewriter;
import org.apache.atlas.hive.rewrite.RewriteException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class HiveLiteralRewriterTest {
private HiveConf conf;
@BeforeClass
public void setup() {
conf = new HiveConf();
conf.addResource("/hive-site.xml");
SessionState ss = new SessionState(conf, "testuser");
SessionState.start(ss);
conf.set("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager");
}
@Test
public void testLiteralRewrite() throws RewriteException {
HiveHook.HiveEventContext ctx = new HiveHook.HiveEventContext();
ctx.setQueryStr("insert into table testTable partition(dt='2014-01-01') select * from test1 where dt = '2014-01-01'" +
" and intColumn = 10" +
" and decimalColumn = 1.10" +
" and charColumn = 'a'" +
" and hexColumn = unhex('\\0xFF')" +
" and expColumn = cast('-1.5e2' as int)" +
" and boolCol = true");
HiveASTRewriter queryRewriter = new HiveASTRewriter(conf);
String result = queryRewriter.rewrite(ctx.getQueryStr());
System.out.println("normlized sql : " + result);
final String normalizedSQL = "insert into table testTable partition(dt='STRING_LITERAL') " +
"select * from test1 where dt = 'STRING_LITERAL' " +
"and intColumn = NUMBER_LITERAL " +
"and decimalColumn = NUMBER_LITERAL and " +
"charColumn = 'STRING_LITERAL' and " +
"hexColumn = unhex('STRING_LITERAL') and " +
"expColumn = cast('STRING_LITERAL' as int) and " +
"boolCol = BOOLEAN_LITERAL";
Assert.assertEquals(result, normalizedSQL);
}
}
......@@ -28,6 +28,8 @@ import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.hive.rewrite.HiveASTRewriter;
import org.apache.atlas.hive.rewrite.RewriteException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.Struct;
import org.apache.atlas.typesystem.persistence.Id;
......@@ -56,11 +58,13 @@ import org.testng.annotations.Test;
import java.io.File;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.atlas.hive.hook.HiveHook.lower;
import static org.apache.atlas.hive.hook.HiveHook.normalize;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
......@@ -76,6 +80,8 @@ public class HiveHookIT {
private AtlasClient atlasClient;
private HiveMetaStoreBridge hiveMetaStoreBridge;
private SessionState ss;
private HiveConf conf;
private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
......@@ -83,10 +89,7 @@ public class HiveHookIT {
@BeforeClass
public void setUp() throws Exception {
//Set-up hive session
HiveConf conf = new HiveConf();
//Run in local mode
conf.set("mapreduce.framework.name", "local");
conf.set("fs.default.name", "file:///'");
conf = new HiveConf();
conf.setClassLoader(Thread.currentThread().getContextClassLoader());
driver = new Driver(conf);
ss = new SessionState(conf);
......@@ -98,7 +101,6 @@ public class HiveHookIT {
hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, atlasClient);
hiveMetaStoreBridge.registerHiveDataModel();
}
private void runCommand(String cmd) throws Exception {
......@@ -231,36 +233,36 @@ public class HiveHookIT {
@Test
public void testCreateExternalTable() throws Exception {
String tableName = tableName();
String dbName = createDatabase();
String colName = columnName();
String pFile = createTestDFSPath("parentPath");
final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", dbName , tableName , colName + " int", "name string", pFile);
final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile);
runCommand(query);
String tableId = assertTableIsRegistered(dbName, tableName, null, true);
assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
Referenceable processReference = validateProcess(query, 1, 1);
String processId = assertProcessIsRegistered(query);
Referenceable processReference = atlasClient.getEntity(processId);
assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
verifyTimestamps(processReference, "startTime");
verifyTimestamps(processReference, "endTime");
validateHDFSPaths(processReference, pFile, INPUTS);
validateOutputTables(processReference, tableId);
}
private void validateOutputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
validateTables(processReference, OUTPUTS, expectedTableGuids);
private void validateOutputTables(Referenceable processReference, String... expectedTableNames) throws Exception {
validateTables(processReference, OUTPUTS, expectedTableNames);
}
private void validateInputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
validateTables(processReference, INPUTS, expectedTableGuids);
private void validateInputTables(Referenceable processReference, String... expectedTableNames) throws Exception {
validateTables(processReference, INPUTS, expectedTableNames);
}
private void validateTables(Referenceable processReference, String attrName, String... expectedTableGuids) throws Exception {
private void validateTables(Referenceable processReference, String attrName, String... expectedTableNames) throws Exception {
List<Id> tableRef = (List<Id>) processReference.get(attrName);
for(int i = 0; i < expectedTableGuids.length; i++) {
Assert.assertEquals(tableRef.get(i)._getId(), expectedTableGuids[i]);
for(int i = 0; i < expectedTableNames.length; i++) {
Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
Assert.assertEquals(entity.get(AtlasClient.NAME), expectedTableNames[i]);
}
}
......@@ -371,7 +373,7 @@ public class HiveHookIT {
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
runCommand(query);
assertProcessIsRegistered(query);
assertProcessIsRegistered(query, null, getQualifiedTblName(tableName));
}
@Test
......@@ -382,7 +384,7 @@ public class HiveHookIT {
String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query);
validateProcess(query, 0, 1);
validateProcess(query, null, getQualifiedTblName(tableName));
}
@Test
......@@ -392,49 +394,42 @@ public class HiveHookIT {
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String loadFile = createTestDFSFile("loadDFSFile");
final String testPathNormed = lower(new Path(loadFile).toString());
String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
runCommand(query);
Referenceable processReference = validateProcess(query, 1, 1);
final String tblQlfdName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, loadFile, INPUTS);
validateOutputTables(processReference, tableId);
validateOutputTables(processReference, tblQlfdName);
}
private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception {
String processId = assertProcessIsRegistered(query);
Referenceable process = atlasClient.getEntity(processId);
if (numInputs == 0) {
Assert.assertNull(process.get(INPUTS));
} else {
Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), numInputs);
}
private String getQualifiedTblName(String inputTable) {
String inputtblQlfdName = inputTable;
if (numOutputs == 0) {
Assert.assertNull(process.get(OUTPUTS));
} else {
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), numOutputs);
if (inputTable != null && !inputTable.contains(".")) {
inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable);
}
return process;
return inputtblQlfdName;
}
private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception {
String processId = assertProcessIsRegistered(query);
private Referenceable validateProcess(String query, String inputTable, String outputTable) throws Exception {
String processId = assertProcessIsRegistered(query, inputTable, outputTable);
Referenceable process = atlasClient.getEntity(processId);
if (inputs == null) {
if (inputTable == null) {
Assert.assertNull(process.get(INPUTS));
} else {
Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputs.length);
validateInputTables(process, inputs);
Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), 1);
validateInputTables(process, inputTable);
}
if (outputs == null) {
if (outputTable == null) {
Assert.assertNull(process.get(OUTPUTS));
} else {
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputs.length);
validateOutputTables(process, outputs);
Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1 );
validateOutputTables(process, outputTable);
}
return process;
......@@ -452,7 +447,14 @@ public class HiveHookIT {
String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
validateProcess(query, new String[]{inputTableId}, new String[]{opTableId});
Referenceable processRef1 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName));
//Rerun same query. Should result in same process
runCommand(query);
Referenceable processRef2 = validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName));
Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId());
}
@Test
......@@ -463,7 +465,7 @@ public class HiveHookIT {
"insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
runCommand(query);
validateProcess(query, 1, 0);
validateProcess(query, getQualifiedTblName(tableName), null);
assertTableIsRegistered(DEFAULT_DB, tableName);
}
......@@ -471,17 +473,33 @@ public class HiveHookIT {
@Test
public void testInsertIntoDFSDir() throws Exception {
String tableName = createTable();
String pFile = createTestDFSPath("somedfspath");
String pFile1 = createTestDFSPath("somedfspath1");
String testPathNormed = lower(new Path(pFile1).toString());
String query =
"insert overwrite DIRECTORY '" + pFile + "' select id, name from " + tableName;
"insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName;
runCommand(query);
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, pFile, OUTPUTS);
String tblQlfdname = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed);
validateHDFSPaths(processReference, pFile1, OUTPUTS);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateInputTables(processReference, tableId);
validateInputTables(processReference, tblQlfdname);
//Rerun same query with different HDFS path
String pFile2 = createTestDFSPath("somedfspath2");
testPathNormed = lower(new Path(pFile2).toString());
query =
"insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName;
runCommand(query);
tblQlfdname = getQualifiedTblName(tableName);
Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed);
validateHDFSPaths(process2Reference, pFile2, OUTPUTS);
Assert.assertNotEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
}
@Test
......@@ -495,11 +513,10 @@ public class HiveHookIT {
"insert into " + insertTableName + " select id, name from " + tableName;
runCommand(query);
validateProcess(query, 1, 1);
validateProcess(query, getQualifiedTblName(tableName), getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
}
@Test
......@@ -510,11 +527,10 @@ public class HiveHookIT {
"insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
+ " where dt = '2015-01-01'";
runCommand(query);
validateProcess(query, 1, 1);
validateProcess(query, getQualifiedTblName(tableName) , getQualifiedTblName(insertTableName));
String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
validateProcess(query, new String[]{ipTableId}, new String[]{opTableId});
assertTableIsRegistered(DEFAULT_DB, tableName);
assertTableIsRegistered(DEFAULT_DB, insertTableName);
}
private String random() {
......@@ -543,10 +559,12 @@ public class HiveHookIT {
String filename = "pfile://" + mkdir("export");
String query = "export table " + tableName + " to \"" + filename + "\"";
final String testPathNormed = lower(new Path(filename).toString());
runCommand(query);
Referenceable processReference = validateProcess(query, 1, 1);
String tblQlfName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfName, testPathNormed);
validateHDFSPaths(processReference, filename, OUTPUTS);
validateInputTables(processReference, tableId);
validateInputTables(processReference, tblQlfName);
//Import
tableName = createTable(false);
......@@ -554,10 +572,11 @@ public class HiveHookIT {
query = "import table " + tableName + " from '" + filename + "'";
runCommand(query);
processReference = validateProcess(query, 1, 1);
tblQlfName = getQualifiedTblName(tableName);
processReference = validateProcess(query, testPathNormed, tblQlfName);
validateHDFSPaths(processReference, filename, INPUTS);
validateOutputTables(processReference, tableId);
validateOutputTables(processReference, tblQlfName);
}
@Test
......@@ -571,12 +590,14 @@ public class HiveHookIT {
runCommand(query);
String filename = "pfile://" + mkdir("export");
final String testPathNormed = lower(new Path(filename).toString());
query = "export table " + tableName + " to \"" + filename + "\"";
runCommand(query);
Referenceable processReference = validateProcess(query, 1, 1);
String tblQlfdName = getQualifiedTblName(tableName);
Referenceable processReference = validateProcess(query, tblQlfdName, testPathNormed);
validateHDFSPaths(processReference, filename, OUTPUTS);
validateInputTables(processReference, tableId);
validateInputTables(processReference, tblQlfdName);
//Import
tableName = createTable(true);
......@@ -584,10 +605,11 @@ public class HiveHookIT {
query = "import table " + tableName + " from '" + filename + "'";
runCommand(query);
processReference = validateProcess(query, 1, 1);
tblQlfdName = getQualifiedTblName(tableName);
processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, filename, INPUTS);
validateOutputTables(processReference, tableId);
validateOutputTables(processReference, tblQlfdName);
}
@Test
......@@ -750,7 +772,7 @@ public class HiveHookIT {
});
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
//Change name and add comment
oldColName = "name2";
......@@ -847,8 +869,9 @@ public class HiveHookIT {
String query = String.format("truncate table %s", tableName);
runCommand(query);
String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
validateProcess(query, 0, 1);
validateProcess(query, null, getQualifiedTblName(tableName));
//Check lineage
String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
......@@ -916,16 +939,17 @@ public class HiveHookIT {
}
});
Referenceable processReference = validateProcess(query, 1, 1);
validateHDFSPaths(processReference, testPath, INPUTS);
final String tblQlfdName = getQualifiedTblName(tableName);
validateOutputTables(processReference, tableId);
final String testPathNormed = lower(new Path(testPath).toString());
Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName);
validateHDFSPaths(processReference, testPath, INPUTS);
}
private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception {
List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
final String testPathNormed = normalize(new Path(testPath).toString());
final String testPathNormed = lower(new Path(testPath).toString());
String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
......@@ -1083,7 +1107,7 @@ public class HiveHookIT {
//Verify columns are not registered for one of the tables
assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
assertColumnIsNotRegistered(HiveMetaStoreBridge
.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]),
HiveDataModelGenerator.NAME));
......@@ -1316,14 +1340,55 @@ public class HiveHookIT {
}
}
private String assertProcessIsRegistered(String queryStr) throws Exception {
LOG.debug("Searching for process with query {}", queryStr);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr), null);
private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String outputTblName) throws Exception {
HiveASTRewriter astRewriter = new HiveASTRewriter(conf);
String normalizedQuery = normalize(astRewriter.rewrite(queryStr));
List<Referenceable> inputs = null;
if (inputTblName != null) {
Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
put(HiveDataModelGenerator.NAME, inputTblName);
}});
inputs = new ArrayList<Referenceable>();
inputs.add(inputTableRef);
}
List<Referenceable> outputs = null;
if (outputTblName != null) {
Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{
put(HiveDataModelGenerator.NAME, outputTblName);
}});
outputs = new ArrayList<Referenceable>();
outputs.add(outputTableRef);
}
String processQFName = HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs);
LOG.debug("Searching for process with query {}", processQFName);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), queryStr);
}
});
}
private String assertProcessIsRegistered(final String queryStr) throws Exception {
String lowerQryStr = lower(queryStr);
LOG.debug("Searching for process with query {}", lowerQryStr);
return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, lowerQryStr, new AssertPredicate() {
@Override
public void assertOnEntity(final Referenceable entity) throws Exception {
List<String> recentQueries = (List<String>) entity.get("recentQueries");
Assert.assertEquals(recentQueries.get(0), queryStr);
}
});
}
private void assertProcessIsNotRegistered(String queryStr) throws Exception {
LOG.debug("Searching for process with query {}", queryStr);
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr));
assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, normalize(queryStr));
}
private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {
......
......@@ -17,6 +17,16 @@
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>local</value>
</property>
<property>
<name>fs.default.name</name>
<value>file:///</value>
</property>
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
......
......@@ -105,7 +105,9 @@ public class SqoopHook extends SqoopJobDataPublisher {
private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef,
SqoopJobDataPublisher.Data data, String clusterName) {
Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName());
procRef.set(SqoopDataModelGenerator.NAME, getSqoopProcessName(data, clusterName));
final String sqoopProcessName = getSqoopProcessName(data, clusterName);
procRef.set(SqoopDataModelGenerator.NAME, sqoopProcessName);
procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName);
procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation());
procRef.set(SqoopDataModelGenerator.INPUTS, dbStoreRef);
procRef.set(SqoopDataModelGenerator.OUTPUTS, hiveTableRef);
......
......@@ -107,8 +107,8 @@ public class SqoopHookIT {
private String assertSqoopProcessIsRegistered(String processName) throws Exception {
LOG.debug("Searching for sqoop process {}", processName);
String query = String.format(
"%s as t where name = '%s' select t",
SqoopDataTypes.SQOOP_PROCESS.getName(), processName);
"%s as t where %s = '%s' select t",
SqoopDataTypes.SQOOP_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processName);
return assertEntityIsRegistered(query);
}
......
......@@ -110,7 +110,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
Referenceable topologyReferenceable = new Referenceable(
StormDataTypes.STORM_TOPOLOGY.getName());
topologyReferenceable.set("id", topologyInfo.get_id());
topologyReferenceable.set("name", topologyInfo.get_name());
topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name());
topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, topologyInfo.get_name());
String owner = topologyInfo.get_owner();
if (StringUtils.isEmpty(owner)) {
owner = ANONYMOUS_OWNER;
......
......@@ -18,6 +18,7 @@
package org.apache.atlas.storm.model
import org.apache.atlas.AtlasClient
import org.apache.atlas.typesystem.TypesDef
import org.apache.atlas.typesystem.builders.TypesBuilder
import org.apache.atlas.typesystem.json.TypesSerialization
......@@ -42,7 +43,7 @@ object StormDataModel extends App {
* Also, Topology contains the Graph of Nodes
* Topology => Node(s) -> Spouts/Bolts
*/
_class(StormDataTypes.STORM_TOPOLOGY.getName, List("Process")) {
_class(StormDataTypes.STORM_TOPOLOGY.getName, List(AtlasClient.PROCESS_SUPER_TYPE)) {
"id" ~ (string, required, indexed, unique)
"description" ~ (string, optional, indexed)
"owner" ~ (string, required, indexed)
......
......@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
ATLAS-619 Canonicalize hive queries (sumasai)
ATLAS-497 Simple Authorization (saqeeb.s via yhemanth)
ATLAS-661 REST API Authentication (nixonrodrigues via yhemanth)
ATLAS-672 UI: Make dashboard v2 the default UI implementation (bergenholtz via yhemanth)
......
......@@ -182,20 +182,21 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang
DESCRIPTION_ATTRIBUTE);
createType(datasetType);
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.<String>of(), NAME_ATTRIBUTE,
DESCRIPTION_ATTRIBUTE,
new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null));
createType(processType);
HierarchicalTypeDefinition<ClassType> referenceableType = TypesUtil
.createClassTypeDef(AtlasClient.REFERENCEABLE_SUPER_TYPE, ImmutableSet.<String>of(),
TypesUtil.createUniqueRequiredAttrDef(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
DataTypes.STRING_TYPE));
createType(referenceableType);
HierarchicalTypeDefinition<ClassType> processType = TypesUtil
.createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.<String>of(AtlasClient.REFERENCEABLE_SUPER_TYPE),
TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE),
DESCRIPTION_ATTRIBUTE,
new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE),
Multiplicity.OPTIONAL, false, null));
createType(processType);
}
private void createType(HierarchicalTypeDefinition<ClassType> type) throws AtlasException {
......
......@@ -331,7 +331,8 @@ public class BaseHiveRepositoryTest {
String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.NAME, name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
......
......@@ -376,7 +376,8 @@ public class QuickStart {
throws Exception {
Referenceable referenceable = new Referenceable(LOAD_PROCESS_TYPE, traitNames);
// super type attributes
referenceable.set("name", name);
referenceable.set(AtlasClient.NAME, name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set(INPUTS_ATTRIBUTE, inputTables);
referenceable.set(OUTPUTS_ATTRIBUTE, outputTables);
......
......@@ -18,6 +18,8 @@
package org.apache.atlas.examples;
import org.apache.atlas.Atlas;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
......@@ -94,10 +96,10 @@ public class QuickStartIT extends BaseResourceIT {
@Test
public void testProcessIsAdded() throws AtlasServiceException, JSONException {
Referenceable loadProcess = serviceClient.getEntity(QuickStart.LOAD_PROCESS_TYPE, "name",
Referenceable loadProcess = serviceClient.getEntity(QuickStart.LOAD_PROCESS_TYPE, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
QuickStart.LOAD_SALES_DAILY_PROCESS);
assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS, loadProcess.get("name"));
assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS, loadProcess.get(AtlasClient.NAME));
assertEquals(QuickStart.LOAD_SALES_DAILY_PROCESS_DESCRIPTION, loadProcess.get("description"));
List<Id> inputs = (List<Id>)loadProcess.get(QuickStart.INPUTS_ATTRIBUTE);
......@@ -141,12 +143,12 @@ public class QuickStartIT extends BaseResourceIT {
@Test
public void testViewIsAdded() throws AtlasServiceException, JSONException {
Referenceable view = serviceClient.getEntity(QuickStart.VIEW_TYPE, "name", QuickStart.PRODUCT_DIM_VIEW);
Referenceable view = serviceClient.getEntity(QuickStart.VIEW_TYPE, AtlasClient.NAME, QuickStart.PRODUCT_DIM_VIEW);
assertEquals(QuickStart.PRODUCT_DIM_VIEW, view.get("name"));
assertEquals(QuickStart.PRODUCT_DIM_VIEW, view.get(AtlasClient.NAME));
Id productDimId = getTable(QuickStart.PRODUCT_DIM_TABLE).getId();
Id inputTableId = ((List<Id>)view.get(QuickStart.INPUT_TABLES_ATTRIBUTE)).get(0);
Id inputTableId = ((List<Id>) view.get(QuickStart.INPUT_TABLES_ATTRIBUTE)).get(0);
assertEquals(productDimId, inputTableId);
}
}
......@@ -184,7 +184,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
table("sales_fact_daily_mv" + randomString(), "sales fact daily materialized view", reportingDB,
"Joe BI", "MANAGED", salesFactColumns, "Metric");
loadProcess("loadSalesDaily" + randomString(), "John ETL", ImmutableList.of(salesFact, timeDim),
String procName = "loadSalesDaily" + randomString();
loadProcess(procName, "John ETL", ImmutableList.of(salesFact, timeDim),
ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
salesMonthlyTable = "sales_fact_monthly_mv" + randomString();
......@@ -237,7 +238,8 @@ public class HiveLineageJerseyResourceIT extends BaseResourceIT {
Id loadProcess(String name, String user, List<Id> inputTables, List<Id> outputTables, String queryText,
String queryPlan, String queryId, String queryGraph, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.NAME, name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("user", user);
referenceable.set("startTime", System.currentTimeMillis());
referenceable.set("endTime", System.currentTimeMillis() + 10000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment