Fix: do not query the distributed store inside main thread

Change-Id: I883030cb661b619f82bd554c21a48bb39f3fbbe6
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenStackSwitchingDirectPortProvider.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenStackSwitchingDirectPortProvider.java
index 7a6253b..f0b781a 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenStackSwitchingDirectPortProvider.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenStackSwitchingDirectPortProvider.java
@@ -15,11 +15,6 @@
  */
 package org.onosproject.openstacknetworking.impl;
 
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
@@ -41,15 +36,23 @@
 import org.onosproject.openstacknode.api.OpenstackNodeService;
 import org.openstack4j.model.network.Port;
 import org.openstack4j.model.network.State;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static java.lang.Thread.sleep;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.DIRECT;
 import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
 import static org.onosproject.openstacknetworking.api.Constants.UNSUPPORTED_VENDOR;
@@ -90,6 +93,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipService mastershipService;
 
+    private final ExecutorService executor =
+            Executors.newSingleThreadExecutor(groupedThreads(this.getClass().getSimpleName(), "direct-port-event"));
     private final OpenstackNetworkListener openstackNetworkListener = new InternalOpenstackNetworkListener();
     private final InternalOpenstackNodeListener internalNodeListener = new InternalOpenstackNodeListener();
 
@@ -112,6 +117,7 @@
         leadershipService.withdraw(appId.name());
         osNetworkService.removeListener(openstackNetworkListener);
         osNodeService.removeListener(internalNodeListener);
+        executor.shutdown();
 
         log.info("Stopped");
     }
@@ -206,28 +212,40 @@
     }
 
     private class InternalOpenstackNetworkListener implements OpenstackNetworkListener {
-        @Override
-        public boolean isRelevant(OpenstackNetworkEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leader)) {
-                return false;
-            }
-            return true;
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(OpenstackNetworkEvent event) {
             switch (event.type()) {
                 case OPENSTACK_PORT_UPDATED:
-                    if (event.port().getState() == State.DOWN) {
-                        processPortRemoved(event.port());
-                    } else {
-                        processPortAdded(event.port());
-                    }
+
+                    executor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        if (event.port().getState() == State.DOWN) {
+                            processPortRemoved(event.port());
+                        } else {
+                            processPortAdded(event.port());
+                        }
+                    });
+
                     break;
                 case OPENSTACK_PORT_REMOVED:
-                    processPortRemoved(event.port());
+
+                    executor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        processPortRemoved(event.port());
+                    });
+
                     break;
                 default:
                     break;
@@ -240,10 +258,10 @@
 
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
+            return event.subject().type() != CONTROLLER;
+        }
 
-            if (event.subject().type() == CONTROLLER) {
-                return false;
-            }
+        private boolean isRelevantHelper(OpenstackNodeEvent event) {
             // do not allow to proceed without mastership
             Device device = deviceService.getDevice(event.subject().intgBridge());
             if (device == null) {
@@ -259,7 +277,15 @@
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
                     log.info("COMPLETE node {} is detected", osNode.hostname());
-                    processComputeState(event.subject());
+
+                    executor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        processComputeState(event.subject());
+                    });
 
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
@@ -280,9 +306,7 @@
                     .filter(port -> port.getHostId().equals(node.hostname()))
                     .collect(Collectors.toList());
 
-            ports.forEach(port -> {
-                addIntfToDevice(node, port);
-            });
+            ports.forEach(port -> addIntfToDevice(node, port));
         }
 
         private void addIntfToDevice(OpenstackNode node, Port port) {
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackMetadataProxyHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackMetadataProxyHandler.java
index dd476de..9446976 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackMetadataProxyHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackMetadataProxyHandler.java
@@ -18,11 +18,6 @@
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang.StringUtils;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.apache.http.Header;
 import org.apache.http.HttpEntityEnclosingRequest;
 import org.apache.http.HttpMessage;
@@ -70,13 +65,21 @@
 import org.onosproject.openstacknode.api.OpenstackNodeListener;
 import org.onosproject.openstacknode.api.OpenstackNodeService;
 import org.openstack4j.model.network.Port;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.openstacknetworking.api.Constants.DHCP_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_DHCP_RULE;
 import static org.onosproject.openstacknetworking.impl.OpenstackMetadataProxyHandler.Http.Type.RESPONSE;
@@ -145,6 +148,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackFlowRuleService osFlowRuleService;
 
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
     private final PacketProcessor packetProcessor = new InternalPacketProcessor();
     private final OpenstackNodeListener osNodeListener = new InternalNodeEventListener();
 
@@ -169,6 +174,7 @@
         packetService.removeProcessor(packetProcessor);
         osNodeService.removeListener(osNodeListener);
         leadershipService.withdraw(appId.name());
+        eventExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -178,11 +184,12 @@
         @Override
         public void process(PacketContext context) {
 
-            if (!useMetadataProxy()) {
+            if (context.isHandled()) {
                 return;
             }
 
-            if (context.isHandled()) {
+            // FIXME: need to find a way to spawn a new thread to check metadata proxy mode
+            if (!useMetadataProxy()) {
                 return;
             }
 
@@ -226,41 +233,48 @@
             byte[] byteData = data.getData();
 
             if (byteData.length != 0) {
-                HttpRequest request = parseHttpRequest(byteData);
-                ConnectPoint cp = context.inPacket().receivedFrom();
-                InstancePort instPort = instancePortService.instancePort(cp.deviceId(), cp.port());
+                eventExecutor.execute(() -> {
+                    processHttpRequest(context, ethPacket, ipv4Packet, tcpPacket, byteData);
+                });
+            }
+        }
 
-                if (instPort == null || request == null) {
-                    log.warn("Cannot send metadata request due to lack of information");
-                    return;
-                }
+        private void processHttpRequest(PacketContext context, Ethernet ethPacket,
+                                        IPv4 ipv4Packet, TCP tcpPacket, byte[] byteData) {
+            HttpRequest request = parseHttpRequest(byteData);
+            ConnectPoint cp = context.inPacket().receivedFrom();
+            InstancePort instPort = instancePortService.instancePort(cp.deviceId(), cp.port());
 
-                // attempt to send HTTP request to the meta-data server (nova-api),
-                // obtain the HTTP response, relay the response to VM through packet-out
-                CloseableHttpResponse proxyResponse = proxyHttpRequest(request, instPort);
+            if (instPort == null || request == null) {
+                log.warn("Cannot send metadata request due to lack of information");
+                return;
+            }
 
-                if (proxyResponse == null) {
-                    log.warn("No response was received from metadata server");
-                    return;
-                }
+            // attempt to send HTTP request to the meta-data server (nova-api),
+            // obtain the HTTP response, relay the response to VM through packet-out
+            CloseableHttpResponse proxyResponse = proxyHttpRequest(request, instPort);
 
-                HttpResponse response = new BasicHttpResponse(proxyResponse.getStatusLine());
-                response.setEntity(proxyResponse.getEntity());
-                response.setHeaders(proxyResponse.getAllHeaders());
+            if (proxyResponse == null) {
+                log.warn("No response was received from metadata server");
+                return;
+            }
 
-                Http httpResponse = new Http();
-                httpResponse.setType(RESPONSE);
-                httpResponse.setMessage(response);
+            HttpResponse response = new BasicHttpResponse(proxyResponse.getStatusLine());
+            response.setEntity(proxyResponse.getEntity());
+            response.setHeaders(proxyResponse.getAllHeaders());
 
-                TCP tcpReply = buildTcpDataPacket(tcpPacket, byteData.length, response);
-                Ethernet ethReply = buildEthFrame(ethPacket, ipv4Packet, tcpReply);
-                sendReply(context, ethReply);
+            Http httpResponse = new Http();
+            httpResponse.setType(RESPONSE);
+            httpResponse.setMessage(response);
 
-                try {
-                    proxyResponse.close();
-                } catch (IOException e) {
-                    log.warn("Failed to close the response connection due to {}", e);
-                }
+            TCP tcpReply = buildTcpDataPacket(tcpPacket, byteData.length, response);
+            Ethernet ethReply = buildEthFrame(ethPacket, ipv4Packet, tcpReply);
+            sendReply(context, ethReply);
+
+            try {
+                proxyResponse.close();
+            } catch (IOException e) {
+                log.warn("Failed to close the response connection due to {}", e);
             }
         }
 
@@ -523,10 +537,12 @@
     private class InternalNodeEventListener implements OpenstackNodeListener {
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader) &&
-                    event.subject().type() == COMPUTE && useMetadataProxy();
+            return event.subject().type() == COMPUTE;
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()))
+                    && useMetadataProxy();
         }
 
         @Override
@@ -534,10 +550,28 @@
             OpenstackNode osNode = event.subject();
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    setMetadataRule(osNode, true);
+
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setMetadataRule(osNode, true);
+                    });
+
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
-                    setMetadataRule(osNode, false);
+
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setMetadataRule(osNode, false);
+                    });
+
                     break;
                 case OPENSTACK_NODE_CREATED:
                 case OPENSTACK_NODE_UPDATED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
index 7e65524..be7616b 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
@@ -62,6 +62,7 @@
 import org.onosproject.openstacknode.api.OpenstackNodeEvent;
 import org.onosproject.openstacknode.api.OpenstackNodeListener;
 import org.onosproject.openstacknode.api.OpenstackNodeService;
+import org.openstack4j.model.network.IP;
 import org.openstack4j.model.network.NetFloatingIP;
 import org.openstack4j.model.network.Network;
 import org.openstack4j.model.network.Port;
@@ -329,21 +330,26 @@
                 return;
             }
 
+            InboundPacket pkt = context.inPacket();
+            Ethernet ethernet = pkt.parsed();
+            if (ethernet != null && ethernet.getEtherType() == Ethernet.TYPE_ARP) {
+                eventExecutor.execute(() -> {
+
+                    if (!isRelevantHelper(context)) {
+                        return;
+                    }
+
+                    processArpPacket(context, ethernet);
+                });
+            }
+        }
+
+        private boolean isRelevantHelper(PacketContext context) {
             Set<DeviceId> gateways = osNodeService.completeNodes(GATEWAY)
                     .stream().map(OpenstackNode::intgBridge)
                     .collect(Collectors.toSet());
 
-            if (!gateways.contains(context.inPacket().receivedFrom().deviceId())) {
-                // return if the packet is not from gateway nodes
-                return;
-            }
-
-            InboundPacket pkt = context.inPacket();
-            Ethernet ethernet = pkt.parsed();
-            if (ethernet != null &&
-                    ethernet.getEtherType() == Ethernet.TYPE_ARP) {
-                eventExecutor.execute(() -> processArpPacket(context, ethernet));
-            }
+            return gateways.contains(context.inPacket().receivedFrom().deviceId());
         }
     }
 
@@ -529,7 +535,14 @@
 
     private void setFakeGatewayArpRuleByRouter(Router router, boolean install) {
         if (ARP_BROADCAST_MODE.equals(getArpMode())) {
-            setFakeGatewayArpRuleByExternalIp(getExternalIp(router, osNetworkService), install);
+            IpAddress externalIp = getExternalIp(router, osNetworkService);
+
+            if (externalIp == null) {
+                log.debug("External IP is not found");
+                return;
+            }
+
+            setFakeGatewayArpRuleByExternalIp(externalIp, install);
         }
     }
 
@@ -580,38 +593,54 @@
                 return false;
             }
 
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader) &&
-                    DEVICE_OWNER_ROUTER_GW.equals(osPort.getDeviceOwner()) &&
-                        ARP_BROADCAST_MODE.equals(getArpMode());
+            return DEVICE_OWNER_ROUTER_GW.equals(osPort.getDeviceOwner()) &&
+                       ARP_BROADCAST_MODE.equals(getArpMode());
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(OpenstackNetworkEvent event) {
+            IpAddress ipAddress = externalIp(event.port());
             switch (event.type()) {
                 case OPENSTACK_PORT_CREATED:
                 case OPENSTACK_PORT_UPDATED:
-                    eventExecutor.execute(() ->
-                            setFakeGatewayArpRuleByExternalIp(
-                                    IpAddress.valueOf(event.port().getFixedIps()
-                                            .stream().findAny().get().getIpAddress()),
-                                    true)
-                    );
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper() || ipAddress == null) {
+                            return;
+                        }
+
+                        setFakeGatewayArpRuleByExternalIp(ipAddress, true);
+                    });
                     break;
                 case OPENSTACK_PORT_REMOVED:
-                    eventExecutor.execute(() ->
-                            setFakeGatewayArpRuleByExternalIp(
-                                    IpAddress.valueOf(event.port().getFixedIps()
-                                            .stream().findAny().get().getIpAddress()),
-                                    false)
-                    );
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper() || ipAddress == null) {
+                            return;
+                        }
+
+                        setFakeGatewayArpRuleByExternalIp(ipAddress, false);
+                    });
                     break;
                 default:
                     // do nothing
                     break;
             }
         }
+
+        private IpAddress externalIp(Port port) {
+            IP ip = port.getFixedIps().stream().findAny().orElse(null);
+
+            if (ip != null && ip.getIpAddress() != null) {
+                return IpAddress.valueOf(ip.getIpAddress());
+            }
+
+            return null;
+        }
     }
 
     /**
@@ -620,11 +649,8 @@
      */
     private class InternalRouterEventListener implements OpenstackRouterListener {
 
-        @Override
-        public boolean isRelevant(OpenstackRouterEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
@@ -637,44 +663,40 @@
                     // add a router with external gateway
                 case OPENSTACK_ROUTER_GATEWAY_ADDED:
                     // add a gateway manually after adding a router
-                    eventExecutor.execute(() ->
-                            // add a router with external gateway
-                            setFakeGatewayArpRuleByRouter(event.subject(), true)
-                    );
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        // add a router with external gateway
+                        setFakeGatewayArpRuleByRouter(event.subject(), true);
+                    });
                     break;
                 case OPENSTACK_ROUTER_REMOVED:
                     // remove a router with external gateway
                 case OPENSTACK_ROUTER_GATEWAY_REMOVED:
                     // remove a gateway from an existing router
-                    eventExecutor.execute(() ->
-                            setFakeGatewayArpRuleByRouter(event.subject(), false)
-                    );
-                    break;
-                case OPENSTACK_FLOATING_IP_ASSOCIATED:
                     eventExecutor.execute(() -> {
-                        if (getValidPortId(event) == null) {
+
+                        if (!isRelevantHelper()) {
                             return;
                         }
-                        // associate a floating IP with an existing VM
-                        setFloatingIpArpRule(event.floatingIp(),
-                                getValidPortId(event), completedGws, true);
-                    });
-                    break;
-                case OPENSTACK_FLOATING_IP_DISASSOCIATED:
-                    eventExecutor.execute(() -> {
-                        if (getValidPortId(event) == null) {
-                            return;
-                        }
-                        // disassociate a floating IP with an existing VM
-                        setFloatingIpArpRule(event.floatingIp(),
-                                getValidPortId(event), completedGws, false);
+
+                        setFakeGatewayArpRuleByRouter(event.subject(), false);
                     });
                     break;
                 case OPENSTACK_FLOATING_IP_CREATED:
                     // during floating IP creation, if the floating IP is
                     // associated with any port of VM, then we will set
                     // floating IP related ARP rules to gateway node
+                case OPENSTACK_FLOATING_IP_ASSOCIATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         if (getValidPortId(event) == null) {
                             return;
                         }
@@ -687,11 +709,17 @@
                     // during floating IP deletion, if the floating IP is
                     // still associated with any port of VM, then we will
                     // remove floating IP related ARP rules from gateway node
+                case OPENSTACK_FLOATING_IP_DISASSOCIATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         if (getValidPortId(event) == null) {
                             return;
                         }
-                        // associate a floating IP with an existing VM
+                        // disassociate a floating IP with an existing VM
                         setFloatingIpArpRule(event.floatingIp(),
                                 getValidPortId(event), completedGws, false);
                     });
@@ -720,46 +748,62 @@
 
     private class InternalInstancePortListener implements InstancePortListener {
 
-        @Override
-        public boolean isRelevant(InstancePortEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(InstancePortEvent event) {
             InstancePort instPort = event.subject();
 
-            Set<NetFloatingIP> ips = osRouterAdminService.floatingIps();
-            NetFloatingIP fip = associatedFloatingIp(instPort, ips);
-            Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
-
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
-                    eventExecutor.execute(() ->
-                            osRouterAdminService.floatingIps().stream()
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        osRouterAdminService.floatingIps().stream()
                                 .filter(f -> f.getPortId() != null)
                                 .filter(f -> f.getPortId().equals(instPort.portId()))
-                                .forEach(f -> setFloatingIpArpRule(f,
-                                        instPort.portId(), gateways, true))
-                    );
+                                .forEach(f -> setFloatingIpArpRule(f, instPort.portId(),
+                                        osNodeService.completeNodes(GATEWAY), true));
+                    });
+
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
                     eventExecutor.execute(() -> {
-                        if (gateways.size() == 1) {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        NetFloatingIP fip = associatedFloatingIp(instPort,
+                                            osRouterAdminService.floatingIps());
+
+                        if (osNodeService.completeNodes(GATEWAY).size() == 1) {
                             return;
                         }
 
                         if (fip != null && isAssociatedWithVM(osNetworkService, fip)) {
-                            setFloatingIpArpRuleWithPortEvent(fip,
-                                        event.subject(), gateways, true);
+                            setFloatingIpArpRuleWithPortEvent(fip, event.subject(),
+                                    osNodeService.completeNodes(GATEWAY), true);
                         }
                     });
 
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        NetFloatingIP fip = associatedFloatingIp(instPort,
+                                            osRouterAdminService.floatingIps());
+                        Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
+
                         InstancePort revisedInstPort = swapStaleLocation(event.subject());
 
                         if (gateways.size() == 1) {
@@ -794,9 +838,11 @@
 
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader) && event.subject().type() == GATEWAY;
+            return event.subject().type() == GATEWAY;
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
@@ -805,6 +851,9 @@
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
                     eventExecutor.execute(() -> {
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
                         setDefaultArpRule(osNode, true);
                         setFloatingIpArpRuleForGateway(osNode, true);
                         sendGratuitousArpToSwitch(event.subject(), true);
@@ -812,6 +861,9 @@
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
                     eventExecutor.execute(() -> {
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
                         setDefaultArpRule(osNode, false);
                         setFloatingIpArpRuleForGateway(osNode, false);
                         sendGratuitousArpToSwitch(event.subject(), false);
@@ -819,6 +871,9 @@
                     break;
                 case OPENSTACK_NODE_REMOVED:
                     eventExecutor.execute(() -> {
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
                         sendGratuitousArpToSwitch(event.subject(), false);
                     });
                     break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
index f04e14a..dc7c877 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
@@ -18,11 +18,6 @@
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
@@ -62,6 +57,11 @@
 import org.openstack4j.model.network.NetworkType;
 import org.openstack4j.model.network.Port;
 import org.openstack4j.openstack.networking.domain.NeutronFloatingIP;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -545,19 +545,23 @@
 
         @Override
         public boolean isRelevant(OpenstackRouterEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leader)) {
-                return false;
-            }
             return event.floatingIp() != null;
         }
 
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
         @Override
         public void event(OpenstackRouterEvent event) {
             switch (event.type()) {
                 case OPENSTACK_FLOATING_IP_ASSOCIATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         if (instancePortService.instancePort(osFip.getPortId()) != null) {
                             associateFloatingIp(osFip);
@@ -569,6 +573,11 @@
                     break;
                 case OPENSTACK_FLOATING_IP_DISASSOCIATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         if (instancePortService.instancePort(event.portId()) != null) {
                             disassociateFloatingIp(osFip, event.portId());
@@ -580,6 +589,11 @@
                     break;
                 case OPENSTACK_FLOATING_IP_CREATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         String portId = osFip.getPortId();
                         if (!Strings.isNullOrEmpty(portId) &&
@@ -591,6 +605,11 @@
                     break;
                 case OPENSTACK_FLOATING_IP_REMOVED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         String portId = osFip.getPortId();
                         if (!Strings.isNullOrEmpty(osFip.getPortId())) {
@@ -630,20 +649,24 @@
 
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leader)) {
-                return false;
-            }
             return event.subject().type() == GATEWAY;
         }
 
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
         @Override
         public void event(OpenstackNodeEvent event) {
 
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         for (NetFloatingIP fip : osRouterAdminService.floatingIps()) {
 
                             if (Strings.isNullOrEmpty(fip.getPortId())) {
@@ -664,6 +687,11 @@
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         for (NetFloatingIP fip : osRouterAdminService.floatingIps()) {
                             if (Strings.isNullOrEmpty(fip.getPortId())) {
                                 continue;
@@ -714,8 +742,7 @@
 
     private class InternalInstancePortListener implements InstancePortListener {
 
-        @Override
-        public boolean isRelevant(InstancePortEvent event) {
+        private boolean isRelevantHelper(InstancePortEvent event) {
 
             if (event.type() == OPENSTACK_INSTANCE_MIGRATION_ENDED ||
                     event.type() == OPENSTACK_INSTANCE_MIGRATION_STARTED) {
@@ -730,27 +757,22 @@
                 }
             }
 
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-
-            return Objects.equals(localNodeId, leader);
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(InstancePortEvent event) {
             InstancePort instPort = event.subject();
-            Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
-
-            Set<NetFloatingIP> ips = osRouterAdminService.floatingIps();
-            NetFloatingIP fip;
-            Port osPort;
-            Network osNet;
-            ExternalPeerRouter externalPeerRouter;
 
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
 
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
                         if (instPort != null && instPort.portId() != null) {
                             osRouterAdminService.floatingIps().stream()
                                     .filter(f -> f.getPortId() != null)
@@ -762,24 +784,31 @@
                     break;
 
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    fip = associatedFloatingIp(event.subject(), ips);
-
-                    if (fip == null) {
-                        return;
-                    }
-
-                    osPort = osNetworkService.port(fip.getPortId());
-                    osNet = osNetworkService.network(osPort.getNetworkId());
-                    externalPeerRouter = externalPeerRouterForNetwork(osNet,
-                                         osNetworkService, osRouterAdminService);
-
-                    if (externalPeerRouter == null) {
-                        final String errorFormat = ERR_FLOW + "no external peer router found";
-                        throw new IllegalStateException(errorFormat);
-                    }
 
                     eventExecutor.execute(() -> {
 
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
+                        Set<NetFloatingIP> ips = osRouterAdminService.floatingIps();
+                        NetFloatingIP fip = associatedFloatingIp(event.subject(), ips);
+
+                        if (fip == null) {
+                            return;
+                        }
+
+                        Port osPort = osNetworkService.port(fip.getPortId());
+                        Network osNet = osNetworkService.network(osPort.getNetworkId());
+                        ExternalPeerRouter externalPeerRouter = externalPeerRouterForNetwork(osNet,
+                                osNetworkService, osRouterAdminService);
+
+                        if (externalPeerRouter == null) {
+                            final String errorFormat = ERR_FLOW + "no external peer router found";
+                            throw new IllegalStateException(errorFormat);
+                        }
+
                         // since DownstreamExternal rules should only be placed in
                         // corresponding gateway node, we need to install new rule to
                         // the corresponding gateway node
@@ -794,52 +823,57 @@
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
 
-                    InstancePort oldInstPort = swapStaleLocation(event.subject());
-
-                    fip = associatedFloatingIp(oldInstPort, ips);
-
-                    if (fip == null) {
-                        return;
-                    }
-
-                    osPort = osNetworkService.port(fip.getPortId());
-                    osNet = osNetworkService.network(osPort.getNetworkId());
-                    externalPeerRouter = externalPeerRouterForNetwork(osNet,
-                                         osNetworkService, osRouterAdminService);
-
-                    if (externalPeerRouter == null) {
-                        final String errorFormat = ERR_FLOW + "no external peer router found";
-                        throw new IllegalStateException(errorFormat);
-                    }
-
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        InstancePort oldInstPort = swapStaleLocation(event.subject());
+
+                        Set<NetFloatingIP> ips = osRouterAdminService.floatingIps();
+                        NetFloatingIP fip = associatedFloatingIp(oldInstPort, ips);
+
+                        if (fip == null) {
+                            return;
+                        }
+
+                        Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
+                        Port osPort = osNetworkService.port(fip.getPortId());
+                        Network osNet = osNetworkService.network(osPort.getNetworkId());
+                        ExternalPeerRouter externalPeerRouter = externalPeerRouterForNetwork(osNet,
+                                osNetworkService, osRouterAdminService);
+
+                        if (externalPeerRouter == null) {
+                            final String errorFormat = ERR_FLOW + "no external peer router found";
+                            throw new IllegalStateException(errorFormat);
+                        }
+
                         // We need to remove the old ComputeNodeToGateway rules from
                         // original compute node
                         setComputeNodeToGatewayHelper(oldInstPort, osNet, gateways, false);
-                    });
 
                         // If we only have one gateway, we simply do not remove any
-                    // flow rules from either gateway or compute node
-                    if (gateways.size() == 1) {
-                        return;
-                    }
+                        // flow rules from either gateway or compute node
+                        if (gateways.size() == 1) {
+                            return;
+                        }
 
-                    // Checks whether the destination compute node's device id
-                    // has identical gateway hash or not
-                    // if it is true, we simply do not remove the rules, as
-                    // it has been overwritten at port detention event
-                    // if it is false, we will remove the rules
-                    DeviceId newDeviceId = event.subject().deviceId();
-                    DeviceId oldDeviceId = oldInstPort.deviceId();
+                        // Checks whether the destination compute node's device id
+                        // has identical gateway hash or not
+                        // if it is true, we simply do not remove the rules, as
+                        // it has been overwritten at port detention event
+                        // if it is false, we will remove the rules
+                        DeviceId newDeviceId = event.subject().deviceId();
+                        DeviceId oldDeviceId = oldInstPort.deviceId();
 
-                    OpenstackNode oldGateway = getGwByComputeDevId(gateways, oldDeviceId);
-                    OpenstackNode newGateway = getGwByComputeDevId(gateways, newDeviceId);
+                        OpenstackNode oldGateway = getGwByComputeDevId(gateways, oldDeviceId);
+                        OpenstackNode newGateway = getGwByComputeDevId(gateways, newDeviceId);
 
-                    if (oldGateway != null && oldGateway.equals(newGateway)) {
-                        return;
-                    }
+                        if (oldGateway != null && oldGateway.equals(newGateway)) {
+                            return;
+                        }
 
-                    eventExecutor.execute(() -> {
                         // Since DownstreamExternal rules should only be placed in
                         // corresponding gateway node, we need to remove old rule from
                         // the corresponding gateway node
@@ -855,18 +889,22 @@
 
     private class InternalOpenstackNetworkListener implements OpenstackNetworkListener {
 
-        @Override
-        public boolean isRelevant(OpenstackNetworkEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(OpenstackNetworkEvent event) {
             switch (event.type()) {
                 case OPENSTACK_PORT_PRE_REMOVE:
-                    eventExecutor.execute(() -> processPortPreRemove(event));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        processPortPreRemove(event);
+                    });
                     break;
                 default:
                     break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
index af81c2f..a76961b 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingHandler.java
@@ -950,11 +950,8 @@
 
     private class InternalRouterEventListener implements OpenstackRouterListener {
 
-        @Override
-        public boolean isRelevant(OpenstackRouterEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         // FIXME only one leader in the cluster should process
@@ -965,27 +962,53 @@
                     log.debug("Router(name:{}, ID:{}) is created",
                             event.subject().getName(),
                             event.subject().getId());
-                    eventExecutor.execute(() -> routerUpdated(event.subject()));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        routerUpdated(event.subject());
+                    });
                     break;
                 case OPENSTACK_ROUTER_UPDATED:
                     log.debug("Router(name:{}, ID:{}) is updated",
                             event.subject().getName(),
                             event.subject().getId());
-                    eventExecutor.execute(() -> routerUpdated(event.subject()));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        routerUpdated(event.subject());
+                    });
                     break;
                 case OPENSTACK_ROUTER_REMOVED:
                     log.debug("Router(name:{}, ID:{}) is removed",
                             event.subject().getName(),
                             event.subject().getId());
-                    eventExecutor.execute(() -> routerRemove(event.subject()));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        routerRemove(event.subject());
+                    });
                     break;
                 case OPENSTACK_ROUTER_INTERFACE_ADDED:
                     log.debug("Router interface {} added to router {}",
                             event.routerIface().getPortId(),
                             event.routerIface().getId());
-                    eventExecutor.execute(() -> routerIfaceAdded(
-                            event.subject(),
-                            event.routerIface()));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        routerIfaceAdded(event.subject(), event.routerIface());
+                    });
                     break;
                 case OPENSTACK_ROUTER_INTERFACE_UPDATED:
                     log.debug("Router interface {} on {} updated",
@@ -996,9 +1019,14 @@
                     log.debug("Router interface {} removed from router {}",
                             event.routerIface().getPortId(),
                             event.routerIface().getId());
-                    eventExecutor.execute(() -> routerIfaceRemoved(
-                            event.subject(),
-                            event.routerIface()));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        routerIfaceRemoved(event.subject(), event.routerIface());
+                    });
                     break;
                 case OPENSTACK_ROUTER_GATEWAY_ADDED:
                     log.debug("Router external gateway {} added",
@@ -1022,11 +1050,8 @@
 
     private class InternalNodeEventListener implements OpenstackNodeListener {
 
-        @Override
-        public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
@@ -1040,6 +1065,11 @@
                 case OPENSTACK_NODE_REMOVED:
                     eventExecutor.execute(() -> {
                         log.info("Reconfigure routers for {}", osNode.hostname());
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         reconfigureRouters();
                     });
                     break;
@@ -1061,10 +1091,8 @@
 
     private class InternalInstancePortListener implements InstancePortListener {
 
-        @Override
-        public boolean isRelevant(InstancePortEvent event) {
-            InstancePort instPort = event.subject();
-            return mastershipService.isLocalMaster(instPort.deviceId());
+        private boolean isRelevantHelper(InstancePortEvent event) {
+            return mastershipService.isLocalMaster(event.subject().deviceId());
         }
 
         @Override
@@ -1077,7 +1105,14 @@
                                                             instPort.macAddress(),
                                                             instPort.ipAddress());
 
-                    eventExecutor.execute(() -> instPortDetected(event.subject()));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        instPortDetected(event.subject());
+                    });
 
                     break;
                 case OPENSTACK_INSTANCE_PORT_VANISHED:
@@ -1085,7 +1120,14 @@
                                                             instPort.macAddress(),
                                                             instPort.ipAddress());
 
-                    eventExecutor.execute(() -> instPortRemoved(event.subject()));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        instPortRemoved(event.subject());
+                    });
 
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
@@ -1093,7 +1135,14 @@
                                                             instPort.macAddress(),
                                                             instPort.ipAddress());
 
-                    eventExecutor.execute(() -> instPortDetected(instPort));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        instPortDetected(instPort);
+                    });
 
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
index f4947f0..75db5a6 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingIcmpHandler.java
@@ -181,9 +181,11 @@
     private class InternalNodeEventListener implements OpenstackNodeListener {
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader) && event.subject().type() == GATEWAY;
+            return event.subject().type() == GATEWAY;
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
@@ -191,10 +193,24 @@
             OpenstackNode osNode = event.subject();
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    eventExecutor.execute(() -> setIcmpReplyRules(osNode.intgBridge(), true));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setIcmpReplyRules(osNode.intgBridge(), true);
+                    });
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
-                    eventExecutor.execute(() -> setIcmpReplyRules(osNode.intgBridge(), false));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setIcmpReplyRules(osNode.intgBridge(), false);
+                    });
                     break;
                 default:
                     break;
@@ -233,12 +249,8 @@
             }
 
             eventExecutor.execute(() -> {
-                Set<DeviceId> gateways = osNodeService.completeNodes(GATEWAY)
-                        .stream().map(OpenstackNode::intgBridge)
-                        .collect(Collectors.toSet());
 
-                if (!gateways.isEmpty() &&
-                        !gateways.contains(context.inPacket().receivedFrom().deviceId())) {
+                if (!isRelevantHelper(context)) {
                     return;
                 }
 
@@ -256,6 +268,14 @@
             });
         }
 
+        private boolean isRelevantHelper(PacketContext context) {
+            Set<DeviceId> gateways = osNodeService.completeNodes(GATEWAY)
+                    .stream().map(OpenstackNode::intgBridge)
+                    .collect(Collectors.toSet());
+
+            return gateways.contains(context.inPacket().receivedFrom().deviceId());
+        }
+
         private void processIcmpPacket(PacketContext context, Ethernet ethernet) {
             IPv4 ipPacket = (IPv4) ethernet.getPayload();
             ICMP icmp = (ICMP) ipPacket.getPayload();
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingSnatHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingSnatHandler.java
index 23333c4..6203740 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingSnatHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingSnatHandler.java
@@ -483,14 +483,9 @@
 
         @Override
         public void process(PacketContext context) {
-            Set<DeviceId> gateways = osNodeService.completeNodes(OpenstackNode.NodeType.GATEWAY)
-                    .stream().map(OpenstackNode::intgBridge)
-                    .collect(Collectors.toSet());
+
             if (context.isHandled()) {
                 return;
-            } else if (!gateways.contains(context.inPacket().receivedFrom().deviceId())) {
-                // return if the packet is not from gateway nodes
-                return;
             }
 
             InboundPacket pkt = context.inPacket();
@@ -511,9 +506,24 @@
                         break;
                     }
                 default:
-                    eventExecutor.execute(() -> processSnatPacket(context, eth));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(context)) {
+                            return;
+                        }
+
+                        processSnatPacket(context, eth);
+                    });
                     break;
             }
         }
+
+        private boolean isRelevantHelper(PacketContext context) {
+            Set<DeviceId> gateways = osNodeService.completeNodes(GATEWAY)
+                    .stream().map(OpenstackNode::intgBridge)
+                    .collect(Collectors.toSet());
+
+            return gateways.contains(context.inPacket().receivedFrom().deviceId());
+        }
     }
 }
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
index 6558410..52eb1ae 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSecurityGroupHandler.java
@@ -759,8 +759,11 @@
 
         @Override
         public boolean isRelevant(InstancePortEvent event) {
-            return useSecurityGroup &&
-                    mastershipService.isLocalMaster(event.subject().deviceId());
+            return useSecurityGroup;
+        }
+
+        private boolean isRelevantHelper(InstancePortEvent event) {
+            return mastershipService.isLocalMaster(event.subject().deviceId());
         }
 
         @Override
@@ -770,11 +773,22 @@
                 case OPENSTACK_INSTANCE_PORT_UPDATED:
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    eventExecutor.execute(() ->
-                                    installSecurityGroupRules(event, instPort));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        installSecurityGroupRules(event, instPort);
+                    });
                     break;
                 case OPENSTACK_INSTANCE_PORT_VANISHED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
                         Port osPort = removedOsPortStore.asJavaMap().get(instPort.portId());
                         setSecurityGroupRules(instPort, osPort, false);
                         removedOsPortStore.remove(instPort.portId());
@@ -782,6 +796,11 @@
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
                         InstancePort revisedInstPort = swapStaleLocation(instPort);
                         Port port = osNetService.port(instPort.portId());
                         setSecurityGroupRules(revisedInstPort, port, false);
@@ -811,13 +830,17 @@
                 return false;
             }
 
+            return useSecurityGroup;
+        }
+
+        private boolean isRelevantHelper(OpenstackNetworkEvent event) {
             InstancePort instPort = instancePortService.instancePort(event.port().getId());
 
             if (instPort == null) {
                 return false;
             }
 
-            return useSecurityGroup && mastershipService.isLocalMaster(instPort.deviceId());
+            return mastershipService.isLocalMaster(instPort.deviceId());
         }
 
         @Override
@@ -827,8 +850,14 @@
 
             switch (event.type()) {
                 case OPENSTACK_PORT_PRE_REMOVE:
-                    eventExecutor.execute(() ->
-                                removedOsPortStore.put(osPort.getId(), osPort));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        removedOsPortStore.put(osPort.getId(), osPort);
+                    });
                     break;
                 default:
                     // do nothing for the other events
@@ -837,8 +866,7 @@
         }
     }
 
-    private class InternalOpenstackNetworkListener
-            implements OpenstackNetworkListener {
+    private class InternalOpenstackNetworkListener implements OpenstackNetworkListener {
 
         @Override
         public boolean isRelevant(OpenstackNetworkEvent event) {
@@ -846,6 +874,10 @@
                 return false;
             }
 
+            return useSecurityGroup;
+        }
+
+        private boolean isRelevantHelper(OpenstackNetworkEvent event) {
             if (event.securityGroupId() == null ||
                     securityGroupService.securityGroup(event.securityGroupId()) == null) {
                 return false;
@@ -857,19 +889,25 @@
                 return false;
             }
 
-            return useSecurityGroup && mastershipService.isLocalMaster(instPort.deviceId());
+            return mastershipService.isLocalMaster(instPort.deviceId());
         }
 
         @Override
         public void event(OpenstackNetworkEvent event) {
             log.debug("security group event received {}", event);
             Port osPort = event.port();
-            InstancePort instPort = instancePortService.instancePort(osPort.getId());
-            SecurityGroup osSg = securityGroupService.securityGroup(event.securityGroupId());
 
             switch (event.type()) {
                 case OPENSTACK_PORT_SECURITY_GROUP_ADDED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        InstancePort instPort = instancePortService.instancePort(osPort.getId());
+                        SecurityGroup osSg = securityGroupService.securityGroup(event.securityGroupId());
+
                         osSg.getRules().forEach(sgRule -> {
                             updateSecurityGroupRule(instPort, osPort, sgRule, true);
                         });
@@ -879,6 +917,14 @@
                     break;
                 case OPENSTACK_PORT_SECURITY_GROUP_REMOVED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        InstancePort instPort = instancePortService.instancePort(osPort.getId());
+                        SecurityGroup osSg = securityGroupService.securityGroup(event.securityGroupId());
+
                         osSg.getRules().forEach(sgRule -> {
                             updateSecurityGroupRule(instPort, osPort, sgRule, false);
                         });
@@ -893,25 +939,28 @@
         }
     }
 
-    private class InternalSecurityGroupListener
-            implements OpenstackSecurityGroupListener {
+    private class InternalSecurityGroupListener implements OpenstackSecurityGroupListener {
 
         @Override
         public boolean isRelevant(OpenstackSecurityGroupEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leader)) {
-                return false;
-            }
             return useSecurityGroup;
         }
 
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
         @Override
         public void event(OpenstackSecurityGroupEvent event) {
             switch (event.type()) {
                 case OPENSTACK_SECURITY_GROUP_RULE_CREATED:
                     SecurityGroupRule sgRuleToAdd = event.securityGroupRule();
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         securityGroupRuleAdded(sgRuleToAdd);
                         log.info("Applied new security group rule {} to ports",
                                 sgRuleToAdd.getId());
@@ -921,6 +970,11 @@
                 case OPENSTACK_SECURITY_GROUP_RULE_REMOVED:
                     SecurityGroupRule sgRuleToRemove = event.securityGroupRule();
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         securityGroupRuleRemoved(sgRuleToRemove);
                         log.info("Removed security group rule {} from ports",
                                 sgRuleToRemove.getId());
@@ -939,20 +993,25 @@
 
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leader)) {
-                return false;
-            }
             return event.subject().type() == COMPUTE;
         }
 
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
         @Override
         public void event(OpenstackNodeEvent event) {
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    eventExecutor.execute(OpenstackSecurityGroupHandler.this::
-                                                        resetSecurityGroupRules);
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        OpenstackSecurityGroupHandler.this.resetSecurityGroupRules();
+                    });
                     break;
                 case OPENSTACK_NODE_CREATED:
                 case OPENSTACK_NODE_REMOVED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
index a4696d5..a2d0013 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingArpHandler.java
@@ -566,11 +566,10 @@
 
         @Override
         public boolean isRelevant(OpenstackNetworkEvent event) {
-            Subnet osSubnet = event.subnet();
-            if (osSubnet == null) {
-                return false;
-            }
+            return event.subnet() != null;
+        }
 
+        private boolean isRelevantHelper(Subnet osSubnet) {
             Network network = osNetworkService.network(osSubnet.getNetworkId());
 
             if (network == null) {
@@ -597,11 +596,21 @@
                 case OPENSTACK_SUBNET_CREATED:
                 case OPENSTACK_SUBNET_UPDATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event.subnet())) {
+                            return;
+                        }
+
                         setFakeGatewayArpRule(event.subnet(), true, null);
                     });
                     break;
                 case OPENSTACK_SUBNET_REMOVED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event.subnet())) {
+                            return;
+                        }
+
                         setFakeGatewayArpRule(event.subnet(), false, null);
                     });
                     break;
@@ -624,13 +633,8 @@
      * default ARP rule to handle ARP request.
      */
     private class InternalNodeEventListener implements OpenstackNodeListener {
-
-        @Override
-        public boolean isRelevant(OpenstackNodeEvent event) {
-
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader) && event.subject().type() == COMPUTE;
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
@@ -639,12 +643,22 @@
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         setDefaultArpRule(osNode, true);
                         setAllArpRules(osNode, true);
                     });
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         setDefaultArpRule(osNode, false);
                         setAllArpRules(osNode, false);
                     });
@@ -749,13 +763,11 @@
 
         @Override
         public boolean isRelevant(InstancePortEvent event) {
+            return ARP_BROADCAST_MODE.equals(getArpMode());
+        }
 
-            if (ARP_PROXY_MODE.equals(getArpMode())) {
-                return false;
-            }
-
-            InstancePort instPort = event.subject();
-            return mastershipService.isLocalMaster(instPort.deviceId());
+        private boolean isRelevantHelper(InstancePortEvent event) {
+            return mastershipService.isLocalMaster(event.subject().deviceId());
         }
 
         @Override
@@ -763,25 +775,35 @@
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
                 case OPENSTACK_INSTANCE_PORT_UPDATED:
+                case OPENSTACK_INSTANCE_MIGRATION_STARTED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
                         setArpRequestRule(event.subject(), true);
                         setArpReplyRule(event.subject(), true);
                     });
                     break;
                 case OPENSTACK_INSTANCE_PORT_VANISHED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
                         setArpRequestRule(event.subject(), false);
                         setArpReplyRule(event.subject(), false);
                     });
                     break;
-                case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    eventExecutor.execute(() -> {
-                        setArpRequestRule(event.subject(), true);
-                        setArpReplyRule(event.subject(), true);
-                    });
-                    break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
                         InstancePort revisedInstPort = swapStaleLocation(event.subject());
                         setArpRequestRule(revisedInstPort, false);
                     });
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
index 113feb9..e5bef9a 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingDhcpHandler.java
@@ -91,7 +91,6 @@
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_DHCP_RULE;
 import static org.onosproject.openstacknetworking.impl.OsgiPropertyConstants.DHCP_SERVER_MAC;
 import static org.onosproject.openstacknetworking.impl.OsgiPropertyConstants.DHCP_SERVER_MAC_DEFAULT;
-import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -546,11 +545,8 @@
     }
 
     private class InternalNodeEventListener implements OpenstackNodeListener {
-        @Override
-        public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader) && event.subject().type() == COMPUTE;
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
@@ -558,10 +554,24 @@
             OpenstackNode osNode = event.subject();
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
-                    eventExecutor.execute(() -> setDhcpRule(osNode, true));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setDhcpRule(osNode, true);
+                    });
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
-                    eventExecutor.execute(() -> setDhcpRule(osNode, false));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setDhcpRule(osNode, false);
+                    });
                     break;
                 case OPENSTACK_NODE_CREATED:
                 case OPENSTACK_NODE_UPDATED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
index 47fad9d..14fd5da 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHandler.java
@@ -17,11 +17,6 @@
 package org.onosproject.openstacknetworking.impl;
 
 import com.google.common.base.Strings;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.VlanId;
 import org.onosproject.cfg.ComponentConfigService;
@@ -52,6 +47,11 @@
 import org.openstack4j.model.network.Network;
 import org.openstack4j.model.network.NetworkType;
 import org.openstack4j.model.network.Port;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
 import java.util.Objects;
@@ -76,6 +76,7 @@
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_TUNNEL_TAG_RULE;
 import static org.onosproject.openstacknetworking.api.Constants.STAT_FLAT_OUTBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAG_TABLE;
+import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_MIGRATION_STARTED;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getPropertyValue;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.swapStaleLocation;
 import static org.onosproject.openstacknetworking.util.RulePopulatorUtil.buildExtension;
@@ -677,7 +678,8 @@
     }
 
     private String getArpMode() {
-        Set<ConfigProperty> properties = configService.getProperties(OpenstackSwitchingArpHandler.class.getName());
+        Set<ConfigProperty> properties =
+                configService.getProperties(OpenstackSwitchingArpHandler.class.getName());
         return getPropertyValue(properties, ARP_MODE);
     }
 
@@ -690,38 +692,58 @@
      */
     private class InternalInstancePortListener implements InstancePortListener {
 
-        @Override
-        public boolean isRelevant(InstancePortEvent event) {
-            InstancePort instPort = event.subject();
-            return mastershipService.isLocalMaster(instPort.deviceId());
+        private boolean isRelevantHelper(InstancePortEvent event) {
+            return mastershipService.isLocalMaster(event.subject().deviceId());
         }
 
         @Override
         public void event(InstancePortEvent event) {
             InstancePort instPort = event.subject();
-            Port osPort = osNetworkService.port(instPort.portId());
 
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
                 case OPENSTACK_INSTANCE_PORT_UPDATED:
+                case OPENSTACK_INSTANCE_MIGRATION_STARTED:
                 case OPENSTACK_INSTANCE_RESTARTED:
-                    log.info("SwitchingHandler: Instance port detected MAC:{} IP:{}",
+
+                    if (event.type() == OPENSTACK_INSTANCE_MIGRATION_STARTED) {
+                        log.info("SwitchingHandler: Migration started at MAC:{} IP:{}",
                                                         instPort.macAddress(),
                                                         instPort.ipAddress());
-
-                    eventExecutor.execute(() -> instPortDetected(instPort));
-
-                    if (osPort != null) {
-                        eventExecutor.execute(() ->
-                                setPortBlockRules(instPort, !osPort.isAdminStateUp()));
+                    } else {
+                        log.info("SwitchingHandler: Instance port detected MAC:{} IP:{}",
+                                                        instPort.macAddress(),
+                                                        instPort.ipAddress());
                     }
 
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        instPortDetected(instPort);
+
+                        Port osPort = osNetworkService.port(instPort.portId());
+
+                        if (osPort != null) {
+                            setPortBlockRules(instPort, !osPort.isAdminStateUp());
+                        }
+                    });
+
                     break;
                 case OPENSTACK_INSTANCE_TERMINATED:
                     log.info("SwitchingHandler: Instance port terminated MAC:{} IP:{}",
                                                         instPort.macAddress(),
                                                         instPort.ipAddress());
-                    eventExecutor.execute(() -> removeVportRules(instPort));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        removeVportRules(instPort);
+                    });
 
                     break;
                 case OPENSTACK_INSTANCE_PORT_VANISHED:
@@ -729,24 +751,20 @@
                                                         instPort.macAddress(),
                                                         instPort.ipAddress());
 
-                    eventExecutor.execute(() -> instPortRemoved(instPort));
+                    eventExecutor.execute(() -> {
 
-                    if (osPort != null) {
-                        setPortBlockRules(instPort, false);
-                    }
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
 
-                    break;
-                case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    log.info("SwitchingHandler: Migration started for MAC:{} IP:{}",
-                                                        instPort.macAddress(),
-                                                        instPort.ipAddress());
+                        instPortRemoved(instPort);
 
-                    eventExecutor.execute(() -> instPortDetected(instPort));
+                        Port osPort = osNetworkService.port(instPort.portId());
 
-                    if (osPort != null) {
-                        eventExecutor.execute(() ->
-                                setPortBlockRules(instPort, !osPort.isAdminStateUp()));
-                    }
+                        if (osPort != null) {
+                            setPortBlockRules(instPort, false);
+                        }
+                    });
 
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
@@ -755,7 +773,14 @@
                                                         instPort.ipAddress());
 
                     InstancePort revisedInstPort = swapStaleLocation(instPort);
-                    eventExecutor.execute(() -> removeVportRules(revisedInstPort));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        removeVportRules(revisedInstPort);
+                    });
 
                     break;
                 default:
@@ -778,19 +803,16 @@
 
         @Override
         public boolean isRelevant(OpenstackNetworkEvent event) {
+            return event.subject() != null && event.port() != null;
+        }
 
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(OpenstackNetworkEvent event) {
 
-            if (event.subject() == null || event.port() == null) {
-                return;
-            }
-
             boolean isNwAdminStateUp = event.subject().isAdminStateUp();
             boolean isPortAdminStateUp = event.port().isAdminStateUp();
 
@@ -799,17 +821,33 @@
             switch (event.type()) {
                 case OPENSTACK_NETWORK_CREATED:
                 case OPENSTACK_NETWORK_UPDATED:
-                    eventExecutor.execute(() ->
-                            setNetworkBlockRules(event.subject(), !isNwAdminStateUp));
+                    eventExecutor.execute(() -> {
 
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setNetworkBlockRules(event.subject(), !isNwAdminStateUp);
+                    });
                     break;
                 case OPENSTACK_NETWORK_REMOVED:
-                    eventExecutor.execute(() ->
-                            setNetworkBlockRules(event.subject(), false));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        setNetworkBlockRules(event.subject(), false);
+                    });
                     break;
                 case OPENSTACK_PORT_CREATED:
                 case OPENSTACK_PORT_UPDATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         InstancePort instPort = instancePortService.instancePort(portId);
                         if (instPort != null) {
                             setPortBlockRules(instPort, !isPortAdminStateUp);
@@ -818,6 +856,11 @@
                     break;
                 case OPENSTACK_PORT_REMOVED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         InstancePort instPort = instancePortService.instancePort(portId);
                         if (instPort != null) {
                             setPortBlockRules(instPort, false);
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
index 8b190d0..4dd5ea7 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackSwitchingHostProvider.java
@@ -304,15 +304,11 @@
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            if (!mastershipService.isLocalMaster(event.subject().id())) {
-                // do not allow to proceed without mastership
-                return false;
-            }
-
             Port port = event.port();
             if (port == null) {
                 return false;
             }
+
             String portName = port.annotations().value(PORT_NAME);
 
             return !Strings.isNullOrEmpty(portName) &&
@@ -321,6 +317,10 @@
                             portName.startsWith(PORT_NAME_VHOST_USER_PREFIX_VM));
         }
 
+        private boolean isRelevantHelper(DeviceEvent event) {
+            return mastershipService.isLocalMaster(event.subject().id());
+        }
+
         private boolean isDirectPort(String portName) {
             return portNamePrefixMap().values().stream().anyMatch(portName::startsWith);
         }
@@ -330,17 +330,38 @@
             log.info("Device event occurred with type {}", event.type());
             switch (event.type()) {
                 case PORT_UPDATED:
-                    if (!event.port().isEnabled()) {
-                        executor.execute(() -> portRemovedHelper(event));
-                    } else if (event.port().isEnabled()) {
-                        executor.execute(() -> portAddedHelper(event));
-                    }
+                    executor.execute(() -> {
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        if (!event.port().isEnabled()) {
+                            portRemovedHelper(event);
+                        } else if (event.port().isEnabled()) {
+                            portAddedHelper(event);
+                        }
+                    });
+
                     break;
                 case PORT_ADDED:
-                    executor.execute(() -> portAddedHelper(event));
+                    executor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        portAddedHelper(event);
+                    });
                     break;
                 case PORT_REMOVED:
-                    executor.execute(() -> portRemovedHelper(event));
+                    executor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        portRemovedHelper(event);
+                    });
                     break;
                 default:
                     break;
@@ -352,10 +373,10 @@
 
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
+            return event.subject().type() != CONTROLLER;
+        }
 
-            if (event.subject().type() == CONTROLLER) {
-                return false;
-            }
+        private boolean isRelevantHelper(OpenstackNodeEvent event) {
             // do not allow to proceed without mastership
             Device device = deviceService.getDevice(event.subject().intgBridge());
             if (device == null) {
@@ -371,7 +392,14 @@
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
                     log.info("COMPLETE node {} is detected", osNode.hostname());
-                    executor.execute(() -> processCompleteNode(event.subject()));
+                    executor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        processCompleteNode(event.subject());
+                    });
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
                     log.warn("{} is changed to INCOMPLETE state", osNode);