Moving the device event handling to separate thread

Change-Id: Idfffee1ef46fda4248839854fa2d87f3db6008bb
diff --git a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
index bb5a794..bb14e16 100644
--- a/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
+++ b/apps/dhcprelay/app/src/main/java/org/onosproject/dhcprelay/DhcpRelayManager.java
@@ -90,6 +90,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -102,6 +103,7 @@
 import static org.onosproject.dhcprelay.OsgiPropertyConstants.DHCP_FPM_ENABLED_DEFAULT;
 import static org.onosproject.dhcprelay.OsgiPropertyConstants.DHCP_POLL_INTERVAL;
 import static org.onosproject.dhcprelay.OsgiPropertyConstants.DHCP_POLL_INTERVAL_DEFAULT;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
 
 /**
@@ -212,6 +214,7 @@
     protected boolean dhcpFpmEnabled = DHCP_FPM_ENABLED_DEFAULT;
 
     private ScheduledExecutorService timerExecutor;
+    protected ExecutorService devEventExecutor;
 
     protected DeviceListener deviceListener = new InternalDeviceListener();
     private DhcpRelayPacketProcessor dhcpRelayPacketProcessor = new DhcpRelayPacketProcessor();
@@ -248,6 +251,9 @@
                 dhcpPollInterval,
                 TimeUnit.SECONDS);
 
+        devEventExecutor = newSingleThreadScheduledExecutor(
+                             groupedThreads("onos/dhcprelay-dev-events", "events-%d", log));
+
         modified(context);
 
         // Enable distribute route store
@@ -271,6 +277,8 @@
         compCfgService.unregisterProperties(getClass(), false);
         deviceService.removeListener(deviceListener);
         timerExecutor.shutdown();
+        devEventExecutor.shutdownNow();
+        devEventExecutor = null;
 
         log.info("DHCP-RELAY Stopped");
     }
@@ -624,17 +632,19 @@
 
         @Override
         public void event(DeviceEvent event) {
+          if (devEventExecutor != null) {
             Device device = event.subject();
             switch (event.type()) {
                 case DEVICE_ADDED:
-                    updateIgnoreVlanConfigs();
+                    devEventExecutor.execute(this::updateIgnoreVlanConfigs);
                     break;
                 case DEVICE_AVAILABILITY_CHANGED:
-                    deviceAvailabilityChanged(device);
+                    devEventExecutor.execute(() -> deviceAvailabilityChanged(device));
                     break;
                 default:
                     break;
             }
+          }
         }
 
         private void deviceAvailabilityChanged(Device device) {
diff --git a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
index 9d86606..ba943f9 100644
--- a/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
+++ b/apps/dhcprelay/app/src/test/java/org/onosproject/dhcprelay/DhcpRelayManagerTest.java
@@ -124,9 +124,11 @@
 
 import static org.easymock.EasyMock.*;
 import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.assertAfter;
 import static org.onosproject.dhcprelay.DhcpRelayManager.DHCP_RELAY_APP;
 
 public class DhcpRelayManagerTest {
+    private static final int EVENT_PROCESSING_MS = 1000;
     private static final short VLAN_LEN = 2;
     private static final short SEPARATOR_LEN = 1;
     private static final String CONFIG_FILE_PATH = "dhcp-relay.json";
@@ -634,10 +636,16 @@
         flowObjectiveService.apply(eq(DEV_1_ID), capture(capturedFromDev1));
         expectLastCall().times(DHCP_SELECTORS.size());
         replay(manager.cfgService, flowObjectiveService, manager.deviceService, device);
+
         manager.deviceListener.event(event);
+
+        // Wait until all flow objective events are captured before triggering onSuccess
+        int expectFlowObjCount = Dhcp4HandlerImpl.DHCP_SELECTORS.size() + Dhcp6HandlerImpl.DHCP_SELECTORS.size();
+        assertAfter(EVENT_PROCESSING_MS, () -> assertEquals(expectFlowObjCount, capturedFromDev1.getValues().size()));
         capturedFromDev1.getValues().forEach(obj -> obj.context().ifPresent(ctx -> ctx.onSuccess(obj)));
-        assertEquals(1, v4Handler.ignoredVlans.size());
-        assertEquals(1, v6Handler.ignoredVlans.size());
+
+        assertAfter(EVENT_PROCESSING_MS, () -> assertEquals(1, v4Handler.ignoredVlans.size()));
+        assertAfter(EVENT_PROCESSING_MS, () -> assertEquals(1, v6Handler.ignoredVlans.size()));
     }
 
     /**
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 45e2e95..2368f25 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -70,6 +70,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.AnnotationKeys.DRIVER;
 import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS;
@@ -150,12 +151,15 @@
     private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
 
     ExecutorService executorService;
+    protected ExecutorService devEventExecutor;
 
     @Activate
     protected void activate() {
         cfgService.registerProperties(FlowObjectiveManager.class);
         executorService = newFixedThreadPool(numThreads,
                                              groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+        devEventExecutor = newSingleThreadScheduledExecutor(
+                                       groupedThreads("onos/flowobj-dev-events", "events-%d", log));
         flowObjectiveStore.setDelegate(delegate);
         deviceService.addListener(deviceListener);
         driverService.addListener(driverListener);
@@ -169,6 +173,8 @@
         deviceService.removeListener(deviceListener);
         driverService.removeListener(driverListener);
         executorService.shutdown();
+        devEventExecutor.shutdownNow();
+        devEventExecutor = null;
         pipeliners.clear();
         driverHandlers.clear();
         nextToDevice.clear();
@@ -435,22 +441,25 @@
     private class InnerDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
+          if (devEventExecutor != null) {
             switch (event.type()) {
                 case DEVICE_ADDED:
                 case DEVICE_AVAILABILITY_CHANGED:
                     log.debug("Device either added or availability changed {}",
                               event.subject().id());
-                    if (deviceService.isAvailable(event.subject().id())) {
+                    devEventExecutor.execute(() -> {
+                      if (deviceService.isAvailable(event.subject().id())) {
                         log.debug("Device is now available {}", event.subject().id());
                         getAndInitDevicePipeliner(event.subject().id());
-                    } else {
+                      } else {
                         log.debug("Device is no longer available {}", event.subject().id());
-                    }
+                      }
+                    });
                     break;
                 case DEVICE_UPDATED:
                     // Invalidate pipeliner and handler caches if the driver name
                     // device annotation changed.
-                    invalidatePipelinerIfNecessary(event.subject());
+                    devEventExecutor.execute(() -> invalidatePipelinerIfNecessary(event.subject()));
                     break;
                 case DEVICE_REMOVED:
                     // evict Pipeliner and Handler cache, when
@@ -459,8 +468,10 @@
                     // System expect the user to clear all existing flows,
                     // before removing device, especially if they intend to
                     // replace driver/pipeliner assigned to the device.
-                    driverHandlers.remove(event.subject().id());
-                    pipeliners.remove(event.subject().id());
+                    devEventExecutor.execute(() -> {
+                      driverHandlers.remove(event.subject().id());
+                      pipeliners.remove(event.subject().id());
+                    });
                     break;
                 case DEVICE_SUSPENDED:
                     break;
@@ -473,6 +484,7 @@
                 default:
                     break;
             }
+          }
         }
     }