Use mastershipService instead of replicaInfoService to determine device mastership
Change-Id: I9d07351bbd024e02b2b116dc011a8eac2f79cda1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index cfbcec8..6b8810b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -26,6 +26,8 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
@@ -37,8 +39,6 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.flow.ReplicaInfo;
-import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
@@ -73,7 +73,7 @@
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ReplicaInfoService replicaInfoManager;
+ protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@@ -200,12 +200,12 @@
@Override
public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
final DeviceId deviceId = connectPoint.deviceId();
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
- if (!replicaInfo.master().isPresent()) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
log.warn("No master for {}", deviceId);
return Collections.emptySet();
}
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ if (master.equals(clusterService.getLocalNode().id())) {
return getCurrentStatisticInternal(connectPoint);
} else {
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
@@ -213,7 +213,7 @@
GET_CURRENT,
SERIALIZER::encode,
SERIALIZER::decode,
- replicaInfo.master().get()),
+ master),
STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
Collections.emptySet());
@@ -228,12 +228,12 @@
@Override
public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
final DeviceId deviceId = connectPoint.deviceId();
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
- if (!replicaInfo.master().isPresent()) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
log.warn("No master for {}", deviceId);
return Collections.emptySet();
}
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ if (master.equals(clusterService.getLocalNode().id())) {
return getPreviousStatisticInternal(connectPoint);
} else {
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
@@ -241,7 +241,7 @@
GET_PREVIOUS,
SERIALIZER::encode,
SERIALIZER::decode,
- replicaInfo.master().get()),
+ master),
STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
Collections.emptySet());
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
index bc1edf1..a014504 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
@@ -24,6 +24,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
import org.onosproject.incubator.net.resource.label.LabelResource;
import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
@@ -33,6 +34,7 @@
import org.onosproject.incubator.net.resource.label.LabelResourcePool;
import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
import org.onosproject.incubator.net.resource.label.LabelResourceStore;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
@@ -40,8 +42,6 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.flow.ReplicaInfo;
-import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
@@ -72,7 +72,7 @@
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ReplicaInfoService replicaInfoManager;
+ protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@@ -210,27 +210,25 @@
return false;
}
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(pool
- .deviceId());
+ NodeId master = mastershipService.getMasterFor(pool.deviceId());
- if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to getFlowEntries: No master for {}", pool);
+ if (master == null) {
+ log.warn("Failed to create label resource pool: No master for {}", pool);
return false;
}
- if (replicaInfo.master().get()
- .equals(clusterService.getLocalNode().id())) {
+ if (master.equals(clusterService.getLocalNode().id())) {
return internalCreate(pool);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), pool.deviceId());
+ master, pool.deviceId());
return complete(clusterCommunicator
.sendAndReceive(pool,
LabelResourceMessageSubjects.LABEL_POOL_CREATED,
SERIALIZER::encode, SERIALIZER::decode,
- replicaInfo.master().get()));
+ master));
}
private boolean internalCreate(LabelResourcePool pool) {
@@ -253,27 +251,26 @@
if (device == null) {
return false;
}
- ReplicaInfo replicaInfo = replicaInfoManager
- .getReplicaInfoFor(deviceId);
- if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (master == null) {
+ log.warn("Failed to destroyDevicePool. No master for {}", deviceId);
return false;
}
- if (replicaInfo.master().get()
- .equals(clusterService.getLocalNode().id())) {
+ if (master.equals(clusterService.getLocalNode().id())) {
return internalDestroy(deviceId);
}
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
return complete(clusterCommunicator
.sendAndReceive(deviceId,
LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
SERIALIZER::encode, SERIALIZER::decode,
- replicaInfo.master().get()));
+ master));
}
private boolean internalDestroy(DeviceId deviceId) {
@@ -301,27 +298,25 @@
deviceId,
LabelResourceRequest.Type.APPLY,
applyNum, null);
- ReplicaInfo replicaInfo = replicaInfoManager
- .getReplicaInfoFor(deviceId);
+ NodeId master = mastershipService.getMasterFor(deviceId);
- if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+ if (master == null) {
+ log.warn("Failed to applyFromDevicePool: No master for {}", deviceId);
return Collections.emptyList();
}
- if (replicaInfo.master().get()
- .equals(clusterService.getLocalNode().id())) {
+ if (master.equals(clusterService.getLocalNode().id())) {
return internalApply(request);
}
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
return complete(clusterCommunicator
.sendAndReceive(request,
LabelResourceMessageSubjects.LABEL_POOL_APPLY,
SERIALIZER::encode, SERIALIZER::decode,
- replicaInfo.master().get()));
+ master));
}
private Collection<LabelResource> internalApply(LabelResourceRequest request) {
@@ -388,27 +383,25 @@
deviceId,
LabelResourceRequest.Type.RELEASE,
0, collection);
- ReplicaInfo replicaInfo = replicaInfoManager
- .getReplicaInfoFor(deviceId);
+ NodeId master = mastershipService.getMasterFor(deviceId);
- if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+ if (master == null) {
+ log.warn("Failed to releaseToDevicePool: No master for {}", deviceId);
return false;
}
- if (replicaInfo.master().get()
- .equals(clusterService.getLocalNode().id())) {
+ if (master.equals(clusterService.getLocalNode().id())) {
return internalRelease(request);
}
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
+ log.trace("Forwarding request to {}, which is the primary (master) for device {}",
+ master, deviceId);
return complete(clusterCommunicator
.sendAndReceive(request,
LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
SERIALIZER::encode, SERIALIZER::decode,
- replicaInfo.master().get()));
+ master));
}
return false;
}