Commit 16107915 by Suma Shivaprasad

ATLAS-537 Falcon hook failing when tried to submit a process which creates a…

ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table. ( shwethgs via sumasai)
parent 0defc6e8
......@@ -235,8 +235,10 @@ public class FalconHook extends FalconEventPublisher {
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed());
if (clusterInputs != null) {
entities.addAll(clusterInputs);
inputs.add(clusterInputs.get(clusterInputs.size() -1 ));
inputs.add(clusterInputs.get(clusterInputs.size() - 1));
}
}
}
......@@ -244,8 +246,10 @@ public class FalconHook extends FalconEventPublisher {
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed());
if (clusterOutputs != null) {
entities.addAll(clusterOutputs);
outputs.add(clusterOutputs.get(clusterOutputs.size() -1 ));
outputs.add(clusterOutputs.get(clusterOutputs.size() - 1));
}
}
}
......
......@@ -43,12 +43,15 @@ import javax.xml.bind.JAXBException;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
public class FalconHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
public static final String CLUSTER_RESOURCE = "/cluster.xml";
public static final String FEED_RESOURCE = "/feed.xml";
public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
private AtlasClient dgiCLient;
......@@ -96,21 +99,13 @@ public class FalconHookIT {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
Feed infeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedin" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
feedCluster.setName(cluster.getName());
String inTableName = "table" + random();
String inDbName = "db" + random();
feedCluster.getTable().setUri(getTableUri(inDbName, inTableName));
STORE.publish(EntityType.FEED, infeed);
Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String inTableName = getTableName(infeed);
String inDbName = getDBName(infeed);
Feed outfeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedout" + random());
feedCluster = outfeed.getClusters().getClusters().get(0);
feedCluster.setName(cluster.getName());
String outTableName = "table" + random();
String outDbName = "db" + random();
feedCluster.getTable().setUri(getTableUri(outDbName, outTableName));
STORE.publish(EntityType.FEED, outfeed);
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outTableName = getTableName(outfeed);
String outDbName = getDBName(outfeed);
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
......@@ -120,6 +115,7 @@ public class FalconHookIT {
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = dgiCLient.getEntity(pid);
assertNotNull(processEntity);
assertEquals(processEntity.get("processName"), process.getName());
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
......@@ -133,7 +129,60 @@ public class FalconHookIT {
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
// @Test (enabled = true, dependsOnMethods = "testCreateProcess")
private Feed getTableFeed(String feedResource, String clusterName) throws Exception {
Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
feedCluster.setName(clusterName);
feedCluster.getTable().setUri(getTableUri("db" + random(), "table" + random()));
STORE.publish(EntityType.FEED, feed);
return feed;
}
private String getDBName(Feed feed) {
String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
String[] parts = uri.split(":");
return parts[1];
}
private String getTableName(Feed feed) {
String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
String[] parts = uri.split(":");
parts = parts[2].split("#");
return parts[0];
}
@Test (enabled = true)
public void testCreateProcessWithHDFSFeed() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
Feed infeed = loadEntity(EntityType.FEED, FEED_HDFS_RESOURCE, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
feedCluster.setName(cluster.getName());
STORE.publish(EntityType.FEED, infeed);
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outTableName = getTableName(outfeed);
String outDbName = getDBName(outfeed);
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
process.getInputs().getInputs().get(0).setFeed(infeed.getName());
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = dgiCLient.getEntity(pid);
assertEquals(processEntity.get("processName"), process.getName());
assertNull(processEntity.get("inputs"));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
Referenceable outEntity = dgiCLient.getEntity(outId._getId());
assertEquals(outEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
// @Test (enabled = true, dependsOnMethods = "testCreateProcess")
// public void testUpdateProcess() throws Exception {
// FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT);
// FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
......@@ -156,7 +205,7 @@ public class FalconHookIT {
}
private String assertEntityIsRegistered(final String query) throws Exception {
waitFor(20000, new Predicate() {
waitFor(2000000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = dgiCLient.search(query);
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1">
<groups>online,bi</groups>
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>
<late-arrival cut-off="hours(3)"/>
<clusters>
<cluster name="testcluster" type="source">
<validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
<retention limit="hours(24)" action="delete"/>
</cluster>
</clusters>
<locations>
<location type="data" path="/tmp/input/${YEAR}-${MONTH}-${DAY}-${HOUR}"/>
</locations>
<ACL owner="testuser" group="group" permission="0x755"/>
<schema location="hcat" provider="hcat"/>
</feed>
......@@ -11,6 +11,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-537 Falcon hook failing when tried to submit a process which creates a hive table ( shwethags via sumasai)
ATLAS-476 Update type attribute with Reserved characters updated the original type as unknown (yhemanth via shwethags)
ATLAS-463 Disconnect inverse references ( dkantor via sumasai)
ATLAS-479 Add description for different types during create time (guptaneeru via shwethags)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment