Revert "Avoids delete of dataplane state during mastership change"
This reverts commit 0a4ead65f47bdf8576bb6d4f1605858f2e8f9df4.
Change-Id: I595ac6181cc0b4494a0e2198629d38522f74d86b
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index d601d8d..bdc48ac 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -23,8 +23,6 @@
import com.google.common.collect.Sets;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
@@ -72,7 +70,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -151,8 +148,6 @@
private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
- private NodeId local;
-
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected FlowRuleStore store;
@@ -171,9 +166,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterService clusterService;
-
@Activate
public void activate(ComponentContext context) {
store.setDelegate(delegate);
@@ -182,7 +174,6 @@
cfgService.registerProperties(getClass());
modified(context);
idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
- local = clusterService.getLocalNode().id();
log.info("Started");
}
@@ -501,7 +492,7 @@
log.debug("Flow {} is on switch but not in store.", flowRule);
}
- private boolean flowAdded(FlowEntry flowEntry) {
+ private void flowAdded(FlowEntry flowEntry) {
checkNotNull(flowEntry, FLOW_RULE_NULL);
checkValidity();
@@ -509,7 +500,6 @@
FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
if (event == null) {
log.debug("No flow store event generated.");
- return false;
} else {
log.trace("Flow {} {}", flowEntry, event.type());
post(event);
@@ -518,7 +508,6 @@
log.debug("Removing flow rules....");
removeFlowRules(flowEntry);
}
- return true;
}
private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
@@ -576,32 +565,15 @@
boolean useMissingFlow) {
Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f));
- NodeId master;
- boolean done;
- // Processing flow rules
for (FlowEntry rule : flowEntries) {
try {
FlowEntry storedRule = storedRules.remove(rule);
if (storedRule != null) {
if (storedRule.exactMatch(rule)) {
// we both have the rule, let's update some info then.
- done = flowAdded(rule);
- if (!done) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to update the flow stats while the node was not the master");
- return;
- }
- }
+ flowAdded(rule);
} else {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to update the flows while the node was not the master");
- return;
- }
// the two rules are not an exact match - remove the
// switch's rule and install our rule
extraneousFlow(rule);
@@ -610,23 +582,9 @@
} else {
// the device has a rule the store does not have
if (!allowExtraneousRules) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to remove flows while the node was not the master");
- return;
- }
extraneousFlow(rule);
} else if (importExtraneousRules) { // Stores the rule, if so is indicated
- FlowRuleEvent flowRuleEvent = store.addOrUpdateFlowRule(rule);
- if (flowRuleEvent == null) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to import flows while the node was not the master");
- return;
- }
- }
+ store.addOrUpdateFlowRule(rule);
}
}
} catch (Exception e) {
@@ -638,12 +596,6 @@
// DO NOT reinstall
if (useMissingFlow) {
for (FlowEntry rule : storedRules.keySet()) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to install missing rules while the node was not the master");
- return;
- }
try {
// there are rules in the store that aren't on the switch
log.debug("Adding the rule that is present in store but not on switch : {}", rule);
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index 3801388..e0b535f 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -25,12 +25,7 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestTools;
-import org.onlab.packet.Ip4Address;
-import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigAdapter;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
@@ -91,7 +86,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
-import static org.easymock.EasyMock.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -134,7 +128,6 @@
private ApplicationId appId;
private TestDriverManager driverService;
- private static final String NODE_ID = "1";
@Before
public void setUp() {
@@ -147,15 +140,6 @@
mgr.operationsService = MoreExecutors.newDirectExecutorService();
mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
mgr.cfgService = new ComponentConfigAdapter();
-
- ClusterService mockClusterService = createMock(ClusterService.class);
- NodeId nodeId = new NodeId(NODE_ID);
- MockControllerNode mockControllerNode = new MockControllerNode(nodeId);
- expect(mockClusterService.getLocalNode())
- .andReturn(mockControllerNode).anyTimes();
- replay(mockClusterService);
- mgr.clusterService = mockClusterService;
-
service = mgr;
registry = mgr;
@@ -717,11 +701,6 @@
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
}
-
- @Override
- public NodeId getMasterFor(DeviceId deviceId) {
- return new NodeId(NODE_ID);
- }
}
private class TestDriverManager extends DriverManager {
@@ -757,37 +736,4 @@
return rules;
}
}
-
- private static class MockControllerNode implements ControllerNode {
- final NodeId id;
-
- public MockControllerNode(NodeId id) {
- this.id = id;
- }
-
- @Override
- public NodeId id() {
- return this.id;
- }
-
- @Override
- public Ip4Address ip() {
- return Ip4Address.valueOf("127.0.0.1");
- }
-
- @Override
- public IpAddress ip(boolean resolve) {
- return null;
- }
-
- @Override
- public String host() {
- return null;
- }
-
- @Override
- public int tcpPort() {
- return 0;
- }
- }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index ff89a6f..f3206c3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -40,12 +40,10 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.StoredFlowEntry;
-import org.onosproject.net.flow.FlowRuleStoreException;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -87,7 +85,6 @@
private final DeviceId deviceId;
private final ClusterCommunicationService clusterCommunicator;
- private final DeviceService deviceService;
private final LifecycleManager lifecycleManager;
private final ScheduledExecutorService scheduler;
private final Executor executor;
@@ -120,7 +117,6 @@
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
LifecycleManager lifecycleManager,
- DeviceService deviceService,
ScheduledExecutorService scheduler,
Executor executor,
long backupPeriod,
@@ -128,7 +124,6 @@
this.deviceId = deviceId;
this.clusterCommunicator = clusterCommunicator;
this.lifecycleManager = lifecycleManager;
- this.deviceService = deviceService;
this.scheduler = scheduler;
this.executor = executor;
this.localNodeId = clusterService.getLocalNode().id();
@@ -252,8 +247,6 @@
SERIALIZER::decode,
replicaInfo.master(),
Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
- } else if (deviceService.isAvailable(deviceId)) {
- throw new FlowRuleStoreException("There is no master for available device " + deviceId);
} else {
return CompletableFuture.completedFuture(Collections.emptySet());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 7949dc5..b4e80a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -697,7 +697,6 @@
clusterService,
clusterCommunicator,
new InternalLifecycleManager(id),
- deviceService,
backupScheduler,
new OrderedExecutor(backupExecutor),
backupPeriod,
@@ -735,7 +734,6 @@
clusterService,
clusterCommunicator,
new InternalLifecycleManager(deviceId),
- deviceService,
backupScheduler,
new OrderedExecutor(backupExecutor),
backupPeriod,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 87355d6..23c1c9f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -145,8 +145,6 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
- private NodeId local;
-
private ScheduledExecutorService executor;
private Consumer<Status> statusChangeListener;
// Per device group table with (device id + app cookie) as key
@@ -252,8 +250,6 @@
groupTopic = getOrCreateGroupTopic(serializer);
groupTopic.subscribe(this::processGroupMessage);
- local = clusterService.getLocalNode().id();
-
log.info("Started");
}
@@ -1394,7 +1390,6 @@
Sets.newHashSet(getStoredGroups(deviceId));
Set<Group> extraneousStoredEntries =
Sets.newHashSet(getExtraneousGroups(deviceId));
- NodeId master;
if (log.isTraceEnabled()) {
log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
@@ -1414,14 +1409,7 @@
garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
- // update stats
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to update the group stats while the node was not the master");
- return;
- }
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
@@ -1432,8 +1420,6 @@
it2.remove();
}
}
-
- // extraneous groups in the dataplane
for (Group group : southboundGroupEntries) {
if (getGroup(group.deviceId(), group.id()) != null) {
// There is a group existing with the same id
@@ -1446,12 +1432,6 @@
+ "not present in key based table");
}
} else {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to process extraneous groups while the node was not the master");
- return;
- }
// there are groups in the switch that aren't in the store
log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
group.id(), deviceId);
@@ -1463,29 +1443,13 @@
}
}
}
-
- // missing groups in the dataplane
for (StoredGroupEntry group : storedGroupEntries) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to process missing groups while the node was not the master");
- return;
- }
// there are groups in the store that aren't in the switch
log.debug("Group AUDIT: group {} missing in data plane for device {}",
group.id(), deviceId);
groupMissing(group);
}
-
- // extraneous groups in the store
for (Group group : extraneousStoredEntries) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to process node extraneous groups while the node was not the master");
- return;
- }
// there are groups in the extraneous store that
// aren't in the switch
log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
@@ -1517,15 +1481,8 @@
return;
}
- NodeId master;
Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
while (it.hasNext()) {
- // Mastership change can occur during this iteration
- master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- log.warn("Tried to run garbage collector while the node was not the master");
- return;
- }
StoredGroupEntry group = it.next();
if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
log.debug("Garbage collecting group {} on {}", group, deviceId);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index efa080d..087bd80 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -22,11 +22,7 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
-import org.onlab.packet.Ip4Address;
-import org.onlab.packet.IpAddress;
import org.onosproject.cfg.ComponentConfigAdapter;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipServiceAdapter;
@@ -58,7 +54,6 @@
import java.util.LinkedList;
import java.util.List;
-import static org.easymock.EasyMock.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
@@ -130,7 +125,7 @@
@Override
public NodeId getMasterFor(DeviceId deviceId) {
- return new NodeId(NODE_ID);
+ return new NodeId("foo");
}
}
@@ -146,42 +141,6 @@
}
}
- private static class MockControllerNode implements ControllerNode {
- final NodeId id;
-
- public MockControllerNode(NodeId id) {
- this.id = id;
- }
-
- @Override
- public NodeId id() {
- return this.id;
- }
-
- @Override
- public Ip4Address ip() {
- return Ip4Address.valueOf("127.0.0.1");
- }
-
- @Override
- public IpAddress ip(boolean resolve) {
- return null;
- }
-
- @Override
- public String host() {
- return null;
- }
-
- @Override
- public int tcpPort() {
- return 0;
- }
- }
-
- private static final String NODE_ID = "foo";
-
-
@Before
public void setUp() throws Exception {
groupStoreImpl = new DistributedGroupStore();
@@ -189,15 +148,6 @@
groupStoreImpl.clusterCommunicator = new ClusterCommunicationServiceAdapter();
groupStoreImpl.mastershipService = new MasterOfAll();
groupStoreImpl.cfgService = new ComponentConfigAdapter();
-
- ClusterService mockClusterService = createMock(ClusterService.class);
- NodeId nodeId = new NodeId(NODE_ID);
- MockControllerNode mockControllerNode = new MockControllerNode(nodeId);
- expect(mockClusterService.getLocalNode())
- .andReturn(mockControllerNode).anyTimes();
- replay(mockClusterService);
-
- groupStoreImpl.clusterService = mockClusterService;
groupStoreImpl.activate(null);
groupStore = groupStoreImpl;
auditPendingReqQueue =
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 9dd52cc..4696778 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -862,22 +862,11 @@
}
log.info("Transitioned switch {} to MASTER", dpid);
activeMasterSwitches.put(dpid, sw);
- // purge pending stats
- log.info("Purged pending stats {}", dpid);
- purgeStatsSwitch(dpid);
} finally {
switchLock.unlock();
}
}
- private void purgeStatsSwitch(Dpid dpid) {
- fullFlowStats.removeAll(dpid);
- fullFlowLightweightStats.removeAll(dpid);
- fullTableStats.removeAll(dpid);
- fullGroupStats.removeAll(dpid);
- fullGroupDescStats.removeAll(dpid);
- fullQueueStats.removeAll(dpid);
- }
@Override
public void transitionToEqualSwitch(Dpid dpid) {
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 6b47889..2d47656 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -651,9 +651,9 @@
NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
if (adaptiveFlowSampling && afsc != null) {
- Set<FlowEntry> flowEntries = replies.getEntries().stream()
+ List<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, handler).withSetAfsc(afsc).build())
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
// Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
@@ -672,9 +672,9 @@
providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
}
} else {
- Set<FlowEntry> flowEntries = replies.getEntries().stream()
+ List<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, handler).build())
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
// call existing entire flow stats update with flowMissing synchronization
providerService.pushFlowMetrics(did, flowEntries);
@@ -687,7 +687,6 @@
List<TableStatisticsEntry> tableStatsEntries = replies.getEntries().stream()
.map(entry -> buildTableStatistics(did, entry))
.filter(Objects::nonNull)
- .distinct()
.collect(Collectors.toList());
providerService.pushTableStatistics(did, tableStatsEntries);
}
@@ -697,9 +696,9 @@
DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
if (adaptiveFlowSampling && afsc != null) {
- Set<FlowEntry> flowEntries = replies.getEntries().stream()
+ List<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, driverService).withSetAfsc(afsc).build())
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
// Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
@@ -718,9 +717,9 @@
providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
}
} else {
- Set<FlowEntry> flowEntries = replies.getEntries().stream()
+ List<FlowEntry> flowEntries = replies.getEntries().stream()
.map(entry -> new FlowEntryBuilder(did, entry, driverService).build())
- .collect(Collectors.toSet());
+ .collect(Collectors.toList());
// call existing entire flow stats update with flowMissing synchronization
providerService.pushFlowMetrics(did, flowEntries);
}