Avoids delete of dataplane state during mastership change
Leftover in the flow stats creates duplicate flow stats entry.
These entries were considered as flows not in the store and thus removed
Additionally adds further guards during the processing of the stats and
updates unit tests
Change-Id: Iba07996e1413c54374b7a4ce7efd21109b429eeb
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index bdc48ac..d601d8d 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -23,6 +23,8 @@
import com.google.common.collect.Sets;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
@@ -70,6 +72,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -148,6 +151,8 @@
private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
+ private NodeId local;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected FlowRuleStore store;
@@ -166,6 +171,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
@Activate
public void activate(ComponentContext context) {
store.setDelegate(delegate);
@@ -174,6 +182,7 @@
cfgService.registerProperties(getClass());
modified(context);
idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+ local = clusterService.getLocalNode().id();
log.info("Started");
}
@@ -492,7 +501,7 @@
log.debug("Flow {} is on switch but not in store.", flowRule);
}
- private void flowAdded(FlowEntry flowEntry) {
+ private boolean flowAdded(FlowEntry flowEntry) {
checkNotNull(flowEntry, FLOW_RULE_NULL);
checkValidity();
@@ -500,6 +509,7 @@
FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
if (event == null) {
log.debug("No flow store event generated.");
+ return false;
} else {
log.trace("Flow {} {}", flowEntry, event.type());
post(event);
@@ -508,6 +518,7 @@
log.debug("Removing flow rules....");
removeFlowRules(flowEntry);
}
+ return true;
}
private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
@@ -565,15 +576,32 @@
boolean useMissingFlow) {
Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f));
+ NodeId master;
+ boolean done;
+ // Processing flow rules
for (FlowEntry rule : flowEntries) {
try {
FlowEntry storedRule = storedRules.remove(rule);
if (storedRule != null) {
if (storedRule.exactMatch(rule)) {
// we both have the rule, let's update some info then.
- flowAdded(rule);
+ done = flowAdded(rule);
+ if (!done) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to update the flow stats while the node was not the master");
+ return;
+ }
+ }
} else {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to update the flows while the node was not the master");
+ return;
+ }
// the two rules are not an exact match - remove the
// switch's rule and install our rule
extraneousFlow(rule);
@@ -582,9 +610,23 @@
} else {
// the device has a rule the store does not have
if (!allowExtraneousRules) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to remove flows while the node was not the master");
+ return;
+ }
extraneousFlow(rule);
} else if (importExtraneousRules) { // Stores the rule, if so is indicated
- store.addOrUpdateFlowRule(rule);
+ FlowRuleEvent flowRuleEvent = store.addOrUpdateFlowRule(rule);
+ if (flowRuleEvent == null) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to import flows while the node was not the master");
+ return;
+ }
+ }
}
}
} catch (Exception e) {
@@ -596,6 +638,12 @@
// DO NOT reinstall
if (useMissingFlow) {
for (FlowEntry rule : storedRules.keySet()) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to install missing rules while the node was not the master");
+ return;
+ }
try {
// there are rules in the store that aren't on the switch
log.debug("Adding the rule that is present in store but not on switch : {}", rule);
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index e0b535f..3801388 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -25,7 +25,12 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestTools;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
@@ -86,6 +91,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import static org.easymock.EasyMock.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -128,6 +134,7 @@
private ApplicationId appId;
private TestDriverManager driverService;
+ private static final String NODE_ID = "1";
@Before
public void setUp() {
@@ -140,6 +147,15 @@
mgr.operationsService = MoreExecutors.newDirectExecutorService();
mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
mgr.cfgService = new ComponentConfigAdapter();
+
+ ClusterService mockClusterService = createMock(ClusterService.class);
+ NodeId nodeId = new NodeId(NODE_ID);
+ MockControllerNode mockControllerNode = new MockControllerNode(nodeId);
+ expect(mockClusterService.getLocalNode())
+ .andReturn(mockControllerNode).anyTimes();
+ replay(mockClusterService);
+ mgr.clusterService = mockClusterService;
+
service = mgr;
registry = mgr;
@@ -701,6 +717,11 @@
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
}
+
+ @Override
+ public NodeId getMasterFor(DeviceId deviceId) {
+ return new NodeId(NODE_ID);
+ }
}
private class TestDriverManager extends DriverManager {
@@ -736,4 +757,37 @@
return rules;
}
}
+
+ private static class MockControllerNode implements ControllerNode {
+ final NodeId id;
+
+ public MockControllerNode(NodeId id) {
+ this.id = id;
+ }
+
+ @Override
+ public NodeId id() {
+ return this.id;
+ }
+
+ @Override
+ public Ip4Address ip() {
+ return Ip4Address.valueOf("127.0.0.1");
+ }
+
+ @Override
+ public IpAddress ip(boolean resolve) {
+ return null;
+ }
+
+ @Override
+ public String host() {
+ return null;
+ }
+
+ @Override
+ public int tcpPort() {
+ return 0;
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index f3206c3..ff89a6f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -40,10 +40,12 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.FlowRuleStoreException;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -85,6 +87,7 @@
private final DeviceId deviceId;
private final ClusterCommunicationService clusterCommunicator;
+ private final DeviceService deviceService;
private final LifecycleManager lifecycleManager;
private final ScheduledExecutorService scheduler;
private final Executor executor;
@@ -117,6 +120,7 @@
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
LifecycleManager lifecycleManager,
+ DeviceService deviceService,
ScheduledExecutorService scheduler,
Executor executor,
long backupPeriod,
@@ -124,6 +128,7 @@
this.deviceId = deviceId;
this.clusterCommunicator = clusterCommunicator;
this.lifecycleManager = lifecycleManager;
+ this.deviceService = deviceService;
this.scheduler = scheduler;
this.executor = executor;
this.localNodeId = clusterService.getLocalNode().id();
@@ -247,6 +252,8 @@
SERIALIZER::decode,
replicaInfo.master(),
Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
+ } else if (deviceService.isAvailable(deviceId)) {
+ throw new FlowRuleStoreException("There is no master for available device " + deviceId);
} else {
return CompletableFuture.completedFuture(Collections.emptySet());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index b4e80a8..7949dc5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -697,6 +697,7 @@
clusterService,
clusterCommunicator,
new InternalLifecycleManager(id),
+ deviceService,
backupScheduler,
new OrderedExecutor(backupExecutor),
backupPeriod,
@@ -734,6 +735,7 @@
clusterService,
clusterCommunicator,
new InternalLifecycleManager(deviceId),
+ deviceService,
backupScheduler,
new OrderedExecutor(backupExecutor),
backupPeriod,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 23c1c9f..87355d6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -145,6 +145,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
+ private NodeId local;
+
private ScheduledExecutorService executor;
private Consumer<Status> statusChangeListener;
// Per device group table with (device id + app cookie) as key
@@ -250,6 +252,8 @@
groupTopic = getOrCreateGroupTopic(serializer);
groupTopic.subscribe(this::processGroupMessage);
+ local = clusterService.getLocalNode().id();
+
log.info("Started");
}
@@ -1390,6 +1394,7 @@
Sets.newHashSet(getStoredGroups(deviceId));
Set<Group> extraneousStoredEntries =
Sets.newHashSet(getExtraneousGroups(deviceId));
+ NodeId master;
if (log.isTraceEnabled()) {
log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
@@ -1409,7 +1414,14 @@
garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
+ // update stats
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to update the group stats while the node was not the master");
+ return;
+ }
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
@@ -1420,6 +1432,8 @@
it2.remove();
}
}
+
+ // extraneous groups in the dataplane
for (Group group : southboundGroupEntries) {
if (getGroup(group.deviceId(), group.id()) != null) {
// There is a group existing with the same id
@@ -1432,6 +1446,12 @@
+ "not present in key based table");
}
} else {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to process extraneous groups while the node was not the master");
+ return;
+ }
// there are groups in the switch that aren't in the store
log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
group.id(), deviceId);
@@ -1443,13 +1463,29 @@
}
}
}
+
+ // missing groups in the dataplane
for (StoredGroupEntry group : storedGroupEntries) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to process missing groups while the node was not the master");
+ return;
+ }
// there are groups in the store that aren't in the switch
log.debug("Group AUDIT: group {} missing in data plane for device {}",
group.id(), deviceId);
groupMissing(group);
}
+
+ // extraneous groups in the store
for (Group group : extraneousStoredEntries) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to process node extraneous groups while the node was not the master");
+ return;
+ }
// there are groups in the extraneous store that
// aren't in the switch
log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
@@ -1481,8 +1517,15 @@
return;
}
+ NodeId master;
Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
while (it.hasNext()) {
+ // Mastership change can occur during this iteration
+ master = mastershipService.getMasterFor(deviceId);
+ if (!Objects.equals(local, master)) {
+ log.warn("Tried to run garbage collector while the node was not the master");
+ return;
+ }
StoredGroupEntry group = it.next();
if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
log.debug("Garbage collecting group {} on {}", group, deviceId);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 087bd80..efa080d 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -22,7 +22,11 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipServiceAdapter;
@@ -54,6 +58,7 @@
import java.util.LinkedList;
import java.util.List;
+import static org.easymock.EasyMock.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
@@ -125,7 +130,7 @@
@Override
public NodeId getMasterFor(DeviceId deviceId) {
- return new NodeId("foo");
+ return new NodeId(NODE_ID);
}
}
@@ -141,6 +146,42 @@
}
}
+ private static class MockControllerNode implements ControllerNode {
+ final NodeId id;
+
+ public MockControllerNode(NodeId id) {
+ this.id = id;
+ }
+
+ @Override
+ public NodeId id() {
+ return this.id;
+ }
+
+ @Override
+ public Ip4Address ip() {
+ return Ip4Address.valueOf("127.0.0.1");
+ }
+
+ @Override
+ public IpAddress ip(boolean resolve) {
+ return null;
+ }
+
+ @Override
+ public String host() {
+ return null;
+ }
+
+ @Override
+ public int tcpPort() {
+ return 0;
+ }
+ }
+
+ private static final String NODE_ID = "foo";
+
+
@Before
public void setUp() throws Exception {
groupStoreImpl = new DistributedGroupStore();
@@ -148,6 +189,15 @@
groupStoreImpl.clusterCommunicator = new ClusterCommunicationServiceAdapter();
groupStoreImpl.mastershipService = new MasterOfAll();
groupStoreImpl.cfgService = new ComponentConfigAdapter();
+
+ ClusterService mockClusterService = createMock(ClusterService.class);
+ NodeId nodeId = new NodeId(NODE_ID);
+ MockControllerNode mockControllerNode = new MockControllerNode(nodeId);
+ expect(mockClusterService.getLocalNode())
+ .andReturn(mockControllerNode).anyTimes();
+ replay(mockClusterService);
+
+ groupStoreImpl.clusterService = mockClusterService;
groupStoreImpl.activate(null);
groupStore = groupStoreImpl;
auditPendingReqQueue =
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 4696778..9dd52cc 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -862,11 +862,22 @@
}
log.info("Transitioned switch {} to MASTER", dpid);
activeMasterSwitches.put(dpid, sw);
+ // purge pending stats
+ log.info("Purged pending stats {}", dpid);
+ purgeStatsSwitch(dpid);
} finally {
switchLock.unlock();
}
}
+ private void purgeStatsSwitch(Dpid dpid) {
+ fullFlowStats.removeAll(dpid);
+ fullFlowLightweightStats.removeAll(dpid);
+ fullTableStats.removeAll(dpid);
+ fullGroupStats.removeAll(dpid);
+ fullGroupDescStats.removeAll(dpid);
+ fullQueueStats.removeAll(dpid);
+ }
@Override
public void transitionToEqualSwitch(Dpid dpid) {
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 2d47656..6b47889 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -651,9 +651,9 @@
NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
if (adaptiveFlowSampling && afsc != null) {
- List<FlowEntry> flowEntries = replies.getEntries().stream()
+ Set<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, handler).withSetAfsc(afsc).build())
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
// Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
@@ -672,9 +672,9 @@
providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
}
} else {
- List<FlowEntry> flowEntries = replies.getEntries().stream()
+ Set<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, handler).build())
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
// call existing entire flow stats update with flowMissing synchronization
providerService.pushFlowMetrics(did, flowEntries);
@@ -687,6 +687,7 @@
List<TableStatisticsEntry> tableStatsEntries = replies.getEntries().stream()
.map(entry -> buildTableStatistics(did, entry))
.filter(Objects::nonNull)
+ .distinct()
.collect(Collectors.toList());
providerService.pushTableStatistics(did, tableStatsEntries);
}
@@ -696,9 +697,9 @@
DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
if (adaptiveFlowSampling && afsc != null) {
- List<FlowEntry> flowEntries = replies.getEntries().stream()
+ Set<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, driverService).withSetAfsc(afsc).build())
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
// Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
@@ -717,9 +718,9 @@
providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
}
} else {
- List<FlowEntry> flowEntries = replies.getEntries().stream()
+ Set<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, driverService).build())
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
// call existing entire flow stats update with flowMissing synchronization
providerService.pushFlowMetrics(did, flowEntries);
}