Commit 086b4a3e by Shwetha GS

ATLAS-379 Create sqoop and falcon metadata addons…

ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
parent 70d54988
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.falcon.model;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumType;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructType;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* Utility that generates falcon data model.
*/
public class FalconDataModelGenerator {
private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class);
private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions;
private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap;
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
public static final String NAME = "name";
public static final String PROCESS_NAME = "processName";
public static final String TIMESTAMP = "timestamp";
public static final String USER = "owned-by";
public static final String TAGS = "tag-classification";
// multiple inputs and outputs for process
public static final String INPUTS = "inputs";
public static final String OUTPUTS = "outputs";
public FalconDataModelGenerator() {
classTypeDefinitions = new HashMap<>();
enumTypeDefinitionMap = new HashMap<>();
structTypeDefinitionMap = new HashMap<>();
}
public void createDataModel() throws AtlasException {
LOG.info("Generating the Falcon Data Model");
createProcessEntityClass();
}
private TypesDef getTypesDef() {
return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(),
getClassTypeDefinitions());
}
public String getDataModelAsJSON() {
return TypesSerialization.toJson(getTypesDef());
}
private ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() {
return ImmutableList.copyOf(enumTypeDefinitionMap.values());
}
private ImmutableList<StructTypeDefinition> getStructTypeDefinitions() {
return ImmutableList.copyOf(structTypeDefinitionMap.values());
}
private ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() {
return ImmutableList.copyOf(classTypeDefinitions.values());
}
private ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() {
return ImmutableList.of();
}
private void createProcessEntityClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(PROCESS_NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(TIMESTAMP, DataTypes.LONG_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
null),
// map of tags
new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()),
Multiplicity.OPTIONAL, false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_PROCESS_ENTITY.getName(),
ImmutableList.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), definition);
LOG.debug("Created definition for {}", FalconDataTypes.FALCON_PROCESS_ENTITY.getName());
}
public String getModelAsJson() throws AtlasException {
createDataModel();
return getDataModelAsJSON();
}
public static void main(String[] args) throws Exception {
FalconDataModelGenerator falconDataModelGenerator = new FalconDataModelGenerator();
System.out.println("falconDataModelAsJSON = " + falconDataModelGenerator.getModelAsJson());
TypesDef typesDef = falconDataModelGenerator.getTypesDef();
for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - values %s", enumType.name, EnumType.class.getSimpleName(),
Arrays.toString(enumType.enumValues)));
}
for (StructTypeDefinition structType : typesDef.structTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - attributes %s", structType.typeName, StructType.class.getSimpleName(),
Arrays.toString(structType.attributeDefinitions)));
}
for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName, ClassType.class.getSimpleName(),
StringUtils.join(classType.superTypes, ","), Arrays.toString(classType.attributeDefinitions)));
}
for (HierarchicalTypeDefinition<TraitType> traitType : typesDef.traitTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - %s", traitType.typeName, TraitType.class.getSimpleName(),
Arrays.toString(traitType.attributeDefinitions)));
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.falcon.model;
/**
* Falcon Data Types for model and bridge.
*/
public enum FalconDataTypes {
FALCON_PROCESS_ENTITY("falcon_process"),
;
private final String name;
FalconDataTypes(java.lang.String name) {
this.name = name;
}
public String getName() {
return name;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.atlas.Util;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Falcon event util
*/
public final class EventUtil {
private static final Logger LOG = LoggerFactory.getLogger(EventUtil.class);
private EventUtil() {}
public static Map<String, String> convertKeyValueStringToMap(final String keyValueString) {
if (StringUtils.isBlank(keyValueString)) {
return null;
}
Map<String, String> keyValueMap = new HashMap<>();
String[] tags = keyValueString.split(",");
for (String tag : tags) {
int index = tag.indexOf("=");
String tagKey = tag.substring(0, index);
String tagValue = tag.substring(index + 1, tag.length());
keyValueMap.put(tagKey, tagValue);
}
return keyValueMap;
}
public static UserGroupInformation getUgi() throws FalconException {
UserGroupInformation ugi;
try {
ugi = CurrentUser.getAuthenticatedUGI();
} catch (IOException ioe) {
throw new FalconException(ioe);
}
return ugi;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.atlas.event;
import org.apache.falcon.entity.v0.Entity;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Falcon event to interface with Atlas Service.
*/
public class FalconEvent {
protected String user;
protected UserGroupInformation ugi;
protected OPERATION operation;
protected long timestamp;
protected Entity entity;
public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) {
this.user = doAsUser;
this.ugi = ugi;
this.operation = falconOperation;
this.timestamp = timestamp;
this.entity = entity;
}
public enum OPERATION {
ADD_PROCESS, UPDATE_PROCESS
}
public String getUser() {
return user;
}
public UserGroupInformation getUgi() {
return ugi;
}
public OPERATION getOperation() {
return operation;
}
public long getTimestamp() {
return timestamp;
}
public Entity getEntity() {
return entity;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.atlas.publisher;
import org.apache.falcon.atlas.event.FalconEvent;
/**
* Falcon publisher for Atlas
*/
public abstract class FalconEventPublisher {
public static class Data {
private FalconEvent event;
public Data(FalconEvent event) {
this.event = event;
}
public FalconEvent getEvent() {
return event;
}
}
public abstract void publish(final Data data) throws Exception;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.atlas.service;
import org.apache.atlas.falcon.hook.FalconHook;
import org.apache.falcon.FalconException;
import org.apache.falcon.atlas.Util.EventUtil;
import org.apache.falcon.atlas.event.FalconEvent;
import org.apache.falcon.atlas.publisher.FalconEventPublisher;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Atlas service to publish Falcon events
*/
public class AtlasService implements FalconService, ConfigurationChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class);
private FalconEventPublisher publisher;
/**
* Constant for the service name.
*/
public static final String SERVICE_NAME = AtlasService.class.getSimpleName();
@Override
public String getName() {
return SERVICE_NAME;
}
@Override
public void init() throws FalconException {
ConfigurationStore.get().registerListener(this);
publisher = new FalconHook();
}
@Override
public void destroy() throws FalconException {
ConfigurationStore.get().unregisterListener(this);
}
@Override
public void onAdd(Entity entity) throws FalconException {
EntityType entityType = entity.getEntityType();
switch (entityType) {
case PROCESS:
addProcessEntity((Process) entity, FalconEvent.OPERATION.ADD_PROCESS);
break;
default:
LOG.debug("Entity type not processed " + entityType);
}
}
@Override
public void onRemove(Entity entity) throws FalconException {
}
@Override
public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
EntityType entityType = newEntity.getEntityType();
switch (entityType) {
case PROCESS:
addProcessEntity((Process) newEntity, FalconEvent.OPERATION.UPDATE_PROCESS);
break;
default:
LOG.debug("Entity type not processed " + entityType);
}
}
@Override
public void onReload(Entity entity) throws FalconException {
//Since there is no import script that can import existing falcon entities to atlas, adding on falcon service start
onAdd(entity);
}
private void addProcessEntity(Process entity, FalconEvent.OPERATION operation) throws FalconException {
LOG.info("Adding process entity to Atlas: {}", entity.getName());
try {
String user = entity.getACL() != null ? entity.getACL().getOwner() :
UserGroupInformation.getLoginUser().getShortUserName();
FalconEvent event = new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity);
FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
publisher.publish(data);
} catch (Exception ex) {
throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.falcon.hook;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.falcon.atlas.service.AtlasService;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBException;
import java.util.List;
import static org.testng.Assert.assertEquals;
public class FalconHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
public static final String CLUSTER_RESOURCE = "/cluster.xml";
public static final String FEED_RESOURCE = "/feed.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
private AtlasClient dgiCLient;
private static final ConfigurationStore STORE = ConfigurationStore.get();
@BeforeClass
public void setUp() throws Exception {
dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address"));
AtlasService service = new AtlasService();
service.init();
STORE.registerListener(service);
CurrentUser.authenticate(System.getProperty("user.name"));
}
private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException {
Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
switch (entity.getEntityType()) {
case CLUSTER:
((Cluster) entity).setName(name);
break;
case FEED:
((Feed) entity).setName(name);
break;
case PROCESS:
((org.apache.falcon.entity.v0.process.Process) entity).setName(name);
break;
}
return (T)entity;
}
private String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
private String getTableUri(String dbName, String tableName) {
return String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, tableName);
}
@Test (enabled = true)
public void testCreateProcess() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
Feed infeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedin" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = infeed.getClusters().getClusters().get(0);
feedCluster.setName(cluster.getName());
String inTableName = "table" + random();
String inDbName = "db" + random();
feedCluster.getTable().setUri(getTableUri(inDbName, inTableName));
STORE.publish(EntityType.FEED, infeed);
Feed outfeed = loadEntity(EntityType.FEED, FEED_RESOURCE, "feedout" + random());
feedCluster = outfeed.getClusters().getClusters().get(0);
feedCluster.setName(cluster.getName());
String outTableName = "table" + random();
String outDbName = "db" + random();
feedCluster.getTable().setUri(getTableUri(outDbName, outTableName));
STORE.publish(EntityType.FEED, outfeed);
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
process.getInputs().getInputs().get(0).setFeed(infeed.getName());
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(cluster.getName(), process.getName());
Referenceable processEntity = dgiCLient.getEntity(pid);
assertEquals(processEntity.get("processName"), process.getName());
Id inId = (Id) ((List)processEntity.get("inputs")).get(0);
Referenceable inEntity = dgiCLient.getEntity(inId._getId());
assertEquals(inEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName));
Id outId = (Id) ((List)processEntity.get("outputs")).get(0);
Referenceable outEntity = dgiCLient.getEntity(outId._getId());
assertEquals(outEntity.get("name"),
HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName));
}
// @Test (enabled = true, dependsOnMethods = "testCreateProcess")
// public void testUpdateProcess() throws Exception {
// FalconEvent event = createProcessEntity(PROCESS_NAME_2, INPUT, OUTPUT);
// FalconEventPublisher.Data data = new FalconEventPublisher.Data(event);
// hook.publish(data);
// String id = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2);
// event = createProcessEntity(PROCESS_NAME_2, INPUT_2, OUTPUT_2);
// hook.publish(data);
// String id2 = assertProcessIsRegistered(CLUSTER_NAME, PROCESS_NAME_2);
// if (!id.equals(id2)) {
// throw new Exception("Id mismatch");
// }
// }
private String assertProcessIsRegistered(String clusterName, String processName) throws Exception {
String name = processName + "@" + clusterName;
LOG.debug("Searching for process {}", name);
String query = String.format("%s as t where name = '%s' select t",
FalconDataTypes.FALCON_PROCESS_ENTITY.getName(), name);
return assertEntityIsRegistered(query);
}
private String assertEntityIsRegistered(final String query) throws Exception {
waitFor(20000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = dgiCLient.search(query);
System.out.println(results);
return results.length() == 1;
}
});
JSONArray results = dgiCLient.search(query);
JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id");
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
boolean evaluate() throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
long mustEnd = System.currentTimeMillis() + timeout;
boolean eval;
while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
Thread.sleep(1000);
}
if (!eval) {
throw new Exception("Waiting timed out after " + timeout + " msec");
}
}
}
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Primary cluster configuration for demo vm
-->
<cluster colo="west-coast" description="Primary Cluster" name="testcluster"
xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<interfaces>
<interface type="readonly" endpoint="hftp://localhost:10070" version="1.1.1" />
<interface type="write" endpoint="hdfs://localhost:10020" version="1.1.1" />
<interface type="execute" endpoint="localhost:10300" version="1.1.1" />
<interface type="workflow" endpoint="http://localhost:11010/oozie/" version="3.3.0" />
<interface type="registry" endpoint="thrift://localhost:19083" version="0.11.0" />
<interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3" />
</interfaces>
<locations>
<location name="staging" path="/apps/falcon/staging" />
<location name="temp" path="/tmp" />
<location name="working" path="/apps/falcon/working" />
</locations>
</cluster>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<feed description="test input" name="testinput" xmlns="uri:falcon:feed:0.1">
<groups>online,bi</groups>
<frequency>hours(1)</frequency>
<timezone>UTC</timezone>
<late-arrival cut-off="hours(3)"/>
<clusters>
<cluster name="testcluster" type="source">
<validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
<retention limit="hours(24)" action="delete"/>
<table uri="catalog:indb:intable#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
</cluster>
</clusters>
<table uri="catalog:indb:unused#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
<ACL owner="testuser" group="group" permission="0x755"/>
<schema location="hcat" provider="hcat"/>
</feed>
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>${user.dir}/target/metastore</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:${user.dir}/target/metastore_db;create=true</value>
</property>
<property>
<name>atlas.hook.hive.synchronous</name>
<value>true</value>
</property>
<property>
<name>atlas.cluster.name</name>
<value>test</value>
</property>
<property>
<name>fs.pfile.impl</name>
<value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
</property>
</configuration>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<process name="testprocess" xmlns="uri:falcon:process:0.1">
<tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
<clusters>
<cluster name="testcluster">
<validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/>
</cluster>
</clusters>
<parallel>1</parallel>
<order>FIFO</order>
<frequency>days(1)</frequency>
<timezone>UTC</timezone>
<inputs>
<input end="today(0,0)" start="today(0,0)" feed="testinput" name="input"/>
</inputs>
<outputs>
<output instance="now(0,0)" feed="testoutput" name="output"/>
</outputs>
<properties>
<property name="blah" value="blah"/>
</properties>
<workflow engine="hive" path="/falcon/test/apps/hive/script.hql"/>
<retry policy="periodic" delay="minutes(10)" attempts="3"/>
<late-process policy="exp-backoff" delay="hours(2)">
<late-input input="input" workflow-path="/falcon/test/workflow"/>
</late-process>
</process>
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
*.domain=debug
*.config.store.persist=false
\ No newline at end of file
......@@ -18,6 +18,7 @@
package org.apache.atlas.hive.bridge;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
......@@ -487,11 +488,13 @@ public class HiveMetaStoreBridge {
dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
LOG.info("Hive data model is already registered!");
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
//Expected in case types do not exist
LOG.info("Registering Hive data model");
dgiClient.createType(dataModelGenerator.getModelAsJson());
}
}
}
public static void main(String[] argv) throws Exception {
Configuration atlasConf = ApplicationProperties.get(ApplicationProperties.CLIENT_PROPERTIES);
......
......@@ -227,7 +227,7 @@ public class HiveDataModelGenerator {
null),
new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false,
new AttributeDefinition("locationUri", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
null),
new AttributeDefinition("parameters", STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("ownerName", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false,
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.sqoop.model;
import com.google.common.collect.ImmutableList;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasException;
import org.apache.atlas.typesystem.TypesDef;
import org.apache.atlas.typesystem.json.TypesSerialization;
import org.apache.atlas.typesystem.types.AttributeDefinition;
import org.apache.atlas.typesystem.types.ClassType;
import org.apache.atlas.typesystem.types.DataTypes;
import org.apache.atlas.typesystem.types.EnumType;
import org.apache.atlas.typesystem.types.EnumTypeDefinition;
import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
import org.apache.atlas.typesystem.types.Multiplicity;
import org.apache.atlas.typesystem.types.StructTypeDefinition;
import org.apache.atlas.typesystem.types.TraitType;
import org.apache.atlas.typesystem.types.utils.TypesUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* Utility that generates Sqoop data model for both metastore entities and DDL/DML queries.
*/
public class SqoopDataModelGenerator {
private static final Logger LOG = LoggerFactory.getLogger(SqoopDataModelGenerator.class);
private final Map<String, HierarchicalTypeDefinition<ClassType>> classTypeDefinitions;
private final Map<String, EnumTypeDefinition> enumTypeDefinitionMap;
private final Map<String, StructTypeDefinition> structTypeDefinitionMap;
private static final DataTypes.MapType STRING_MAP_TYPE =
new DataTypes.MapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE);
public static final String NAME = "name";
public static final String OWNER = "ownerName";
public static final String USER = "userName";
public static final String DB_STORE_TYPE = "dbStoreType";
public static final String DB_STORE_USAGE = "storeUse";
public static final String SOURCE = "source";
public static final String DESCRIPTION = "description";
public static final String STORE_URI = "storeUri";
public static final String OPERATION = "operation";
public static final String START_TIME = "startTime";
public static final String END_TIME = "endTime";
public static final String CMD_LINE_OPTS = "commandlineOpts";
// multiple inputs and outputs for process
public static final String INPUTS = "inputs";
public static final String OUTPUTS = "outputs";
public SqoopDataModelGenerator() {
classTypeDefinitions = new HashMap<>();
enumTypeDefinitionMap = new HashMap<>();
structTypeDefinitionMap = new HashMap<>();
}
public void createDataModel() throws AtlasException {
LOG.info("Generating the Sqoop Data Model....");
// enums
// structs
// classes
createSqoopDbStoreClass();
// DDL/DML Process
createSqoopProcessClass();
}
public TypesDef getTypesDef() {
return TypesUtil.getTypesDef(getEnumTypeDefinitions(), getStructTypeDefinitions(), getTraitTypeDefinitions(),
getClassTypeDefinitions());
}
public String getDataModelAsJSON() {
return TypesSerialization.toJson(getTypesDef());
}
public ImmutableList<EnumTypeDefinition> getEnumTypeDefinitions() {
return ImmutableList.copyOf(enumTypeDefinitionMap.values());
}
public ImmutableList<StructTypeDefinition> getStructTypeDefinitions() {
return ImmutableList.copyOf(structTypeDefinitionMap.values());
}
public ImmutableList<HierarchicalTypeDefinition<ClassType>> getClassTypeDefinitions() {
return ImmutableList.copyOf(classTypeDefinitions.values());
}
public ImmutableList<HierarchicalTypeDefinition<TraitType>> getTraitTypeDefinitions() {
return ImmutableList.of();
}
private void createSqoopDbStoreClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(DB_STORE_TYPE,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(DB_STORE_USAGE,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(STORE_URI,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(SOURCE,
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition(OWNER,
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_DBDATASTORE.getName(),
ImmutableList.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(SqoopDataTypes.SQOOP_DBDATASTORE.getName(), definition);
LOG.debug("Created definition for " + SqoopDataTypes.SQOOP_DBDATASTORE.getName());
}
private void createSqoopProcessClass() throws AtlasException {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition(OPERATION,
DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(CMD_LINE_OPTS, STRING_MAP_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(START_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(END_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition(USER,
DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),
};
HierarchicalTypeDefinition<ClassType> definition =
new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_PROCESS.getName(),
ImmutableList.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
classTypeDefinitions.put(SqoopDataTypes.SQOOP_PROCESS.getName(), definition);
LOG.debug("Created definition for " + SqoopDataTypes.SQOOP_PROCESS.getName());
}
public String getModelAsJson() throws AtlasException {
createDataModel();
return getDataModelAsJSON();
}
public static void main(String[] args) throws Exception {
SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator();
System.out.println("sqoopDataModelAsJSON = " + dataModelGenerator.getModelAsJson());
TypesDef typesDef = dataModelGenerator.getTypesDef();
for (EnumTypeDefinition enumType : typesDef.enumTypesAsJavaList()) {
System.out.println(String.format("%s(%s) - values %s", enumType.name, EnumType.class.getSimpleName(),
Arrays.toString(enumType.enumValues)));
}
for (HierarchicalTypeDefinition<ClassType> classType : typesDef.classTypesAsJavaList()) {
System.out.println(
String.format("%s(%s) - super types [%s] - attributes %s", classType.typeName,
ClassType.class.getSimpleName(), StringUtils.join(classType.superTypes, ","),
Arrays.toString(classType.attributeDefinitions)));
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.sqoop.model;
/**
* Hive Data Types for model and bridge.
*/
public enum SqoopDataTypes {
// Classes
SQOOP_DBDATASTORE,
SQOOP_PROCESS,
;
public String getName() {
return name().toLowerCase();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.sqoop.hook;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.sqoop.model.SqoopDataTypes;
import org.apache.sqoop.SqoopJobDataPublisher;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.Properties;
public class SqoopHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SqoopHookIT.class);
private static final String CLUSTER_NAME = "primary";
public static final String DEFAULT_DB = "default";
private static final int MAX_WAIT_TIME = 2000;
private AtlasClient dgiCLient;
@BeforeClass
public void setUp() throws Exception {
//Set-up sqoop session
dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address"));
}
@Test
public void testSqoopImport() throws Exception {
SqoopJobDataPublisher.Data d = new SqoopJobDataPublisher.Data("import", "jdbc:mysql:///localhost/db",
"mysqluser", "mysql", "myTable", null, "default", "hiveTable", new Properties(),
System.currentTimeMillis() - 100, System.currentTimeMillis());
SqoopHook hook = new SqoopHook();
hook.publish(d);
Thread.sleep(1000);
String storeName = SqoopHook.getSqoopDBStoreName(d);
assertDBStoreIsRegistered(storeName);
String name = SqoopHook.getSqoopProcessName(d, CLUSTER_NAME);
assertSqoopProcessIsRegistered(name);
assertHiveTableIsRegistered(DEFAULT_DB, "hiveTable");
}
private String assertDBStoreIsRegistered(String storeName) throws Exception {
LOG.debug("Searching for db store {}", storeName);
String query = String.format(
"%s as t where name = '%s'" + " select t",
SqoopDataTypes.SQOOP_DBDATASTORE.getName(), storeName);
return assertEntityIsRegistered(query);
}
private String assertHiveTableIsRegistered(String dbName, String tableName) throws Exception {
LOG.debug("Searching for table {}.{}", dbName, tableName);
String query = String.format(
"%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
return assertEntityIsRegistered(query);
}
private String assertSqoopProcessIsRegistered(String processName) throws Exception {
LOG.debug("Searching for sqoop process {}", processName);
String query = String.format(
"%s as t where name = '%s' select t",
SqoopDataTypes.SQOOP_PROCESS.getName(), processName);
return assertEntityIsRegistered(query);
}
private String assertEntityIsRegistered(final String query) throws Exception {
waitFor(MAX_WAIT_TIME, new Predicate() {
@Override
public boolean evaluate() throws Exception {
JSONArray results = dgiCLient.search(query);
return results.length() > 0;
}
});
JSONArray results = dgiCLient.search(query);
JSONObject row = results.getJSONObject(0).getJSONObject("t");
return row.getString("id");
}
protected void waitFor(int timeout, Predicate predicate) throws Exception {
long mustEnd = System.currentTimeMillis() + timeout;
boolean eval;
while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
Thread.sleep(1000);
}
if (!eval) {
throw new Exception("Waiting timed out after " + timeout + " msec");
}
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
boolean evaluate() throws Exception;
}
}
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>${user.dir}/target/metastore</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:${user.dir}/target/metastore_db;create=true</value>
</property>
<property>
<name>atlas.hook.hive.synchronous</name>
<value>true</value>
</property>
<property>
<name>atlas.cluster.name</name>
<value>test</value>
</property>
<property>
<name>fs.pfile.impl</name>
<value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
</property>
</configuration>
\ No newline at end of file
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!-- Put Sqoop-specific properties in this file. -->
<configuration>
<!--
Set the value of this property to explicitly enable third-party
ManagerFactory plugins.
If this is not used, you can alternately specify a set of ManagerFactories
in the $SQOOP_CONF_DIR/managers.d/ subdirectory. Each file should contain
one or more lines like:
manager.class.name[=/path/to/containing.jar]
Files will be consulted in lexicographical order only if this property
is unset.
-->
<!--
<property>
<name>sqoop.connection.factories</name>
<value>com.cloudera.sqoop.manager.DefaultManagerFactory</value>
<description>A comma-delimited list of ManagerFactory implementations
which are consulted, in order, to instantiate ConnManager instances
used to drive connections to databases.
</description>
</property>
-->
<!--
Set the value of this property to enable third-party tools.
If this is not used, you can alternately specify a set of ToolPlugins
in the $SQOOP_CONF_DIR/tools.d/ subdirectory. Each file should contain
one or more lines like:
plugin.class.name[=/path/to/containing.jar]
Files will be consulted in lexicographical order only if this property
is unset.
-->
<!--
<property>
<name>sqoop.tool.plugins</name>
<value></value>
<description>A comma-delimited list of ToolPlugin implementations
which are consulted, in order, to register SqoopTool instances which
allow third-party tools to be used.
</description>
</property>
-->
<!--
By default, the Sqoop metastore will auto-connect to a local embedded
database stored in ~/.sqoop/. To disable metastore auto-connect, uncomment
this next property.
-->
<!--
<property>
<name>sqoop.metastore.client.enable.autoconnect</name>
<value>false</value>
<description>If true, Sqoop will connect to a local metastore
for job management when no other metastore arguments are
provided.
</description>
</property>
-->
<!--
The auto-connect metastore is stored in ~/.sqoop/. Uncomment
these next arguments to control the auto-connect process with
greater precision.
-->
<!--
<property>
<name>sqoop.metastore.client.autoconnect.url</name>
<value>jdbc:hsqldb:file:/tmp/sqoop-meta/meta.db;shutdown=true</value>
<description>The connect string to use when connecting to a
job-management metastore. If unspecified, uses ~/.sqoop/.
You can specify a different path here.
</description>
</property>
<property>
<name>sqoop.metastore.client.autoconnect.username</name>
<value>SA</value>
<description>The username to bind to the metastore.
</description>
</property>
<property>
<name>sqoop.metastore.client.autoconnect.password</name>
<value></value>
<description>The password to bind to the metastore.
</description>
</property>
-->
<!--
For security reasons, by default your database password will not be stored in
the Sqoop metastore. When executing a saved job, you will need to
reenter the database password. Uncomment this setting to enable saved
password storage. (INSECURE!)
-->
<!--
<property>
<name>sqoop.metastore.client.record.password</name>
<value>true</value>
<description>If true, allow saved passwords in the metastore.
</description>
</property>
-->
<!--
Enabling this option will instruct Sqoop to put all options that
were used in the invocation into created mapreduce job(s). This
become handy when one needs to investigate what exact options were
used in the Sqoop invocation.
-->
<!--
<property>
<name>sqoop.jobbase.serialize.sqoopoptions</name>
<value>true</value>
<description>If true, then all options will be serialized into job.xml
</description>
</property>
-->
<!--
SERVER CONFIGURATION: If you plan to run a Sqoop metastore on this machine,
you should uncomment and set these parameters appropriately.
You should then configure clients with:
sqoop.metastore.client.autoconnect.url =
jdbc:hsqldb:hsql://&lt;server-name&gt;:&lt;port&gt;/sqoop
-->
<!--
<property>
<name>sqoop.metastore.server.location</name>
<value>/tmp/sqoop-metastore/shared.db</value>
<description>Path to the shared metastore database files.
If this is not set, it will be placed in ~/.sqoop/.
</description>
</property>
<property>
<name>sqoop.metastore.server.port</name>
<value>16000</value>
<description>Port that this metastore should listen on.
</description>
</property>
-->
<!--
ATLAS SERVER ADDRESS
-->
<property>
<name>atlas.rest.address</name>
<value>http://localhost:21000/</value>
</property>
<!--
SQOOP JOB DATA PUBLISHING CLASS. Currently only one publishing class is supported
-->
<property>
<name>sqoop.job.data.publish.class</name>
<value>org.apache.atlas.sqoop.hook.SqoopHook</value>
</property>
<!--
ATLAS SERVER ADDRESS
-->
<property>
<name>atlas.cluster.name</name>
<value>primary</value>
</property>
</configuration>
......@@ -24,7 +24,7 @@ import atlas_config as mc
ATLAS_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log"
ATLAS_COMMAND_OPTS="-Datlas.home=%s"
ATLAS_CONFIG_OPTS="-Datlas.conf=%s"
DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml"
DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml -Djava.net.preferIPv4Stack=true"
CONF_FILE="atlas-application.properties"
HBASE_STORAGE_CONF_ENTRY="atlas.graph.storage.backend\s*=\s*hbase"
......
......@@ -98,6 +98,18 @@
<directory>../addons/hive-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
<!-- addons/falcon -->
<fileSet>
<directory>../addons/falcon-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
<!-- addons/sqoop -->
<fileSet>
<directory>../addons/sqoop-bridge/target/dependency/hook</directory>
<outputDirectory>hook</outputDirectory>
</fileSet>
</fileSets>
<files>
......
......@@ -57,14 +57,14 @@ class TestMetadata(unittest.TestCase):
'org.apache.atlas.Atlas',
['-app', 'atlas_home\\server\\webapp\\atlas'],
'atlas_home\\conf;atlas_home\\server\\webapp\\atlas\\WEB-INF\\classes;atlas_home\\server\\webapp\\atlas\\WEB-INF\\lib\\atlas-titan-${project.version}.jar;atlas_home\\server\\webapp\\atlas\\WEB-INF\\lib\\*;atlas_home\\libext\\*;atlas_home\\hbase\\conf',
['-Datlas.log.dir=atlas_home\\logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home\\conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'atlas_home\\logs')
['-Datlas.log.dir=atlas_home\\logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home\\conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml', '-Djava.net.preferIPv4Stack=true'], 'atlas_home\\logs')
else:
java_mock.assert_called_with(
'org.apache.atlas.Atlas',
['-app', 'atlas_home/server/webapp/atlas'],
'atlas_home/conf:atlas_home/server/webapp/atlas/WEB-INF/classes:atlas_home/server/webapp/atlas/WEB-INF/lib/atlas-titan-${project.version}.jar:atlas_home/server/webapp/atlas/WEB-INF/lib/*:atlas_home/libext/*:atlas_home/hbase/conf',
['-Datlas.log.dir=atlas_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'atlas_home/logs')
['-Datlas.log.dir=atlas_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=atlas_home', '-Datlas.conf=atlas_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml', '-Djava.net.preferIPv4Stack=true'], 'atlas_home/logs')
pass
......
......@@ -23,7 +23,8 @@ Atlas exposes notification interface and can be used for reliable entity registr
Available bridges are:
* [[Bridge-Hive][Hive Bridge]]
* [[Bridge-Sqoop][Sqoop Bridge]]
* [[Bridge-Falcon][Falcon Bridge]]
---++ Notification
Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides Kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded Kafka server by default.
......
---+ Falcon Atlas Bridge
---++ Falcon Model
The default falcon modelling is available in org.apache.atlas.falcon.model.FalconDataModelGenerator. It defines the following types:
<verbatim>
falcon_process(ClassType) - super types [Process] - attributes [timestamp, owned-by, tags]
</verbatim>
One falcon_process entity is created for every cluster that the falcon process is defined for.
The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying/lineage as well. The unique attributes are:
* falcon_process - attribute name - <process name>@<cluster name>
---++ Falcon Hook
Falcon supports listeners on falcon entity submission. This is used to add entities in Atlas using the model defined in org.apache.atlas.falcon.model.FalconDataModelGenerator.
The hook submits the request to a thread pool executor to avoid blocking the command execution. The thread submits the entities as message to the notification server and atlas server reads these messages and registers the entities.
* Add 'org.apache.falcon.atlas.service.AtlasService' to application.services in <falcon-conf>/startup.properties
* Link falcon hook jars in falcon classpath - 'ln -s <atlas-home>/hook/falcon/* <falcon-home>/server/webapp/falcon/WEB-INF/lib/'
* Copy <atlas-conf>/client.properties and <atlas-conf>/atlas-application.properties to the falcon conf directory.
The following properties in <atlas-conf>/client.properties control the thread pool and notification details:
* atlas.hook.falcon.synchronous - boolean, true to run the hook synchronously. default false
* atlas.hook.falcon.numRetries - number of retries for notification failure. default 3
* atlas.hook.falcon.minThreads - core number of threads. default 5
* atlas.hook.falcon.maxThreads - maximum number of threads. default 5
* atlas.hook.falcon.keepAliveTime - keep alive time in msecs. default 10
* atlas.hook.falcon.queueSize - queue size for the threadpool. default 10000
Refer [[Configuration][Configuration]] for notification related configurations
---++ Limitations
* Only the process entity creation is currently handled. This model will be expanded to include all Falcon metadata
* In falcon cluster entity, cluster name used should be uniform across components like hive, falcon, sqoop etc. If used with ambari, ambari cluster name should be used for cluster entity
---+ Sqoop Atlas Bridge
---++ Sqoop Model
The default Sqoop modelling is available in org.apache.atlas.sqoop.model.SqoopDataModelGenerator. It defines the following types:
<verbatim>
sqoop_operation_type(EnumType) - values [IMPORT, EXPORT, EVAL]
sqoop_dbstore_usage(EnumType) - values [TABLE, QUERY, PROCEDURE, OTHER]
sqoop_process(ClassType) - super types [Process] - attributes [name, operation, dbStore, hiveTable, commandlineOpts, startTime, endTime, userName]
sqoop_dbdatastore(ClassType) - super types [DataSet] - attributes [name, dbStoreType, storeUse, storeUri, source, description, ownerName]
</verbatim>
The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying as well:
sqoop_process - attribute name - sqoop-dbStoreType-storeUri-endTime
sqoop_dbdatastore - attribute name - dbStoreType-connectorUrl-source
---++ Sqoop Hook
Sqoop added a SqoopJobDataPublisher that publishes data to Atlas after completion of import Job. Today, only hiveImport is supported in sqoopHook.
This is used to add entities in Atlas using the model defined in org.apache.atlas.sqoop.model.SqoopDataModelGenerator.
Follow these instructions in your sqoop set-up to add sqoop hook for Atlas in <sqoop-conf>/sqoop-site.xml:
* Sqoop Job publisher class. Currently only one publishing class is supported
<property>
<name>sqoop.job.data.publish.class</name>
<value>org.apache.atlas.sqoop.hook.SqoopHook</value>
</property>
* Atlas cluster name
<property>
<name>atlas.cluster.name</name>
<value><clustername></value>
</property>
* Copy <atlas-conf>/atlas-application.properties and <atlas-conf>/client.properties to to the sqoop conf directory <sqoop-conf>/
* Link <atlas-home>/hook/sqoop/*.jar in sqoop lib
Refer [[Configuration][Configuration]] for notification related configurations
---++ Limitations
* Only the following sqoop operations are captured by sqoop hook currently - hiveImport
......@@ -47,6 +47,8 @@ allows integration with the whole enterprise data ecosystem.
* [[Notification-Entity][Entity Notification]]
* Bridges
* [[Bridge-Hive][Hive Bridge]]
* [[Bridge-Sqoop][Sqoop Bridge]]
* [[Bridge-Falcon][Falcon Bridge]]
* [[HighAvailability][Fault Tolerance And High Availability Options]]
......
......@@ -102,8 +102,11 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN
private EntityCreateRequest() { }
public EntityCreateRequest(Referenceable... entities) {
super(HookNotificationType.ENTITY_CREATE);
this.entities = Arrays.asList(entities);
this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities));
}
public EntityCreateRequest(List<Referenceable> entities) {
this(HookNotificationType.ENTITY_CREATE, entities);
}
protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) {
......
......@@ -429,6 +429,8 @@
<module>webapp</module>
<module>docs</module>
<module>addons/hive-bridge</module>
<module>addons/falcon-bridge</module>
<module>addons/sqoop-bridge</module>
<module>distro</module>
</modules>
......@@ -976,6 +978,24 @@
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>hive-bridge</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>falcon-bridge</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>sqoop-bridge</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-dashboard</artifactId>
<version>${project.version}</version>
<type>war</type>
......
......@@ -3,6 +3,7 @@ Apache Atlas Release Notes
--trunk - unreleased
INCOMPATIBLE CHANGES:
ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
ALL CHANGES:
ATLAS-392 Rename application.properties to atlas-application.properties (rishabhbhardwaj via shwethags)
......
......@@ -64,7 +64,7 @@ atlas.notification.embedded=true
atlas.kafka.zookeeper.connect=localhost:19026
atlas.kafka.bootstrap.servers=localhost:19027
atlas.kafka.data=${sys:atlas.data}/kafka
atlas.kafka.zookeeper.session.timeout.ms=400
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.consumer.timeout.ms=100
atlas.kafka.auto.commit.interval.ms=100
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment