ONOS-2043: Move device event handling in PacketManager off of event loop thread

Change-Id: Ia8b12e6ec3e732f0311adc7b3e7e63d07ad117e0
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index 26c4102..28f1df0 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -58,9 +58,12 @@
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.security.AppGuard.checkPermission;
 
 
@@ -92,6 +95,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private FlowObjectiveService objectiveService;
 
+    private ExecutorService eventHandlingExecutor;
+
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
     private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
@@ -100,6 +105,8 @@
 
     @Activate
     public void activate() {
+        eventHandlingExecutor = Executors.newSingleThreadExecutor(
+                        groupedThreads("onos/net/packet", "event-handler"));
         appId = coreService.getAppId(CoreService.CORE_APP_NAME);
         store.setDelegate(delegate);
         deviceService.addListener(deviceListener);
@@ -111,6 +118,7 @@
     public void deactivate() {
         store.unsetDelegate(delegate);
         deviceService.removeListener(deviceListener);
+        eventHandlingExecutor.shutdown();
         log.info("Stopped");
     }
 
@@ -277,19 +285,25 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            Device device = event.subject();
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                    if (deviceService.isAvailable(event.subject().id())) {
-                        for (PacketRequest request : store.existingRequests()) {
-                            pushRule(device, request);
+            eventHandlingExecutor.execute(() -> {
+                try {
+                    Device device = event.subject();
+                    switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                        if (deviceService.isAvailable(event.subject().id())) {
+                            for (PacketRequest request : store.existingRequests()) {
+                                pushRule(device, request);
+                            }
                         }
+                        break;
+                    default:
+                        break;
                     }
-                    break;
-                default:
-                    break;
-            }
+                } catch (Exception e) {
+                    log.warn("Failed to process {}", event, e);
+                }
+            });
         }
     }