Commit 582c2294 by Shwetha GS

changed falcon version to 0.6

parent 72fee44b
......@@ -21,6 +21,7 @@ package org.apache.metadata.falcon;
import com.google.inject.Inject;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interface;
......@@ -33,9 +34,15 @@ import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.Referenceable;
import org.apache.hadoop.metadata.Struct;
import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.apache.hadoop.metadata.types.EnumType;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructType;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.parboiled.common.StringUtils;
import javax.xml.bind.JAXBException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -43,21 +50,27 @@ import java.util.Map;
public class FalconImporter {
private final FalconTypeSystem typeSystem;
private static final TypeSystem typeSystem = TypeSystem.getInstance();
private final FalconClient client;
private final MetadataRepository repository;
@Inject
public FalconImporter(FalconTypeSystem typeSystem, FalconClient client, MetadataRepository repo) {
this.typeSystem = typeSystem;
public FalconImporter(FalconClient client, MetadataRepository repo) {
this.client = client;
this.repository = repo;
}
public void importClusters() throws FalconCLIException, MetadataException {
EntityList clusters = client.getEntityList(EntityType.CLUSTER.name(), null, null, null, null, null, null, null, null);
private Entity getEntity(FalconClient client, EntityType type, String name) throws FalconCLIException, JAXBException {
String entityStr = client.getDefinition(type.name(), name);
return (Entity) type.getUnmarshaller().unmarshal(new StringReader(entityStr));
}
public void importClusters() throws MetadataException {
try {
EntityList clusters = client.getEntityList(EntityType.CLUSTER.name(), null, null, null, null, null, null, null);
for (EntityList.EntityElement element : clusters.getElements()) {
Cluster cluster = (Cluster) client.getDefinition(EntityType.CLUSTER.name(), element.name);
Cluster cluster = (Cluster) getEntity(client, EntityType.CLUSTER, element.name);
Referenceable clusterRef = new Referenceable(FalconTypeSystem.DefinedTypes.CLUSTER.name());
clusterRef.set("name", cluster.getName());
......@@ -67,12 +80,22 @@ public class FalconImporter {
acl.set("owner", cluster.getACL().getOwner());
acl.set("group", cluster.getACL().getGroup());
acl.set("permission", cluster.getACL().getPermission());
StructType aclType = (StructType) typeSystem.getDataType(FalconTypeSystem.DefinedTypes.ACL.name());
StructType aclType = typeSystem.getDataType(StructType.class, FalconTypeSystem.DefinedTypes.ACL.name());
clusterRef.set("acl", aclType.convert(acl, Multiplicity.REQUIRED));
}
if (StringUtils.isNotEmpty(cluster.getTags())) {
clusterRef.set("tags", getMap(cluster.getTags()));
String[] parts = cluster.getTags().split(",");
List<ITypedInstance> tags = new ArrayList<>();
for (String part : parts) {
TraitType tagType = typeSystem.getDataType(TraitType.class, FalconTypeSystem.DefinedTypes.TAG.name());
String[] kv = part.trim().split("=");
Struct tag = new Struct(FalconTypeSystem.DefinedTypes.TAG.name());
tag.set("name", kv[0]);
tag.set("value", kv[0]);
tags.add(tagType.convert(tag, Multiplicity.REQUIRED));
}
clusterRef.set("tags", tags);
}
if (cluster.getProperties() != null) {
......@@ -83,9 +106,10 @@ public class FalconImporter {
List<ITypedInstance> locations = new ArrayList<>();
for (Location loc : cluster.getLocations().getLocations()) {
Struct location = new Struct(FalconTypeSystem.DefinedTypes.CLUSTER_LOCATION.name());
location.set("type", loc.getName());
EnumType locationType = typeSystem.getDataType(EnumType.class, FalconTypeSystem.DefinedTypes.CLUSTER_LOCATION_TYPE.name());
location.set("type", locationType.fromValue(loc.getName().toUpperCase()));
location.set("path", loc.getPath());
StructType type = (StructType) typeSystem.getDataType(FalconTypeSystem.DefinedTypes.CLUSTER_LOCATION.name());
StructType type = typeSystem.getDataType(StructType.class, FalconTypeSystem.DefinedTypes.CLUSTER_LOCATION.name());
locations.add(type.convert(location, Multiplicity.REQUIRED));
}
clusterRef.set("locations", locations);
......@@ -98,13 +122,16 @@ public class FalconImporter {
interfaceStruct.set("type", interfaceFld.getType().name());
interfaceStruct.set("endpoint", interfaceFld.getEndpoint());
interfaceStruct.set("version", interfaceFld.getVersion());
StructType type = (StructType) typeSystem.getDataType(FalconTypeSystem.DefinedTypes.CLUSTER_INTERFACE.name());
StructType type = typeSystem.getDataType(StructType.class, FalconTypeSystem.DefinedTypes.CLUSTER_INTERFACE.name());
interfaces.add(type.convert(interfaceStruct, Multiplicity.REQUIRED));
}
clusterRef.set("interfaces", interfaces);
}
repository.createEntity(clusterRef, clusterRef.getTypeName());
}
} catch (Exception e) {
throw new MetadataException(e);
}
}
private Map<String, String> getMap(Properties properties) {
......@@ -114,14 +141,4 @@ public class FalconImporter {
}
return map;
}
private Map<String, String> getMap(String tags) {
Map<String, String> map = new HashMap();
String[] parts = tags.split(",");
for (String part : parts) {
String[] kv = part.trim().split("=");
map.put(kv[0].trim(), kv[1].trim());
}
return map;
}
}
......@@ -26,7 +26,6 @@ import org.apache.hadoop.metadata.types.DataTypes;
import org.apache.hadoop.metadata.types.EnumTypeDefinition;
import org.apache.hadoop.metadata.types.EnumValue;
import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import org.apache.hadoop.metadata.types.IDataType;
import org.apache.hadoop.metadata.types.Multiplicity;
import org.apache.hadoop.metadata.types.StructTypeDefinition;
import org.apache.hadoop.metadata.types.TraitType;
......@@ -34,18 +33,17 @@ import org.apache.hadoop.metadata.types.TypeSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.List;
public class FalconTypeSystem {
public static final Logger LOG = LoggerFactory.getLogger(FalconTypeSystem.class);
private static FalconTypeSystem INSTANCE;
public static final TypeSystem TYPE_SYSTEM = TypeSystem.getInstance();
private final Map<String, IDataType> typeMap = new HashMap<>();
private Map<String, EnumTypeDefinition> enumTypeDefinitionMap = new HashMap<>();
private Map<String, StructTypeDefinition> structTypeDefinitionMap = new HashMap<>();
private List<StructTypeDefinition> structTypeDefinitions = new ArrayList<>();
private List<HierarchicalTypeDefinition<TraitType>> traitTypeDefinitions = new ArrayList<>();
public static FalconTypeSystem getInstance() throws MetadataException {
if (INSTANCE == null) {
......@@ -62,13 +60,8 @@ public class FalconTypeSystem {
HierarchicalTypeDefinition<ClassType> cluster = defineCluster();
//TODO define feed and process
for (Map.Entry<String, EnumTypeDefinition> entry : enumTypeDefinitionMap.entrySet()) {
typeMap.put(entry.getKey(), TYPE_SYSTEM.defineEnumType(entry.getValue()));
}
typeMap.putAll(
TYPE_SYSTEM.defineTypes(ImmutableList.copyOf(structTypeDefinitionMap.values()), ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(cluster)));
TYPE_SYSTEM.defineTypes(ImmutableList.copyOf(structTypeDefinitions), ImmutableList.copyOf(traitTypeDefinitions),
ImmutableList.of(cluster));
}
......@@ -76,11 +69,12 @@ public class FalconTypeSystem {
defineACL();
defineClusterInterface();
defineClusterLocation();
defineTags();
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("acl", DefinedTypes.ACL.name(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("tags", TYPE_SYSTEM.defineMapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE).getName(), Multiplicity.OPTIONAL, false, null),
new AttributeDefinition("tags", DefinedTypes.TAG.name(), Multiplicity.COLLECTION, false, null),
new AttributeDefinition("locations", TYPE_SYSTEM.defineMapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE).getName(), Multiplicity.COLLECTION, false, null),
new AttributeDefinition("interfaces", DefinedTypes.CLUSTER_INTERFACE.name(), Multiplicity.COLLECTION, false, null),
new AttributeDefinition("properties", TYPE_SYSTEM.defineMapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE).getName(), Multiplicity.OPTIONAL, false, null),
......@@ -91,7 +85,19 @@ public class FalconTypeSystem {
return cluster;
}
private StructTypeDefinition defineClusterLocation() {
private HierarchicalTypeDefinition<TraitType> defineTags() {
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("name", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null),
new AttributeDefinition("value", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null)
};
HierarchicalTypeDefinition<TraitType> traitType = new HierarchicalTypeDefinition<>(TraitType.class, DefinedTypes.TAG.name(), ImmutableList.<String>of(), attributeDefinitions);
LOG.debug("Created definition for " + DefinedTypes.TAG.name());
traitTypeDefinitions.add(traitType);
return traitType;
}
private StructTypeDefinition defineClusterLocation() throws MetadataException {
EnumValue values[] = {
new EnumValue("WORKING", 1),
new EnumValue("STAGING", 2),
......@@ -100,7 +106,7 @@ public class FalconTypeSystem {
LOG.debug("Created definition for " + DefinedTypes.CLUSTER_LOCATION_TYPE.name());
EnumTypeDefinition locationType = new EnumTypeDefinition(DefinedTypes.CLUSTER_LOCATION_TYPE.name(), values);
enumTypeDefinitionMap.put(locationType.name, locationType);
TYPE_SYSTEM.defineEnumType(locationType);
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("type", DefinedTypes.CLUSTER_LOCATION_TYPE.name(), Multiplicity.REQUIRED, false, null),
......@@ -108,13 +114,13 @@ public class FalconTypeSystem {
};
LOG.debug("Created definition for " + DefinedTypes.CLUSTER_LOCATION.name());
StructTypeDefinition location = new StructTypeDefinition(DefinedTypes.CLUSTER_LOCATION.name(), attributeDefinitions);
structTypeDefinitionMap.put(location.typeName, location);
structTypeDefinitions.add(location);
return location;
}
private StructTypeDefinition defineClusterInterface() {
private StructTypeDefinition defineClusterInterface() throws MetadataException {
EnumValue values[] = {
new EnumValue("READ_ONLY", 1),
new EnumValue("READONLY", 1),
new EnumValue("WRITE", 2),
new EnumValue("EXECUTE", 3),
new EnumValue("WORKFLOW", 4),
......@@ -124,7 +130,7 @@ public class FalconTypeSystem {
LOG.debug("Created definition for " + DefinedTypes.CLUSTER_INTERFACE_TYPE.name());
EnumTypeDefinition interfaceType = new EnumTypeDefinition(DefinedTypes.CLUSTER_INTERFACE_TYPE.name(), values);
enumTypeDefinitionMap.put(interfaceType.name, interfaceType);
TYPE_SYSTEM.defineEnumType(interfaceType);
AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
new AttributeDefinition("type", DefinedTypes.CLUSTER_INTERFACE_TYPE.name(), Multiplicity.REQUIRED, false, null),
......@@ -133,18 +139,19 @@ public class FalconTypeSystem {
};
LOG.debug("Created definition for " + DefinedTypes.CLUSTER_INTERFACE.name());
StructTypeDefinition interfaceEntity = new StructTypeDefinition(DefinedTypes.CLUSTER_INTERFACE.name(), attributeDefinitions);
structTypeDefinitionMap.put(interfaceEntity.typeName, interfaceEntity);
structTypeDefinitions.add(interfaceEntity);
return interfaceEntity;
}
public static enum DefinedTypes {
ACL,
TAG,
CLUSTER,
CLUSTER_INTERFACE,
CLUSTER_INTERFACE_TYPE,
CLUSTER_LOCATION,
CLUSTER_LOCATION_TYPE;
CLUSTER_LOCATION_TYPE
}
private StructTypeDefinition defineACL() {
......@@ -155,11 +162,7 @@ public class FalconTypeSystem {
};
LOG.debug("Created definition for " + DefinedTypes.ACL.name());
StructTypeDefinition acl = new StructTypeDefinition(DefinedTypes.ACL.name(), attributeDefinitions);
structTypeDefinitionMap.put(acl.typeName, acl);
structTypeDefinitions.add(acl);
return acl;
}
public IDataType getDataType(String typeName) {
return typeMap.get(typeName);
}
}
......@@ -18,16 +18,23 @@
package org.apache.metadata.falcon;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfaces;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
import org.apache.falcon.entity.v0.cluster.Locations;
import org.apache.falcon.resource.EntityList;
import org.apache.hadoop.metadata.IReferenceableInstance;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.repository.MetadataRepository;
import org.testng.annotations.Test;
import java.io.StringWriter;
import java.util.UUID;
import static org.mockito.Matchers.any;
......@@ -37,15 +44,15 @@ import static org.mockito.Mockito.when;
public class FalconImporterTest {
@Test
public void testImport() throws MetadataException, FalconCLIException {
public void testImport() throws Exception {
MetadataRepository repo = mock(MetadataRepository.class);
FalconClient client = mock(FalconClient.class);
FalconTypeSystem.getInstance();
FalconImporter importer = new FalconImporter(FalconTypeSystem.getInstance(), client, repo);
when(client.getEntityList(EntityType.CLUSTER.name(), null, null, null, null, null, null, null, null)).thenReturn(getEntityList());
Cluster cluster = new Cluster();
FalconImporter importer = new FalconImporter(client, repo);
when(client.getEntityList(EntityType.CLUSTER.name(), null, null, null, null, null, null, null)).thenReturn(getEntityList());
//TODO Set other fields in cluster
when(client.getDefinition(anyString(), anyString())).thenReturn(cluster);
when(client.getDefinition(anyString(), anyString())).thenReturn(getCluster());
when(repo.createEntity(any(IReferenceableInstance.class), anyString())).thenReturn(UUID.randomUUID().toString());
importer.importClusters();
......@@ -59,4 +66,44 @@ public class FalconImporterTest {
entities[1].name = "c2";
return new EntityList(entities);
}
private Interface getInterface(Interfacetype type, String endpoint) {
Interface clusterInterface = new Interface();
clusterInterface.setEndpoint(endpoint);
clusterInterface.setType(type);
clusterInterface.setVersion("2.2");
return clusterInterface;
}
public String getCluster() throws Exception {
Cluster cluster = new Cluster();
cluster.setName(RandomStringUtils.randomAlphabetic(10));
cluster.setColo(RandomStringUtils.randomAlphabetic(5));
cluster.setTags("owner=xyz,team=abc");
Interfaces interfaces = new Interfaces();
Interface clusterInterface = new Interface();
clusterInterface.setEndpoint("hdfs://localhost:8030");
clusterInterface.setType(Interfacetype.WRITE);
clusterInterface.setVersion("2.2");
interfaces.getInterfaces().add(getInterface(Interfacetype.WRITE, "hdfs://localhost:8030"));
interfaces.getInterfaces().add(getInterface(Interfacetype.READONLY, "hdfs://localhost:8030"));
interfaces.getInterfaces().add(getInterface(Interfacetype.EXECUTE, "http://localhost:8040"));
cluster.setInterfaces(interfaces);
Locations locations = new Locations();
locations.getLocations().add(getLocation());
cluster.setLocations(locations);
StringWriter writer = new StringWriter();
EntityType.CLUSTER.getMarshaller().marshal(cluster, writer);
return writer.toString();
}
public Location getLocation() {
Location location = new Location();
location.setName("staging");
location.setPath("/staging");
return location;
}
}
......@@ -19,12 +19,17 @@
package org.apache.metadata.falcon;
import org.apache.hadoop.metadata.MetadataException;
import org.apache.hadoop.metadata.types.ClassType;
import org.apache.hadoop.metadata.types.TraitType;
import org.apache.hadoop.metadata.types.TypeSystem;
import org.junit.Assert;
import org.testng.annotations.Test;
public class FalconTypeSystemTest {
@Test
public void testTypeSystem() throws MetadataException {
FalconTypeSystem instance = FalconTypeSystem.getInstance();
instance.getDataType(FalconTypeSystem.DefinedTypes.CLUSTER.name());
FalconTypeSystem.getInstance();
Assert.assertNotNull(TypeSystem.getInstance().getDataType(ClassType.class, FalconTypeSystem.DefinedTypes.CLUSTER.name()));
Assert.assertNotNull(TypeSystem.getInstance().getDataType(TraitType.class, FalconTypeSystem.DefinedTypes.TAG.name()));
}
}
......@@ -70,7 +70,7 @@
<titan.version>0.5.3</titan.version>
<hadoop.version>2.5.0</hadoop.version>
<hive.version>0.14.0</hive.version>
<falcon.version>0.6-SNAPSHOT</falcon.version>
<falcon.version>0.6.0.2.2.0.0-2041</falcon.version>
</properties>
<profiles>
......
......@@ -51,7 +51,9 @@ public class EnumType extends AbstractDataType<EnumValue> {
public EnumValue convert(Object val, Multiplicity m) throws MetadataException {
if ( val != null ) {
EnumValue e = null;
if ( val instanceof Integer) {
if (val instanceof EnumValue) {
e = (EnumValue) val;
} else if ( val instanceof Integer) {
e = ordinalMap.get((Integer)val);
} else if ( val instanceof String) {
e = valueMap.get((String)val);
......@@ -77,6 +79,6 @@ public class EnumType extends AbstractDataType<EnumValue> {
}
public EnumValue fromValue(String val) {
return valueMap.get(val);
return valueMap.get(val.trim());
}
}
......@@ -47,14 +47,19 @@ public class HdfsStoreTest extends BaseTest{
fs.delete(new Path(LOCATION), true);
//define type system
HierarchicalTypeDefinition<TraitType> tagTypeDefinition =
createTraitTypeDef("tag",
ImmutableList.<String>of(),
createRequiredAttrDef("level", DataTypes.INT_TYPE));
HierarchicalTypeDefinition<ClassType> databaseTypeDefinition =
createClassTypeDef("database",
ImmutableList.<String>of(),
createRequiredAttrDef("name", DataTypes.STRING_TYPE),
createRequiredAttrDef("description", DataTypes.STRING_TYPE));
createRequiredAttrDef("description", DataTypes.STRING_TYPE),
createRequiredAttrDef("tag", "tag"));
TypeSystem.getInstance().defineTypes(
ImmutableList.<StructTypeDefinition>of(),
ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(),
ImmutableList.of(tagTypeDefinition),
ImmutableList.of(databaseTypeDefinition));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment