add check for duplicate MastershipEvents to DeviceManager
Change-Id: I2753366b29ef32fa77ebcefff4b2202f1afe0006
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index e284052..2441e88 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -29,6 +29,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.mastership.MastershipEvent;
@@ -58,6 +59,8 @@
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
+import com.google.common.collect.HashMultimap;
+
/**
* Provides implementation of the device SB & NB APIs.
*/
@@ -387,6 +390,12 @@
// Intercepts mastership events
private class InternalMastershipListener implements MastershipListener {
+ // random cache size
+ private final int cacheSize = 5;
+ // temporarily stores term number + events to check for duplicates. A hack.
+ private HashMultimap<Integer, RoleInfo> eventCache =
+ HashMultimap.create();
+
@Override
public void event(MastershipEvent event) {
final DeviceId did = event.subject();
@@ -395,6 +404,13 @@
if (myNodeId.equals(event.roleInfo().master())) {
MastershipTerm term = termService.getMastershipTerm(did);
+ // TODO duplicate suppression should probably occur in the MastershipManager
+ // itself, so listeners that can't deal with duplicates don't have to
+ // so this check themselves.
+ if (checkDuplicate(event.roleInfo(), term.termNumber())) {
+ return;
+ }
+
if (!myNodeId.equals(term.master())) {
// something went wrong in consistency, let go
log.warn("Mastership has changed after this event."
@@ -436,6 +452,24 @@
applyRole(did, MastershipRole.STANDBY);
}
}
+
+ // checks for duplicate event, returning true if one is found.
+ private boolean checkDuplicate(RoleInfo roleInfo, int term) {
+ synchronized (eventCache) {
+ if (eventCache.get(term).contains(roleInfo)) {
+ log.info("duplicate event detected; ignoring");
+ return true;
+ } else {
+ eventCache.put(term, roleInfo);
+ // purge by-term oldest entries to keep the cache size under limit
+ if (eventCache.size() > cacheSize) {
+ eventCache.removeAll(term - cacheSize);
+ }
+ return false;
+ }
+ }
+ }
+
}
// Store delegate to re-post events emitted from the store.