Refactor: install rules in a separated thread to avoid thread blocking

Change-Id: I10ff88fb56f9358ec948f01176d6fe20d91e37c0
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
index 6ae7100..298263c 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
@@ -70,7 +70,6 @@
 import org.onosproject.openstacknode.api.OpenstackNodeEvent;
 import org.onosproject.openstacknode.api.OpenstackNodeListener;
 import org.onosproject.openstacknode.api.OpenstackNodeService;
-import org.openstack4j.model.network.ExternalGateway;
 import org.openstack4j.model.network.IP;
 import org.openstack4j.model.network.NetFloatingIP;
 import org.openstack4j.model.network.Network;
@@ -98,11 +97,13 @@
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.associatedFloatingIp;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.externalPeerRouterForNetwork;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.floatingIpByInstancePort;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByInstancePort;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getPropertyValue;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.isAssociatedWithVM;
-import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGratuitousArpPacketForFloatingIp;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGarpPacketForFloatingIp;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.swapStaleLocation;
 import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
 import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
@@ -241,14 +242,16 @@
                     targetMac = instPort.macAddress();
                 }
 
-                OpenstackNode gw = getGwByInstancePort(osNodeService.completeNodes(GATEWAY), instPort);
+                OpenstackNode gw =
+                        getGwByInstancePort(osNodeService.completeNodes(GATEWAY), instPort);
 
                 if (gw == null) {
                     return;
                 }
 
                 // if the ARP packet_in received from non-relevant GWs, we simply ignore it
-                if (!Objects.equals(gw.intgBridge(), context.inPacket().receivedFrom().deviceId())) {
+                if (!Objects.equals(gw.intgBridge(),
+                                context.inPacket().receivedFrom().deviceId())) {
                     return;
                 }
             }
@@ -286,8 +289,10 @@
 
             try {
 
-                Set<String> extRouterIps = osNetworkService.externalPeerRouters().
-                        stream().map(r -> r.ipAddress().toString()).collect(Collectors.toSet());
+                Set<String> extRouterIps = osNetworkService.externalPeerRouters()
+                        .stream()
+                        .map(r -> r.ipAddress().toString())
+                        .collect(Collectors.toSet());
 
                 // if SPA is NOT contained in existing external router IP set, we ignore it
                 if (!extRouterIps.contains(spa.toString())) {
@@ -354,7 +359,8 @@
      * @param gateway gateway node
      * @param install flow rule installation flag
      */
-    private void setFloatingIpArpRuleForGateway(OpenstackNode gateway, boolean install) {
+    private void setFloatingIpArpRuleForGateway(OpenstackNode gateway,
+                                                boolean install) {
         if (ARP_BROADCAST_MODE.equals(getArpMode())) {
 
             Set<OpenstackNode> completedGws = osNodeService.completeNodes(GATEWAY);
@@ -367,14 +373,16 @@
                         finalGws.remove(gateway);
                         osRouterAdminService.floatingIps().forEach(fip -> {
                             if (fip.getPortId() != null) {
-                                setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
+                                setFloatingIpArpRule(fip, fip.getPortId(),
+                                                     finalGws, false);
                                 finalGws.add(gateway);
                             }
                         });
                     }
                     osRouterAdminService.floatingIps().forEach(fip -> {
                         if (fip.getPortId() != null) {
-                            setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
+                            setFloatingIpArpRule(fip, fip.getPortId(),
+                                                 finalGws, true);
                         }
                     });
                 } else {
@@ -385,14 +393,16 @@
                     finalGws.add(gateway);
                     osRouterAdminService.floatingIps().forEach(fip -> {
                         if (fip.getPortId() != null) {
-                            setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
+                            setFloatingIpArpRule(fip, fip.getPortId(),
+                                                 finalGws, false);
                         }
                     });
                     finalGws.remove(gateway);
                     if (completedGws.size() >= 1) {
                         osRouterAdminService.floatingIps().forEach(fip -> {
                             if (fip.getPortId() != null) {
-                                setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
+                                setFloatingIpArpRule(fip, fip.getPortId(),
+                                                     finalGws, true);
                             }
                         });
                     }
@@ -516,15 +526,16 @@
     }
 
     private void setFakeGatewayArpRuleByRouter(Router router, boolean install) {
-        setFakeGatewayArpRuleByGateway(router.getId(), router.getExternalGatewayInfo(), install);
+        setFakeGatewayArpRuleByGateway(router.getId(), install);
     }
 
-    private Set<IP> getExternalGatewaySnatIps(String routerId, ExternalGateway extGw) {
+    private Set<IP> getExternalGatewaySnatIps(String routerId) {
         if (routerId == null) {
             return ImmutableSet.of();
         }
 
-        Set<String> portIds = osRouterAdminService.routerInterfaces(routerId).stream()
+        Set<String> portIds = osRouterAdminService.routerInterfaces(routerId)
+                .stream()
                 .map(RouterInterface::getPortId)
                 .collect(Collectors.toSet());
 
@@ -535,14 +546,9 @@
                 .collect(Collectors.toSet());
     }
 
-    private void setFakeGatewayArpRuleByGateway(String routerId, ExternalGateway extGw, boolean install) {
+    private void setFakeGatewayArpRuleByGateway(String routerId, boolean install) {
         if (ARP_BROADCAST_MODE.equals(getArpMode())) {
-
-            if (extGw == null) {
-                return;
-            }
-
-            setFakeGatewayArpRuleByIps(getExternalGatewaySnatIps(routerId, extGw), install);
+            setFakeGatewayArpRuleByIps(getExternalGatewaySnatIps(routerId), install);
         }
     }
 
@@ -607,12 +613,14 @@
                 case OPENSTACK_PORT_CREATED:
                 case OPENSTACK_PORT_UPDATED:
                     eventExecutor.execute(() ->
-                        setFakeGatewayArpRuleByIps((Set<IP>) event.port().getFixedIps(), true)
+                        setFakeGatewayArpRuleByIps(
+                                (Set<IP>) event.port().getFixedIps(), true)
                     );
                     break;
                 case OPENSTACK_PORT_REMOVED:
                     eventExecutor.execute(() ->
-                        setFakeGatewayArpRuleByIps((Set<IP>) event.port().getFixedIps(), false)
+                        setFakeGatewayArpRuleByIps(
+                                (Set<IP>) event.port().getFixedIps(), false)
                     );
                     break;
                 default:
@@ -656,58 +664,60 @@
                 case OPENSTACK_ROUTER_GATEWAY_ADDED:
                     eventExecutor.execute(() ->
                         // add a gateway manually after adding a router
-                        setFakeGatewayArpRuleByGateway(event.subject().getId(),
-                                                        event.externalGateway(), true)
+                        setFakeGatewayArpRuleByGateway(event.subject().getId(), true)
                     );
                     break;
                 case OPENSTACK_ROUTER_GATEWAY_REMOVED:
                     eventExecutor.execute(() ->
                         // remove a gateway from an existing router
-                        setFakeGatewayArpRuleByGateway(event.subject().getId(),
-                                                        event.externalGateway(), false)
+                        setFakeGatewayArpRuleByGateway(event.subject().getId(), false)
                     );
                     break;
                 case OPENSTACK_FLOATING_IP_ASSOCIATED:
-                    if (getValidPortId(event) != null) {
-                        eventExecutor.execute(() -> {
-                            // associate a floating IP with an existing VM
-                            setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
-                                    completedGws, true);
-                        });
-                    }
+                    eventExecutor.execute(() -> {
+                        if (getValidPortId(event) == null) {
+                            return;
+                        }
+                        // associate a floating IP with an existing VM
+                        setFloatingIpArpRule(event.floatingIp(),
+                                getValidPortId(event), completedGws, true);
+                    });
                     break;
                 case OPENSTACK_FLOATING_IP_DISASSOCIATED:
-                    if (getValidPortId(event) != null) {
-                        eventExecutor.execute(() -> {
-                            // disassociate a floating IP with an existing VM
-                            setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
-                                    completedGws, false);
-                        });
-                    }
+                    eventExecutor.execute(() -> {
+                        if (getValidPortId(event) == null) {
+                            return;
+                        }
+                        // disassociate a floating IP with an existing VM
+                        setFloatingIpArpRule(event.floatingIp(),
+                                getValidPortId(event), completedGws, false);
+                    });
                     break;
                 case OPENSTACK_FLOATING_IP_CREATED:
                     // during floating IP creation, if the floating IP is
                     // associated with any port of VM, then we will set
                     // floating IP related ARP rules to gateway node
-                    if (getValidPortId(event) != null) {
-                        eventExecutor.execute(() -> {
-                            // associate a floating IP with an existing VM
-                            setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
-                                    completedGws, true);
-                        });
-                    }
+                    eventExecutor.execute(() -> {
+                        if (getValidPortId(event) == null) {
+                            return;
+                        }
+                        // associate a floating IP with an existing VM
+                        setFloatingIpArpRule(event.floatingIp(),
+                                getValidPortId(event), completedGws, true);
+                    });
                     break;
                 case OPENSTACK_FLOATING_IP_REMOVED:
                     // during floating IP deletion, if the floating IP is
                     // still associated with any port of VM, then we will
                     // remove floating IP related ARP rules from gateway node
-                    if (getValidPortId(event) != null) {
-                        eventExecutor.execute(() -> {
-                            // associate a floating IP with an existing VM
-                            setFloatingIpArpRule(event.floatingIp(), getValidPortId(event),
-                                    completedGws, false);
-                        });
-                    }
+                    eventExecutor.execute(() -> {
+                        if (getValidPortId(event) == null) {
+                            return;
+                        }
+                        // associate a floating IP with an existing VM
+                        setFloatingIpArpRule(event.floatingIp(),
+                                getValidPortId(event), completedGws, false);
+                    });
                     break;
                 default:
                     // do nothing for the other events
@@ -750,50 +760,52 @@
 
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
-
-                    osRouterAdminService.floatingIps().stream()
-                            .filter(f -> f.getPortId() != null)
-                            .filter(f -> f.getPortId().equals(instPort.portId()))
-                            .forEach(f -> setFloatingIpArpRule(f, instPort.portId(), gateways, true));
-
+                    eventExecutor.execute(() ->
+                            osRouterAdminService.floatingIps().stream()
+                                .filter(f -> f.getPortId() != null)
+                                .filter(f -> f.getPortId().equals(instPort.portId()))
+                                .forEach(f -> setFloatingIpArpRule(f,
+                                        instPort.portId(), gateways, true))
+                    );
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-
-                    if (gateways.size() == 1) {
-                        return;
-                    }
-
-                    if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
-                        eventExecutor.execute(() ->
-                            setFloatingIpArpRuleWithPortEvent(fip, event.subject(),
-                                    gateways, true)
-                        );
-                    }
-
-                    break;
-                case OPENSTACK_INSTANCE_MIGRATION_ENDED:
-
-                    InstancePort revisedInstPort = swapStaleLocation(event.subject());
-
-                    if (gateways.size() == 1) {
-                        return;
-                    }
-
-                    if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
-                        DeviceId newDeviceId = event.subject().deviceId();
-                        DeviceId oldDeviceId = revisedInstPort.deviceId();
-
-                        OpenstackNode oldGw = getGwByComputeDevId(gateways, oldDeviceId);
-                        OpenstackNode newGw = getGwByComputeDevId(gateways, newDeviceId);
-
-                        if (oldGw != null && oldGw.equals(newGw)) {
+                    eventExecutor.execute(() -> {
+                        if (gateways.size() == 1) {
                             return;
                         }
 
-                        eventExecutor.execute(() ->
-                                setFloatingIpArpRuleWithPortEvent(fip,
-                                        revisedInstPort, gateways, false));
-                    }
+                        if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
+                            setFloatingIpArpRuleWithPortEvent(fip,
+                                        event.subject(), gateways, true);
+                        }
+                    });
+
+                    break;
+                case OPENSTACK_INSTANCE_MIGRATION_ENDED:
+                    eventExecutor.execute(() -> {
+                        InstancePort revisedInstPort = swapStaleLocation(event.subject());
+
+                        if (gateways.size() == 1) {
+                            return;
+                        }
+
+                        if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
+                            DeviceId newDeviceId = event.subject().deviceId();
+                            DeviceId oldDeviceId = revisedInstPort.deviceId();
+
+                            OpenstackNode oldGw =
+                                    getGwByComputeDevId(gateways, oldDeviceId);
+                            OpenstackNode newGw =
+                                    getGwByComputeDevId(gateways, newDeviceId);
+
+                            if (oldGw != null && oldGw.equals(newGw)) {
+                                return;
+                            }
+
+                            setFloatingIpArpRuleWithPortEvent(fip,
+                                        revisedInstPort, gateways, false);
+                        }
+                    });
                     break;
                 default:
                     break;
@@ -815,44 +827,52 @@
             OpenstackNode osNode = event.subject();
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    setDefaultArpRule(osNode, true);
-                    setFloatingIpArpRuleForGateway(osNode, true);
-                    sendGratuitousArpToSwitch(event.subject(), true);
+                    eventExecutor.execute(() -> {
+                        setDefaultArpRule(osNode, true);
+                        setFloatingIpArpRuleForGateway(osNode, true);
+                        sendGratuitousArpToSwitch(event.subject(), true);
+                    });
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
-                    setDefaultArpRule(osNode, false);
-                    setFloatingIpArpRuleForGateway(osNode, false);
-                    sendGratuitousArpToSwitch(event.subject(), false);
+                    eventExecutor.execute(() -> {
+                        setDefaultArpRule(osNode, false);
+                        setFloatingIpArpRuleForGateway(osNode, false);
+                        sendGratuitousArpToSwitch(event.subject(), false);
+                    });
                     break;
                 case OPENSTACK_NODE_REMOVED:
-                    sendGratuitousArpToSwitch(event.subject(), false);
+                    eventExecutor.execute(() -> {
+                        sendGratuitousArpToSwitch(event.subject(), false);
+                    });
                     break;
-
                 default:
                     break;
             }
         }
 
-        private void sendGratuitousArpToSwitch(OpenstackNode gatewayNode, boolean isCompleteCase) {
-            Set<OpenstackNode> completeGws = ImmutableSet.copyOf(osNodeService.completeNodes(GATEWAY));
+        private void sendGratuitousArpToSwitch(OpenstackNode gatewayNode,
+                                               boolean isCompleteCase) {
+            Set<OpenstackNode> completeGws =
+                    ImmutableSet.copyOf(osNodeService.completeNodes(GATEWAY));
 
             if (isCompleteCase) {
-                osNodeService.completeNodes(COMPUTE)
-                        .stream()
-                        .filter(node -> isGwSelectedByComputeNode(completeGws, node, gatewayNode))
-                        .forEach(node -> processGratuitousArpPacketForComputeNode(node, gatewayNode));
+                osNodeService.completeNodes(COMPUTE).stream()
+                        .filter(node -> isGwSelectedByComputeNode(completeGws,
+                                                                  node, gatewayNode))
+                        .forEach(node -> processGarpPacketForComputeNode(node, gatewayNode));
 
             } else {
                 Set<OpenstackNode> oldCompleteGws = Sets.newConcurrentHashSet();
                 oldCompleteGws.addAll(ImmutableSet.copyOf(osNodeService.completeNodes(GATEWAY)));
                 oldCompleteGws.add(gatewayNode);
 
-                osNodeService.completeNodes(COMPUTE)
-                        .stream()
-                        .filter(node -> isGwSelectedByComputeNode(oldCompleteGws, node, gatewayNode))
+                osNodeService.completeNodes(COMPUTE).stream()
+                        .filter(node -> isGwSelectedByComputeNode(oldCompleteGws,
+                                                                  node, gatewayNode))
                         .forEach(node -> {
-                            OpenstackNode newSelectedGatewayNode = getGwByComputeDevId(completeGws, node.intgBridge());
-                            processGratuitousArpPacketForComputeNode(node, newSelectedGatewayNode);
+                            OpenstackNode newSelectedGatewayNode =
+                                    getGwByComputeDevId(completeGws, node.intgBridge());
+                            processGarpPacketForComputeNode(node, newSelectedGatewayNode);
                         });
             }
         }
@@ -865,16 +885,20 @@
                     .intgBridge().equals(gwNode.intgBridge());
         }
 
-        private void processGratuitousArpPacketForComputeNode(OpenstackNode computeNode, OpenstackNode gatewayNode) {
-            instancePortService.instancePort(computeNode.intgBridge()).forEach(instancePort -> {
-                NetFloatingIP floatingIP = OpenstackNetworkingUtil.floatingIpByInstancePort(instancePort,
-                        osRouterAdminService);
+        private void processGarpPacketForComputeNode(OpenstackNode computeNode,
+                                                     OpenstackNode gatewayNode) {
+            instancePortService.instancePort(computeNode.intgBridge())
+                    .forEach(instancePort -> {
+                NetFloatingIP floatingIP =
+                        floatingIpByInstancePort(instancePort, osRouterAdminService);
                 Network network = osNetworkService.network(instancePort.networkId());
-                ExternalPeerRouter externalPeerRouter = OpenstackNetworkingUtil.externalPeerRouterForNetwork(network,
-                        osNetworkService, osRouterAdminService);
+                ExternalPeerRouter externalPeerRouter =
+                        externalPeerRouterForNetwork(network, osNetworkService,
+                                                            osRouterAdminService);
                 if (floatingIP != null && externalPeerRouter != null) {
-                    processGratuitousArpPacketForFloatingIp(
-                            floatingIP, instancePort, externalPeerRouter.vlanId(), gatewayNode, packetService);
+                    processGarpPacketForFloatingIp(
+                            floatingIP, instancePort, externalPeerRouter.vlanId(),
+                                                        gatewayNode, packetService);
                 }
             });
         }
@@ -899,7 +923,8 @@
             }
         }
 
-        private void setDefaultArpRuleForProxyMode(OpenstackNode osNode, boolean install) {
+        private void setDefaultArpRuleForProxyMode(OpenstackNode osNode,
+                                                   boolean install) {
             TrafficSelector selector = DefaultTrafficSelector.builder()
                     .matchEthType(EthType.EtherType.ARP.ethType().toShort())
                     .build();
@@ -919,7 +944,8 @@
             );
         }
 
-        private void setDefaultArpRuleForBroadcastMode(OpenstackNode osNode, boolean install) {
+        private void setDefaultArpRuleForBroadcastMode(OpenstackNode osNode,
+                                                       boolean install) {
             // we only match ARP_REPLY in gateway node, because controller
             // somehow need to process ARP_REPLY which is issued from
             // external router...
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
index 732275c..15d2f96 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
@@ -85,7 +85,7 @@
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByInstancePort;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.isAssociatedWithVM;
-import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGratuitousArpPacketForFloatingIp;
+import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.processGarpPacketForFloatingIp;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.swapStaleLocation;
 import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.buildExtension;
 import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
@@ -440,7 +440,8 @@
     }
 
     private void setUpstreamRules(NetFloatingIP floatingIp, Network osNet,
-                                  InstancePort instPort, ExternalPeerRouter externalPeerRouter,
+                                  InstancePort instPort,
+                                  ExternalPeerRouter externalPeerRouter,
                                   boolean install) {
         IpAddress floating = IpAddress.valueOf(floatingIp.getFloatingIpAddress());
         TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
@@ -515,12 +516,13 @@
         ExternalPeerRouter externalPeerRouter =
                 externalPeerRouterForNetwork(osNet, osNetworkService, osRouterAdminService);
         if (externalPeerRouter == null) {
-            log.error("Failed to process GARP packet for floating ip {} because no external peer router found");
+            log.error("Failed to process GARP packet for floating ip {} " +
+                                        "because no external peer router found");
             return;
         }
 
-        processGratuitousArpPacketForFloatingIp(floatingIP, instancePort, externalPeerRouter.vlanId(),
-                selectedGw, packetService);
+        processGarpPacketForFloatingIp(floatingIP, instancePort,
+                        externalPeerRouter.vlanId(), selectedGw, packetService);
 
     }
 
@@ -604,7 +606,8 @@
                             preCommitPortService.unsubscribePreCommit(osFip.getPortId(),
                                     OPENSTACK_PORT_PRE_REMOVE, instancePortService,
                                     this.getClass().getName());
-                            log.info("Unsubscribed the port {} on listening pre-remove event", osFip.getPortId());
+                            log.info("Unsubscribed the port {} on listening pre-remove event",
+                                     osFip.getPortId());
                         }
                         log.info("Removed floating IP {}", osFip.getFloatingIpAddress());
                     });
@@ -746,17 +749,19 @@
 
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
-                    if (instPort != null && instPort.portId() != null) {
-                        osRouterAdminService.floatingIps().stream()
-                                .filter(f -> f.getPortId() != null)
-                                .filter(f -> f.getPortId().equals(instPort.portId()))
-                                .forEach(f -> setFloatingIpRules(f, instPort, null, true));
-                    }
 
+                    eventExecutor.execute(() -> {
+                        if (instPort != null && instPort.portId() != null) {
+                            osRouterAdminService.floatingIps().stream()
+                                    .filter(f -> f.getPortId() != null)
+                                    .filter(f -> f.getPortId().equals(instPort.portId()))
+                                    .forEach(f -> setFloatingIpRules(f,
+                                            instPort, null, true));
+                        }
+                    });
                     break;
 
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-
                     fip = associatedFloatingIp(event.subject(), ips);
 
                     if (fip == null) {
@@ -765,7 +770,8 @@
 
                     osPort = osNetworkService.port(fip.getPortId());
                     osNet = osNetworkService.network(osPort.getNetworkId());
-                    externalPeerRouter = externalPeerRouterForNetwork(osNet, osNetworkService, osRouterAdminService);
+                    externalPeerRouter = externalPeerRouterForNetwork(osNet,
+                                         osNetworkService, osRouterAdminService);
 
                     if (externalPeerRouter == null) {
                         final String errorFormat = ERR_FLOW + "no external peer router found";
@@ -798,7 +804,8 @@
 
                     osPort = osNetworkService.port(fip.getPortId());
                     osNet = osNetworkService.network(osPort.getNetworkId());
-                    externalPeerRouter = externalPeerRouterForNetwork(osNet, osNetworkService, osRouterAdminService);
+                    externalPeerRouter = externalPeerRouterForNetwork(osNet,
+                                         osNetworkService, osRouterAdminService);
 
                     if (externalPeerRouter == null) {
                         final String errorFormat = ERR_FLOW + "no external peer router found";
@@ -859,29 +866,31 @@
         public void event(OpenstackNetworkEvent event) {
             switch (event.type()) {
                 case OPENSTACK_PORT_PRE_REMOVE:
-                    InstancePort instPort =
-                            instancePortService.instancePort(event.port().getId());
-
-                    if (instPort == null) {
-                        break;
-                    }
-
-                    NetFloatingIP fip =
-                            associatedFloatingIp(instPort, osRouterAdminService.floatingIps());
-
-                    if (fip != null) {
-                        instancePortService.updateInstancePort(
-                                            instPort.updateState(REMOVE_PENDING));
-                        eventExecutor.execute(() -> updateFipStore(event.port().getId()));
-                    } else {
-                        instancePortService.removeInstancePort(instPort.portId());
-                    }
+                    eventExecutor.execute(() -> processPortPreRemove(event));
                     break;
                 default:
                     break;
             }
         }
 
+        private void processPortPreRemove(OpenstackNetworkEvent event) {
+            InstancePort instPort = instancePortService.instancePort(
+                                                        event.port().getId());
+            if (instPort == null) {
+                return;
+            }
+            NetFloatingIP fip = associatedFloatingIp(instPort,
+                    osRouterAdminService.floatingIps());
+
+            if (fip != null) {
+                instancePortService.updateInstancePort(
+                        instPort.updateState(REMOVE_PENDING));
+                updateFipStore(event.port().getId());
+            } else {
+                instancePortService.removeInstancePort(instPort.portId());
+            }
+        }
+
         private void updateFipStore(String portId) {
 
             if (portId == null) {
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
index 28697ed..edc9e09 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
@@ -100,6 +100,7 @@
 import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.buildExtension;
 import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
 import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.GATEWAY;
+import static org.openstack4j.model.network.NetworkType.FLAT;
 
 /**
  * Handles OpenStack router events.
@@ -332,7 +333,7 @@
         Subnet osSubnet = osNetworkAdminService.subnet(routerIface.getSubnetId());
         Network osNet = osNetworkAdminService.network(osSubnet.getNetworkId());
 
-        if (osNet.getNetworkType() == NetworkType.FLAT) {
+        if (osNet.getNetworkType() == FLAT) {
             return;
         }
 
@@ -996,10 +997,12 @@
                             event.routerIface()));
                     break;
                 case OPENSTACK_ROUTER_GATEWAY_ADDED:
-                    log.debug("Router external gateway {} added", event.externalGateway().getNetworkId());
+                    log.debug("Router external gateway {} added",
+                                        event.externalGateway().getNetworkId());
                     break;
                 case OPENSTACK_ROUTER_GATEWAY_REMOVED:
-                    log.debug("Router external gateway {} removed", event.externalGateway().getNetworkId());
+                    log.debug("Router external gateway {} removed",
+                                        event.externalGateway().getNetworkId());
                     break;
                 case OPENSTACK_FLOATING_IP_CREATED:
                 case OPENSTACK_FLOATING_IP_UPDATED:
@@ -1104,7 +1107,7 @@
         }
 
         private void instPortDetected(InstancePort instPort) {
-            if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == NetworkType.FLAT) {
+            if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == FLAT) {
                 return;
             }
 
@@ -1120,7 +1123,7 @@
         }
 
         private void instPortRemoved(InstancePort instPort) {
-            if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == NetworkType.FLAT) {
+            if (osNetworkAdminService.network(instPort.networkId()).getNetworkType() == FLAT) {
                 return;
             }
 
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
index 83f75aa..2f8b7db 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
@@ -767,20 +767,22 @@
                 case OPENSTACK_INSTANCE_PORT_UPDATED:
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    installSecurityGroupRules(event, instPort);
+                    eventExecutor.execute(() ->
+                                    installSecurityGroupRules(event, instPort));
                     break;
                 case OPENSTACK_INSTANCE_PORT_VANISHED:
-                    Port osPort = removedOsPortStore.asJavaMap().get(instPort.portId());
-                    eventExecutor.execute(() ->
-                            setSecurityGroupRules(instPort, osPort, false)
-                    );
-                    removedOsPortStore.remove(instPort.portId());
+                    eventExecutor.execute(() -> {
+                        Port osPort = removedOsPortStore.asJavaMap().get(instPort.portId());
+                        setSecurityGroupRules(instPort, osPort, false);
+                        removedOsPortStore.remove(instPort.portId());
+                    });
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
-                    InstancePort revisedInstPort = swapStaleLocation(instPort);
-                    Port port = osNetService.port(instPort.portId());
-                    eventExecutor.execute(() ->
-                            setSecurityGroupRules(revisedInstPort, port, false));
+                    eventExecutor.execute(() -> {
+                        InstancePort revisedInstPort = swapStaleLocation(instPort);
+                        Port port = osNetService.port(instPort.portId());
+                        setSecurityGroupRules(revisedInstPort, port, false);
+                    });
                     break;
                 default:
                     break;
@@ -822,7 +824,8 @@
 
             switch (event.type()) {
                 case OPENSTACK_PORT_PRE_REMOVE:
-                    removedOsPortStore.put(osPort.getId(), osPort);
+                    eventExecutor.execute(() ->
+                                removedOsPortStore.put(osPort.getId(), osPort));
                     break;
                 default:
                     // do nothing for the other events
@@ -945,7 +948,8 @@
         public void event(OpenstackNodeEvent event) {
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    resetSecurityGroupRules();
+                    eventExecutor.execute(OpenstackSecurityGroupHandler.this::
+                                                        resetSecurityGroupRules);
                     break;
                 case OPENSTACK_NODE_CREATED:
                 case OPENSTACK_NODE_REMOVED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
index 95661a3..3f72375 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
@@ -71,8 +71,11 @@
 import java.util.Dictionary;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.ARP_BROADCAST_MODE;
 import static org.onosproject.openstacknetworking.api.Constants.ARP_PROXY_MODE;
 import static org.onosproject.openstacknetworking.api.Constants.ARP_TABLE;
@@ -148,6 +151,9 @@
     private final InstancePortListener instancePortListener = new InternalInstancePortListener();
     private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
 
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
 
     private ApplicationId appId;
     private NodeId localNodeId;
@@ -175,6 +181,7 @@
         instancePortService.removeListener(instancePortListener);
         leadershipService.withdraw(appId.name());
         configService.unregisterProperties(getClass(), false);
+        eventExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -543,7 +550,8 @@
             if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
                 return;
             }
-            processPacketIn(context, ethPacket);
+
+            eventExecutor.execute(() -> processPacketIn(context, ethPacket));
         }
     }
 
@@ -586,10 +594,14 @@
             switch (event.type()) {
                 case OPENSTACK_SUBNET_CREATED:
                 case OPENSTACK_SUBNET_UPDATED:
-                    setFakeGatewayArpRule(event.subnet(), true, null);
+                    eventExecutor.execute(() -> {
+                        setFakeGatewayArpRule(event.subnet(), true, null);
+                    });
                     break;
                 case OPENSTACK_SUBNET_REMOVED:
-                    setFakeGatewayArpRule(event.subnet(), false, null);
+                    eventExecutor.execute(() -> {
+                        setFakeGatewayArpRule(event.subnet(), false, null);
+                    });
                     break;
                 case OPENSTACK_NETWORK_CREATED:
                 case OPENSTACK_NETWORK_UPDATED:
@@ -624,12 +636,16 @@
             OpenstackNode osNode = event.subject();
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    setDefaultArpRule(osNode, true);
-                    setAllArpRules(osNode, true);
+                    eventExecutor.execute(() -> {
+                        setDefaultArpRule(osNode, true);
+                        setAllArpRules(osNode, true);
+                    });
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
-                    setDefaultArpRule(osNode, false);
-                    setAllArpRules(osNode, false);
+                    eventExecutor.execute(() -> {
+                        setDefaultArpRule(osNode, false);
+                        setAllArpRules(osNode, false);
+                    });
                     break;
                 default:
                     break;
@@ -745,20 +761,28 @@
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
                 case OPENSTACK_INSTANCE_PORT_UPDATED:
-                    setArpRequestRule(event.subject(), true);
-                    setArpReplyRule(event.subject(), true);
+                    eventExecutor.execute(() -> {
+                        setArpRequestRule(event.subject(), true);
+                        setArpReplyRule(event.subject(), true);
+                    });
                     break;
                 case OPENSTACK_INSTANCE_PORT_VANISHED:
-                    setArpRequestRule(event.subject(), false);
-                    setArpReplyRule(event.subject(), false);
+                    eventExecutor.execute(() -> {
+                        setArpRequestRule(event.subject(), false);
+                        setArpReplyRule(event.subject(), false);
+                    });
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    setArpRequestRule(event.subject(), true);
-                    setArpReplyRule(event.subject(), true);
+                    eventExecutor.execute(() -> {
+                        setArpRequestRule(event.subject(), true);
+                        setArpReplyRule(event.subject(), true);
+                    });
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
-                    InstancePort revisedInstPort = swapStaleLocation(event.subject());
-                    setArpRequestRule(revisedInstPort, false);
+                    eventExecutor.execute(() -> {
+                        InstancePort revisedInstPort = swapStaleLocation(event.subject());
+                        setArpRequestRule(revisedInstPort, false);
+                    });
                     break;
                 default:
                     break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
index de4f8f2..68bf34f 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
@@ -72,8 +72,10 @@
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_BroadcastAddress;
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_Classless_Static_Route;
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_DHCPServerIp;
@@ -85,6 +87,7 @@
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_SubnetMask;
 import static org.onlab.packet.DHCP.MsgType.DHCPACK;
 import static org.onlab.packet.DHCP.MsgType.DHCPOFFER;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.DEFAULT_GATEWAY_MAC_STR;
 import static org.onosproject.openstacknetworking.api.Constants.DHCP_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_DHCP_RULE;
@@ -150,6 +153,9 @@
     private final PacketProcessor packetProcessor = new InternalPacketProcessor();
     private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
 
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
     private ApplicationId appId;
     private NodeId localNodeId;
 
@@ -171,6 +177,7 @@
         osNodeService.removeListener(osNodeListener);
         configService.unregisterProperties(getClass(), false);
         leadershipService.withdraw(appId.name());
+        eventExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -212,7 +219,8 @@
             }
 
             DHCP dhcpPacket = (DHCP) udpPacket.getPayload();
-            processDhcp(context, dhcpPacket);
+
+            eventExecutor.execute(() -> processDhcp(context, dhcpPacket));
         }
 
         private void processDhcp(PacketContext context, DHCP dhcpPacket) {
@@ -547,10 +555,10 @@
             OpenstackNode osNode = event.subject();
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    setDhcpRule(osNode, true);
+                    eventExecutor.execute(() -> setDhcpRule(osNode, true));
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
-                    setDhcpRule(osNode, false);
+                    eventExecutor.execute(() -> setDhcpRule(osNode, false));
                     break;
                 case OPENSTACK_NODE_CREATED:
                 case OPENSTACK_NODE_UPDATED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
index e63444f..9039ed2 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
@@ -47,7 +47,6 @@
 import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
 import org.onosproject.openstacknetworking.api.OpenstackNetworkListener;
 import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
-import org.onosproject.openstacknetworking.api.OpenstackSecurityGroupService;
 import org.onosproject.openstacknode.api.OpenstackNode;
 import org.onosproject.openstacknode.api.OpenstackNodeService;
 import org.openstack4j.model.network.Network;
@@ -129,9 +128,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackNodeService osNodeService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected OpenstackSecurityGroupService securityGroupService;
-
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler"));
     private final InstancePortListener instancePortListener = new InternalInstancePortListener();
@@ -797,7 +793,7 @@
             boolean isNwAdminStateUp = event.subject().isAdminStateUp();
             boolean isPortAdminStateUp = event.port().isAdminStateUp();
 
-            InstancePort instPort = instancePortService.instancePort(event.port().getId());
+            String portId = event.port().getId();
 
             switch (event.type()) {
                 case OPENSTACK_NETWORK_CREATED:
@@ -812,20 +808,20 @@
                     break;
                 case OPENSTACK_PORT_CREATED:
                 case OPENSTACK_PORT_UPDATED:
-
-                    if (instPort != null) {
-                        eventExecutor.execute(() ->
-                                setPortBlockRules(instPort, !isPortAdminStateUp));
-                    }
-
+                    eventExecutor.execute(() -> {
+                        InstancePort instPort = instancePortService.instancePort(portId);
+                        if (instPort != null) {
+                            setPortBlockRules(instPort, !isPortAdminStateUp);
+                        }
+                    });
                     break;
                 case OPENSTACK_PORT_REMOVED:
-
-                    if (instPort != null) {
-                        eventExecutor.execute(() ->
-                                setPortBlockRules(instPort, false));
-                    }
-
+                    eventExecutor.execute(() -> {
+                        InstancePort instPort = instancePortService.instancePort(portId);
+                        if (instPort != null) {
+                            setPortBlockRules(instPort, false);
+                        }
+                    });
                     break;
                 default:
                     break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
index 40413d7..92eab4f 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
@@ -306,11 +306,11 @@
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            Device device = event.subject();
-            if (!mastershipService.isLocalMaster(device.id())) {
+            if (!mastershipService.isLocalMaster(event.subject().id())) {
                 // do not allow to proceed without mastership
                 return false;
             }
+
             Port port = event.port();
             if (port == null) {
                 return false;
@@ -390,8 +390,10 @@
 
         private void processCompleteNode(OpenstackNode osNode) {
             deviceService.getPorts(osNode.intgBridge()).stream()
-                    .filter(port -> vnicType(port.annotations().value(PORT_NAME)).equals(Constants.VnicType.NORMAL) ||
-                            vnicType(port.annotations().value(PORT_NAME)).equals(Constants.VnicType.DIRECT))
+                    .filter(port -> vnicType(port.annotations().value(PORT_NAME))
+                                    .equals(Constants.VnicType.NORMAL) ||
+                            vnicType(port.annotations().value(PORT_NAME))
+                                    .equals(Constants.VnicType.DIRECT))
                     .filter(Port::isEnabled)
                     .forEach(port -> {
                         log.debug("Instance port {} is detected from {}",
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
index db15e15..25a7e41 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/util/OpenstackNetworkingUtil.java
@@ -884,11 +884,11 @@
      * @param gatewayNode gateway node
      * @param packetService packet service
      */
-    public static void processGratuitousArpPacketForFloatingIp(NetFloatingIP floatingIP,
-                                                               InstancePort instancePort,
-                                                               VlanId vlanId,
-                                                               OpenstackNode gatewayNode,
-                                                               PacketService packetService) {
+    public static void processGarpPacketForFloatingIp(NetFloatingIP floatingIP,
+                                                      InstancePort instancePort,
+                                                      VlanId vlanId,
+                                                      OpenstackNode gatewayNode,
+                                                      PacketService packetService) {
         Ethernet ethernet = buildGratuitousArpPacket(floatingIP, instancePort, vlanId);
 
         TrafficTreatment treatment = DefaultTrafficTreatment.builder()