Refactored IntentPartitionService as WorkPartitionService

Change-Id: Ic5cf1978b7fce55b34f84eae9b03c8f9ddcfb9c1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index e914579..2eb1d2e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -32,7 +32,7 @@
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentEvent;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionService;
 import org.onosproject.net.intent.IntentState;
 import org.onosproject.net.intent.IntentStore;
 import org.onosproject.net.intent.IntentStoreDelegate;
@@ -84,7 +84,7 @@
     protected StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected IntentPartitionService partitionService;
+    protected WorkPartitionService partitionService;
 
     private final AtomicLong sequenceNumber = new AtomicLong(0);
 
@@ -211,7 +211,7 @@
     }
 
     private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
-        NodeId master = partitionService.getLeader(key);
+        NodeId master = partitionService.getLeader(key, Key::hash);
         NodeId origin = (data != null) ? data.origin() : null;
         if (data != null && (master == null || origin == null)) {
             log.debug("Intent {} missing master and/or origin; master = {}, origin = {}",
@@ -283,7 +283,7 @@
 
     @Override
     public boolean isMaster(Key intentKey) {
-        return partitionService.isMine(intentKey);
+        return partitionService.isMine(intentKey, Key::hash);
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
similarity index 77%
rename from core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
rename to core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
index ab266b6..337bc55 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
@@ -29,10 +29,9 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.event.ListenerRegistry;
-import org.onosproject.net.intent.IntentPartitionEvent;
-import org.onosproject.net.intent.IntentPartitionEventListener;
-import org.onosproject.net.intent.IntentPartitionService;
-import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.WorkPartitionEvent;
+import org.onosproject.net.intent.WorkPartitionEventListener;
+import org.onosproject.net.intent.WorkPartitionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,17 +43,18 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
- * Manages the assignment of intent keyspace partitions to instances.
+ * Manages the assignment of work partitions to instances.
  */
 @Component(immediate = true)
 @Service
-public class IntentPartitionManager implements IntentPartitionService {
+public class WorkPartitionManager implements WorkPartitionService {
 
-    private static final Logger log = LoggerFactory.getLogger(IntentPartitionManager.class);
+    private static final Logger log = LoggerFactory.getLogger(WorkPartitionManager.class);
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected LeadershipService leadershipService;
@@ -72,14 +72,14 @@
     private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
     private static final int RETRY_AFTER_DELAY_SEC = 5;
 
-    private static final String ELECTION_PREFIX = "intent-partition-";
+    private static final String ELECTION_PREFIX = "work-partition-";
 
     protected NodeId localNodeId;
-    private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
+    private ListenerRegistry<WorkPartitionEvent, WorkPartitionEventListener> listenerRegistry;
     private LeadershipEventListener leaderListener = new InternalLeadershipListener();
 
     private ScheduledExecutorService executor = Executors
-            .newScheduledThreadPool(1, groupedThreads("IntentPartition", "balancer-%d", log));
+            .newScheduledThreadPool(1, groupedThreads("work-parition", "balancer-%d", log));
 
     @Activate
     public void activate() {
@@ -87,7 +87,7 @@
         leadershipService.addListener(leaderListener);
 
         listenerRegistry = new ListenerRegistry<>();
-        eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
+        eventDispatcher.addSink(WorkPartitionEvent.class, listenerRegistry);
 
         for (int i = 0; i < NUM_PARTITIONS; i++) {
             leadershipService.runForLeadership(getPartitionPath(i));
@@ -103,7 +103,7 @@
     public void deactivate() {
         executor.shutdownNow();
 
-        eventDispatcher.removeSink(IntentPartitionEvent.class);
+        eventDispatcher.removeSink(WorkPartitionEvent.class);
         leadershipService.removeListener(leaderListener);
         log.info("Stopped");
     }
@@ -112,9 +112,9 @@
      * Sets the specified executor to be used for scheduling background tasks.
      *
      * @param executor scheduled executor service for background tasks
-     * @return this PartitionManager
+     * @return this WorkPartitionManager
      */
-    IntentPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
+    WorkPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
         this.executor = executor;
         return this;
     }
@@ -123,38 +123,25 @@
         return ELECTION_PREFIX + i;
     }
 
-    private String getPartitionPath(PartitionId id) {
-        return getPartitionPath(id.value());
-    }
-
-    private PartitionId getPartitionForKey(Key intentKey) {
-        int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS;
-        //TODO investigate Guava consistent hash method
-        // ... does it add significant computational complexity? is it worth it?
-        //int partition = consistentHash(intentKey.hash(), NUM_PARTITIONS);
-        PartitionId id = new PartitionId(partition);
-        return id;
+    @Override
+    public <K> boolean isMine(K id, Function<K, Long> hasher) {
+        return Objects.equals(localNodeId, getLeader(id, hasher));
     }
 
     @Override
-    public boolean isMine(Key intentKey) {
-        return Objects.equals(leadershipService.getLeadership(getPartitionPath(getPartitionForKey(intentKey)))
-                                               .leaderNodeId(),
-                              localNodeId);
+    public <K> NodeId getLeader(K id, Function<K, Long> hasher) {
+        int partition = Math.abs(hasher.apply(id).intValue()) % NUM_PARTITIONS;
+        PartitionId partitionId = new PartitionId(partition);
+        return leadershipService.getLeadership(getPartitionPath(partitionId.value())).leaderNodeId();
     }
 
     @Override
-    public NodeId getLeader(Key intentKey) {
-        return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
-    }
-
-    @Override
-    public void addListener(IntentPartitionEventListener listener) {
+    public void addListener(WorkPartitionEventListener listener) {
         listenerRegistry.addListener(listener);
     }
 
     @Override
-    public void removeListener(IntentPartitionEventListener listener) {
+    public void removeListener(WorkPartitionEventListener listener) {
         listenerRegistry.removeListener(listener);
     }
 
@@ -235,7 +222,7 @@
             if (Objects.equals(leadership.leaderNodeId(), localNodeId) &&
                     leadership.topic().startsWith(ELECTION_PREFIX)) {
 
-                eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
+                eventDispatcher.post(new WorkPartitionEvent(WorkPartitionEvent.Type.LEADER_CHANGED,
                                                         leadership.topic()));
             }