Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
atlas
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
dataplatform
atlas
Commits
8e5d4827
Commit
8e5d4827
authored
Mar 03, 2015
by
Shwetha GS
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
type system store
parent
0b11134f
Show whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
576 additions
and
64 deletions
+576
-64
pom.xml
falcontypes/pom.xml
+16
-0
FalconImporter.java
.../main/java/org/apache/metadata/falcon/FalconImporter.java
+10
-17
FalconTypeSystem.java
...ain/java/org/apache/metadata/falcon/FalconTypeSystem.java
+14
-22
FalconImporterTest.java
...t/java/org/apache/metadata/falcon/FalconImporterTest.java
+62
-0
FalconTypeSystemTest.java
...java/org/apache/metadata/falcon/FalconTypeSystemTest.java
+30
-0
pom.xml
pom.xml
+13
-13
pom.xml
typesystem/pom.xml
+6
-3
HdfsStore.java
...ava/org/apache/hadoop/metadata/types/store/HdfsStore.java
+156
-0
StorageException.java
.../apache/hadoop/metadata/types/store/StorageException.java
+35
-0
StorageFactory.java
...rg/apache/hadoop/metadata/types/store/StorageFactory.java
+25
-0
TypeSystemStore.java
...g/apache/hadoop/metadata/types/store/TypeSystemStore.java
+85
-0
BaseTest.java
...em/src/test/java/org/apache/hadoop/metadata/BaseTest.java
+1
-1
ClassTest.java
...m/src/test/java/org/apache/hadoop/metadata/ClassTest.java
+1
-1
EnumTest.java
...em/src/test/java/org/apache/hadoop/metadata/EnumTest.java
+1
-1
StorageTest.java
...src/test/java/org/apache/hadoop/metadata/StorageTest.java
+1
-1
StructTest.java
.../src/test/java/org/apache/hadoop/metadata/StructTest.java
+1
-1
TraitTest.java
...m/src/test/java/org/apache/hadoop/metadata/TraitTest.java
+1
-1
VenkateshTest.java
...c/test/java/org/apache/hadoop/metadata/VenkateshTest.java
+0
-2
SerializationJavaTest.java
...rg/apache/hadoop/metadata/json/SerializationJavaTest.java
+1
-1
HdfsStoreTest.java
...org/apache/hadoop/metadata/types/store/HdfsStoreTest.java
+117
-0
No files found.
falcontypes/pom.xml
View file @
8e5d4827
...
...
@@ -35,12 +35,23 @@
<artifactId>
falcon-client
</artifactId>
</dependency>
<!-- falcon-client depends on jersey-client in provided scope. Hence explicit dependency -->
<dependency>
<groupId>
com.sun.jersey
</groupId>
<artifactId>
jersey-client
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.hadoop.metadata
</groupId>
<artifactId>
metadata-typesystem
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.hadoop.metadata
</groupId>
<artifactId>
metadata-repository
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-client
</artifactId>
</dependency>
...
...
@@ -54,5 +65,10 @@
<groupId>
org.testng
</groupId>
<artifactId>
testng
</artifactId>
</dependency>
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-all
</artifactId>
</dependency>
</dependencies>
</project>
falcontypes/src/main/java/org/apache/metadata/falcon/FalconImporter.java
View file @
8e5d4827
...
...
@@ -29,15 +29,13 @@ import org.apache.falcon.entity.v0.cluster.Properties;
import
org.apache.falcon.entity.v0.cluster.Property
;
import
org.apache.falcon.resource.EntityList
;
import
org.apache.hadoop.metadata.ITypedInstance
;
import
org.apache.hadoop.metadata.ITypedReferenceableInstance
;
import
org.apache.hadoop.metadata.MetadataException
;
import
org.apache.hadoop.metadata.Referenceable
;
import
org.apache.hadoop.metadata.Struct
;
import
org.apache.hadoop.metadata.
storage.I
Repository
;
import
org.apache.hadoop.metadata.
repository.Metadata
Repository
;
import
org.apache.hadoop.metadata.types.Multiplicity
;
import
org.apache.hadoop.metadata.types.StructType
;
import
org.parboiled.common.StringUtils
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
...
...
@@ -47,22 +45,22 @@ public class FalconImporter {
private
final
FalconTypeSystem
typeSystem
;
private
final
FalconClient
client
;
private
final
I
Repository
repository
;
private
final
Metadata
Repository
repository
;
@Inject
public
FalconImporter
(
FalconTypeSystem
typeSystem
,
FalconClient
client
,
I
Repository
repo
)
{
public
FalconImporter
(
FalconTypeSystem
typeSystem
,
FalconClient
client
,
Metadata
Repository
repo
)
{
this
.
typeSystem
=
typeSystem
;
this
.
client
=
client
;
this
.
repository
=
repo
;
}
p
rivate
ITypedReferenceableInstance
importClusters
()
throws
FalconCLIException
,
MetadataException
{
p
ublic
void
importClusters
()
throws
FalconCLIException
,
MetadataException
{
EntityList
clusters
=
client
.
getEntityList
(
EntityType
.
CLUSTER
.
name
(),
null
,
null
,
null
,
null
,
null
,
null
,
null
,
null
);
for
(
EntityList
.
EntityElement
element
:
clusters
.
getElements
())
{
Cluster
cluster
=
(
Cluster
)
client
.
getDefinition
(
EntityType
.
CLUSTER
.
name
(),
element
.
name
);
Referenceable
entityRef
=
new
Referenceable
(
FalconTypeSystem
.
DefinedTypes
.
ENTITY
.
name
());
entity
Ref
.
set
(
"name"
,
cluster
.
getName
());
Referenceable
clusterRef
=
new
Referenceable
(
FalconTypeSystem
.
DefinedTypes
.
CLUSTER
.
name
());
cluster
Ref
.
set
(
"name"
,
cluster
.
getName
());
if
(
cluster
.
getACL
()
!=
null
)
{
Struct
acl
=
new
Struct
(
FalconTypeSystem
.
DefinedTypes
.
ACL
.
name
());
...
...
@@ -70,21 +68,17 @@ public class FalconImporter {
acl
.
set
(
"group"
,
cluster
.
getACL
().
getGroup
());
acl
.
set
(
"permission"
,
cluster
.
getACL
().
getPermission
());
StructType
aclType
=
(
StructType
)
typeSystem
.
getDataType
(
FalconTypeSystem
.
DefinedTypes
.
ACL
.
name
());
entity
Ref
.
set
(
"acl"
,
aclType
.
convert
(
acl
,
Multiplicity
.
REQUIRED
));
cluster
Ref
.
set
(
"acl"
,
aclType
.
convert
(
acl
,
Multiplicity
.
REQUIRED
));
}
if
(
StringUtils
.
isNotEmpty
(
cluster
.
getTags
()))
{
entity
Ref
.
set
(
"tags"
,
getMap
(
cluster
.
getTags
()));
cluster
Ref
.
set
(
"tags"
,
getMap
(
cluster
.
getTags
()));
}
if
(
cluster
.
getProperties
()
!=
null
)
{
entity
Ref
.
set
(
"properties"
,
getMap
(
cluster
.
getProperties
()));
cluster
Ref
.
set
(
"properties"
,
getMap
(
cluster
.
getProperties
()));
}
repository
.
create
(
entityRef
);
Referenceable
clusterRef
=
new
Referenceable
(
FalconTypeSystem
.
DefinedTypes
.
CLUSTER
.
name
());
if
(
cluster
.
getLocations
()
!=
null
)
{
List
<
ITypedInstance
>
locations
=
new
ArrayList
<>();
for
(
Location
loc
:
cluster
.
getLocations
().
getLocations
())
{
...
...
@@ -109,9 +103,8 @@ public class FalconImporter {
}
clusterRef
.
set
(
"interfaces"
,
interfaces
);
}
repository
.
createEntity
(
clusterRef
,
clusterRef
.
getTypeName
());
}
return
null
;
}
private
Map
<
String
,
String
>
getMap
(
Properties
properties
)
{
...
...
falcontypes/src/main/java/org/apache/metadata/falcon/FalconTypeSystem.java
View file @
8e5d4827
...
...
@@ -29,6 +29,7 @@ import org.apache.hadoop.metadata.types.HierarchicalTypeDefinition;
import
org.apache.hadoop.metadata.types.IDataType
;
import
org.apache.hadoop.metadata.types.Multiplicity
;
import
org.apache.hadoop.metadata.types.StructTypeDefinition
;
import
org.apache.hadoop.metadata.types.TraitType
;
import
org.apache.hadoop.metadata.types.TypeSystem
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -46,9 +47,9 @@ public class FalconTypeSystem {
private
Map
<
String
,
EnumTypeDefinition
>
enumTypeDefinitionMap
=
new
HashMap
<>();
private
Map
<
String
,
StructTypeDefinition
>
structTypeDefinitionMap
=
new
HashMap
<>();
public
FalconTypeSystem
getInstance
()
throws
MetadataException
{
public
static
FalconTypeSystem
getInstance
()
throws
MetadataException
{
if
(
INSTANCE
==
null
)
{
synchronized
(
this
)
{
synchronized
(
LOG
)
{
if
(
INSTANCE
==
null
)
{
INSTANCE
=
new
FalconTypeSystem
();
}
...
...
@@ -58,27 +59,34 @@ public class FalconTypeSystem {
}
private
FalconTypeSystem
()
throws
MetadataException
{
defineEntity
();
HierarchicalTypeDefinition
<
ClassType
>
cluster
=
defineCluster
();
//TODO define feed and process
for
(
Map
.
Entry
<
String
,
EnumTypeDefinition
>
entry
:
enumTypeDefinitionMap
.
entrySet
())
{
typeMap
.
put
(
entry
.
getKey
(),
TYPE_SYSTEM
.
defineEnumType
(
entry
.
getValue
()));
}
typeMap
.
putAll
(
TYPE_SYSTEM
.
defineTypes
(
ImmutableList
.
copyOf
(
structTypeDefinitionMap
.
values
()),
null
,
TYPE_SYSTEM
.
defineTypes
(
ImmutableList
.
copyOf
(
structTypeDefinitionMap
.
values
()),
ImmutableList
.<
HierarchicalTypeDefinition
<
TraitType
>>
of
()
,
ImmutableList
.
of
(
cluster
)));
}
private
HierarchicalTypeDefinition
<
ClassType
>
defineCluster
()
throws
MetadataException
{
defineACL
();
defineClusterInterface
();
defineClusterLocation
();
AttributeDefinition
[]
attributeDefinitions
=
new
AttributeDefinition
[]{
new
AttributeDefinition
(
"name"
,
DataTypes
.
STRING_TYPE
.
getName
(),
Multiplicity
.
REQUIRED
,
false
,
null
),
new
AttributeDefinition
(
"acl"
,
DefinedTypes
.
ACL
.
name
(),
Multiplicity
.
OPTIONAL
,
false
,
null
),
new
AttributeDefinition
(
"tags"
,
TYPE_SYSTEM
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
DataTypes
.
STRING_TYPE
).
getName
(),
Multiplicity
.
OPTIONAL
,
false
,
null
),
new
AttributeDefinition
(
"locations"
,
TYPE_SYSTEM
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
DataTypes
.
STRING_TYPE
).
getName
(),
Multiplicity
.
COLLECTION
,
false
,
null
),
new
AttributeDefinition
(
"interfaces"
,
DefinedTypes
.
CLUSTER_INTERFACE
.
name
(),
Multiplicity
.
COLLECTION
,
false
,
null
),
new
AttributeDefinition
(
"properties"
,
TYPE_SYSTEM
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
DataTypes
.
STRING_TYPE
).
getName
(),
Multiplicity
.
OPTIONAL
,
false
,
null
),
};
HierarchicalTypeDefinition
<
ClassType
>
cluster
=
new
HierarchicalTypeDefinition
<>(
ClassType
.
class
,
DefinedTypes
.
CLUSTER
.
name
(),
ImmutableList
.
of
(
DefinedTypes
.
ENTITY
.
name
()
),
attributeDefinitions
);
new
HierarchicalTypeDefinition
<>(
ClassType
.
class
,
DefinedTypes
.
CLUSTER
.
name
(),
ImmutableList
.
<
String
>
of
(
),
attributeDefinitions
);
LOG
.
debug
(
"Created definition for "
+
DefinedTypes
.
CLUSTER
.
name
());
return
cluster
;
}
...
...
@@ -129,24 +137,8 @@ public class FalconTypeSystem {
return
interfaceEntity
;
}
private
StructTypeDefinition
defineEntity
()
throws
MetadataException
{
defineACL
();
AttributeDefinition
[]
attributeDefinitions
=
new
AttributeDefinition
[]{
new
AttributeDefinition
(
"name"
,
DataTypes
.
STRING_TYPE
.
getName
(),
Multiplicity
.
REQUIRED
,
false
,
null
),
new
AttributeDefinition
(
"acl"
,
DefinedTypes
.
ACL
.
name
(),
Multiplicity
.
OPTIONAL
,
false
,
null
),
new
AttributeDefinition
(
"tags"
,
TYPE_SYSTEM
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
DataTypes
.
STRING_TYPE
).
getName
(),
Multiplicity
.
OPTIONAL
,
false
,
null
),
new
AttributeDefinition
(
"properties"
,
TYPE_SYSTEM
.
defineMapType
(
DataTypes
.
STRING_TYPE
,
DataTypes
.
STRING_TYPE
).
getName
(),
Multiplicity
.
OPTIONAL
,
false
,
null
),
};
LOG
.
debug
(
"Created definition for "
+
DefinedTypes
.
ENTITY
.
name
());
StructTypeDefinition
entity
=
new
StructTypeDefinition
(
DefinedTypes
.
ENTITY
.
name
(),
attributeDefinitions
);
structTypeDefinitionMap
.
put
(
entity
.
typeName
,
entity
);
return
entity
;
}
public
static
enum
DefinedTypes
{
ACL
,
ENTITY
,
CLUSTER
,
CLUSTER_INTERFACE
,
...
...
falcontypes/src/test/java/org/apache/metadata/falcon/FalconImporterTest.java
0 → 100644
View file @
8e5d4827
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
metadata
.
falcon
;
import
org.apache.falcon.client.FalconCLIException
;
import
org.apache.falcon.client.FalconClient
;
import
org.apache.falcon.entity.v0.EntityType
;
import
org.apache.falcon.entity.v0.cluster.Cluster
;
import
org.apache.falcon.resource.EntityList
;
import
org.apache.hadoop.metadata.IReferenceableInstance
;
import
org.apache.hadoop.metadata.MetadataException
;
import
org.apache.hadoop.metadata.repository.MetadataRepository
;
import
org.testng.annotations.Test
;
import
java.util.UUID
;
import
static
org
.
mockito
.
Matchers
.
any
;
import
static
org
.
mockito
.
Matchers
.
anyString
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
public
class
FalconImporterTest
{
@Test
public
void
testImport
()
throws
MetadataException
,
FalconCLIException
{
MetadataRepository
repo
=
mock
(
MetadataRepository
.
class
);
FalconClient
client
=
mock
(
FalconClient
.
class
);
FalconImporter
importer
=
new
FalconImporter
(
FalconTypeSystem
.
getInstance
(),
client
,
repo
);
when
(
client
.
getEntityList
(
EntityType
.
CLUSTER
.
name
(),
null
,
null
,
null
,
null
,
null
,
null
,
null
,
null
)).
thenReturn
(
getEntityList
());
Cluster
cluster
=
new
Cluster
();
//TODO Set other fields in cluster
when
(
client
.
getDefinition
(
anyString
(),
anyString
())).
thenReturn
(
cluster
);
when
(
repo
.
createEntity
(
any
(
IReferenceableInstance
.
class
),
anyString
())).
thenReturn
(
UUID
.
randomUUID
().
toString
());
importer
.
importClusters
();
}
public
EntityList
getEntityList
()
{
EntityList
.
EntityElement
[]
entities
=
new
EntityList
.
EntityElement
[
2
];
entities
[
0
]
=
new
EntityList
.
EntityElement
();
entities
[
0
].
name
=
"c1"
;
entities
[
1
]
=
new
EntityList
.
EntityElement
();
entities
[
1
].
name
=
"c2"
;
return
new
EntityList
(
entities
);
}
}
falcontypes/src/test/java/org/apache/metadata/falcon/FalconTypeSystemTest.java
0 → 100644
View file @
8e5d4827
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
metadata
.
falcon
;
import
org.apache.hadoop.metadata.MetadataException
;
import
org.testng.annotations.Test
;
public
class
FalconTypeSystemTest
{
@Test
public
void
testTypeSystem
()
throws
MetadataException
{
FalconTypeSystem
instance
=
FalconTypeSystem
.
getInstance
();
instance
.
getDataType
(
FalconTypeSystem
.
DefinedTypes
.
CLUSTER
.
name
());
}
}
pom.xml
View file @
8e5d4827
...
...
@@ -246,21 +246,8 @@
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-client
</artifactId>
<version>
${hadoop.version}
</version>
<scope>
provided
</scope>
<exclusions>
<exclusion>
<groupId>
com.sun.jersey
</groupId>
<artifactId>
jersey-server
</artifactId>
</exclusion>
<exclusion>
<groupId>
com.sun.jersey
</groupId>
<artifactId>
jersey-core
</artifactId>
</exclusion>
<exclusion>
<groupId>
com.sun.jersey
</groupId>
<artifactId>
jersey-json
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.glassfish
</groupId>
<artifactId>
javax.servlet
</artifactId>
</exclusion>
...
...
@@ -322,6 +309,12 @@
<version>
${falcon.version}
</version>
</dependency>
<dependency>
<groupId>
com.sun.jersey
</groupId>
<artifactId>
jersey-client
</artifactId>
<version>
1.9
</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>
org.slf4j
</groupId>
...
...
@@ -569,6 +562,13 @@
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
4.10
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.easymock
</groupId>
<artifactId>
easymock
</artifactId>
<version>
2.4
</version>
...
...
typesystem/pom.xml
View file @
8e5d4827
...
...
@@ -63,6 +63,11 @@
<dependencies>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-client
</artifactId>
</dependency>
<dependency>
<groupId>
org.scala-lang
</groupId>
<artifactId>
scala-compiler
</artifactId>
<version>
${scala.version}
</version>
...
...
@@ -161,11 +166,10 @@
<artifactId>
guava
</artifactId>
<version>
${guava.version}
</version>
</dependency>
<dependency>
<groupId>
junit
</groupId>
<artifactId>
junit
</artifactId>
<version>
4.10
</version>
<scope>
test
</scope>
</dependency>
<dependency>
...
...
@@ -202,7 +206,6 @@
<groupId>
com.thinkaurelius.titan
</groupId>
<artifactId>
titan-es
</artifactId>
</dependency>
</dependencies>
<build>
...
...
typesystem/src/main/java/org/apache/hadoop/metadata/types/store/HdfsStore.java
0 → 100644
View file @
8e5d4827
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
hadoop
.
metadata
.
types
.
store
;
import
com.google.common.collect.ImmutableList
;
import
com.google.inject.Singleton
;
import
org.apache.commons.io.IOUtils
;
import
org.apache.commons.lang.StringUtils
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.fs.FSDataInputStream
;
import
org.apache.hadoop.fs.FSDataOutputStream
;
import
org.apache.hadoop.fs.FileStatus
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.fs.PathFilter
;
import
java.io.IOException
;
import
java.io.StringReader
;
import
java.util.ArrayList
;
import
java.util.List
;
@Singleton
public
class
HdfsStore
extends
TypeSystemStore
{
public
static
final
String
LOCATION_PROPERTY
=
"metadata.typesystem.store.hdfs.location"
;
private
static
final
Path
LOCATION
=
new
Path
(
System
.
getProperty
(
LOCATION_PROPERTY
));
private
static
final
Path
ARCHIVE_LOCATION
=
new
Path
(
LOCATION
,
"ARCHIVE"
);
private
static
final
PathFilter
PATH_FILTER
=
new
PathFilter
()
{
@Override
public
boolean
accept
(
Path
path
)
{
String
name
=
path
.
getName
();
return
!
name
.
startsWith
(
"."
)
&&
!
name
.
startsWith
(
"_"
)
&&
!
name
.
equals
(
ARCHIVE_LOCATION
.
getName
());
}
};
private
static
final
String
UTF8_ENCODING
=
"UTF8"
;
private
static
final
HdfsStore
INSTANCE
=
new
HdfsStore
();
public
static
HdfsStore
getInstance
()
{
return
INSTANCE
;
}
@Override
protected
void
publish
(
String
namespace
,
String
json
)
throws
StorageException
{
FSDataOutputStream
stream
=
null
;
FileSystem
fs
=
null
;
try
{
fs
=
LOCATION
.
getFileSystem
(
new
Configuration
());
Path
jsonFile
=
new
Path
(
LOCATION
,
namespace
+
".json"
);
if
(
fs
.
exists
(
jsonFile
))
{
//TODO check if the new json is same and skip update?
archive
(
namespace
);
}
mkdir
(
fs
,
jsonFile
.
getParent
());
stream
=
fs
.
create
(
jsonFile
);
IOUtils
.
copy
(
new
StringReader
(
json
),
stream
,
UTF8_ENCODING
);
}
catch
(
IOException
e
)
{
throw
new
StorageException
(
namespace
,
e
);
}
finally
{
IOUtils
.
closeQuietly
(
stream
);
closeQuietly
(
fs
);
}
}
@Override
public
synchronized
void
delete
(
String
namespace
)
throws
StorageException
{
archive
(
namespace
);
}
private
void
archive
(
String
namespace
)
throws
StorageException
{
FileSystem
fs
=
null
;
try
{
fs
=
LOCATION
.
getFileSystem
(
new
Configuration
());
Path
jsonFile
=
new
Path
(
LOCATION
,
namespace
+
".json"
);
Path
archivePath
=
new
Path
(
ARCHIVE_LOCATION
,
jsonFile
.
getName
()
+
System
.
currentTimeMillis
());
mkdir
(
fs
,
archivePath
.
getParent
());
if
(!
fs
.
rename
(
jsonFile
,
archivePath
))
{
throw
new
StorageException
(
namespace
);
}
}
catch
(
IOException
e
)
{
throw
new
StorageException
(
namespace
,
e
);
}
finally
{
closeQuietly
(
fs
);
}
}
private
void
mkdir
(
FileSystem
fs
,
Path
path
)
throws
StorageException
{
try
{
if
(!
fs
.
exists
(
path
)
&&
!
fs
.
mkdirs
(
path
))
{
throw
new
StorageException
(
"Failed to create "
+
path
);
}
}
catch
(
IOException
e
)
{
throw
new
StorageException
(
e
);
}
}
@Override
protected
String
fetch
(
String
namespace
)
throws
StorageException
{
FileSystem
fs
=
null
;
FSDataInputStream
stream
=
null
;
try
{
fs
=
LOCATION
.
getFileSystem
(
new
Configuration
());
Path
file
=
new
Path
(
LOCATION
,
namespace
+
".json"
);
stream
=
fs
.
open
(
file
);
return
IOUtils
.
toString
(
stream
,
UTF8_ENCODING
);
}
catch
(
IOException
e
)
{
throw
new
StorageException
(
namespace
,
e
);
}
finally
{
IOUtils
.
closeQuietly
(
stream
);
closeQuietly
(
fs
);
}
}
private
void
closeQuietly
(
FileSystem
fs
)
throws
StorageException
{
if
(
fs
!=
null
)
{
try
{
fs
.
close
();
}
catch
(
IOException
e
)
{
throw
new
StorageException
(
e
);
}
}
}
@Override
protected
ImmutableList
<
String
>
listNamespaces
()
throws
StorageException
{
FileSystem
fs
=
null
;
try
{
fs
=
LOCATION
.
getFileSystem
(
new
Configuration
());
FileStatus
[]
files
=
fs
.
listStatus
(
LOCATION
,
PATH_FILTER
);
List
<
String
>
nameSpaces
=
new
ArrayList
<>();
for
(
FileStatus
file
:
files
)
{
nameSpaces
.
add
(
StringUtils
.
removeEnd
(
file
.
getPath
().
getName
(),
".json"
));
}
return
ImmutableList
.
copyOf
(
nameSpaces
);
}
catch
(
IOException
e
)
{
throw
new
StorageException
(
"list"
,
e
);
}
finally
{
closeQuietly
(
fs
);
}
}
}
typesystem/src/main/java/org/apache/hadoop/metadata/types/store/StorageException.java
0 → 100644
View file @
8e5d4827
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
hadoop
.
metadata
.
types
.
store
;
import
org.apache.hadoop.metadata.MetadataException
;
public
class
StorageException
extends
MetadataException
{
public
StorageException
(
String
nameSpace
)
{
super
(
"Failure in typesystem storage - "
+
nameSpace
);
}
public
StorageException
(
String
nameSpace
,
Throwable
cause
)
{
super
(
"Failure in typesystem storage - "
+
nameSpace
,
cause
);
}
public
StorageException
(
Throwable
cause
)
{
super
(
"Failure in type system storage"
,
cause
);
}
}
typesystem/src/main/java/org/apache/hadoop/metadata/types/store/StorageFactory.java
0 → 100644
View file @
8e5d4827
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
hadoop
.
metadata
.
types
.
store
;
public
class
StorageFactory
{
public
static
TypeSystemStore
getTypeSystemStore
()
{
return
HdfsStore
.
getInstance
();
}
}
typesystem/src/main/java/org/apache/hadoop/metadata/types/store/TypeSystemStore.java
0 → 100644
View file @
8e5d4827
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
hadoop
.
metadata
.
types
.
store
;
import
com.google.common.collect.ImmutableList
;
import
com.google.common.collect.ImmutableMap
;
import
org.apache.hadoop.metadata.MetadataException
;
import
org.apache.hadoop.metadata.TypesDef
;
import
org.apache.hadoop.metadata.json.TypesSerialization
$
;
import
org.apache.hadoop.metadata.types.TypeSystem
;
import
java.util.HashMap
;
import
java.util.Map
;
public
abstract
class
TypeSystemStore
{
/**
* Persist the type system under namespace - insert or update
* @param typeSystem type system to persist
* @param namespace
* @throws StorageException
*/
public
synchronized
void
store
(
TypeSystem
typeSystem
,
String
namespace
)
throws
StorageException
{
String
json
=
TypesSerialization
$
.
MODULE
$
.
toJson
(
typeSystem
,
typeSystem
.
getTypeNames
());
publish
(
namespace
,
json
);
}
/**
* Restore all type definitions
* @return List of persisted type definitions
* @throws MetadataException
*/
public
synchronized
ImmutableMap
<
String
,
TypesDef
>
restore
()
throws
MetadataException
{
ImmutableList
<
String
>
nameSpaces
=
listNamespaces
();
Map
<
String
,
TypesDef
>
typesDefs
=
new
HashMap
<>();
for
(
String
namespace
:
nameSpaces
)
{
String
json
=
fetch
(
namespace
);
typesDefs
.
put
(
namespace
,
TypesSerialization
$
.
MODULE
$
.
fromJson
(
json
));
}
return
ImmutableMap
.
copyOf
(
typesDefs
);
}
/**
* Restore specified namespace as type definition
* @param namespace
* @return type definition
* @throws MetadataException
*/
public
synchronized
TypesDef
restore
(
String
namespace
)
throws
MetadataException
{
String
json
=
fetch
(
namespace
);
return
TypesSerialization
$
.
MODULE
$
.
fromJson
(
json
);
}
/**
* Delete the specified namespace
* @param namespace
* @throws StorageException
*/
public
abstract
void
delete
(
String
namespace
)
throws
StorageException
;
//Interfaces for concrete implementations
protected
abstract
void
publish
(
String
namespace
,
String
json
)
throws
StorageException
;
protected
abstract
String
fetch
(
String
namespace
)
throws
StorageException
;
protected
abstract
ImmutableList
<
String
>
listNamespaces
()
throws
MetadataException
;
}
typesystem/src/test/java/org/apache/hadoop/metadata/BaseTest.java
View file @
8e5d4827
...
...
@@ -51,7 +51,7 @@ public abstract class BaseTest {
}
@Before
public
void
setup
()
throws
Metadata
Exception
{
public
void
setup
()
throws
Exception
{
TypeSystem
ts
=
TypeSystem
.
getInstance
();
ts
.
reset
();
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/ClassTest.java
View file @
8e5d4827
...
...
@@ -27,7 +27,7 @@ import org.junit.Test;
public
class
ClassTest
extends
BaseTest
{
@Before
public
void
setup
()
throws
Metadata
Exception
{
public
void
setup
()
throws
Exception
{
super
.
setup
();
}
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/EnumTest.java
View file @
8e5d4827
...
...
@@ -35,7 +35,7 @@ import java.util.Map;
public
class
EnumTest
extends
BaseTest
{
@Before
public
void
setup
()
throws
Metadata
Exception
{
public
void
setup
()
throws
Exception
{
super
.
setup
();
}
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/StorageTest.java
View file @
8e5d4827
...
...
@@ -30,7 +30,7 @@ import scala.tools.cmd.Meta;
public
class
StorageTest
extends
BaseTest
{
@Before
public
void
setup
()
throws
Metadata
Exception
{
public
void
setup
()
throws
Exception
{
super
.
setup
();
}
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/StructTest.java
View file @
8e5d4827
...
...
@@ -30,7 +30,7 @@ public class StructTest extends BaseTest {
StructType
recursiveStructType
;
@Before
public
void
setup
()
throws
Metadata
Exception
{
public
void
setup
()
throws
Exception
{
super
.
setup
();
structType
=
(
StructType
)
getTypeSystem
().
getDataType
(
StructType
.
class
,
STRUCT_TYPE_1
);
recursiveStructType
=
(
StructType
)
getTypeSystem
().
getDataType
(
StructType
.
class
,
STRUCT_TYPE_2
);
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/TraitTest.java
View file @
8e5d4827
...
...
@@ -28,7 +28,7 @@ public class TraitTest extends BaseTest {
@Before
public
void
setup
()
throws
Metadata
Exception
{
public
void
setup
()
throws
Exception
{
super
.
setup
();
}
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/VenkateshTest.java
View file @
8e5d4827
...
...
@@ -20,10 +20,8 @@ package org.apache.hadoop.metadata;
import
com.google.common.collect.ImmutableList
;
import
org.apache.hadoop.metadata.json.Serialization
$
;
import
org.apache.hadoop.metadata.json.TypesSerialization
;
import
org.apache.hadoop.metadata.json.TypesSerialization
$
;
import
org.apache.hadoop.metadata.types.*
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
java.util.ArrayList
;
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/json/SerializationJavaTest.java
View file @
8e5d4827
...
...
@@ -32,7 +32,7 @@ public class SerializationJavaTest extends BaseTest {
@Before
public
void
setup
()
throws
Metadata
Exception
{
public
void
setup
()
throws
Exception
{
super
.
setup
();
}
...
...
typesystem/src/test/java/org/apache/hadoop/metadata/types/store/HdfsStoreTest.java
0 → 100644
View file @
8e5d4827
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org
.
apache
.
hadoop
.
metadata
.
types
.
store
;
import
com.google.common.collect.ImmutableList
;
import
com.google.common.collect.ImmutableMap
;
import
junit.framework.Assert
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.fs.FileStatus
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.metadata.BaseTest
;
import
org.apache.hadoop.metadata.TypesDef
;
import
org.apache.hadoop.metadata.types.ClassType
;
import
org.apache.hadoop.metadata.types.DataTypes
;
import
org.apache.hadoop.metadata.types.HierarchicalTypeDefinition
;
import
org.apache.hadoop.metadata.types.StructTypeDefinition
;
import
org.apache.hadoop.metadata.types.TraitType
;
import
org.apache.hadoop.metadata.types.TypeSystem
;
import
org.junit.Before
;
import
org.junit.Test
;
public
class
HdfsStoreTest
extends
BaseTest
{
private
static
final
String
LOCATION
=
"target/type-store"
;
@Before
public
void
setup
()
throws
Exception
{
super
.
setup
();
System
.
setProperty
(
HdfsStore
.
LOCATION_PROPERTY
,
LOCATION
);
FileSystem
fs
=
FileSystem
.
get
(
new
Configuration
());
fs
.
delete
(
new
Path
(
LOCATION
),
true
);
//define type system
HierarchicalTypeDefinition
<
ClassType
>
databaseTypeDefinition
=
createClassTypeDef
(
"database"
,
ImmutableList
.<
String
>
of
(),
createRequiredAttrDef
(
"name"
,
DataTypes
.
STRING_TYPE
),
createRequiredAttrDef
(
"description"
,
DataTypes
.
STRING_TYPE
));
TypeSystem
.
getInstance
().
defineTypes
(
ImmutableList
.<
StructTypeDefinition
>
of
(),
ImmutableList
.<
HierarchicalTypeDefinition
<
TraitType
>>
of
(),
ImmutableList
.
of
(
databaseTypeDefinition
));
}
@Test
public
void
testStore
()
throws
Exception
{
TypeSystemStore
store
=
new
HdfsStore
();
TypeSystem
typeSystem
=
TypeSystem
.
getInstance
();
store
.
store
(
typeSystem
,
"hive"
);
FileSystem
fs
=
FileSystem
.
get
(
new
Configuration
());
Assert
.
assertTrue
(
fs
.
exists
(
new
Path
(
LOCATION
,
"hive.json"
)));
TypesDef
typeDef
=
store
.
restore
(
"hive"
);
Assert
.
assertNotNull
(
typeDef
);
Assert
.
assertEquals
(
1
,
typeDef
.
classTypesAsJavaList
().
size
());
Assert
.
assertEquals
(
"database"
,
typeDef
.
classTypesAsJavaList
().
get
(
0
).
typeName
);
}
@Test
public
void
testRestore
()
throws
Exception
{
TypeSystemStore
store
=
new
HdfsStore
();
TypeSystem
typeSystem
=
TypeSystem
.
getInstance
();
store
.
store
(
typeSystem
,
"hive"
);
store
.
store
(
typeSystem
,
"falcon"
);
ImmutableMap
<
String
,
TypesDef
>
typeDef
=
store
.
restore
();
Assert
.
assertEquals
(
2
,
typeDef
.
size
());
Assert
.
assertEquals
(
1
,
typeDef
.
get
(
"falcon"
).
classTypesAsJavaList
().
size
());
Assert
.
assertEquals
(
"database"
,
typeDef
.
get
(
"falcon"
).
classTypesAsJavaList
().
get
(
0
).
typeName
);
}
@Test
public
void
testArchive
()
throws
Exception
{
TypeSystemStore
store
=
new
HdfsStore
();
TypeSystem
typeSystem
=
TypeSystem
.
getInstance
();
store
.
store
(
typeSystem
,
"hive"
);
//insert
store
.
store
(
typeSystem
,
"hive"
);
//update
FileSystem
fs
=
FileSystem
.
get
(
new
Configuration
());
Assert
.
assertTrue
(
fs
.
exists
(
new
Path
(
LOCATION
,
"hive.json"
)));
FileStatus
[]
files
=
fs
.
listStatus
(
new
Path
(
LOCATION
,
"ARCHIVE"
));
Assert
.
assertEquals
(
1
,
files
.
length
);
Assert
.
assertTrue
(
files
[
0
].
getPath
().
getName
().
startsWith
(
"hive.json"
));
}
@Test
public
void
testDelete
()
throws
Exception
{
TypeSystemStore
store
=
new
HdfsStore
();
TypeSystem
typeSystem
=
TypeSystem
.
getInstance
();
store
.
store
(
typeSystem
,
"hive"
);
store
.
delete
(
"hive"
);
FileSystem
fs
=
FileSystem
.
get
(
new
Configuration
());
Assert
.
assertFalse
(
fs
.
exists
(
new
Path
(
LOCATION
,
"hive.json"
)));
FileStatus
[]
files
=
fs
.
listStatus
(
new
Path
(
LOCATION
,
"ARCHIVE"
));
Assert
.
assertEquals
(
1
,
files
.
length
);
Assert
.
assertTrue
(
files
[
0
].
getPath
().
getName
().
startsWith
(
"hive.json"
));
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment