Refactor: install rules in a separated thread to avoid thread blocking

Change-Id: I10ff88fb56f9358ec948f01176d6fe20d91e37c0
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
index 95661a3..3f72375 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
@@ -71,8 +71,11 @@
 import java.util.Dictionary;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.ARP_BROADCAST_MODE;
 import static org.onosproject.openstacknetworking.api.Constants.ARP_PROXY_MODE;
 import static org.onosproject.openstacknetworking.api.Constants.ARP_TABLE;
@@ -148,6 +151,9 @@
     private final InstancePortListener instancePortListener = new InternalInstancePortListener();
     private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
 
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -175,6 +181,7 @@
         instancePortService.removeListener(instancePortListener);
         leadershipService.withdraw(appId.name());
         configService.unregisterProperties(getClass(), false);
+        eventExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -543,7 +550,8 @@
             if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
                 return;
             }
-            processPacketIn(context, ethPacket);
+
+            eventExecutor.execute(() -> processPacketIn(context, ethPacket));
         }
     }
 
@@ -586,10 +594,14 @@
             switch (event.type()) {
                 case OPENSTACK_SUBNET_CREATED:
                 case OPENSTACK_SUBNET_UPDATED:
-                    setFakeGatewayArpRule(event.subnet(), true, null);
+                    eventExecutor.execute(() -> {
+                        setFakeGatewayArpRule(event.subnet(), true, null);
+                    });
                     break;
                 case OPENSTACK_SUBNET_REMOVED:
-                    setFakeGatewayArpRule(event.subnet(), false, null);
+                    eventExecutor.execute(() -> {
+                        setFakeGatewayArpRule(event.subnet(), false, null);
+                    });
                     break;
                 case OPENSTACK_NETWORK_CREATED:
                 case OPENSTACK_NETWORK_UPDATED:
@@ -624,12 +636,16 @@
             OpenstackNode osNode = event.subject();
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    setDefaultArpRule(osNode, true);
-                    setAllArpRules(osNode, true);
+                    eventExecutor.execute(() -> {
+                        setDefaultArpRule(osNode, true);
+                        setAllArpRules(osNode, true);
+                    });
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
-                    setDefaultArpRule(osNode, false);
-                    setAllArpRules(osNode, false);
+                    eventExecutor.execute(() -> {
+                        setDefaultArpRule(osNode, false);
+                        setAllArpRules(osNode, false);
+                    });
                     break;
                 default:
                     break;
@@ -745,20 +761,28 @@
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
                 case OPENSTACK_INSTANCE_PORT_UPDATED:
-                    setArpRequestRule(event.subject(), true);
-                    setArpReplyRule(event.subject(), true);
+                    eventExecutor.execute(() -> {
+                        setArpRequestRule(event.subject(), true);
+                        setArpReplyRule(event.subject(), true);
+                    });
                     break;
                 case OPENSTACK_INSTANCE_PORT_VANISHED:
-                    setArpRequestRule(event.subject(), false);
-                    setArpReplyRule(event.subject(), false);
+                    eventExecutor.execute(() -> {
+                        setArpRequestRule(event.subject(), false);
+                        setArpReplyRule(event.subject(), false);
+                    });
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    setArpRequestRule(event.subject(), true);
-                    setArpReplyRule(event.subject(), true);
+                    eventExecutor.execute(() -> {
+                        setArpRequestRule(event.subject(), true);
+                        setArpReplyRule(event.subject(), true);
+                    });
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
-                    InstancePort revisedInstPort = swapStaleLocation(event.subject());
-                    setArpRequestRule(revisedInstPort, false);
+                    eventExecutor.execute(() -> {
+                        InstancePort revisedInstPort = swapStaleLocation(event.subject());
+                        setArpRequestRule(revisedInstPort, false);
+                    });
                     break;
                 default:
                     break;