Commit 3ba4a3fe by Ashutosh Mestry

ATLAS-3089: PC Framework Moved to Intg Module

parent dcf0404e
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder; import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer; import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.JsonNodeParsers.ParseElement; import org.apache.atlas.repository.graphdb.janus.migration.JsonNodeParsers.ParseElement;
import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Graph;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -206,7 +206,7 @@ public class JsonNodeProcessManager { ...@@ -206,7 +206,7 @@ public class JsonNodeProcessManager {
} }
} }
static class WorkItemManager extends org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager { static class WorkItemManager extends org.apache.atlas.pc.WorkItemManager {
public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) { public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) {
super(builder, batchSize, numWorkers); super(builder, batchSize, numWorkers);
} }
......
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder; import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer; import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager; import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.graphdb.janus.migration.postProcess.PostProcessListProperty; import org.apache.atlas.repository.graphdb.janus.migration.postProcess.PostProcessListProperty;
import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.Vertex;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.repository.graphdb.janus.migration.pc; package org.apache.atlas.pc;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
......
...@@ -16,9 +16,8 @@ ...@@ -16,9 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.repository.graphdb.janus.migration.pc; package org.apache.atlas.pc;
import com.google.common.base.Stopwatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -32,7 +31,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -32,7 +31,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
private final BlockingQueue<T> queue; private final BlockingQueue<T> queue;
private boolean isDirty = false; private boolean isDirty = false;
private long maxCommitTimeSeconds = 0; private long maxCommitTimeInMs = 0;
public WorkItemConsumer(BlockingQueue<T> queue) { public WorkItemConsumer(BlockingQueue<T> queue) {
this.queue = queue; this.queue = queue;
...@@ -58,7 +57,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -58,7 +57,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
} }
public long getMaxCommitTimeSeconds() { public long getMaxCommitTimeSeconds() {
return (this.maxCommitTimeSeconds > 0 ? this.maxCommitTimeSeconds : 15); return (this.maxCommitTimeInMs > 0 ? this.maxCommitTimeInMs / 1000 : 15);
} }
protected void commitDirty() { protected void commitDirty() {
...@@ -71,13 +70,13 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -71,13 +70,13 @@ public abstract class WorkItemConsumer<T> implements Runnable {
} }
protected void commit() { protected void commit() {
Stopwatch sw = Stopwatch.createStarted(); long start = System.currentTimeMillis();
doCommit(); doCommit();
sw.stop(); long end = System.currentTimeMillis();
updateCommitTime(sw.elapsed(TimeUnit.SECONDS)); updateCommitTime((end - start));
isDirty = false; isDirty = false;
} }
...@@ -87,8 +86,8 @@ public abstract class WorkItemConsumer<T> implements Runnable { ...@@ -87,8 +86,8 @@ public abstract class WorkItemConsumer<T> implements Runnable {
protected abstract void processItem(T item); protected abstract void processItem(T item);
protected void updateCommitTime(long commitTime) { protected void updateCommitTime(long commitTime) {
if (this.maxCommitTimeSeconds < commitTime) { if (this.maxCommitTimeInMs < commitTime) {
this.maxCommitTimeSeconds = commitTime; this.maxCommitTimeInMs = commitTime;
} }
} }
} }
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.repository.graphdb.janus.migration.pc; package org.apache.atlas.pc;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
...@@ -16,9 +16,8 @@ ...@@ -16,9 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.pc;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
......
...@@ -16,11 +16,8 @@ ...@@ -16,11 +16,8 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.atlas.repository.graphdb.janus.migration; package org.apache.atlas.pc;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemBuilder;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.janus.migration.pc.WorkItemManager;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment