Use mastershipService instead of replicaInfoService to determine device mastership

Change-Id: I9d07351bbd024e02b2b116dc011a8eac2f79cda1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index cfbcec8..6b8810b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -26,6 +26,8 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
@@ -37,8 +39,6 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.flow.ReplicaInfo;
-import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.slf4j.Logger;
@@ -73,7 +73,7 @@
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ReplicaInfoService replicaInfoManager;
+    protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
@@ -200,12 +200,12 @@
     @Override
     public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
         final DeviceId deviceId = connectPoint.deviceId();
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
-        if (!replicaInfo.master().isPresent()) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        if (master == null) {
             log.warn("No master for {}", deviceId);
             return Collections.emptySet();
         }
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+        if (master.equals(clusterService.getLocalNode().id())) {
             return getCurrentStatisticInternal(connectPoint);
         } else {
             return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
@@ -213,7 +213,7 @@
                                         GET_CURRENT,
                                         SERIALIZER::encode,
                                         SERIALIZER::decode,
-                                        replicaInfo.master().get()),
+                                        master),
                                    STATISTIC_STORE_TIMEOUT_MILLIS,
                                    TimeUnit.MILLISECONDS,
                                    Collections.emptySet());
@@ -228,12 +228,12 @@
     @Override
     public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
         final DeviceId deviceId = connectPoint.deviceId();
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
-        if (!replicaInfo.master().isPresent()) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        if (master == null) {
             log.warn("No master for {}", deviceId);
             return Collections.emptySet();
         }
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+        if (master.equals(clusterService.getLocalNode().id())) {
             return getPreviousStatisticInternal(connectPoint);
         } else {
             return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
@@ -241,7 +241,7 @@
                                         GET_PREVIOUS,
                                         SERIALIZER::encode,
                                         SERIALIZER::decode,
-                                        replicaInfo.master().get()),
+                                        master),
                                    STATISTIC_STORE_TIMEOUT_MILLIS,
                                    TimeUnit.MILLISECONDS,
                                    Collections.emptySet());
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
index bc1edf1..a014504 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
@@ -24,6 +24,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
 import org.onosproject.incubator.net.resource.label.LabelResource;
 import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
@@ -33,6 +34,7 @@
 import org.onosproject.incubator.net.resource.label.LabelResourcePool;
 import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
 import org.onosproject.incubator.net.resource.label.LabelResourceStore;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
@@ -40,8 +42,6 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.flow.ReplicaInfo;
-import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
@@ -72,7 +72,7 @@
     protected StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ReplicaInfoService replicaInfoManager;
+    protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
@@ -210,27 +210,25 @@
             return false;
         }
 
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(pool
-                .deviceId());
+        NodeId master = mastershipService.getMasterFor(pool.deviceId());
 
-        if (!replicaInfo.master().isPresent()) {
-            log.warn("Failed to getFlowEntries: No master for {}", pool);
+        if (master == null) {
+            log.warn("Failed to create label resource pool: No master for {}", pool);
             return false;
         }
 
-        if (replicaInfo.master().get()
-                .equals(clusterService.getLocalNode().id())) {
+        if (master.equals(clusterService.getLocalNode().id())) {
             return internalCreate(pool);
         }
 
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                  replicaInfo.master().orNull(), pool.deviceId());
+                  master, pool.deviceId());
 
         return complete(clusterCommunicator
                 .sendAndReceive(pool,
                                 LabelResourceMessageSubjects.LABEL_POOL_CREATED,
                                 SERIALIZER::encode, SERIALIZER::decode,
-                                replicaInfo.master().get()));
+                                master));
     }
 
     private boolean internalCreate(LabelResourcePool pool) {
@@ -253,27 +251,26 @@
         if (device == null) {
             return false;
         }
-        ReplicaInfo replicaInfo = replicaInfoManager
-                .getReplicaInfoFor(deviceId);
 
-        if (!replicaInfo.master().isPresent()) {
-            log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+        NodeId master = mastershipService.getMasterFor(deviceId);
+
+        if (master == null) {
+            log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
             return false;
         }
 
-        if (replicaInfo.master().get()
-                .equals(clusterService.getLocalNode().id())) {
+        if (master.equals(clusterService.getLocalNode().id())) {
             return internalDestroy(deviceId);
         }
 
-        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                  replicaInfo.master().orNull(), deviceId);
+        log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+                  master, deviceId);
 
         return complete(clusterCommunicator
                 .sendAndReceive(deviceId,
                                 LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
                                 SERIALIZER::encode, SERIALIZER::decode,
-                                replicaInfo.master().get()));
+                                master));
     }
 
     private boolean internalDestroy(DeviceId deviceId) {
@@ -301,27 +298,25 @@
                                                                 deviceId,
                                                                 LabelResourceRequest.Type.APPLY,
                                                                 applyNum, null);
-        ReplicaInfo replicaInfo = replicaInfoManager
-                .getReplicaInfoFor(deviceId);
+        NodeId master = mastershipService.getMasterFor(deviceId);
 
-        if (!replicaInfo.master().isPresent()) {
-            log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+        if (master == null) {
+            log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
             return Collections.emptyList();
         }
 
-        if (replicaInfo.master().get()
-                .equals(clusterService.getLocalNode().id())) {
+        if (master.equals(clusterService.getLocalNode().id())) {
             return internalApply(request);
         }
 
-        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                  replicaInfo.master().orNull(), deviceId);
+        log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+                  master, deviceId);
 
         return complete(clusterCommunicator
                 .sendAndReceive(request,
                                 LabelResourceMessageSubjects.LABEL_POOL_APPLY,
                                 SERIALIZER::encode, SERIALIZER::decode,
-                                replicaInfo.master().get()));
+                                master));
     }
 
     private Collection<LabelResource> internalApply(LabelResourceRequest request) {
@@ -388,27 +383,25 @@
                                                deviceId,
                                                LabelResourceRequest.Type.RELEASE,
                                                0, collection);
-            ReplicaInfo replicaInfo = replicaInfoManager
-                    .getReplicaInfoFor(deviceId);
+            NodeId master = mastershipService.getMasterFor(deviceId);
 
-            if (!replicaInfo.master().isPresent()) {
-                log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+            if (master == null) {
+                log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
                 return false;
             }
 
-            if (replicaInfo.master().get()
-                    .equals(clusterService.getLocalNode().id())) {
+            if (master.equals(clusterService.getLocalNode().id())) {
                 return internalRelease(request);
             }
 
-            log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                      replicaInfo.master().orNull(), deviceId);
+            log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+                      master, deviceId);
 
             return complete(clusterCommunicator
                     .sendAndReceive(request,
                                     LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
                                     SERIALIZER::encode, SERIALIZER::decode,
-                                    replicaInfo.master().get()));
+                                    master));
         }
         return false;
     }