Offload packet processing to another thread

Also update unit tests

Change-Id: Ib94c796083e2d75912f77667d3cfe4ed794694e9
diff --git a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
index 90c58b7..08ba908 100644
--- a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
+++ b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
@@ -109,8 +109,9 @@
 @Service
 public class DhcpRelayManager implements DhcpRelayService {
     public static final String DHCP_RELAY_APP = "org.onosproject.dhcprelay";
-    public static final String ROUTE_STORE_IMPL =
-            "org.onosproject.routeservice.store.RouteStoreImpl";
+    public static final String ROUTE_STORE_IMPL = "org.onosproject.routeservice.store.RouteStoreImpl";
+
+    private static final int DEFAULT_POOL_SIZE = 32;
 
     private static final TrafficSelector ARP_SELECTOR = DefaultTrafficSelector.builder()
             .matchEthType(Ethernet.TYPE_ARP)
@@ -207,6 +208,7 @@
 
     private ScheduledExecutorService timerExecutor;
     protected ExecutorService devEventExecutor;
+    private ExecutorService packetExecutor;
 
     protected DeviceListener deviceListener = new InternalDeviceListener();
     private DhcpRelayPacketProcessor dhcpRelayPacketProcessor = new DhcpRelayPacketProcessor();
@@ -236,12 +238,10 @@
         packetService.addProcessor(dhcpRelayPacketProcessor, PacketProcessor.director(0));
 
         timerExecutor = Executors.newScheduledThreadPool(1,
-                groupedThreads("dhcpRelay",
-                        "config-reloader-%d", log));
-        timerExecutor.scheduleAtFixedRate(new Dhcp6Timer(),
-                0,
-                dhcpPollInterval,
-                TimeUnit.SECONDS);
+                groupedThreads("onos/dhcprelay", "config-reloader-%d", log));
+        timerExecutor.scheduleAtFixedRate(new Dhcp6Timer(), 0, dhcpPollInterval, TimeUnit.SECONDS);
+        packetExecutor = Executors.newFixedThreadPool(DEFAULT_POOL_SIZE,
+                groupedThreads("onos/dhcprelay", "packet-%d", log));
 
         devEventExecutor = newSingleThreadScheduledExecutor(
                              groupedThreads("onos/dhcprelay-dev-events", "events-%d", log));
@@ -255,8 +255,6 @@
 
         deviceService.addListener(deviceListener);
 
-
-
         log.info("DHCP-RELAY Started");
     }
 
@@ -271,6 +269,9 @@
         timerExecutor.shutdown();
         devEventExecutor.shutdownNow();
         devEventExecutor = null;
+        packetExecutor.shutdown();
+        timerExecutor = null;
+        packetExecutor = null;
 
         log.info("DHCP-RELAY Stopped");
     }
@@ -497,9 +498,12 @@
 
 
     private class DhcpRelayPacketProcessor implements PacketProcessor {
-
         @Override
         public void process(PacketContext context) {
+            packetExecutor.execute(() -> processInternal(context));
+        }
+
+        private void processInternal(PacketContext context) {
             // process the packet and get the payload
             Ethernet packet = context.inPacket().parsed();
             if (packet == null) {
diff --git a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
index ba943f9..afda3bd 100644
--- a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
+++ b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
@@ -104,9 +104,6 @@
 import org.onosproject.net.packet.PacketServiceAdapter;
 import org.onosproject.store.StoreDelegate;
 
-
-
-
 import org.osgi.service.component.ComponentContext;
 import org.onlab.packet.DHCP6;
 import org.onlab.packet.IPv6;
@@ -129,6 +126,7 @@
 
 public class DhcpRelayManagerTest {
     private static final int EVENT_PROCESSING_MS = 1000;
+    private static final int PKT_PROCESSING_MS = 500;
     private static final short VLAN_LEN = 2;
     private static final short SEPARATOR_LEN = 1;
     private static final String CONFIG_FILE_PATH = "dhcp-relay.json";
@@ -399,7 +397,7 @@
         verify(mockHostProviderService);
         reset(mockHostProviderService);
 
-        assertEquals(0, mockRouteStore.routes.size());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size()));
 
         HostId expectHostId = HostId.hostId(CLIENT_MAC, CLIENT_VLAN);
         Capture<HostDescription> capturedHostDesc = newCapture();
@@ -409,15 +407,15 @@
         packetService.processPacket(new TestDhcpAckPacketContext(CLIENT_CP, CLIENT_MAC,
                                                                  CLIENT_VLAN, INTERFACE_IP.ipAddress().getIp4Address(),
                                                                  false));
-        verify(mockHostProviderService);
-        assertEquals(0, mockRouteStore.routes.size());
+        assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size()));
 
         HostDescription host = capturedHostDesc.getValue();
-        assertEquals(false, host.configured());
-        assertEquals(CLIENT_CP.deviceId(), host.location().elementId());
-        assertEquals(CLIENT_CP.port(), host.location().port());
-        assertEquals(1, host.ipAddress().size());
-        assertEquals(IP_FOR_CLIENT, host.ipAddress().iterator().next());
+        assertAfter(PKT_PROCESSING_MS, () -> assertFalse(host.configured()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.deviceId(), host.location().elementId()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.port(), host.location().port()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(1, host.ipAddress().size()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(IP_FOR_CLIENT, host.ipAddress().iterator().next()));
     }
 
     /**
@@ -429,12 +427,16 @@
         // Assume outer dhcp relay agent exists in store already
         // send request
         packetService.processPacket(new TestDhcpRequestPacketContext(CLIENT2_MAC,
-                                                                     CLIENT2_VLAN,
-                                                                     CLIENT2_CP,
-                                                                     INTERFACE_IP.ipAddress().getIp4Address(),
-                                                                     true));
+                CLIENT2_VLAN,
+                CLIENT2_CP,
+                INTERFACE_IP.ipAddress().getIp4Address(),
+                true));
         // No routes
         assertEquals(0, mockRouteStore.routes.size());
+
+        // Make sure the REQUEST packet has been processed before start sending ACK
+        assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket));
+
         // send ack
         packetService.processPacket(new TestDhcpAckPacketContext(CLIENT2_CP,
                                                                  CLIENT2_MAC,
@@ -445,13 +447,12 @@
         // won't trigger the host provider service
         verify(mockHostProviderService);
         reset(mockHostProviderService);
-
-        assertEquals(1, mockRouteStore.routes.size());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(1, mockRouteStore.routes.size()));
 
         Route route = mockRouteStore.routes.get(0);
-        assertEquals(OUTER_RELAY_IP, route.nextHop());
-        assertEquals(IP_FOR_CLIENT.toIpPrefix(), route.prefix());
-        assertEquals(Route.Source.DHCP, route.source());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(OUTER_RELAY_IP, route.nextHop()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(IP_FOR_CLIENT.toIpPrefix(), route.prefix()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(Route.Source.DHCP, route.source()));
     }
 
     @Test
@@ -465,25 +466,28 @@
                                                                      CLIENT2_CP,
                                                                      INTERFACE_IP.ipAddress().getIp4Address(),
                                                                      true));
+        assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket));
         OutboundPacket outPacket = packetService.emittedPacket;
         byte[] outData = outPacket.data().array();
         Ethernet eth = Ethernet.deserializer().deserialize(outData, 0, outData.length);
         IPv4 ip = (IPv4) eth.getPayload();
         UDP udp = (UDP) ip.getPayload();
         DHCP dhcp = (DHCP) udp.getPayload();
-        assertEquals(RELAY_AGENT_IP.toInt(), dhcp.getGatewayIPAddress());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(RELAY_AGENT_IP.toInt(), dhcp.getGatewayIPAddress()));
     }
 
     @Test
     public void testArpRequest() throws Exception {
         packetService.processPacket(new TestArpRequestPacketContext(CLIENT_INTERFACE));
+        assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket));
         OutboundPacket outboundPacket = packetService.emittedPacket;
         byte[] outPacketData = outboundPacket.data().array();
         Ethernet eth = Ethernet.deserializer().deserialize(outPacketData, 0, outPacketData.length);
 
-        assertEquals(eth.getEtherType(), Ethernet.TYPE_ARP);
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(eth.getEtherType(), Ethernet.TYPE_ARP));
         ARP arp = (ARP) eth.getPayload();
-        assertArrayEquals(arp.getSenderHardwareAddress(), CLIENT_INTERFACE.mac().toBytes());
+        assertAfter(PKT_PROCESSING_MS, () ->
+                assertArrayEquals(arp.getSenderHardwareAddress(), CLIENT_INTERFACE.mac().toBytes()));
     }
 
     /**
@@ -665,7 +669,7 @@
 
         verify(mockHostProviderService);
         reset(mockHostProviderService);
-        assertEquals(0, mockRouteStore.routes.size());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size()));
 
         Capture<HostDescription> capturedHostDesc = newCapture();
         mockHostProviderService.hostDetected(eq(HostId.hostId(CLIENT_MAC, CLIENT_VLAN)),
@@ -677,15 +681,15 @@
                                                                     CLIENT_VLAN,
                                                                     INTERFACE_IP_V6.ipAddress().getIp6Address(),
                                                                     0, false, CLIENT_VLAN));
-        verify(mockHostProviderService);
-        assertEquals(0, mockRouteStore.routes.size());
+        assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size()));
 
         HostDescription host = capturedHostDesc.getValue();
-        assertEquals(CLIENT_VLAN, host.vlan());
-        assertEquals(CLIENT_CP.deviceId(), host.location().elementId());
-        assertEquals(CLIENT_CP.port(), host.location().port());
-        assertEquals(1, host.ipAddress().size());
-        assertEquals(IP_FOR_CLIENT_V6, host.ipAddress().iterator().next());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_VLAN, host.vlan()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.deviceId(), host.location().elementId()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(CLIENT_CP.port(), host.location().port()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(1, host.ipAddress().size()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(IP_FOR_CLIENT_V6, host.ipAddress().iterator().next()));
 
         // send release
         packetService.processPacket(new TestDhcp6RequestPacketContext(DHCP6.MsgType.RELEASE.value(),
@@ -695,7 +699,8 @@
                 INTERFACE_IP_V6.ipAddress().getIp6Address(),
                 0));
 
-        assertEquals(null, manager.hostService.getHost(HostId.hostId(CLIENT_MAC, CLIENT_VLAN)));
+        assertAfter(PKT_PROCESSING_MS,
+                () -> assertNull(manager.hostService.getHost(HostId.hostId(CLIENT_MAC, CLIENT_VLAN))));
     }
 
     /**
@@ -715,6 +720,9 @@
 
         assertEquals(0, mockRouteStore.routes.size());
 
+        // Make sure the REQUEST packet has been processed before start sending ACK
+        assertAfter(PKT_PROCESSING_MS, () -> assertNotNull(packetService.emittedPacket));
+
         // send reply
         packetService.processPacket(new TestDhcp6ReplyPacketContext(DHCP6.MsgType.REPLY.value(), CLIENT2_CP,
                 CLIENT2_MAC,
@@ -725,19 +733,13 @@
         // won't trigger the host provider service
         verify(mockHostProviderService);
         reset(mockHostProviderService);
-        assertEquals(2, mockRouteStore.routes.size()); // ipAddress and prefix
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(2, mockRouteStore.routes.size())); // ipAddress and prefix
 
-        Route aRoute = mockRouteStore.routes.stream()
-                             .filter(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6))
-                             .findFirst()
-                             .orElse(null);
-        assertNotEquals(null, aRoute);
+        assertAfter(PKT_PROCESSING_MS, () ->
+                assertTrue(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6))));
 
-        aRoute = mockRouteStore.routes.stream()
-                .filter(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6))
-                .findFirst()
-                .orElse(null);
-        assertNotEquals(null, aRoute);
+        assertAfter(PKT_PROCESSING_MS, () ->
+                assertTrue(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6))));
 
         // send release msg
         packetService.processPacket(new TestDhcp6RequestPacketContext(DHCP6.MsgType.RELEASE.value(),
@@ -746,20 +748,13 @@
                 CLIENT2_CP,
                 OUTER_RELAY_IP_V6,
                 1));
+        assertAfter(PKT_PROCESSING_MS, () ->
+                assertFalse(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6))));
 
-        aRoute = mockRouteStore.routes.stream()
-                .filter(rt -> rt.prefix().contains(IP_FOR_CLIENT_V6))
-                .findFirst()
-                .orElse(null);
-        assertEquals(null, aRoute);
+        assertAfter(PKT_PROCESSING_MS, () ->
+                assertFalse(mockRouteStore.routes.stream().anyMatch(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6))));
 
-        aRoute = mockRouteStore.routes.stream()
-                .filter(rt -> rt.prefix().contains(PREFIX_FOR_CLIENT_V6))
-                .findFirst()
-                .orElse(null);
-        assertEquals(null, aRoute);
-
-        assertEquals(0, mockRouteStore.routes.size());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size()));
 
     }
 
@@ -779,7 +774,7 @@
                 INTERFACE_IP_V6.ipAddress().getIp6Address(),
                 1));
 
-        assertEquals(0, mockRouteStore.routes.size());
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size()));
 
         // send reply
         packetService.processPacket(new TestDhcp6ReplyPacketContext(DHCP6.MsgType.REPLY.value(),
@@ -794,7 +789,7 @@
         // won't trigger the host provider service
         verify(mockHostProviderService);
         reset(mockHostProviderService);
-        assertEquals(0, mockRouteStore.routes.size()); // ipAddress and prefix
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(0, mockRouteStore.routes.size())); // ipAddress and prefix
 
     }
 
@@ -811,13 +806,15 @@
         mockHostProviderService.hostDetected(eq(CLIENT_HOST_ID), capture(capturedHostDesc), eq(false));
         replay(mockHostProviderService, manager.hostService);
         packetService.processPacket(packetContext);
-        verify(mockHostProviderService);
+        assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService));
 
+        assertAfter(PKT_PROCESSING_MS, () -> assertTrue(capturedHostDesc.hasCaptured()));
         HostDescription hostDesc = capturedHostDesc.getValue();
         Set<HostLocation> hostLocations = hostDesc.locations();
-        assertEquals(2, hostLocations.size());
-        assertTrue(hostLocations.contains(CLIENT_LOCATION));
-        assertTrue(hostLocations.contains(CLIENT_DH_LOCATION));
+
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(2, hostLocations.size()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_LOCATION)));
+        assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_DH_LOCATION)));
     }
 
     @Test
@@ -848,13 +845,14 @@
         expectLastCall().anyTimes();
         replay(mockHostProviderService, manager.hostService);
         packetService.processPacket(packetContext);
-        verify(mockHostProviderService);
+        assertAfter(PKT_PROCESSING_MS, () -> verify(mockHostProviderService));
 
+        assertAfter(PKT_PROCESSING_MS, () -> assertTrue(capturedHostDesc.hasCaptured()));
         HostDescription hostDesc = capturedHostDesc.getValue();
         Set<HostLocation> hostLocations = hostDesc.locations();
-        assertEquals(2, hostLocations.size());
-        assertTrue(hostLocations.contains(CLIENT_LOCATION));
-        assertTrue(hostLocations.contains(CLIENT_DH_LOCATION));
+        assertAfter(PKT_PROCESSING_MS, () -> assertEquals(2, hostLocations.size()));
+        assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_LOCATION)));
+        assertAfter(PKT_PROCESSING_MS, () -> assertTrue(hostLocations.contains(CLIENT_DH_LOCATION)));
     }
 
     private static class MockDefaultDhcpRelayConfig extends DefaultDhcpRelayConfig {
@@ -939,6 +937,7 @@
             routes.remove(route);
         }
 
+        @Override
         public void replaceRoute(Route route) {
             routes.remove(route);
             routes.add(route);
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index f2ac5ed..68e49d8 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -288,6 +288,7 @@
     private ScheduledExecutorService routeEventExecutor;
     private ScheduledExecutorService mcastEventExecutor;
     private ExecutorService packetExecutor;
+    ExecutorService neighborExecutor;
 
     Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<>();
     /**
@@ -367,6 +368,8 @@
      */
     public static final VlanId PSEUDOWIRE_VLAN = VlanId.vlanId((short) 4093);
 
+    private static final int DEFAULT_POOL_SIZE = 32;
+
     Instant lastEdgePortEvent = Instant.EPOCH;
 
     protected void bindXconnectService(XconnectService xconnectService) {
@@ -391,11 +394,17 @@
     protected void activate(ComponentContext context) {
         appId = coreService.registerApplication(APP_NAME);
 
-        mainEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-main", "%d", log));
-        hostEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-host", "%d", log));
-        routeEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-route", "%d", log));
-        mcastEventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("sr-event-mcast", "%d", log));
-        packetExecutor = Executors.newSingleThreadExecutor(groupedThreads("sr-packet", "%d", log));
+        mainEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-main-%d", log));
+        hostEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-host-%d", log));
+        routeEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-route-%d", log));
+        mcastEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/sr", "event-mcast-%d", log));
+        packetExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/sr", "packet-%d", log));
+        neighborExecutor = Executors.newFixedThreadPool(DEFAULT_POOL_SIZE,
+                groupedThreads("onos/sr", "neighbor-%d", log));
 
         log.debug("Creating EC map nsnextobjectivestore");
         EventuallyConsistentMapBuilder<DestinationSetNextObjectiveStoreKey, NextNeighbors>
@@ -544,12 +553,14 @@
         routeEventExecutor.shutdown();
         mcastEventExecutor.shutdown();
         packetExecutor.shutdown();
+        neighborExecutor.shutdown();
 
         mainEventExecutor = null;
         hostEventExecutor = null;
         routeEventExecutor = null;
         mcastEventExecutor = null;
         packetExecutor = null;
+        neighborExecutor = null;
 
         cfgService.removeListener(cfgListener);
         cfgService.unregisterConfigFactory(deviceConfigFactory);
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java
index f170fae..3cf0e32 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingNeighbourDispatcher.java
@@ -42,6 +42,10 @@
 
     @Override
     public void handleMessage(NeighbourMessageContext context, HostService hostService) {
+        manager.neighborExecutor.execute(() -> handleMessageInternal(context, hostService));
+    }
+
+    private void handleMessageInternal(NeighbourMessageContext context, HostService hostService) {
         log.trace("Received {} packet on {}: {}", context.protocol(),
                   context.inPort(), context.packet());
         switch (context.protocol()) {