Getting local flow entries and groups for disconnected devices
Change-Id: I52747b347ce1c89b41ae9d746a2b5038c30a7c7b
(cherry picked from commit 72ab6e5592e71a0466d7bac06b1eeeb7e0513cd4)
(cherry picked from commit 5daa7c463b783514430cab48904b71106441faf2)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index ff89a6f..c438885 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -87,6 +87,7 @@
private final DeviceId deviceId;
private final ClusterCommunicationService clusterCommunicator;
+ private final ClusterService clusterService;
private final DeviceService deviceService;
private final LifecycleManager lifecycleManager;
private final ScheduledExecutorService scheduler;
@@ -127,6 +128,7 @@
long antiEntropyPeriod) {
this.deviceId = deviceId;
this.clusterCommunicator = clusterCommunicator;
+ this.clusterService = clusterService;
this.lifecycleManager = lifecycleManager;
this.deviceService = deviceService;
this.scheduler = scheduler;
@@ -236,7 +238,6 @@
*/
private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
-
// If the local node is the master, fetch the entries locally. Otherwise, request the entries
// from the current master. Note that there's a change of a brief cycle during a mastership change.
if (replicaInfo.isMaster(localNodeId)) {
@@ -254,6 +255,18 @@
Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
} else if (deviceService.isAvailable(deviceId)) {
throw new FlowRuleStoreException("There is no master for available device " + deviceId);
+ } else if (clusterService.getNodes().size() <= 1 + ECFlowRuleStore.backupCount) {
+ //TODO remove this check when [ONOS-8080] is fixed
+ //When device is not available and has no master and
+ // the number of nodes surpasses the guaranteed backup count,
+ // we are certain that this node has a replica.
+ // -- DISCLAIMER --
+ // You manually need to set the backup count for clusters > 3 nodes,
+ // the default is 2, which handles the single instance and 3 node scenarios
+ return CompletableFuture.completedFuture(
+ bucket.getFlowBucket().values().stream()
+ .flatMap(entries -> entries.values().stream())
+ .collect(Collectors.toSet()));
} else {
return CompletableFuture.completedFuture(Collections.emptySet());
}
@@ -823,8 +836,10 @@
this.replicaInfo = replicaInfo;
}
- // If the local node is neither the master or a backup for the device, clear the flow table.
- if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
+ // If the local node is neither the master or a backup for the device,
+ // and the number of nodes surpasses the guaranteed backup count, clear the flow table.
+ if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId) &&
+ (clusterService.getNodes().size() > 1 + ECFlowRuleStore.backupCount)) {
flowBuckets.values().forEach(bucket -> bucket.clear());
}
activeTerm = replicaInfo.term();
@@ -839,10 +854,11 @@
this.replicaInfo = replicaInfo;
// If the local node is neither the master or a backup for the device *and the term is active*,
- // clear the flow table.
+ // and the number of nodes surpasses the guaranteed backup count, clear the flow table.
if (activeTerm == replicaInfo.term()
&& !replicaInfo.isMaster(localNodeId)
- && !replicaInfo.isBackup(localNodeId)) {
+ && !replicaInfo.isBackup(localNodeId)
+ && (clusterService.getNodes().size() > 1 + ECFlowRuleStore.backupCount)) {
flowBuckets.values().forEach(bucket -> bucket.clear());
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 7949dc5..354b494 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -151,7 +151,7 @@
private boolean persistenceEnabled = EC_FLOW_RULE_STORE_PERSISTENCE_ENABLED_DEFAULT;
/** Max number of backup copies for each device. */
- private volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
+ protected static volatile int backupCount = MAX_BACKUP_COUNT_DEFAULT;
private InternalFlowTable flowTable = new InternalFlowTable();
@@ -370,12 +370,12 @@
@Override
public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
+ if (master == null && deviceService.isAvailable(deviceId)) {
log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
return 0;
}
- if (Objects.equals(local, master)) {
+ if (Objects.equals(local, master) || master == null) {
return flowTable.getFlowRuleCount(deviceId, state);
}
@@ -395,12 +395,12 @@
public FlowEntry getFlowEntry(FlowRule rule) {
NodeId master = mastershipService.getMasterFor(rule.deviceId());
- if (master == null) {
+ if (master == null && deviceService.isAvailable(rule.deviceId())) {
log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
return null;
}
- if (Objects.equals(local, master)) {
+ if (Objects.equals(local, master) || master == null) {
return flowTable.getFlowEntry(rule);
}
@@ -892,7 +892,7 @@
public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
+ if (master == null && deviceService.isAvailable(deviceId)) {
log.debug("Failed to getTableStats: No master for {}", deviceId);
return Collections.emptyList();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 87355d6..8db3385 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -27,6 +27,7 @@
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupBucket;
@@ -140,6 +141,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService cfgService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
// Guarantees enabling DriverService before enabling GroupStore
// (DriverService is used in serializing/de-serializing DefaultGroup)
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -379,7 +383,7 @@
private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
+ if (master == null && deviceService.isAvailable(deviceId)) {
log.debug("Failed to getGroups: No master for {}", deviceId);
return Collections.emptySet();
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index efa080d..9e0dd9d 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -33,6 +33,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroup;
@@ -189,6 +190,7 @@
groupStoreImpl.clusterCommunicator = new ClusterCommunicationServiceAdapter();
groupStoreImpl.mastershipService = new MasterOfAll();
groupStoreImpl.cfgService = new ComponentConfigAdapter();
+ groupStoreImpl.deviceService = new InternalDeviceServiceImpl();
ClusterService mockClusterService = createMock(ClusterService.class);
NodeId nodeId = new NodeId(NODE_ID);
@@ -661,4 +663,10 @@
}
+ private class InternalDeviceServiceImpl extends DeviceServiceAdapter {
+ @Override
+ public boolean isAvailable(DeviceId deviceId) {
+ return true;
+ }
+ }
}