Fix: do not query the distributed store inside main thread

Change-Id: I883030cb661b619f82bd554c21a48bb39f3fbbe6
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
index f04e14a..dc7c877 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
@@ -18,11 +18,6 @@
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
@@ -62,6 +57,11 @@
 import org.openstack4j.model.network.NetworkType;
 import org.openstack4j.model.network.Port;
 import org.openstack4j.openstack.networking.domain.NeutronFloatingIP;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -545,19 +545,23 @@
 
         @Override
         public boolean isRelevant(OpenstackRouterEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leader)) {
-                return false;
-            }
             return event.floatingIp() != null;
         }
 
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
         @Override
         public void event(OpenstackRouterEvent event) {
             switch (event.type()) {
                 case OPENSTACK_FLOATING_IP_ASSOCIATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         if (instancePortService.instancePort(osFip.getPortId()) != null) {
                             associateFloatingIp(osFip);
@@ -569,6 +573,11 @@
                     break;
                 case OPENSTACK_FLOATING_IP_DISASSOCIATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         if (instancePortService.instancePort(event.portId()) != null) {
                             disassociateFloatingIp(osFip, event.portId());
@@ -580,6 +589,11 @@
                     break;
                 case OPENSTACK_FLOATING_IP_CREATED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         String portId = osFip.getPortId();
                         if (!Strings.isNullOrEmpty(portId) &&
@@ -591,6 +605,11 @@
                     break;
                 case OPENSTACK_FLOATING_IP_REMOVED:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         NetFloatingIP osFip = event.floatingIp();
                         String portId = osFip.getPortId();
                         if (!Strings.isNullOrEmpty(osFip.getPortId())) {
@@ -630,20 +649,24 @@
 
         @Override
         public boolean isRelevant(OpenstackNodeEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            if (!Objects.equals(localNodeId, leader)) {
-                return false;
-            }
             return event.subject().type() == GATEWAY;
         }
 
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
         @Override
         public void event(OpenstackNodeEvent event) {
 
             switch (event.type()) {
                 case OPENSTACK_NODE_COMPLETE:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         for (NetFloatingIP fip : osRouterAdminService.floatingIps()) {
 
                             if (Strings.isNullOrEmpty(fip.getPortId())) {
@@ -664,6 +687,11 @@
                     break;
                 case OPENSTACK_NODE_INCOMPLETE:
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
                         for (NetFloatingIP fip : osRouterAdminService.floatingIps()) {
                             if (Strings.isNullOrEmpty(fip.getPortId())) {
                                 continue;
@@ -714,8 +742,7 @@
 
     private class InternalInstancePortListener implements InstancePortListener {
 
-        @Override
-        public boolean isRelevant(InstancePortEvent event) {
+        private boolean isRelevantHelper(InstancePortEvent event) {
 
             if (event.type() == OPENSTACK_INSTANCE_MIGRATION_ENDED ||
                     event.type() == OPENSTACK_INSTANCE_MIGRATION_STARTED) {
@@ -730,27 +757,22 @@
                 }
             }
 
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-
-            return Objects.equals(localNodeId, leader);
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(InstancePortEvent event) {
             InstancePort instPort = event.subject();
-            Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
-
-            Set<NetFloatingIP> ips = osRouterAdminService.floatingIps();
-            NetFloatingIP fip;
-            Port osPort;
-            Network osNet;
-            ExternalPeerRouter externalPeerRouter;
 
             switch (event.type()) {
                 case OPENSTACK_INSTANCE_PORT_DETECTED:
 
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
                         if (instPort != null && instPort.portId() != null) {
                             osRouterAdminService.floatingIps().stream()
                                     .filter(f -> f.getPortId() != null)
@@ -762,24 +784,31 @@
                     break;
 
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
-                    fip = associatedFloatingIp(event.subject(), ips);
-
-                    if (fip == null) {
-                        return;
-                    }
-
-                    osPort = osNetworkService.port(fip.getPortId());
-                    osNet = osNetworkService.network(osPort.getNetworkId());
-                    externalPeerRouter = externalPeerRouterForNetwork(osNet,
-                                         osNetworkService, osRouterAdminService);
-
-                    if (externalPeerRouter == null) {
-                        final String errorFormat = ERR_FLOW + "no external peer router found";
-                        throw new IllegalStateException(errorFormat);
-                    }
 
                     eventExecutor.execute(() -> {
 
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
+                        Set<NetFloatingIP> ips = osRouterAdminService.floatingIps();
+                        NetFloatingIP fip = associatedFloatingIp(event.subject(), ips);
+
+                        if (fip == null) {
+                            return;
+                        }
+
+                        Port osPort = osNetworkService.port(fip.getPortId());
+                        Network osNet = osNetworkService.network(osPort.getNetworkId());
+                        ExternalPeerRouter externalPeerRouter = externalPeerRouterForNetwork(osNet,
+                                osNetworkService, osRouterAdminService);
+
+                        if (externalPeerRouter == null) {
+                            final String errorFormat = ERR_FLOW + "no external peer router found";
+                            throw new IllegalStateException(errorFormat);
+                        }
+
                         // since DownstreamExternal rules should only be placed in
                         // corresponding gateway node, we need to install new rule to
                         // the corresponding gateway node
@@ -794,52 +823,57 @@
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_ENDED:
 
-                    InstancePort oldInstPort = swapStaleLocation(event.subject());
-
-                    fip = associatedFloatingIp(oldInstPort, ips);
-
-                    if (fip == null) {
-                        return;
-                    }
-
-                    osPort = osNetworkService.port(fip.getPortId());
-                    osNet = osNetworkService.network(osPort.getNetworkId());
-                    externalPeerRouter = externalPeerRouterForNetwork(osNet,
-                                         osNetworkService, osRouterAdminService);
-
-                    if (externalPeerRouter == null) {
-                        final String errorFormat = ERR_FLOW + "no external peer router found";
-                        throw new IllegalStateException(errorFormat);
-                    }
-
                     eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper(event)) {
+                            return;
+                        }
+
+                        InstancePort oldInstPort = swapStaleLocation(event.subject());
+
+                        Set<NetFloatingIP> ips = osRouterAdminService.floatingIps();
+                        NetFloatingIP fip = associatedFloatingIp(oldInstPort, ips);
+
+                        if (fip == null) {
+                            return;
+                        }
+
+                        Set<OpenstackNode> gateways = osNodeService.completeNodes(GATEWAY);
+                        Port osPort = osNetworkService.port(fip.getPortId());
+                        Network osNet = osNetworkService.network(osPort.getNetworkId());
+                        ExternalPeerRouter externalPeerRouter = externalPeerRouterForNetwork(osNet,
+                                osNetworkService, osRouterAdminService);
+
+                        if (externalPeerRouter == null) {
+                            final String errorFormat = ERR_FLOW + "no external peer router found";
+                            throw new IllegalStateException(errorFormat);
+                        }
+
                         // We need to remove the old ComputeNodeToGateway rules from
                         // original compute node
                         setComputeNodeToGatewayHelper(oldInstPort, osNet, gateways, false);
-                    });
 
                         // If we only have one gateway, we simply do not remove any
-                    // flow rules from either gateway or compute node
-                    if (gateways.size() == 1) {
-                        return;
-                    }
+                        // flow rules from either gateway or compute node
+                        if (gateways.size() == 1) {
+                            return;
+                        }
 
-                    // Checks whether the destination compute node's device id
-                    // has identical gateway hash or not
-                    // if it is true, we simply do not remove the rules, as
-                    // it has been overwritten at port detention event
-                    // if it is false, we will remove the rules
-                    DeviceId newDeviceId = event.subject().deviceId();
-                    DeviceId oldDeviceId = oldInstPort.deviceId();
+                        // Checks whether the destination compute node's device id
+                        // has identical gateway hash or not
+                        // if it is true, we simply do not remove the rules, as
+                        // it has been overwritten at port detention event
+                        // if it is false, we will remove the rules
+                        DeviceId newDeviceId = event.subject().deviceId();
+                        DeviceId oldDeviceId = oldInstPort.deviceId();
 
-                    OpenstackNode oldGateway = getGwByComputeDevId(gateways, oldDeviceId);
-                    OpenstackNode newGateway = getGwByComputeDevId(gateways, newDeviceId);
+                        OpenstackNode oldGateway = getGwByComputeDevId(gateways, oldDeviceId);
+                        OpenstackNode newGateway = getGwByComputeDevId(gateways, newDeviceId);
 
-                    if (oldGateway != null && oldGateway.equals(newGateway)) {
-                        return;
-                    }
+                        if (oldGateway != null && oldGateway.equals(newGateway)) {
+                            return;
+                        }
 
-                    eventExecutor.execute(() -> {
                         // Since DownstreamExternal rules should only be placed in
                         // corresponding gateway node, we need to remove old rule from
                         // the corresponding gateway node
@@ -855,18 +889,22 @@
 
     private class InternalOpenstackNetworkListener implements OpenstackNetworkListener {
 
-        @Override
-        public boolean isRelevant(OpenstackNetworkEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
         }
 
         @Override
         public void event(OpenstackNetworkEvent event) {
             switch (event.type()) {
                 case OPENSTACK_PORT_PRE_REMOVE:
-                    eventExecutor.execute(() -> processPortPreRemove(event));
+                    eventExecutor.execute(() -> {
+
+                        if (!isRelevantHelper()) {
+                            return;
+                        }
+
+                        processPortPreRemove(event);
+                    });
                     break;
                 default:
                     break;