Handling mastership changes during route-path programming
Change-Id: I2467d68c29d03cdd96044f23fcf14b6b53c9f9be
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index eebce8c..864a924 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -38,6 +38,8 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
@@ -129,6 +131,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Dictionary;
@@ -251,6 +254,7 @@
private final InternalRouteEventListener routeListener = new InternalRouteEventListener();
private final InternalTopologyListener topologyListener = new InternalTopologyListener();
private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
+ final InternalClusterListener clusterListener = new InternalClusterListener();
//Completable future for network configuration process to buffer config events handling during activation
private CompletableFuture<Boolean> networkConfigCompletion = null;
private List<Event> queuedEvents = new CopyOnWriteArrayList<>();
@@ -446,6 +450,7 @@
routeService.addListener(routeListener);
topologyService.addListener(topologyListener);
mastershipService.addListener(mastershipListener);
+ clusterService.addListener(clusterListener);
linkHandler.init();
l2TunnelHandler.init();
@@ -505,6 +510,7 @@
routeService.removeListener(routeListener);
topologyService.removeListener(topologyListener);
mastershipService.removeListener(mastershipListener);
+ clusterService.removeListener(clusterListener);
neighbourResolutionService.unregisterNeighbourHandlers(appId);
@@ -1170,6 +1176,15 @@
} else {
log.error("Unhandled config class: {}", configClass);
}
+ } else if (event.type() == MastershipEvent.Type.MASTER_CHANGED) {
+ MastershipEvent me = (MastershipEvent) event;
+ DeviceId deviceId = me.subject();
+ Optional<DeviceId> pairDeviceId = getPairDeviceId(deviceId);
+ log.info(" ** MASTERSHIP CHANGED Invalidating shouldProgram cache"
+ + " for {}/pair={} due to change", deviceId, pairDeviceId);
+ defaultRoutingHandler.invalidateShouldProgramCache(deviceId);
+ pairDeviceId.ifPresent(defaultRoutingHandler::invalidateShouldProgramCache);
+ defaultRoutingHandler.checkFullRerouteForMasterChange(deviceId, me);
} else {
log.warn("Unhandled event type: {}", event.type());
}
@@ -1269,6 +1284,12 @@
.forEach(entry -> entry.getValue().cleanUpForNeighborDown(device.id()));
}
+ void purgeHashedNextObjectiveStore(DeviceId devId) {
+ dsNextObjStore.entrySet().stream()
+ .filter(entry -> entry.getKey().deviceId().equals(devId))
+ .forEach(entry -> dsNextObjStore.remove(entry.getKey()));
+ }
+
private void processPortUpdated(Device device, Port port) {
if (deviceConfiguration == null || !deviceConfiguration.isConfigured(device.id())) {
log.warn("Device configuration uploading. Not handling port event for"
@@ -1590,24 +1611,49 @@
private class InternalMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
- DeviceId deviceId = event.subject();
- Optional<DeviceId> pairDeviceId = getPairDeviceId(deviceId);
-
switch (event.type()) {
- case MASTER_CHANGED:
- log.info("Invalidating shouldProgram cache for {}/pair={} due to mastership change",
- deviceId, pairDeviceId);
- defaultRoutingHandler.invalidateShouldProgramCache(deviceId);
- pairDeviceId.ifPresent(defaultRoutingHandler::invalidateShouldProgramCache);
- break;
- case BACKUPS_CHANGED:
- case SUSPENDED:
- default:
- break;
+ case MASTER_CHANGED:
+ log.debug("Mastership event: {}/{}", event.subject(),
+ event.roleInfo());
+ mainEventExecutor.execute(new InternalEventHandler(event));
+ break;
+ case BACKUPS_CHANGED:
+ case SUSPENDED:
+ default:
+ log.debug("Mastership event type {} not handled", event.type());
+ break;
}
}
}
+ class InternalClusterListener implements ClusterEventListener {
+ private Instant lastClusterEvent = Instant.EPOCH;
+
+ long timeSinceLastClusterEvent() {
+ return Instant.now().toEpochMilli() - lastClusterEvent.toEpochMilli();
+ }
+
+ @Override
+ public void event(ClusterEvent event) {
+ switch (event.type()) {
+ case INSTANCE_ACTIVATED:
+ case INSTANCE_ADDED:
+ case INSTANCE_READY:
+ log.debug("Cluster event {} ignored", event.type());
+ break;
+ case INSTANCE_DEACTIVATED:
+ case INSTANCE_REMOVED:
+ log.info("** Cluster event {}", event.type());
+ lastClusterEvent = Instant.now();
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ }
+
private void updateInterface(InterfaceConfig conf, InterfaceConfig prevConf) {
try {
Set<Interface> intfs = conf.getInterfaces();