Offload packet processing to another thread

Also update unit tests

Change-Id: Ib94c796083e2d75912f77667d3cfe4ed794694e9
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 6f541c1..a5f8a51 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -323,6 +323,7 @@
     private ScheduledExecutorService routeEventExecutor;
     private ScheduledExecutorService mcastEventExecutor;
     private ExecutorService packetExecutor;
+    ExecutorService neighborExecutor;
 
     Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<>();
     /**
@@ -407,6 +408,8 @@
     public static final int MIN_DUMMY_VLAN_ID = 2;
     public static final int MAX_DUMMY_VLAN_ID = 4093;
 
+    private static final int DEFAULT_POOL_SIZE = 32;
+
     Instant lastEdgePortEvent = Instant.EPOCH;
 
     protected void bindXconnectService(XconnectService xconnectService) {
@@ -431,11 +434,17 @@
     protected void activate(ComponentContext context) {
         appId = coreService.registerApplication(APP_NAME);
 
-        mainEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-main", "%d", log));
-        hostEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-host", "%d", log));
-        routeEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-route", "%d", log));
-        mcastEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-mcast", "%d", log));
-        packetExecutor = Executors.newSingleThreadExecutor(groupedThreads("sr-packet", "%d", log));
+        mainEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-main-%d", log));
+        hostEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-host-%d", log));
+        routeEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-route-%d", log));
+        mcastEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-mcast-%d", log));
+        packetExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/sr", "packet-%d", log));
+        neighborExecutor = Executors.newFixedThreadPool(DEFAULT_POOL_SIZE,
+                groupedThreads("onos/sr", "neighbor-%d", log));
 
         log.debug("Creating EC map nsnextobjectivestore");
         EventuallyConsistentMapBuilder<DestinationSetNextObjectiveStoreKey, NextNeighbors>
@@ -598,12 +607,14 @@
         routeEventExecutor.shutdown();
         mcastEventExecutor.shutdown();
         packetExecutor.shutdown();
+        neighborExecutor.shutdown();
 
         mainEventExecutor = null;
         hostEventExecutor = null;
         routeEventExecutor = null;
         mcastEventExecutor = null;
         packetExecutor = null;
+        neighborExecutor = null;
 
         cfgService.removeListener(cfgListener);
         cfgService.unregisterConfigFactory(deviceConfigFactory);
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java
index f170fae..3cf0e32 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java
@@ -42,6 +42,10 @@
 
     @Override
     public void handleMessage(NeighbourMessageContext context, HostService hostService) {
+        manager.neighborExecutor.execute(() -> handleMessageInternal(context, hostService));
+    }
+
+    private void handleMessageInternal(NeighbourMessageContext context, HostService hostService) {
         log.trace("Received {} packet on {}: {}", context.protocol(),
                   context.inPort(), context.packet());
         switch (context.protocol()) {