Commit e30ab3d8 by Shwetha GS

ATLAS-835 Falcon Integration with Atlas (sowmyaramesh via shwethags)

parent 436a5245
......@@ -16,14 +16,12 @@
* limitations under the License.
*/
package org.apache.falcon.atlas.Util;
package org.apache.atlas.falcon.Util;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
......@@ -33,7 +31,6 @@ import java.util.Map;
* Falcon event util
*/
public final class EventUtil {
private static final Logger LOG = LoggerFactory.getLogger(EventUtil.class);
private EventUtil() {}
......@@ -55,7 +52,6 @@ public final class EventUtil {
return keyValueMap;
}
public static UserGroupInformation getUgi() throws FalconException {
UserGroupInformation ugi;
try {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.falcon.bridge;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang3.StringUtils;
import org.apache.atlas.falcon.Util.EventUtil;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.FileSystemStorage;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A Bridge Utility to register Falcon entities metadata to Atlas.
*/
public class FalconBridge {
private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class);
/**
* Creates cluster entity
*
* @param cluster ClusterEntity
* @return cluster instance reference
*/
public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster,
final String user,
final Date timestamp) throws Exception {
LOG.info("Creating cluster Entity : {}", cluster.getName());
Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
clusterRef.set(FalconDataModelGenerator.NAME, cluster.getName());
clusterRef.set("description", cluster.getDescription());
clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName());
clusterRef.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
clusterRef.set(FalconDataModelGenerator.COLO, cluster.getColo());
clusterRef.set(FalconDataModelGenerator.USER, user);
if (StringUtils.isNotEmpty(cluster.getTags())) {
clusterRef.set(FalconDataModelGenerator.TAGS,
EventUtil.convertKeyValueStringToMap(cluster.getTags()));
}
return clusterRef;
}
private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable,
String user, Date timestamp) throws Exception {
LOG.info("Creating feed dataset: {}", feed.getName());
Referenceable datasetReferenceable = new Referenceable(FalconDataTypes.FALCON_FEED.getName());
datasetReferenceable.set(FalconDataModelGenerator.NAME, feed.getName());
String feedQualifiedName =
getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(FalconDataModelGenerator.NAME));
datasetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName);
datasetReferenceable.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
datasetReferenceable.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable);
datasetReferenceable.set(FalconDataModelGenerator.USER, user);
if (StringUtils.isNotEmpty(feed.getTags())) {
datasetReferenceable.set(FalconDataModelGenerator.TAGS,
EventUtil.convertKeyValueStringToMap(feed.getTags()));
}
if (feed.getGroups() != null) {
datasetReferenceable.set(FalconDataModelGenerator.GROUPS, feed.getGroups());
}
return datasetReferenceable;
}
public static List<Referenceable> createFeedCreationEntity(Feed feed, ConfigurationStore falconStore, String user,
Date timestamp) throws Exception {
LOG.info("Creating feed : {}", feed.getName());
List<Referenceable> entities = new ArrayList<>();
if (feed.getClusters() != null) {
List<Referenceable> replicationInputs = new ArrayList<>();
List<Referenceable> replicationOutputs = new ArrayList<>();
for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER,
feedCluster.getName());
// set cluster
Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo());
entities.add(clusterReferenceable);
// input as hive_table or hdfs_path, output as falcon_feed dataset
List<Referenceable> inputs = new ArrayList<>();
List<Referenceable> inputReferenceables = getInputEntities(cluster, feed);
if (inputReferenceables != null) {
entities.addAll(inputReferenceables);
inputs.add(inputReferenceables.get(inputReferenceables.size() - 1));
}
List<Referenceable> outputs = new ArrayList<>();
Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable, user, timestamp);
if (feedEntity != null) {
entities.add(feedEntity);
outputs.add(feedEntity);
}
if (!inputs.isEmpty() || !outputs.isEmpty()) {
Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName());
String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName());
feedCreateEntity.set(FalconDataModelGenerator.NAME, feed.getName());
feedCreateEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName);
feedCreateEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
if (!inputs.isEmpty()) {
feedCreateEntity.set(FalconDataModelGenerator.INPUTS, inputs);
}
if (!outputs.isEmpty()) {
feedCreateEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
}
feedCreateEntity.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable);
feedCreateEntity.set(FalconDataModelGenerator.USER, user);
entities.add(feedCreateEntity);
}
if (ClusterType.SOURCE == feedCluster.getType()) {
replicationInputs.add(feedEntity);
} else if (ClusterType.TARGET == feedCluster.getType()) {
replicationOutputs.add(feedEntity);
}
}
if (!replicationInputs.isEmpty() && !replicationInputs.isEmpty()) {
Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes
.FALCON_FEED_REPLICATION.getName());
feedReplicationEntity.set(FalconDataModelGenerator.NAME, feed.getName());
feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName());
feedReplicationEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
feedReplicationEntity.set(FalconDataModelGenerator.INPUTS, replicationInputs);
feedReplicationEntity.set(FalconDataModelGenerator.OUTPUTS, replicationOutputs);
feedReplicationEntity.set(FalconDataModelGenerator.USER, user);
entities.add(feedReplicationEntity);
}
}
return entities;
}
/**
* + * Creates process entity
* + *
* + * @param process process entity
* + * @param falconStore config store
* + * @param user falcon user
* + * @param timestamp timestamp of entity
* + * @return process instance reference
* +
*/
public static List<Referenceable> createProcessEntity(org.apache.falcon.entity.v0.process.Process process,
ConfigurationStore falconStore, String user,
Date timestamp) throws Exception {
LOG.info("Creating process Entity : {}", process.getName());
// The requirement is for each cluster, create a process entity with name
// clustername.processname
List<Referenceable> entities = new ArrayList<>();
if (process.getClusters() != null) {
for (Cluster processCluster : process.getClusters().getClusters()) {
org.apache.falcon.entity.v0.cluster.Cluster cluster =
falconStore.get(EntityType.CLUSTER, processCluster.getName());
Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo());
entities.add(clusterReferenceable);
List<Referenceable> inputs = new ArrayList<>();
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
Referenceable inputReferenceable = getFeedDataSetReference(getFeedQualifiedName(input.getFeed(),
cluster.getName()), clusterReferenceable);
entities.add(inputReferenceable);
inputs.add(inputReferenceable);
}
}
List<Referenceable> outputs = new ArrayList<>();
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
Referenceable outputReferenceable = getFeedDataSetReference(getFeedQualifiedName(output.getFeed(),
cluster.getName()), clusterReferenceable);
entities.add(outputReferenceable);
outputs.add(outputReferenceable);
}
}
if (!inputs.isEmpty() || !outputs.isEmpty()) {
Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS.getName());
processEntity.set(FalconDataModelGenerator.NAME, process.getName());
processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getProcessQualifiedName(process.getName(), cluster.getName()));
processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
if (!inputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
}
if (!outputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
}
// set cluster
processEntity.set(FalconDataModelGenerator.RUNSON, clusterReferenceable);
// Set user
processEntity.set(FalconDataModelGenerator.USER, user);
if (StringUtils.isNotEmpty(process.getTags())) {
processEntity.set(FalconDataModelGenerator.TAGS,
EventUtil.convertKeyValueStringToMap(process.getTags()));
}
if (process.getPipelines() != null) {
processEntity.set(FalconDataModelGenerator.PIPELINES, process.getPipelines());
}
processEntity.set(FalconDataModelGenerator.WFPROPERTIES,
getProcessEntityWFProperties(process.getWorkflow(),
process.getName()));
entities.add(processEntity);
}
}
}
return entities;
}
private static List<Referenceable> getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster,
Feed feed) throws Exception {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
final CatalogTable table = getTable(feedCluster, feed);
if (table != null) {
CatalogStorage storage = new CatalogStorage(cluster, table);
return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(),
storage.getTable().toLowerCase());
} else {
List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
final String pathUri = normalize(dataLocation.getPath());
LOG.info("Registering DFS Path {} ", pathUri);
return fillHDFSDataSet(pathUri, cluster.getName());
}
}
private static CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
// check if table is overridden in cluster
if (cluster.getTable() != null) {
return cluster.getTable();
}
return feed.getTable();
}
private static List<Referenceable> fillHDFSDataSet(final String pathUri, final String clusterName) {
List<Referenceable> entities = new ArrayList<>();
Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString());
ref.set("path", pathUri);
// Path path = new Path(pathUri);
// ref.set("name", path.getName());
//TODO - Fix after ATLAS-542 to shorter Name
ref.set(FalconDataModelGenerator.NAME, pathUri);
ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
entities.add(ref);
return entities;
}
private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
return dbRef;
}
private static List<Referenceable> createHiveTableInstance(String clusterName, String dbName,
String tableName) throws Exception {
List<Referenceable> entities = new ArrayList<>();
Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
entities.add(dbRef);
Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(HiveDataModelGenerator.NAME, tableName.toLowerCase());
tableRef.set(HiveDataModelGenerator.DB, dbRef);
entities.add(tableRef);
return entities;
}
private static Referenceable getClusterEntityReference(final String clusterName,
final String colo) {
LOG.info("Getting reference for entity {}", clusterName);
Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName());
clusterRef.set(FalconDataModelGenerator.NAME, String.format("%s", clusterName));
clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName);
clusterRef.set(FalconDataModelGenerator.COLO, colo);
return clusterRef;
}
private static Referenceable getFeedDataSetReference(final String feedDatasetName,
Referenceable clusterReference) {
LOG.info("Getting reference for entity {}", feedDatasetName);
Referenceable feedDatasetRef = new Referenceable(FalconDataTypes.FALCON_FEED.getName());
feedDatasetRef.set(FalconDataModelGenerator.NAME, feedDatasetName);
feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedDatasetName);
feedDatasetRef.set(FalconDataModelGenerator.STOREDIN, clusterReference);
return feedDatasetRef;
}
private static Map<String, String> getProcessEntityWFProperties(final Workflow workflow,
final String processName) {
Map<String, String> wfProperties = new HashMap<>();
wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(),
workflow.getVersion());
wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
workflow.getEngine().value());
return wfProperties;
}
public static String getFeedQualifiedName(final String feedName, final String clusterName) {
return String.format("%s@%s", feedName, clusterName);
}
public static String getProcessQualifiedName(final String processName, final String clusterName) {
return String.format("%s@%s", processName, clusterName);
}
public static String normalize(final String str) {
if (StringUtils.isBlank(str)) {
return null;
}
return str.toLowerCase().trim();
}
}
......@@ -16,11 +16,13 @@
* limitations under the License.
*/
package org.apache.falcon.atlas.event;
package org.apache.atlas.falcon.event;
import org.apache.falcon.entity.v0.Entity;
import org.apache.hadoop.security.UserGroupInformation;
import java.util.Date;
/**
* Falcon event to interface with Atlas Service.
*/
......@@ -40,7 +42,12 @@ public class FalconEvent {
}
public enum OPERATION {
ADD_PROCESS, UPDATE_PROCESS
ADD_CLUSTER,
UPDATE_CLUSTER,
ADD_FEED,
UPDATE_FEED,
ADD_PROCESS,
UPDATE_PROCESS,
}
public String getUser() {
......@@ -55,8 +62,8 @@ public class FalconEvent {
return operation;
}
public long getTimestamp() {
return timestamp;
public Date getTimestamp() {
return new Date(timestamp);
}
public Entity getEntity() {
......
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -21,32 +21,17 @@ package org.apache.atlas.falcon.hook;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataModelGenerator;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.NotificationModule;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.atlas.Util.EventUtil;
import org.apache.falcon.atlas.event.FalconEvent;
import org.apache.falcon.atlas.publisher.FalconEventPublisher;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.FeedHelper;
import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.publisher.FalconEventPublisher;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -86,6 +71,11 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
private static ConfigurationStore STORE;
private enum Operation {
ADD,
UPDATE
}
static {
try {
// initialize the async facility to process hook calls. We don't
......@@ -115,12 +105,14 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
});
STORE = ConfigurationStore.get();
Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class);
} catch (Exception e) {
LOG.info("Caught exception initializing the falcon hook.", e);
LOG.error("Caught exception initializing the falcon hook.", e);
}
Injector injector = Guice.createInjector(new NotificationModule());
notifInterface = injector.getInstance(NotificationInterface.class);
LOG.info("Created Atlas Hook for Falcon");
}
......@@ -128,166 +120,92 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher {
@Override
public void publish(final Data data) throws Exception {
final FalconEvent event = data.getEvent();
if (sync) {
fireAndForget(event);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
try {
fireAndForget(event);
} catch (Throwable e) {
LOG.info("Atlas hook failed", e);
try {
if (sync) {
fireAndForget(event);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
try {
fireAndForget(event);
} catch (Throwable e) {
LOG.info("Atlas hook failed", e);
}
}
}
});
});
}
} catch (Throwable t) {
LOG.warn("Error in processing data {}", data);
}
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
private void fireAndForget(FalconEvent event) throws Exception {
LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation());
List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
notifyEntities(getAuthenticatedUser(), createEntities(event));
}
Operation op = getOperation(event.getOperation());
String user = getUser(event.getUser());
LOG.info("fireAndForget user:{}, ugi: {}", user, event.getUgi());
switch (op) {
case ADD:
messages.add(new HookNotification.EntityCreateRequest(user, createEntities(event, user)));
break;
private String getAuthenticatedUser() {
String user = null;
try {
user = CurrentUser.getAuthenticatedUser();
} catch (IllegalArgumentException e) {
LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser");
}
return getUser(user, null);
notifyEntities(messages);
}
private List<Referenceable> createEntities(FalconEvent event) throws Exception {
switch (event.getOperation()) {
case ADD_PROCESS:
return createProcessInstance((Process) event.getEntity(), event.getUser(), event.getTimestamp());
}
return null;
}
/**
+ * Creates process entity
+ *
+ * @param event process entity event
+ * @return process instance reference
+ */
public List<Referenceable> createProcessInstance(Process process, String user, long timestamp) throws Exception {
LOG.info("Creating process Instance : {}", process.getName());
// The requirement is for each cluster, create a process entity with name
// clustername.processname
private List<Referenceable> createEntities(FalconEvent event, String user) throws Exception {
List<Referenceable> entities = new ArrayList<>();
if (process.getClusters() != null) {
for (Cluster processCluster : process.getClusters().getClusters()) {
org.apache.falcon.entity.v0.cluster.Cluster cluster = STORE.get(EntityType.CLUSTER, processCluster.getName());
List<Referenceable> inputs = new ArrayList<>();
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
List<Referenceable> clusterInputs = getInputOutputEntity(cluster, input.getFeed());
if (clusterInputs != null) {
entities.addAll(clusterInputs);
inputs.add(clusterInputs.get(clusterInputs.size() - 1));
}
}
}
List<Referenceable> outputs = new ArrayList<>();
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
List<Referenceable> clusterOutputs = getInputOutputEntity(cluster, output.getFeed());
if (clusterOutputs != null) {
entities.addAll(clusterOutputs);
outputs.add(clusterOutputs.get(clusterOutputs.size() - 1));
}
}
}
if (!inputs.isEmpty() || !outputs.isEmpty()) {
Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
processEntity.set(FalconDataModelGenerator.NAME, String.format("%s", process.getName(),
cluster.getName()));
processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, String.format("%s@%s", process.getName(),
cluster.getName()));
processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp);
if (!inputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.INPUTS, inputs);
}
if (!outputs.isEmpty()) {
processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs);
}
processEntity.set(FalconDataModelGenerator.USER, user);
if (StringUtils.isNotEmpty(process.getTags())) {
processEntity.set(FalconDataModelGenerator.TAGS,
EventUtil.convertKeyValueStringToMap(process.getTags()));
}
entities.add(processEntity);
}
}
switch (event.getOperation()) {
case ADD_CLUSTER:
entities.add(FalconBridge
.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity(), user,
event.getTimestamp()));
break;
case ADD_PROCESS:
entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE,
user, event.getTimestamp()));
break;
case ADD_FEED:
entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE,
user, event.getTimestamp()));
break;
case UPDATE_CLUSTER:
case UPDATE_FEED:
case UPDATE_PROCESS:
default:
LOG.info("Falcon operation {} is not valid or supported", event.getOperation());
}
return entities;
}
private List<Referenceable> getInputOutputEntity(org.apache.falcon.entity.v0.cluster.Cluster cluster, String feedName) throws Exception {
Feed feed = STORE.get(EntityType.FEED, feedName);
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
private static Operation getOperation(final FalconEvent.OPERATION op) throws Exception {
switch (op) {
case ADD_CLUSTER:
case ADD_FEED:
case ADD_PROCESS:
return Operation.ADD;
final CatalogTable table = getTable(feedCluster, feed);
if (table != null) {
CatalogStorage storage = new CatalogStorage(cluster, table);
return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(),
storage.getTable().toLowerCase());
}
return null;
}
case UPDATE_CLUSTER:
case UPDATE_FEED:
case UPDATE_PROCESS:
return Operation.UPDATE;
private CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed) {
// check if table is overridden in cluster
if (cluster.getTable() != null) {
return cluster.getTable();
default:
throw new Exception("Falcon operation " + op + " is not valid or supported");
}
return feed.getTable();
}
private Referenceable createHiveDatabaseInstance(String clusterName, String dbName)
throws Exception {
Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName());
dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
dbRef.set(HiveDataModelGenerator.NAME, dbName);
dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
return dbRef;
}
private List<Referenceable> createHiveTableInstance(String clusterName, String dbName, String tableName) throws Exception {
List<Referenceable> entities = new ArrayList<>();
Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName);
entities.add(dbRef);
Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
tableRef.set(HiveDataModelGenerator.NAME,
tableName);
tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
tableRef.set(HiveDataModelGenerator.DB, dbRef);
entities.add(tableRef);
return entities;
}
@Override
protected String getNumberOfRetriesPropertyKey() {
return HOOK_NUM_RETRIES;
}
}
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -20,7 +20,6 @@ package org.apache.atlas.falcon.model;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.addons.ModelDefinitionDump;
......@@ -53,48 +52,46 @@ public class FalconDataModelGenerator {
private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class);
private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions;
private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap;
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
public static final String NAME = "name";
public static final String TIMESTAMP = "timestamp";
public static final String USER = "owned-by";
public static final String TAGS = "tag-classification";
public static final String COLO = "colo";
public static final String USER = "owner";
public static final String TAGS = "tags";
public static final String GROUPS = "groups";
public static final String PIPELINES = "pipelines";
public static final String WFPROPERTIES = "workflow-properties";
public static final String RUNSON = "runs-on";
public static final String STOREDIN = "stored-in";
// multiple inputs and outputs for process
public static final String INPUTS = "inputs";
public static final String OUTPUTS = "outputs";
public FalconDataModelGenerator() {
classTypeDefinitions = new HashMap<>();
enumTypeDefinitionMap = new HashMap<>();
structTypeDefinitionMap = new HashMap<>();
}
public void createDataModel() throws AtlasException {
LOG.info("Generating the Falcon Data Model");
createProcessEntityClass();
// classes
createClusterEntityClass();
createProcessEntityClass();
createFeedEntityClass();
createFeedDatasetClass();
createReplicationFeedEntityClass();
}
private TypesDef getTypesDef() {
return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(),
getClassTypeDefinitions());
return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
getTraitTypeDefinitions(), getClassTypeDefinitions());
}
public String getDataModelAsJSON() {
return TypesSerialization.toJson(getTypesDef());
}
private ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() {
return ImmutableList.copyOf(enumTypeDefinitionMap.values());
}
private ImmutableList<StructTypeDefinition> getStructTypeDefinitions() {
return ImmutableList.copyOf(structTypeDefinitionMap.values());
}
private ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() {
return ImmutableList.copyOf(classTypeDefinitions.values());
}
......@@ -103,24 +100,103 @@ public class FalconDataModelGenerator {
return ImmutableList.of();
}
private void createClusterEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(COLO, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
// map of tags
new AttributeDefinition(TAGS,
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),};
private void createProcessEntityClass() throws AtlasException {
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_CLUSTER.getName(), null,
ImmutableSet.of(AtlasClient.INFRASTRUCTURE_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(FalconDataTypes.FALCON_CLUSTER.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_CLUSTER.getName());
}
private void createFeedEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false,
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
false, null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null)};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_CREATION.getName(), null,
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_CREATION.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_CREATION.getName());
}
private void createFeedDatasetClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
false, null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition(GROUPS, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
// map of tags
new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
new AttributeDefinition(TAGS,
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), null,
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED.getName(), null,
ImmutableSet.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(FalconDataTypes.FALCON_FEED.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED.getName());
}
private void createReplicationFeedEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null)};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class,
FalconDataTypes.FALCON_FEED_REPLICATION.getName(), null,
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_REPLICATION.getName());
}
private void createProcessEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(RUNSON, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED,
false, null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
// map of tags
new AttributeDefinition(TAGS,
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(PIPELINES, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
// wf properties
new AttributeDefinition(WFPROPERTIES,
DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS.getName(), null,
ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS.getName());
}
public String getModelAsJson() throws AtlasException {
......@@ -145,11 +221,13 @@ public class FalconDataModelGenerator {
Arrays.toString(enumType.enumValues)));
}
for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(),
Arrays.toString(structType.attributeDefinitions)));
System.out.println(
String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(),
Arrays.toString(structType.attributeDefinitions)));
}
for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, ClassType.class.getSimpleName(),
System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName,
ClassType.class.getSimpleName(),
StringUtils.join(classType.superTypes, ","), Arrays.toString(classType.attributeDefinitions)));
}
for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) {
......
......@@ -22,19 +22,15 @@ package org.apache.atlas.falcon.model;
* Falcon Data Types for model and bridge.
*/
public enum FalconDataTypes {
FALCON_PROCESS_ENTITY("falcon_process"),
;
private final String name;
FalconDataTypes(java.lang.String name) {
this.name = name;
}
// Classes
FALCON_CLUSTER,
FALCON_FEED_CREATION,
FALCON_FEED,
FALCON_FEED_REPLICATION,
FALCON_PROCESS;
public String getName() {
return name;
return name().toLowerCase();
}
}
......@@ -16,10 +16,10 @@
* limitations under the License.
*/
package org.apache.falcon.atlas.publisher;
package org.apache.atlas.falcon.publisher;
import org.apache.falcon.atlas.event.FalconEvent;
import org.apache.atlas.falcon.event.FalconEvent;
/**
* Falcon publisher for Atlas
......
......@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -16,17 +16,16 @@
* limitations under the License.
*/
package org.apache.falcon.atlas.service;
package org.apache.atlas.falcon.service;
import org.apache.atlas.falcon.Util.EventUtil;
import org.apache.atlas.falcon.event.FalconEvent;
import org.apache.atlas.falcon.hook.FalconHook;
import org.apache.atlas.falcon.publisher.FalconEventPublisher;
import org.apache.falcon.FalconException;
import org.apache.falcon.atlas.Util.EventUtil;
import org.apache.falcon.atlas.event.FalconEvent;
import org.apache.falcon.atlas.publisher.FalconEventPublisher;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.hadoop.security.UserGroupInformation;
......@@ -57,7 +56,6 @@ public class AtlasService implements FalconService, ConfigurationChangeListener
publisher = new FalconHook();
}
@Override
public void destroy() throws FalconException {
ConfigurationStore.get().unregisterListener(this);
......@@ -65,14 +63,26 @@ public class AtlasService implements FalconService, ConfigurationChangeListener
@Override
public void onAdd(Entity entity) throws FalconException {
EntityType entityType = entity.getEntityType();
switch (entityType) {
try {
EntityType entityType = entity.getEntityType();
switch (entityType) {
case CLUSTER:
addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER);
break;
case PROCESS:
addProcessEntity((Process) entity, FalconEvent.OPERATION.ADD_PROCESS);
addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS);
break;
case FEED:
addEntity(entity, FalconEvent.OPERATION.ADD_FEED);
break;
default:
LOG.debug("Entity type not processed " + entityType);
LOG.debug("Entity type not processed {}", entityType);
}
} catch(Throwable t) {
LOG.warn("Error handling entity {}", entity, t);
}
}
......@@ -82,15 +92,29 @@ public class AtlasService implements FalconService, ConfigurationChangeListener
@Override
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
/**
* Skipping update for now - update uses full update currently and this might result in all attributes wiped for hive entities
EntityType entityType = newEntity.getEntityType();
switch (entityType) {
case PROCESS:
addProcessEntity((Process) newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
break;
default:
LOG.debug("Entity type not processed " + entityType);
case CLUSTER:
addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER);
break;
case PROCESS:
addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
break;
case FEED:
FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ?
FalconEvent.OPERATION.UPDATE_REPLICATION_FEED :
FalconEvent.OPERATION.UPDATE_FEED;
addEntity(newEntity, operation);
break;
default:
LOG.debug("Entity type not processed {}", entityType);
}
**/
}
@Override
......@@ -99,17 +123,19 @@ public class AtlasService implements FalconService, ConfigurationChangeListener
onAdd(entity);
}
private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) throws FalconException {
LOG.info("Adding process entity to Atlas: {}", entity.getName());
private void addEntity(Entity entity, FalconEvent.OPERATION operation) throws FalconException {
LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName());
try {
String user = entity.getACL() != null ? entity.getACL().getOwner() :
UserGroupInformation.getLoginUser().getShortUserName();
FalconEvent event = new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity);
FalconEvent event =
new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity);
FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
publisher.publish(data);
} catch (Exception ex) {
throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex);
}
}
}
......@@ -22,24 +22,31 @@ import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.model.FalconDataModelGenerator;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.fs.model.FSDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.falcon.atlas.service.AtlasService;
import org.apache.atlas.falcon.service.AtlasService;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.FileSystemStorage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.hive.conf.HiveConf;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
......@@ -49,7 +56,7 @@ import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
public class FalconHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
......@@ -57,6 +64,7 @@ public class FalconHookIT {
public static final String CLUSTER_RESOURCE = "/cluster.xml";
public static final String FEED_RESOURCE = "/feed.xml";
public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String FEED_REPLICATION_RESOURCE = "/feed-replication.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
private AtlasClient atlasClient;
......@@ -91,7 +99,7 @@ public class FalconHookIT {
private boolean isDataModelAlreadyRegistered() throws Exception {
try {
atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
atlasClient.getType(FalconDataTypes.FALCON_PROCESS.getName());
LOG.info("Hive data model is already registered!");
return true;
} catch(AtlasServiceException ase) {
......@@ -128,18 +136,19 @@ public class FalconHookIT {
return String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, tableName);
}
@Test (enabled = true)
@Test
public void testCreateProcess() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
assertClusterIsRegistered(cluster);
Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String inTableName = getTableName(infeed);
String inDbName = getDBName(infeed);
Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null);
String infeedId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName())).getId()._getId();
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outTableName = getTableName(outfeed);
String outDbName = getDBName(outfeed);
String outFeedId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName())).getId()._getId();
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
......@@ -147,57 +156,140 @@ public class FalconHookIT {
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
String pid = assertProcessIsRegistered(process, cluster.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
assertNotNull(processEntity);
assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
assertEquals(((List<Id>)processEntity.get("inputs")).get(0)._getId(), infeedId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0)._getId(), outFeedId);
}
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
Referenceable inEntity = atlasClient.getEntity(inId._getId());
assertEquals(inEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName));
private String assertProcessIsRegistered(Process process, String clusterName) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getProcessQualifiedName(process.getName(), clusterName));
}
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
Referenceable outEntity = atlasClient.getEntity(outId._getId());
assertEquals(outEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
private String assertClusterIsRegistered(Cluster cluster) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName());
}
private TypeUtils.Pair<String, Feed> getHDFSFeed(String feedResource, String clusterName) throws Exception {
Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
feedCluster.setName(clusterName);
STORE.publish(EntityType.FEED, feed);
String feedId = assertFeedIsRegistered(feed, clusterName);
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
Referenceable processEntity = atlasClient.getEntity(processId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0).getId()._getId(), feedId);
String inputId = ((List<Id>) processEntity.get("inputs")).get(0).getId()._getId();
Referenceable pathEntity = atlasClient.getEntity(inputId);
assertEquals(pathEntity.getTypeName(), FSDataTypes.HDFS_PATH().toString());
List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
assertEquals(pathEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
FalconBridge.normalize(dataLocation.getPath()));
return TypeUtils.Pair.of(feedId, feed);
}
private Feed getTableFeed(String feedResource, String clusterName) throws Exception {
return getTableFeed(feedResource, clusterName, null);
}
private Feed getTableFeed(String feedResource, String clusterName, String secondClusterName) throws Exception {
Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
feedCluster.setName(clusterName);
feedCluster.getTable().setUri(getTableUri("db" + random(), "table" + random()));
String dbName = "db" + random();
String tableName = "table" + random();
feedCluster.getTable().setUri(getTableUri(dbName, tableName));
String dbName2 = "db" + random();
String tableName2 = "table" + random();
if (secondClusterName != null) {
org.apache.falcon.entity.v0.feed.Cluster feedCluster2 = feed.getClusters().getClusters().get(1);
feedCluster2.setName(secondClusterName);
feedCluster2.getTable().setUri(getTableUri(dbName2, tableName2));
}
STORE.publish(EntityType.FEED, feed);
String feedId = assertFeedIsRegistered(feed, clusterName);
verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, tableName);
if (secondClusterName != null) {
String feedId2 = assertFeedIsRegistered(feed, secondClusterName);
verifyFeedLineage(feed.getName(), secondClusterName, feedId2, dbName2, tableName2);
}
return feed;
}
private String getDBName(Feed feed) {
String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
String[] parts = uri.split(":");
return parts[1];
private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName)
throws Exception{
//verify that lineage from hive table to falcon feed is created
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feedName, clusterName));
Referenceable processEntity = atlasClient.getEntity(processId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0).getId()._getId(), feedId);
String inputId = ((List<Id>) processEntity.get("inputs")).get(0).getId()._getId();
Referenceable tableEntity = atlasClient.getEntity(inputId);
assertEquals(tableEntity.getTypeName(), HiveDataTypes.HIVE_TABLE.getName());
assertEquals(tableEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
}
private String getTableName(Feed feed) {
String uri = feed.getClusters().getClusters().get(0).getTable().getUri();
String[] parts = uri.split(":");
parts = parts[2].split("#");
return parts[0];
private String assertFeedIsRegistered(Feed feed, String clusterName) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
}
@Test (enabled = true)
@Test
public void testReplicationFeed() throws Exception {
Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, srcCluster);
assertClusterIsRegistered(srcCluster);
Cluster targetCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, targetCluster);
assertClusterIsRegistered(targetCluster);
Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName());
String inId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName())).getId()._getId();
String outId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName())).getId()._getId();
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName());
Referenceable process = atlasClient.getEntity(processId);
assertEquals(((List<Id>)process.get("inputs")).get(0)._getId(), inId);
assertEquals(((List<Id>)process.get("outputs")).get(0)._getId(), outId);
}
@Test
public void testCreateProcessWithHDFSFeed() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
Feed infeed = loadEntity(EntityType.FEED, FEED_HDFS_RESOURCE, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
feedCluster.setName(cluster.getName());
STORE.publish(EntityType.FEED, infeed);
TypeUtils.Pair<String, Feed> result = getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName());
Feed infeed = result.right;
String infeedId = result.left;
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outTableName = getTableName(outfeed);
String outDbName = getDBName(outfeed);
String outfeedId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName())).getId()._getId();
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
......@@ -205,65 +297,35 @@ public class FalconHookIT {
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
String pid = assertProcessIsRegistered(process, cluster.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
assertNull(processEntity.get("inputs"));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
Referenceable outEntity = atlasClient.getEntity(outId._getId());
assertEquals(outEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
assertEquals(processEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName()));
assertEquals(((List<Id>)processEntity.get("inputs")).get(0)._getId(), infeedId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0)._getId(), outfeedId);
}
// @Test (enabled = true, dependsOnMethods = "testCreateProcess")
// public void testUpdateProcess() throws Exception {
// FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT);
// FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
// hook.publish(data);
// String id = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2);
// event = createProcessEntity(PROCESS_NAME_2, INPUT_2, OUTPUT_2);
// hook.publish(data);
// String id2 = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2);
// if (!id.equals(id2)) {
// throw new Exception("Id mismatch");
// }
// }
private String assertProcessIsRegistered(String clusterName, String processName) throws Exception {
String name = processName + "@" + clusterName;
LOG.debug("Searching for process {}", name);
String query = String.format("%s as t where %s = '%s' select t",
FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
return assertEntityIsRegistered(query);
}
private String assertEntityIsRegistered(final String query) throws Exception {
waitFor(2000000, new Predicate() {
private String assertEntityIsRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = atlasClient.search(query);
System.out.println(results);
return results.length() == 1;
public void evaluate() throws Exception {
Referenceable entity = atlasClient.getEntity(typeName, property, value);
assertNotNull(entity);
}
});
JSONArray results = atlasClient.search(query);
JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id");
Referenceable entity = atlasClient.getEntity(typeName, property, value);
return entity.getId()._getId();
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
boolean evaluate() throws Exception;
void evaluate() throws Exception;
}
/**
......@@ -273,16 +335,20 @@ public class FalconHookIT {
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
boolean eval;
while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
Thread.sleep(1000);
}
if (!eval) {
throw new Exception("Waiting timed out after " + timeout + " msec");
while (true) {
try {
predicate.evaluate();
return;
} catch(Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
LOG.debug("Waiting up to " + (mustEnd - System.currentTimeMillis()) + " msec as assertion failed", e);
Thread.sleep(400);
}
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1">
<groups>online,bi</groups>
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>
<late-arrival cut-off="hours(3)"/>
<clusters>
<cluster name="testcluster" type="source">
<validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
<retention limit="hours(24)" action="delete"/>
<table uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</cluster>
<cluster name="testcluster" type="target">
<validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
<retention limit="hours(24)" action="delete"/>
<table uri="catalog:outdb:outtable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</cluster>
</clusters>
<table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
<ACL owner="testuser" group="group" permission="0x755"/>
<schema location="hcat" provider="hcat"/>
</feed>
......@@ -51,11 +51,9 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.LogManager;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.tools.cmd.gen.AnyVals;
import java.net.MalformedURLException;
import java.util.ArrayList;
......@@ -99,8 +97,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private static final long keepAliveTimeDefault = 10;
private static final int queueSizeDefault = 10000;
private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
private static final HiveConf hiveConf;
static {
......@@ -266,7 +262,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
default:
}
notifyEntities(messages);
notifyEntities(event.getMessages());
}
private void deleteTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
......@@ -280,7 +276,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private void deleteTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event, WriteEntity output) {
final String tblQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(dgiBridge.getClusterName(), output.getTable());
LOG.info("Deleting table {} ", tblQualifiedName);
messages.add(
event.addMessage(
new HookNotification.EntityDeleteRequest(event.getUser(),
HiveDataTypes.HIVE_TABLE.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
......@@ -297,7 +293,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
deleteTable(dgiBridge, event, output);
} else if (Type.DATABASE.equals(output.getType())) {
final String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(dgiBridge.getClusterName(), output.getDatabase().getName());
messages.add(
event.addMessage(
new HookNotification.EntityDeleteRequest(event.getUser(),
HiveDataTypes.HIVE_DB.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
......@@ -348,7 +344,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
for(WriteEntity writeEntity : event.getOutputs()){
if (writeEntity.getType() == Type.TABLE){
Table newTable = writeEntity.getTable();
createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true, oldTable);
createOrUpdateEntities(dgiBridge, event, writeEntity, true, oldTable);
final String newQualifiedTableName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
newTable);
String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newQualifiedTableName, oldColName);
......@@ -356,7 +352,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldColumnQFName, newColEntity));
}
......@@ -385,7 +381,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
//Create/update old table entity - create entity with oldQFNme and old tableName if it doesnt exist. If exists, will update
//We always use the new entity while creating the table since some flags, attributes of the table are not set in inputEntity and Hive.getTable(oldTableName) also fails since the table doesnt exist in hive anymore
final LinkedHashMap<Type, Referenceable> tables = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true);
final LinkedHashMap<Type, Referenceable> tables = createOrUpdateEntities(dgiBridge, event, writeEntity, true);
Referenceable tableEntity = tables.get(Type.TABLE);
//Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits
......@@ -398,13 +394,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
replaceSDQFName(event, tableEntity, oldQualifiedName, newQualifiedName);
//Reset Table QF Name to old Name and create a new partial notification request to replace old Table QFName to newName
replaceTableQFName(dgiBridge, event, oldTable, newTable, tableEntity, oldQualifiedName, newQualifiedName);
replaceTableQFName(event, oldTable, newTable, tableEntity, oldQualifiedName, newQualifiedName);
}
}
}
}
private Referenceable replaceTableQFName(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Table oldTable, Table newTable, final Referenceable tableEntity, final String oldTableQFName, final String newTableQFName) throws HiveException {
private Referenceable replaceTableQFName(HiveEventContext event, Table oldTable, Table newTable, final Referenceable tableEntity, final String oldTableQFName, final String newTableQFName) throws HiveException {
tableEntity.set(HiveDataModelGenerator.NAME, oldTable.getTableName().toLowerCase());
tableEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, oldTableQFName);
......@@ -416,7 +412,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
ArrayList<String> alias_list = new ArrayList<>();
alias_list.add(oldTable.getTableName().toLowerCase());
newEntity.set(HiveDataModelGenerator.TABLE_ALIAS_LIST, alias_list);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldTableQFName, newEntity));
......@@ -434,7 +430,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
Referenceable newColEntity = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName());
///Only QF Name changes
newColEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newColumnQFName);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldColumnQFName, newColEntity));
newColEntities.add(newColEntity);
......@@ -453,14 +449,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
final Referenceable newSDEntity = new Referenceable(HiveDataTypes.HIVE_STORAGEDESC.getName());
newSDEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newSDQFName);
messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
event.addMessage(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
HiveDataTypes.HIVE_STORAGEDESC.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
oldSDQFName, newSDEntity));
return newSDEntity;
}
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables, Table existTable) throws Exception {
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, boolean skipTempTables, Table existTable) throws Exception {
Database db = null;
Table table = null;
Partition partition = null;
......@@ -513,18 +509,18 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
messages.add(new HookNotification.EntityUpdateRequest(user, entities));
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
return result;
}
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception{
return createOrUpdateEntities(dgiBridge, user, entity, skipTempTables, null);
private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, boolean skipTempTables) throws Exception{
return createOrUpdateEntities(dgiBridge, event, entity, skipTempTables, null);
}
private LinkedHashMap<Type, Referenceable> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception {
for (Entity entity : event.getOutputs()) {
if (entity.getType() == entityType) {
return createOrUpdateEntities(dgiBridge, event.getUser(), entity, true);
return createOrUpdateEntities(dgiBridge, event, entity, true);
}
}
return null;
......@@ -602,7 +598,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}});
entities.add(processReferenceable);
messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<Referenceable>(entities)));
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
} else {
LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
}
......@@ -615,7 +611,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
if (!dataSets.containsKey(tblQFName)) {
LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event.getUser(), entity, false);
LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event, entity, false);
dataSets.put(tblQFName, result.get(Type.TABLE));
entities.addAll(result.values());
}
......@@ -684,7 +680,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
}
entities.addAll(tables.values());
entities.add(processReferenceable);
messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities));
}
}
......@@ -771,6 +767,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
private String queryType;
List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
public void setInputs(Set<ReadEntity> inputs) {
this.inputs = inputs;
}
......@@ -859,5 +857,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
return queryType;
}
public void addMessage(HookNotification.HookNotificationMessage message) {
messages.add(message);
}
public List<HookNotification.HookNotificationMessage> getMessages() {
return messages;
}
}
}
......@@ -1538,9 +1538,6 @@ public class HiveHookIT {
return assertTableIsRegistered(dbName, tableName, null, false);
}
private String assertTableIsRegistered(String dbName, String tableName, boolean isTemporary) throws Exception {
return assertTableIsRegistered(dbName, tableName, null, isTemporary);
}
private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate, boolean isTemporary) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
......
......@@ -23,23 +23,21 @@
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
<param name="Append" value="true"/>
<param name="Threshold" value="info"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
<appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${atlas.log.dir}/audit.log"/>
<param name="Append" value="true"/>
<param name="Threshold" value="info"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %x %m%n"/>
</layout>
......@@ -55,6 +53,12 @@
<appender-ref ref="FILE"/>
</logger>
<!-- to avoid logs - The configuration log.flush.interval.messages = 1 was supplied but isn't a known config -->
<logger name="org.apache.kafka.common.config.AbstractConfig" additivity="false">
<level value="error"/>
<appender-ref ref="FILE"/>
</logger>
<logger name="AUDIT" additivity="false">
<level value="info"/>
<appender-ref ref="AUDIT"/>
......
......@@ -3,18 +3,26 @@
---++ Falcon Model
The default falcon modelling is available in org.apache.atlas.falcon.model.FalconDataModelGenerator. It defines the following types:
<verbatim>
falcon_process(ClassType) - super types [Process] - attributes [timestamp, owned-by, tags]
falcon_cluster(ClassType) - super types [Infrastructure] - attributes [timestamp, colo, owner, tags]
falcon_feed(ClassType) - super types [DataSet] - attributes [timestamp, stored-in, owner, groups, tags]
falcon_feed_creation(ClassType) - super types [Process] - attributes [timestamp, stored-in, owner]
falcon_feed_replication(ClassType) - super types [Process] - attributes [timestamp, owner]
falcon_process(ClassType) - super types [Process] - attributes [timestamp, runs-on, owner, tags, pipelines, workflow-properties]
</verbatim>
One falcon_process entity is created for every cluster that the falcon process is defined for.
The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. The unique attributes are:
* falcon_process - attribute name - <process name>@<cluster name>
The entities are created and de-duped using unique qualifiedName attribute. They provide namespace and can be used for querying/lineage as well. The unique attributes are:
* falcon_process - <process name>@<cluster name>
* falcon_cluster - <cluster name>
* falcon_feed - <feed name>@<cluster name>
* falcon_feed_creation - <feed name>
* falcon_feed_replication - <feed name>
---++ Falcon Hook
Falcon supports listeners on falcon entity submission. This is used to add entities in Atlas using the model defined in org.apache.atlas.falcon.model.FalconDataModelGenerator.
The hook submits the request to a thread pool executor to avoid blocking the command execution. The thread submits the entities as message to the notification server and atlas server reads these messages and registers the entities.
* Add 'org.apache.falcon.atlas.service.AtlasService' to application.services in <falcon-conf>/startup.properties
* Add 'org.apache.atlas.falcon.service.AtlasService' to application.services in <falcon-conf>/startup.properties
* Link falcon hook jars in falcon classpath - 'ln -s <atlas-home>/hook/falcon/* <falcon-home>/server/webapp/falcon/WEB-INF/lib/'
* In <falcon_conf>/falcon-env.sh, set an environment variable as follows:
<verbatim>
......@@ -33,5 +41,4 @@ Refer [[Configuration][Configuration]] for notification related configurations
---++ Limitations
* Only the process entity creation is currently handled. This model will be expanded to include all Falcon metadata
* In falcon cluster entity, cluster name used should be uniform across components like hive, falcon, sqoop etc. If used with ambari, ambari cluster name should be used for cluster entity
......@@ -115,8 +115,11 @@ public abstract class AtlasHook {
static void notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, int maxRetries,
NotificationInterface notificationInterface,
boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
final String message = messages.toString();
if (messages == null || messages.isEmpty()) {
return;
}
final String message = messages.toString();
int numRetries = 0;
while (true) {
try {
......
......@@ -61,7 +61,11 @@ public class AtlasHookTest {
@Test
public void testNotifyEntitiesRetriesOnException() throws NotificationException {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
List<HookNotification.HookNotificationMessage> hookNotificationMessages =
new ArrayList<HookNotification.HookNotificationMessage>() {{
add(new HookNotification.EntityCreateRequest("user"));
}
};
doThrow(new NotificationException(new Exception())).when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false,
......@@ -73,7 +77,11 @@ public class AtlasHookTest {
@Test
public void testFailedMessageIsLoggedIfRequired() throws NotificationException {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
List<HookNotification.HookNotificationMessage> hookNotificationMessages =
new ArrayList<HookNotification.HookNotificationMessage>() {{
add(new HookNotification.EntityCreateRequest("user"));
}
};
doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
......@@ -97,7 +105,11 @@ public class AtlasHookTest {
@Test
public void testAllFailedMessagesAreLogged() throws NotificationException {
List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>();
List<HookNotification.HookNotificationMessage> hookNotificationMessages =
new ArrayList<HookNotification.HookNotificationMessage>() {{
add(new HookNotification.EntityCreateRequest("user"));
}
};
doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2")))
.when(notificationInterface)
.send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
......
......@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
ATLAS-835 Falcon Integration with Atlas (sowmyaramesh via shwethags)
ATLAS-912 Update to use Kafka 0.10.0.0 (from 0.9.0.0) (madhan.neethiraj via yhemanth)
ATLAS-542 Make qualifiedName and name consistent across all Datasets and Process (sumasai via yhemanth)
ATLAS-716 Entity update/delete notifications (shwethags)
......
......@@ -101,3 +101,7 @@ atlas.jaas.KafkaClient.option.principal = atlas/_HOST@EXAMPLE.COM
atlas.server.ha.enabled=false
#atlas.server.ids=id1
#atlas.server.address.id1=localhost:21000
#########POLICY FILE PATH #########
atlas.auth.policy.file=${sys:user.dir}/distro/src/conf/policy-store.txt
......@@ -23,7 +23,7 @@
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
<param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/>
</layout>
</appender>
......@@ -43,7 +43,7 @@
<logger name="com.thinkaurelius.titan" additivity="false">
<level value="info"/>
<appender-ref ref="FILE"/>
<appender-ref ref="console"/>
</logger>
<logger name="AUDIT">
......
......@@ -73,7 +73,7 @@ public class AuditFilter implements Filter {
// put the request id into the response so users can trace logs for this request
((HttpServletResponse) response).setHeader(AtlasClient.REQUEST_ID, requestId);
currentThread.setName(oldName);
RequestContext.clear();;
RequestContext.clear();
}
}
......@@ -88,7 +88,7 @@ public class AuditFilter implements Filter {
final String whatURL = Servlets.getRequestURL(httpRequest);
final String whatAddrs = httpRequest.getLocalAddr();
LOG.debug("Audit: {}/{} performed request {} {} ({}) at time {}", who, fromAddress, whatRequest, whatURL,
LOG.info("Audit: {}/{} performed request {} {} ({}) at time {}", who, fromAddress, whatRequest, whatURL,
whatAddrs, whenISO9601);
audit(who, fromAddress, whatRequest, fromHost, whatURL, whatAddrs, whenISO9601);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment