[CORD-2740] Listening for topology events

Change-Id: I25fc2880e7e78439652c5c6a41ac940396e28045
diff --git a/app/src/main/java/org/onosproject/segmentrouting/LinkHandler.java b/app/src/main/java/org/onosproject/segmentrouting/LinkHandler.java
index c643991..be99764 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/LinkHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/LinkHandler.java
@@ -180,8 +180,6 @@
                 }
             }
         }
-
-        srManager.mcastHandler.init();
     }
 
     /**
@@ -244,8 +242,6 @@
             log.warn("group handler not found for dev:{} when removing link: {}",
                      link.src().deviceId(), link);
         }
-
-        srManager.mcastHandler.processLinkDown(link);
     }
 
     /**
@@ -255,7 +251,7 @@
      * @param link the link to be processed
      * @return true if valid link
      */
-    private boolean isLinkValid(Link link) {
+     boolean isLinkValid(Link link) {
         if (link.type() != Link.Type.DIRECT) {
             // NOTE: A DIRECT link might be transiently marked as INDIRECT
             // if BDDP is received before LLDP. We can safely ignore that
diff --git a/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index fff2c92..a4c9d19 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -52,6 +52,7 @@
 import org.onosproject.net.mcast.McastEvent;
 import org.onosproject.net.mcast.McastRoute;
 import org.onosproject.net.mcast.McastRouteInfo;
+import org.onosproject.net.topology.Topology;
 import org.onosproject.net.topology.TopologyService;
 import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
 import org.onosproject.segmentrouting.storekey.McastStoreKey;
@@ -949,8 +950,11 @@
      * @return an optional path from src to dst
      */
     private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
+        // Takes a snapshot of the topology
+        final Topology currentTopology = topologyService.currentTopology();
         List<Path> allPaths = Lists.newArrayList(
-                topologyService.getPaths(topologyService.currentTopology(), src, dst));
+                topologyService.getPaths(currentTopology, src, dst)
+        );
         log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
         if (allPaths.isEmpty()) {
             return Optional.empty();
@@ -1337,7 +1341,7 @@
                                 // Send to the flowobjective service
                                 srManager.flowObjectiveService.next(deviceId, currentNext);
                             } else {
-                                log.warn("Unable to run buckets corrector." +
+                                log.warn("Unable to run buckets corrector. " +
                                                  "Missing next for {} and group {}",
                                          deviceId, mcastIp);
                             }
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index f72d11b..a47101d 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -85,6 +85,8 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.net.topology.TopologyListener;
 import org.onosproject.net.topology.TopologyService;
 import org.onosproject.routeservice.ResolvedRoute;
 import org.onosproject.routeservice.RouteEvent;
@@ -237,11 +239,13 @@
     LinkHandler linkHandler = null;
     private SegmentRoutingNeighbourDispatcher neighbourHandler = null;
     private DefaultL2TunnelHandler l2TunnelHandler = null;
+    private TopologyHandler topologyHandler = null;
     private InternalEventHandler eventHandler = new InternalEventHandler();
     private final InternalHostListener hostListener = new InternalHostListener();
     private final InternalConfigListener cfgListener = new InternalConfigListener(this);
     private final InternalMcastListener mcastListener = new InternalMcastListener();
     private final InternalRouteEventListener routeListener = new InternalRouteEventListener();
+    private final InternalTopologyListener topologyListener = new InternalTopologyListener();
 
     private ScheduledExecutorService executorService = Executors
             .newScheduledThreadPool(1, groupedThreads("SegmentRoutingManager", "event-%d", log));
@@ -416,6 +420,7 @@
         routeHandler = new RouteHandler(this);
         neighbourHandler = new SegmentRoutingNeighbourDispatcher(this);
         l2TunnelHandler = new DefaultL2TunnelHandler(this);
+        topologyHandler = new TopologyHandler(this);
 
         cfgService.addListener(cfgListener);
         cfgService.registerConfigFactory(deviceConfigFactory);
@@ -431,6 +436,7 @@
         deviceService.addListener(deviceListener);
         multicastRouteService.addListener(mcastListener);
         routeService.addListener(routeListener);
+        topologyService.addListener(topologyListener);
 
         l2TunnelHandler.init();
 
@@ -473,6 +479,7 @@
         deviceService.removeListener(deviceListener);
         multicastRouteService.removeListener(mcastListener);
         routeService.removeListener(routeListener);
+        topologyService.removeListener(topologyListener);
 
         neighbourResolutionService.unregisterNeighbourHandlers(appId);
 
@@ -1067,6 +1074,22 @@
         }
     }
 
+    /**
+     * Internal listener for topology events.
+     */
+    private class InternalTopologyListener implements TopologyListener {
+        @Override
+        public void event(TopologyEvent event) {
+            switch (event.type()) {
+            case TOPOLOGY_CHANGED:
+                log.debug("Event {} received from TopologyService", event.type());
+                scheduleEventHandlerIfNotScheduled(event);
+                break;
+            default:
+            }
+        }
+    }
+
     @SuppressWarnings("rawtypes")
     private void scheduleEventHandlerIfNotScheduled(Event event) {
         synchronized (THREAD_SCHED_LOCK) {
@@ -1103,6 +1126,7 @@
                             break;
                         }
                     }
+                    // TODO We should also change SR routing and PW to listen to TopologyEvents
                     if (event.type() == LinkEvent.Type.LINK_ADDED ||
                             event.type() == LinkEvent.Type.LINK_UPDATED) {
                         linkHandler.processLinkAdded((Link) event.subject());
@@ -1139,6 +1163,14 @@
                                  event.type());
                         processPortUpdated(((Device) event.subject()),
                                            ((DeviceEvent) event).port());
+                    } else if (event.type() == TopologyEvent.Type.TOPOLOGY_CHANGED) {
+                        // Process topology event, needed for all modules relying on
+                        // topology service for path computation
+                        TopologyEvent topologyEvent = (TopologyEvent) event;
+                        log.info("Processing topology event {}, topology age {}, reasons {}",
+                                 event.type(), topologyEvent.subject().time(),
+                                 topologyEvent.reasons().size());
+                        topologyHandler.processTopologyChange(topologyEvent.reasons());
                     } else {
                         log.warn("Unhandled event type: {}", event.type());
                     }
@@ -1228,7 +1260,6 @@
         defaultRoutingHandler
             .populateRoutingRulesForLinkStatusChange(null, null, device.id());
         defaultRoutingHandler.purgeEcmpGraph(device.id());
-        mcastHandler.processDeviceDown(device.id());
         xConnectHandler.removeDevice(device.id());
     }
 
diff --git a/app/src/main/java/org/onosproject/segmentrouting/TopologyHandler.java b/app/src/main/java/org/onosproject/segmentrouting/TopologyHandler.java
new file mode 100644
index 0000000..72b1a70
--- /dev/null
+++ b/app/src/main/java/org/onosproject/segmentrouting/TopologyHandler.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting;
+
+import com.google.common.collect.Maps;
+import org.onosproject.event.Event;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Link;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for topology events.
+ */
+class TopologyHandler {
+
+    // Logging instance
+    private static final Logger log = LoggerFactory.getLogger(TopologyHandler.class);
+    // Internal reference for SR manager and its services
+    private final SegmentRoutingManager srManager;
+
+    /**
+     * Constructs the TopologyHandler.
+     *
+     * @param srManager Segment Routing manager
+     */
+    TopologyHandler(SegmentRoutingManager srManager) {
+        this.srManager = srManager;
+    }
+
+    // Check if the link event is valid
+    private boolean isValid(LinkEvent linkEvent) {
+        Link link = linkEvent.subject();
+        // Verify if the link is valid with the link handler
+        if (!srManager.linkHandler.isLinkValid(link)) {
+            log.debug("Link {} ignored by the LinkHandler", link);
+            return false;
+        }
+        // Processing for LINK_REMOVED
+        if (linkEvent.type() == LinkEvent.Type.LINK_REMOVED) {
+            // device availability check helps to ensure that multiple link-removed
+            // events are actually treated as a single switch removed event.
+            // purgeSeenLink is necessary so we do rerouting (instead of rehashing)
+            // when switch comes back.
+            if (link.src().elementId() instanceof DeviceId
+                    && !srManager.deviceService.isAvailable(link.src().deviceId())) {
+                log.debug("Link {} ignored device {} is down", link, link.src().deviceId());
+                return false;
+            }
+            if (link.dst().elementId() instanceof DeviceId
+                    && !srManager.deviceService.isAvailable(link.dst().deviceId())) {
+                log.debug("Link {} ignored device {} is down", link, link.dst().deviceId());
+                return false;
+            }
+            // LINK_REMOVED is ok
+            return true;
+        }
+        // Processing for LINK_ADDED and LINK_UPDATED
+        // Verify if source device is configured
+        if (srManager.deviceConfiguration == null ||
+                !srManager.deviceConfiguration.isConfigured(link.src().deviceId())) {
+            // Source device is not configured, not valid for us
+            log.warn("Source device of this link is not configured.. "
+                             + "not processing further");
+            return false;
+        }
+        // LINK_ADDED/LINK_UPDATED is ok
+        return true;
+    }
+
+    // Check if the device event is valid
+    private boolean isValid(DeviceEvent deviceEvent) {
+        Device device = deviceEvent.subject();
+        // We don't process the event if the device is available
+        return !srManager.deviceService.isAvailable(device.id());
+    }
+
+    /**
+     * Process the TOPOLOGY_CHANGE event. An initial optimization
+     * is performed to avoid the processing of not relevant events.
+     *
+     * @param reasons list of events that triggered topology change
+     */
+    void processTopologyChange(List<Event> reasons) {
+        // Store temporary in the map all the link events,
+        // events having the same subject will be automatically
+        // overridden.
+        Map<Link, LinkEvent> linkEvents = Maps.newHashMap();
+        // Store temporary in the map all the device events,
+        // events having the same subject will be automatically
+        // overridden.
+        Map<DeviceId, DeviceEvent> deviceEvents = Maps.newHashMap();
+        // Pre-process all the events putting them in the right map
+        reasons.forEach(reason -> {
+            // Relevant events for devices
+            if (reason.type() == DeviceEvent.Type.DEVICE_ADDED ||
+                    reason.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+                    reason.type() == DeviceEvent.Type.DEVICE_UPDATED) {
+                // Take the event and save in the map
+                DeviceEvent deviceEvent = (DeviceEvent) reason;
+                deviceEvents.put(deviceEvent.subject().id(), deviceEvent);
+                /// Relevant events for links
+            } else if (reason.type() == LinkEvent.Type.LINK_ADDED ||
+                    reason.type() == LinkEvent.Type.LINK_UPDATED ||
+                    reason.type() == LinkEvent.Type.LINK_REMOVED) {
+                // Take the event and store the link in the map
+                LinkEvent linkEvent = (LinkEvent) reason;
+                linkEvents.put(linkEvent.subject(), linkEvent);
+                // Other events are not relevant
+            } else {
+                log.debug("Unhandled event type: {}", reason.type());
+            }
+        });
+        // Verify if the link events are valid
+        // before sent for mcast handling
+        List<LinkEvent> toProcessLinkEvent = linkEvents.values()
+                .stream()
+                .filter(this::isValid)
+                .collect(Collectors.toList());
+        // Verify if the device events are valid
+        // before sent for mcast handling
+        List<DeviceEvent> toProcessDeviceEvent = deviceEvents.values()
+                .stream()
+                .filter(this::isValid)
+                .collect(Collectors.toList());
+
+        // Processing part of the received events
+        // We don't need to process all LINK_ADDED
+        boolean isLinkAdded = false;
+        // Iterate on link events
+        for (LinkEvent linkEvent : toProcessLinkEvent) {
+            // We process just the first one
+            if (linkEvent.type() == LinkEvent.Type.LINK_ADDED ||
+                    linkEvent.type() == LinkEvent.Type.LINK_UPDATED) {
+                // Other ones are skipped
+                if (isLinkAdded) {
+                    continue;
+                }
+                log.info("Processing - Event: {}", linkEvent);
+                // First time, let's process it
+                isLinkAdded = true;
+                // McastHandler, reroute all the mcast tree
+                srManager.mcastHandler.init();
+            } else {
+                log.info("Processing - Event: {}", linkEvent);
+                // We compute each time a LINK_DOWN event
+                srManager.mcastHandler.processLinkDown(linkEvent.subject());
+            }
+        }
+        // Process all the device events
+        toProcessDeviceEvent.forEach(deviceEvent -> {
+            log.info("Processing - Event: {}", deviceEvent);
+            srManager.mcastHandler.processDeviceDown(deviceEvent.subject().id());
+        });
+    }
+}