Fix ConsistentMapException.Interrupted and NullPointerException

- Moving time-consuming packet processing to a separate thread
- Re-use the group information when dealing groupMissing instead of query again

Change-Id: I01f1b43260f22dcb969a105f16d04d79c722146e
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
index c78108c..4a0e409 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
@@ -55,16 +55,17 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Dictionary;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.GROUP_READ;
 import static org.onosproject.security.AppPermission.Type.GROUP_WRITE;
 import static org.slf4j.LoggerFactory.getLogger;
 
-
-
 /**
  * Provides implementation of the group service APIs.
  */
@@ -80,7 +81,9 @@
     private final GroupStoreDelegate delegate = new InternalGroupStoreDelegate();
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
-    private final  GroupDriverProvider defaultProvider = new GroupDriverProvider();
+    private final GroupDriverProvider defaultProvider = new GroupDriverProvider();
+
+    private ExecutorService eventExecutor;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected GroupStore store;
@@ -110,6 +113,7 @@
 
     @Activate
     public void activate(ComponentContext context) {
+        eventExecutor = Executors.newSingleThreadExecutor(groupedThreads("onos/group", "event"));
         store.setDelegate(delegate);
         eventDispatcher.addSink(GroupEvent.class, listenerRegistry);
         deviceService.addListener(deviceListener);
@@ -120,6 +124,7 @@
 
     @Deactivate
     public void deactivate() {
+        eventExecutor.shutdown();
         defaultProvider.terminate();
         deviceService.removeListener(deviceListener);
         cfgService.unregisterProperties(getClass(), false);
@@ -420,16 +425,19 @@
     }
 
     private class InternalDeviceListener implements DeviceListener {
-
         @Override
         public void event(DeviceEvent event) {
+            eventExecutor.execute(() -> processEventInternal(event));
+        }
+
+        private void processEventInternal(DeviceEvent event) {
             switch (event.type()) {
                 case DEVICE_REMOVED:
                 case DEVICE_AVAILABILITY_CHANGED:
                     DeviceId deviceId = event.subject().id();
                     if (!deviceService.isAvailable(deviceId)) {
-                        log.debug("Device {} became un available; clearing initial audit status",
-                                  event.type(), event.subject().id());
+                        log.debug("Device {} became unavailable; clearing initial audit status",
+                                event.type(), event.subject().id());
                         store.deviceInitialAuditCompleted(event.subject().id(), false);
 
                         if (purgeOnDisconnection) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index d8c3f7c..58417a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -1416,7 +1416,7 @@
                 }
             }
         }
-        for (Group group : storedGroupEntries) {
+        for (StoredGroupEntry group : storedGroupEntries) {
             // there are groups in the store that aren't in the switch
             log.debug("Group AUDIT: group {} missing in data plane for device {}",
                       group.id(), deviceId);
@@ -1470,7 +1470,7 @@
         return (group.referenceCount() == 0 && group.age() >= gcThresh);
     }
 
-    private void groupMissing(Group group) {
+    private void groupMissing(StoredGroupEntry group) {
         switch (group.state()) {
             case PENDING_DELETE:
                 log.debug("Group {} delete confirmation from device {}",
@@ -1481,19 +1481,13 @@
             case PENDING_ADD:
             case PENDING_ADD_RETRY:
             case PENDING_UPDATE:
-                log.debug("Group {} is in store but not on device {}",
-                          group, group.deviceId());
-                StoredGroupEntry existing =
-                        getStoredGroupEntry(group.deviceId(), group.id());
                 log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
-                          existing.id(),
-                          existing.deviceId(),
-                          existing.state());
-                existing.setState(Group.GroupState.PENDING_ADD_RETRY);
+                        group.id(),
+                        group.deviceId(),
+                        group.state());
+                group.setState(Group.GroupState.PENDING_ADD_RETRY);
                 //Re-PUT map entries to trigger map update events
-                getGroupStoreKeyMap().
-                        put(new GroupStoreKeyMapKey(existing.deviceId(),
-                                                    existing.appCookie()), existing);
+                getGroupStoreKeyMap().put(new GroupStoreKeyMapKey(group.deviceId(), group.appCookie()), group);
                 notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
                                               group));
                 break;