Move event handling to background thread

Change-Id: I8ccd1631fac14b1f753da4fb4b4ed01e5a045edf
(cherry picked from commit 8f906bfae1bb600644723a8aa75c80207c6d0941)
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java
index d2cdf9d..e617ae9 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleDriverProvider.java
@@ -20,6 +20,7 @@
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+
 import org.onosproject.core.ApplicationId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
@@ -186,16 +187,19 @@
 
         @Override
         public void event(DeviceEvent event) {
-            executor.schedule(() -> pollDeviceFlowEntries(event.subject()), 0, TimeUnit.SECONDS);
+            executor.execute(() -> handleEvent(event));
         }
 
-        @Override
-        public boolean isRelevant(DeviceEvent event) {
+        private void handleEvent(DeviceEvent event) {
             Device device = event.subject();
-            return mastershipService.isLocalMaster(device.id()) && device.is(FlowRuleProgrammable.class) &&
-                    (event.type() == DEVICE_ADDED ||
-                            event.type() == DEVICE_UPDATED ||
-                            (event.type() == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(device.id())));
+            boolean isRelevant = mastershipService.isLocalMaster(device.id())
+                    && device.is(FlowRuleProgrammable.class)
+                    && (event.type() == DEVICE_ADDED ||
+                        event.type() == DEVICE_UPDATED ||
+                        (event.type() == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(device.id())));
+            if (isRelevant) {
+                pollDeviceFlowEntries(event.subject());
+            }
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 57ee153..a259c26 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -163,6 +163,7 @@
 
     private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
     private ExecutorService messageHandlingExecutor;
+    private ExecutorService eventHandler;
 
     private ScheduledFuture<?> backupTask;
     private final ScheduledExecutorService backupSenderExecutor =
@@ -197,6 +198,8 @@
 
         local = clusterService.getLocalNode().id();
 
+        eventHandler = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/flow", "event-handler", log));
         messageHandlingExecutor = Executors.newFixedThreadPool(
                 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
 
@@ -233,6 +236,7 @@
         unregisterMessageHandlers();
         deviceTableStats.removeListener(tableStatsListener);
         deviceTableStats.destroy();
+        eventHandler.shutdownNow();
         messageHandlingExecutor.shutdownNow();
         backupSenderExecutor.shutdownNow();
         log.info("Stopped");
@@ -663,6 +667,10 @@
 
         @Override
         public void event(ReplicaInfoEvent event) {
+            eventHandler.execute(() -> handleEvent(event));
+        }
+
+        private void handleEvent(ReplicaInfoEvent event) {
             if (!backupEnabled) {
                 return;
             }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 320c4d1..3e83183 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -97,6 +97,7 @@
     private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
             Pattern.compile("device:(.*)");
 
+    private ExecutorService eventHandler;
     private ExecutorService messageHandlingExecutor;
     private ScheduledExecutorService transferExecutor;
     private final LeadershipEventListener leadershipEventListener =
@@ -116,6 +117,10 @@
 
     @Activate
     public void activate() {
+
+        eventHandler = Executors.newSingleThreadExecutor(
+                        groupedThreads("onos/store/device/mastership", "event-handler", log));
+
         messageHandlingExecutor =
                 Executors.newSingleThreadExecutor(
                         groupedThreads("onos/store/device/mastership", "message-handler", log));
@@ -136,10 +141,10 @@
     @Deactivate
     public void deactivate() {
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
+        leadershipService.removeListener(leadershipEventListener);
         messageHandlingExecutor.shutdown();
         transferExecutor.shutdown();
-        leadershipService.removeListener(leadershipEventListener);
-
+        eventHandler.shutdown();
         log.info("Stopped");
     }
 
@@ -308,6 +313,10 @@
 
         @Override
         public void event(LeadershipEvent event) {
+            eventHandler.execute(() -> handleEvent(event));
+        }
+
+        private void handleEvent(LeadershipEvent event) {
             Leadership leadership = event.subject();
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
             RoleInfo roleInfo = getNodes(deviceId);