Move device event handling to another thread in XconnectManager

Change-Id: I72f4f4e5d285fa928bb682714369019fc0be38fa
(cherry picked from commit 168111e81cdba245cde4ac94844aa0c561cb0097)
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index 3577536..696b811 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -71,10 +71,14 @@
 import java.io.Serializable;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.onlab.util.Tools.groupedThreads;
+
 @Service
 @Component(immediate = true)
 public class XconnectManager implements XconnectService {
@@ -114,6 +118,8 @@
     private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
+    private ExecutorService deviceEventExecutor;
+
     @Activate
     void activate() {
         appId = coreService.registerApplication(APP_NAME);
@@ -137,6 +143,9 @@
                 .withSerializer(Serializer.using(serializer.build()))
                 .build();
 
+        deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("sr-xconnect-device-event", "%d", log));
+
         deviceService.addListener(deviceListener);
 
         log.info("Started");
@@ -148,6 +157,8 @@
         deviceService.removeListener(deviceListener);
         codecService.unregisterCodec(XconnectDesc.class);
 
+        deviceEventExecutor.shutdown();
+
         log.info("Stopped");
     }
 
@@ -207,24 +218,26 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            DeviceId deviceId = event.subject().id();
-            if (!mastershipService.isLocalMaster(deviceId)) {
-                return;
-            }
+            deviceEventExecutor.execute(() -> {
+                DeviceId deviceId = event.subject().id();
+                if (!mastershipService.isLocalMaster(deviceId)) {
+                    return;
+                }
 
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                case DEVICE_UPDATED:
-                    if (deviceService.isAvailable(deviceId)) {
-                        init(deviceId);
-                    } else {
-                        cleanup(deviceId);
-                    }
-                    break;
-                default:
-                    break;
-            }
+                switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                    case DEVICE_UPDATED:
+                        if (deviceService.isAvailable(deviceId)) {
+                            init(deviceId);
+                        } else {
+                            cleanup(deviceId);
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            });
         }
     }