ONOS-1823 and ONOS-1838:Segment Routing Multi-instance Support-1

Change-Id: I3cc848415a609a9c4001d135e51104c62fb2830d
diff --git a/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 9ace6b8..87282b4 100644
--- a/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -22,23 +22,22 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.Event;
 import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
 import org.onosproject.segmentrouting.grouphandler.NeighborSet;
+import org.onosproject.segmentrouting.grouphandler.NeighborSetNextObjectiveStoreKey;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
-import org.onosproject.net.MastershipRole;
 import org.onosproject.net.Port;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
-import org.onosproject.net.group.Group;
-import org.onosproject.net.group.GroupEvent;
 import org.onosproject.net.group.GroupKey;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.intent.IntentService;
@@ -51,9 +50,16 @@
 import org.onosproject.net.packet.PacketService;
 import org.onosproject.net.topology.TopologyService;
 import org.onosproject.segmentrouting.config.NetworkConfigManager;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.service.WallclockClockManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -112,6 +118,11 @@
     private static ScheduledFuture<?> eventHandlerFuture = null;
     private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
     private Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<DeviceId, DefaultGroupHandler>();
+    // Per device next objective ID store with (device id + neighbor set) as key
+    private EventuallyConsistentMap<NeighborSetNextObjectiveStoreKey,
+        Integer> nsNextObjStore = null;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
 
     private NetworkConfigManager networkConfigService = new NetworkConfigManager();;
 
@@ -119,10 +130,34 @@
     private static int numOfHandlerExecution = 0;
     private static int numOfHandlerScheduled = 0;
 
+    private KryoNamespace.Builder kryoBuilder = null;
+
     @Activate
     protected void activate() {
         appId = coreService
                 .registerApplication("org.onosproject.segmentrouting");
+
+        kryoBuilder = new KryoNamespace.Builder()
+            .register(NeighborSetNextObjectiveStoreKey.class,
+                  NeighborSet.class,
+                  DeviceId.class,
+                  URI.class,
+                  WallClockTimestamp.class,
+                  org.onosproject.cluster.NodeId.class,
+                  HashSet.class
+                );
+
+        log.debug("Creating EC map nsnextobjectivestore");
+        EventuallyConsistentMapBuilder<NeighborSetNextObjectiveStoreKey, Integer>
+                nsNextObjMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+        nsNextObjStore = nsNextObjMapBuilder
+                .withName("nsnextobjectivestore")
+                .withSerializer(kryoBuilder)
+                .withClockService(new WallclockClockManager<>())
+                .build();
+        log.trace("Current size {}", nsNextObjStore.size());
+
         networkConfigService.init();
         deviceConfiguration = new DeviceConfiguration(networkConfigService);
         arpHandler = new ArpHandler(this);
@@ -136,20 +171,18 @@
         deviceService.addListener(new InternalDeviceListener());
 
         for (Device device : deviceService.getDevices()) {
-            if (mastershipService.getLocalRole(device.id()) == MastershipRole.MASTER) {
-                DefaultGroupHandler groupHandler = DefaultGroupHandler
-                        .createGroupHandler(device.id(), appId,
-                                            deviceConfiguration, linkService,
-                                            flowObjectiveService);
-                groupHandlerMap.put(device.id(), groupHandler);
-                defaultRoutingHandler.populateTtpRules(device.id());
-                log.debug("Initiating default group handling for {}", device.id());
-            } else {
-                log.debug("Activate: Local role {} "
-                                  + "is not MASTER for device {}",
-                          mastershipService.getLocalRole(device.id()),
-                          device.id());
-            }
+            //Irrespective whether the local is a MASTER or not for this device,
+            //create group handler instance and push default TTP flow rules.
+            //Because in a multi-instance setup, instances can initiate
+            //groups for any devices. Also the default TTP rules are needed
+            //to be pushed before inserting any IP table entries for any device
+            DefaultGroupHandler groupHandler = DefaultGroupHandler
+                    .createGroupHandler(device.id(), appId,
+                                        deviceConfiguration, linkService,
+                                        flowObjectiveService,
+                                        nsNextObjStore);
+            groupHandlerMap.put(device.id(), groupHandler);
+            defaultRoutingHandler.populateTtpRules(device.id());
         }
 
         defaultRoutingHandler.startPopulationProcess();
@@ -180,8 +213,14 @@
 
     public int getNextObjectiveId(DeviceId deviceId, NeighborSet ns) {
 
-        return (groupHandlerMap.get(deviceId) != null) ? groupHandlerMap
-                .get(deviceId).getNextObjectiveId(ns) : -1;
+        if (groupHandlerMap.get(deviceId) != null) {
+            log.trace("getNextObjectiveId query in device {}", deviceId);
+            return groupHandlerMap
+                    .get(deviceId).getNextObjectiveId(ns);
+        } else {
+            log.warn("getNextObjectiveId query in device {} not found", deviceId);
+            return -1;
+        }
     }
 
     private class InternalPacketProcessor implements PacketProcessor {
@@ -224,12 +263,12 @@
 
         @Override
         public void event(DeviceEvent event) {
-            if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
+            /*if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
                 log.debug("Local role {} is not MASTER for device {}",
                           mastershipService.getLocalRole(event.subject().id()),
                           event.subject().id());
                 return;
-            }
+            }*/
 
             switch (event.type()) {
             case DEVICE_ADDED:
@@ -245,12 +284,14 @@
 
     private void scheduleEventHandlerIfNotScheduled(Event event) {
 
-        eventQueue.add(event);
-        numOfEvents++;
-        if (eventHandlerFuture == null || eventHandlerFuture.isDone()) {
-            eventHandlerFuture = executorService
-                    .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
-            numOfHandlerScheduled++;
+        synchronized (eventQueue) {
+            eventQueue.add(event);
+            numOfEvents++;
+            if (eventHandlerFuture == null || eventHandlerFuture.isDone()) {
+                eventHandlerFuture = executorService
+                        .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
+                numOfHandlerScheduled++;
+            }
         }
 
         log.trace("numOfEvents {}, numOfEventHanlderScheduled {}", numOfEvents,
@@ -262,44 +303,68 @@
 
         @Override
         public void run() {
-            numOfHandlerExecution++;
-            while (!eventQueue.isEmpty()) {
-                Event event = eventQueue.poll();
-                if (event.type() == LinkEvent.Type.LINK_ADDED) {
-                    processLinkAdded((Link) event.subject());
-                } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
-                    processLinkRemoved((Link) event.subject());
-                } else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
-                    processGroupAdded((Group) event.subject());
-                } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
-                        event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
-                        event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
-                    if (deviceService.isAvailable(((Device) event.subject()).id())) {
-                        processDeviceAdded((Device) event.subject());
+            try {
+                synchronized (eventQueue) {
+                    numOfHandlerExecution++;
+                    while (!eventQueue.isEmpty()) {
+                        Event event = eventQueue.poll();
+                        if (event.type() == LinkEvent.Type.LINK_ADDED) {
+                            processLinkAdded((Link) event.subject());
+                        } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
+                            processLinkRemoved((Link) event.subject());
+                        //} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
+                        //    processGroupAdded((Group) event.subject());
+                        } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
+                                event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+                                event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
+                            if (deviceService.isAvailable(((Device) event.subject()).id())) {
+                                processDeviceAdded((Device) event.subject());
+                            }
+                        } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
+                            processPortRemoved((Device) event.subject(),
+                                               ((DeviceEvent) event).port());
+                        } else {
+                            log.warn("Unhandled event type: {}", event.type());
+                        }
                     }
-                } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
-                    processPortRemoved((Device) event.subject(),
-                                       ((DeviceEvent) event).port());
-                } else {
-                    log.warn("Unhandled event type: {}", event.type());
                 }
+                log.debug("numOfHandlerExecution {} numOfEventHanlderScheduled {} numOfEvents {}",
+                          numOfHandlerExecution, numOfHandlerScheduled, numOfEvents);
+            } catch (Exception e) {
+                log.error("SegmentRouting event handler "
+                        + "thread thrown an exception: {}", e);
             }
-            log.debug("numOfHandlerExecution {} numOfEventHanlderScheduled {} numOfEvents {}",
-                      numOfHandlerExecution, numOfHandlerScheduled, numOfEvents);
         }
     }
 
     private void processLinkAdded(Link link) {
         log.debug("A new link {} was added", link.toString());
 
-        if (mastershipService.getLocalRole(link.src().deviceId()) == MastershipRole.MASTER) {
-            DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
-                    .deviceId());
-            if (groupHandler != null) {
+        //Irrespective whether the local is a MASTER or not for this device,
+        //create group handler instance and push default TTP flow rules.
+        //Because in a multi-instance setup, instances can initiate
+        //groups for any devices. Also the default TTP rules are needed
+        //to be pushed before inserting any IP table entries for any device
+        DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
+                .deviceId());
+        if (groupHandler != null) {
+            groupHandler.linkUp(link);
+        } else {
+            Device device = deviceService.getDevice(link.src().deviceId());
+            if (device != null) {
+                log.warn("processLinkAdded: Link Added "
+                        + "Notification without Device Added "
+                        + "event, still handling it");
+                processDeviceAdded(device);
+                groupHandler = groupHandlerMap.get(link.src()
+                                                   .deviceId());
                 groupHandler.linkUp(link);
             }
         }
-        defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
+
+        //defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
+        log.trace("processLinkAdded: re-starting route population process");
+        defaultRoutingHandler.startPopulationProcess();
     }
 
     private void processLinkRemoved(Link link) {
@@ -308,20 +373,27 @@
         if (groupHandler != null) {
             groupHandler.portDown(link.src().port());
         }
-        defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
-    }
-
-    private void processGroupAdded(Group group) {
-        log.debug("A new group with ID {} was added", group.id());
-        defaultRoutingHandler.resumePopulationProcess();
+        //defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
+        log.trace("processLinkRemoved: re-starting route population process");
+        defaultRoutingHandler.startPopulationProcess();
     }
 
     private void processDeviceAdded(Device device) {
         log.debug("A new device with ID {} was added", device.id());
-        defaultRoutingHandler.populateTtpRules(device.id());
-        DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device
-                .id(), appId, deviceConfiguration, linkService, flowObjectiveService);
+        //Irrespective whether the local is a MASTER or not for this device,
+        //create group handler instance and push default TTP flow rules.
+        //Because in a multi-instance setup, instances can initiate
+        //groups for any devices. Also the default TTP rules are needed
+        //to be pushed before inserting any IP table entries for any device
+        DefaultGroupHandler dgh = DefaultGroupHandler.
+                createGroupHandler(device.id(),
+                                   appId,
+                                   deviceConfiguration,
+                                   linkService,
+                                   flowObjectiveService,
+                                   nsNextObjStore);
         groupHandlerMap.put(device.id(), dgh);
+        defaultRoutingHandler.populateTtpRules(device.id());
     }
 
     private void processPortRemoved(Device device, Port port) {