Handling mastership changes during route-path programming

Change-Id: I2467d68c29d03cdd96044f23fcf14b6b53c9f9be
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
index 7503ba8..c64d452 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
@@ -29,6 +29,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipEvent;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
@@ -73,6 +74,7 @@
     private static final long RETRY_INTERVAL_MS = 250L;
     private static final int RETRY_INTERVAL_SCALE = 1;
     private static final long STABLITY_THRESHOLD = 10; //secs
+    private static final long MASTER_CHANGE_DELAY = 1000; // ms
     private static Logger log = LoggerFactory.getLogger(DefaultRoutingHandler.class);
 
     private SegmentRoutingManager srManager;
@@ -84,13 +86,18 @@
     private volatile Status populationStatus;
     private ScheduledExecutorService executorService
         = newScheduledThreadPool(1, groupedThreads("retryftr", "retry-%d", log));
-    private Instant lastRoutingChange;
+    private Instant lastRoutingChange = Instant.EPOCH;
 
-    // Keep track on which ONOS instance should program the device pair.
-    // There should be only one instance that programs the same pair.
+    // Distributed store to keep track of ONOS instance that should program the
+    // device pair. There should be only one instance (the king) that programs the same pair.
     Map<Set<DeviceId>, NodeId> shouldProgram;
     Map<DeviceId, Boolean> shouldProgramCache;
 
+    // Local store to keep track of all devices that this instance was responsible
+    // for programming in the last run. Helps to determine if mastership changed
+    // during a run - only relevant for programming as a result of topo change.
+    Set<DeviceId> lastProgrammed;
+
     /**
      * Represents the default routing population status.
      */
@@ -135,6 +142,7 @@
         this.config = checkNotNull(srManager.deviceConfiguration);
         this.populationStatus = Status.IDLE;
         this.currentEcmpSpgMap = Maps.newHashMap();
+        this.lastProgrammed = Sets.newConcurrentHashSet();
     }
 
     /**
@@ -229,9 +237,12 @@
                 }
 
                 if (!shouldProgram(dstSw)) {
+                    lastProgrammed.remove(dstSw);
                     continue;
+                } else {
+                    lastProgrammed.add(dstSw);
                 }
-                // To do a full reroute, assume all routes have changed
+                // To do a full reroute, assume all route-paths have changed
                 for (DeviceId dev : deviceAndItsPair(dstSw)) {
                     for (DeviceId targetSw : srManager.deviceConfiguration.getRouters()) {
                         if (targetSw.equals(dev)) {
@@ -1181,6 +1192,90 @@
         }
     }
 
+    /**
+     * Attempts a full reroute of route-paths if topology has changed relatively
+     * close to a mastership change event. Does not do a reroute if mastership
+     * change is due to reasons other than a ONOS cluster event - for example a
+     * call to balance-masters, or a switch up/down event.
+     *
+     * @param devId the device identifier for which mastership has changed
+     * @param me the mastership event
+     */
+    void checkFullRerouteForMasterChange(DeviceId devId, MastershipEvent me) {
+        // give small delay to absorb mastership events that are caused by
+        // device that has disconnected from cluster
+        executorService.schedule(new MasterChange(devId, me),
+                                 MASTER_CHANGE_DELAY, TimeUnit.MILLISECONDS);
+    }
+
+    protected final class MasterChange implements Runnable {
+        private DeviceId devId;
+        private MastershipEvent me;
+        private static final long CLUSTER_EVENT_THRESHOLD = 4500; // ms
+        private static final long DEVICE_EVENT_THRESHOLD = 2000; // ms
+
+        MasterChange(DeviceId devId, MastershipEvent me) {
+            this.devId = devId;
+            this.me = me;
+        }
+
+        @Override
+        public void run() {
+            long lce = srManager.clusterListener.timeSinceLastClusterEvent();
+            boolean clusterEvent = lce < CLUSTER_EVENT_THRESHOLD;
+
+            // ignore event for lost switch if cluster event hasn't happened -
+            // device down event will handle it
+            if ((me.roleInfo().master() == null
+                    || !srManager.deviceService.isAvailable(devId))
+                    && !clusterEvent) {
+                log.debug("Full reroute not required for lost device: {}/{} "
+                        + "clusterEvent/timeSince: {}/{}",
+                          devId, me.roleInfo(), clusterEvent, lce);
+                return;
+            }
+
+            long update = srManager.deviceService.getLastUpdatedInstant(devId);
+            long lde = Instant.now().toEpochMilli() - update;
+            boolean deviceEvent = lde < DEVICE_EVENT_THRESHOLD;
+
+            // ignore event for recently connected switch if cluster event hasn't
+            // happened - link up events will handle it
+            if (srManager.deviceService.isAvailable(devId) && deviceEvent
+                    && !clusterEvent) {
+                log.debug("Full reroute not required for recently available"
+                        + " device: {}/{} deviceEvent/timeSince: {}/{} "
+                        + "clusterEvent/timeSince: {}/{}",
+                        devId, me.roleInfo(), deviceEvent, lde, clusterEvent, lce);
+                return;
+            }
+
+            // if it gets here, then mastership change is likely due to onos
+            // instance failure, or network partition in onos cluster
+            // normally a mastership change like this does not require re-programming
+            // but if topology changes happen at the same time then we may miss events
+            if (!isRoutingStable() && clusterEvent) {
+                log.warn("Mastership changed for dev: {}/{} while programming "
+                        + "due to clusterEvent {} ms ago .. attempting full reroute",
+                         devId, me.roleInfo(), lce);
+                if (srManager.mastershipService.isLocalMaster(devId)) {
+                    // old master could have died when populating filters
+                    populatePortAddressingRules(devId);
+                }
+                // old master could have died when creating groups
+                srManager.purgeHashedNextObjectiveStore(devId);
+                // XXX right now we have no fine-grained way to only make changes
+                // for the route paths affected by this device.
+                populateAllRoutingRules();
+            } else {
+                log.debug("Stable route-paths .. full reroute not attempted for "
+                        + "mastership change {}/{} deviceEvent/timeSince: {}/{} "
+                        + "clusterEvent/timeSince: {}/{}", devId, me.roleInfo(),
+                        deviceEvent, lde, clusterEvent, lce);
+            }
+        }
+    }
+
     //////////////////////////////////////
     //  Routing helper methods and classes
     //////////////////////////////////////
@@ -1201,9 +1296,17 @@
             log.debug("Computing the impacted routes for device {} due to link fail",
                       sw.id());
             if (!shouldProgram(sw.id())) {
+                lastProgrammed.remove(sw.id());
                 continue;
             }
             for (DeviceId rootSw : deviceAndItsPair(sw.id())) {
+                // check for mastership change since last run
+                if (!lastProgrammed.contains(sw.id())) {
+                    lastProgrammed.add(sw.id());
+                    log.warn("New reponsibility for this node to program dev:{}"
+                            + " ... nuking current ECMPspg", sw.id());
+                    currentEcmpSpgMap.remove(sw.id());
+                }
                 EcmpShortestPathGraph ecmpSpg = currentEcmpSpgMap.get(rootSw);
                 if (ecmpSpg == null) {
                     log.warn("No existing ECMP graph for switch {}. Aborting optimized"
@@ -1268,6 +1371,7 @@
         for (Device sw : srManager.deviceService.getDevices()) {
             log.debug("Computing the impacted routes for device {}", sw.id());
             if (!shouldProgram(sw.id())) {
+                lastProgrammed.remove(sw.id());
                 continue;
             }
             for (DeviceId rootSw : deviceAndItsPair(sw.id())) {
@@ -1278,6 +1382,13 @@
                                   link.dst().deviceId());
                     }
                 }
+                // check for mastership change since last run
+                if (!lastProgrammed.contains(sw.id())) {
+                    lastProgrammed.add(sw.id());
+                    log.warn("New reponsibility for this node to program dev:{}"
+                            + " ... nuking current ECMPspg", sw.id());
+                    currentEcmpSpgMap.remove(sw.id());
+                }
                 EcmpShortestPathGraph currEcmpSpg = currentEcmpSpgMap.get(rootSw);
                 if (currEcmpSpg == null) {
                     log.debug("No existing ECMP graph for device {}.. adding self as "
@@ -1437,6 +1548,7 @@
     boolean shouldProgram(DeviceId deviceId) {
         Boolean cached = shouldProgramCache.get(deviceId);
         if (cached != null) {
+            log.debug("shouldProgram dev:{} cached:{}", deviceId, cached);
             return cached;
         }
 
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index eebce8c..864a924 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -38,6 +38,8 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
@@ -129,6 +131,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Dictionary;
@@ -251,6 +254,7 @@
     private final InternalRouteEventListener routeListener = new InternalRouteEventListener();
     private final InternalTopologyListener topologyListener = new InternalTopologyListener();
     private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
+    final InternalClusterListener clusterListener = new InternalClusterListener();
     //Completable future for network configuration process to buffer config events handling during activation
     private CompletableFuture<Boolean> networkConfigCompletion = null;
     private List<Event> queuedEvents = new CopyOnWriteArrayList<>();
@@ -446,6 +450,7 @@
         routeService.addListener(routeListener);
         topologyService.addListener(topologyListener);
         mastershipService.addListener(mastershipListener);
+        clusterService.addListener(clusterListener);
 
         linkHandler.init();
         l2TunnelHandler.init();
@@ -505,6 +510,7 @@
         routeService.removeListener(routeListener);
         topologyService.removeListener(topologyListener);
         mastershipService.removeListener(mastershipListener);
+        clusterService.removeListener(clusterListener);
 
         neighbourResolutionService.unregisterNeighbourHandlers(appId);
 
@@ -1170,6 +1176,15 @@
                     } else {
                         log.error("Unhandled config class: {}", configClass);
                     }
+                } else if (event.type() == MastershipEvent.Type.MASTER_CHANGED) {
+                    MastershipEvent me = (MastershipEvent) event;
+                    DeviceId deviceId = me.subject();
+                    Optional<DeviceId> pairDeviceId = getPairDeviceId(deviceId);
+                    log.info(" ** MASTERSHIP CHANGED Invalidating shouldProgram cache"
+                            + " for {}/pair={} due to change", deviceId, pairDeviceId);
+                    defaultRoutingHandler.invalidateShouldProgramCache(deviceId);
+                    pairDeviceId.ifPresent(defaultRoutingHandler::invalidateShouldProgramCache);
+                    defaultRoutingHandler.checkFullRerouteForMasterChange(deviceId, me);
                 } else {
                     log.warn("Unhandled event type: {}", event.type());
                 }
@@ -1269,6 +1284,12 @@
             .forEach(entry -> entry.getValue().cleanUpForNeighborDown(device.id()));
     }
 
+    void purgeHashedNextObjectiveStore(DeviceId devId) {
+        dsNextObjStore.entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(devId))
+                .forEach(entry -> dsNextObjStore.remove(entry.getKey()));
+    }
+
     private void processPortUpdated(Device device, Port port) {
         if (deviceConfiguration == null || !deviceConfiguration.isConfigured(device.id())) {
             log.warn("Device configuration uploading. Not handling port event for"
@@ -1590,24 +1611,49 @@
     private class InternalMastershipListener implements MastershipListener {
         @Override
         public void event(MastershipEvent event) {
-            DeviceId deviceId = event.subject();
-            Optional<DeviceId> pairDeviceId = getPairDeviceId(deviceId);
-
             switch (event.type()) {
-                case MASTER_CHANGED:
-                    log.info("Invalidating shouldProgram cache for {}/pair={} due to mastership change",
-                            deviceId, pairDeviceId);
-                    defaultRoutingHandler.invalidateShouldProgramCache(deviceId);
-                    pairDeviceId.ifPresent(defaultRoutingHandler::invalidateShouldProgramCache);
-                    break;
-                case BACKUPS_CHANGED:
-                case SUSPENDED:
-                default:
-                    break;
+            case MASTER_CHANGED:
+                log.debug("Mastership event: {}/{}", event.subject(),
+                          event.roleInfo());
+                mainEventExecutor.execute(new InternalEventHandler(event));
+                break;
+            case BACKUPS_CHANGED:
+            case SUSPENDED:
+            default:
+                log.debug("Mastership event type {} not handled", event.type());
+                break;
             }
         }
     }
 
+    class InternalClusterListener implements ClusterEventListener {
+        private Instant lastClusterEvent = Instant.EPOCH;
+
+        long timeSinceLastClusterEvent() {
+            return Instant.now().toEpochMilli() - lastClusterEvent.toEpochMilli();
+        }
+
+        @Override
+        public void event(ClusterEvent event) {
+            switch (event.type()) {
+            case INSTANCE_ACTIVATED:
+            case INSTANCE_ADDED:
+            case INSTANCE_READY:
+                log.debug("Cluster event {} ignored", event.type());
+                break;
+            case INSTANCE_DEACTIVATED:
+            case INSTANCE_REMOVED:
+                log.info("** Cluster event {}", event.type());
+                lastClusterEvent = Instant.now();
+                break;
+            default:
+                break;
+            }
+
+        }
+
+    }
+
     private void updateInterface(InterfaceConfig conf, InterfaceConfig prevConf) {
         try {
             Set<Interface> intfs = conf.getInterfaces();