Add precommit capability to openstack port service to resolve NPE

Change-Id: I91bdc24a3ba2b586b5503d35a8da4c6824c99765
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java
index 248061c..3581580 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java
@@ -29,6 +29,7 @@
 import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
 import org.onosproject.openstacknetworking.api.OpenstackNetworkStore;
 import org.onosproject.openstacknetworking.api.OpenstackNetworkStoreDelegate;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
@@ -68,6 +69,7 @@
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_NETWORK_REMOVED;
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_NETWORK_UPDATED;
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_CREATED;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_REMOVED;
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_SECURITY_GROUP_ADDED;
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_SECURITY_GROUP_REMOVED;
@@ -118,6 +120,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PreCommitPortService preCommitPortService;
+
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
 
@@ -255,6 +260,28 @@
 
     @Override
     public Port removePort(String portId) {
+
+        Port port = osPortStore.asJavaMap().get(portId);
+
+        if (port == null) {
+            return null;
+        }
+
+        eventExecutor.execute(() ->
+                notifyDelegate(new OpenstackNetworkEvent(
+                        OPENSTACK_PORT_PRE_REMOVE,
+                        network(port.getNetworkId()), port))
+        );
+
+        log.debug("Prepare OpenStack port remove");
+
+        while (true) {
+            if (preCommitPortService.subscriberCountByEventType(
+                    portId, OPENSTACK_PORT_PRE_REMOVE) == 0) {
+                break;
+            }
+        }
+
         Versioned<Port> osPort = osPortStore.remove(portId);
         return osPort == null ? null : osPort.value();
     }
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
index 39975dc..9e47a0c 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.openstacknetworking.impl;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
@@ -347,14 +346,14 @@
                         finalGws.remove(gateway);
                         osRouterService.floatingIps().forEach(fip -> {
                             if (fip.getPortId() != null) {
-                                setFloatingIpArpRule(fip, finalGws, false);
+                                setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
                                 finalGws.add(gateway);
                             }
                         });
                     }
                     osRouterService.floatingIps().forEach(fip -> {
                         if (fip.getPortId() != null) {
-                            setFloatingIpArpRule(fip, finalGws, true);
+                            setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
                         }
                     });
                 } else {
@@ -365,14 +364,14 @@
                     finalGws.add(gateway);
                     osRouterService.floatingIps().forEach(fip -> {
                         if (fip.getPortId() != null) {
-                            setFloatingIpArpRule(fip, finalGws, false);
+                            setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
                         }
                     });
                     finalGws.remove(gateway);
                     if (completedGws.size() >= 1) {
                         osRouterService.floatingIps().forEach(fip -> {
                             if (fip.getPortId() != null) {
-                                setFloatingIpArpRule(fip, finalGws, true);
+                                setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
                             }
                         });
                     }
@@ -417,10 +416,11 @@
      * without the helps from controller.
      *
      * @param fip       floating IP address
+     * @param portId    port identifier
      * @param gateways  a set of gateway nodes
      * @param install   flow rule installation flag
      */
-    private synchronized void setFloatingIpArpRule(NetFloatingIP fip,
+    private synchronized void setFloatingIpArpRule(NetFloatingIP fip, String portId,
                                                    Set<OpenstackNode> gateways,
                                                    boolean install) {
         if (ARP_BROADCAST_MODE.equals(getArpMode())) {
@@ -430,13 +430,7 @@
                 return;
             }
 
-            if (fip.getPortId() == null) {
-                log.trace("Unknown target ARP request for {}, ignore it",
-                                                    fip.getFloatingIpAddress());
-                return;
-            }
-
-            InstancePort instPort = instancePortService.instancePort(fip.getPortId());
+            InstancePort instPort = instancePortService.instancePort(portId);
             MacAddress targetMac = instPort.macAddress();
 
             OpenstackNode gw = getGwByInstancePort(gateways, instPort);
@@ -530,22 +524,16 @@
                     eventExecutor.execute(() -> {
                         NetFloatingIP osFip = event.floatingIp();
 
-                        if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
-                                instancePortService.instancePort(osFip.getPortId()) != null) {
-                            // associate a floating IP with an existing VM
-                            setFloatingIpArpRule(event.floatingIp(), completedGws, true);
-                        }
+                        // associate a floating IP with an existing VM
+                        setFloatingIpArpRule(event.floatingIp(), event.portId(), completedGws, true);
                     });
                     break;
                 case OPENSTACK_FLOATING_IP_DISASSOCIATED:
                     eventExecutor.execute(() -> {
                         NetFloatingIP osFip = event.floatingIp();
 
-                        if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
-                                instancePortService.instancePort(osFip.getPortId()) != null) {
-                            // associate a floating IP with an existing VM
-                            setFloatingIpArpRule(event.floatingIp(), completedGws, false);
-                        }
+                        // associate a floating IP with an existing VM
+                        setFloatingIpArpRule(event.floatingIp(), event.portId(), completedGws, false);
                     });
                     break;
                 case OPENSTACK_FLOATING_IP_CREATED:
@@ -555,10 +543,7 @@
                         // during floating IP creation, if the floating IP is
                         // associated with any port of VM, then we will set
                         // floating IP related ARP rules to gateway node
-                        if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
-                                instancePortService.instancePort(osFip.getPortId()) != null) {
-                            setFloatingIpArpRule(osFip, completedGws, true);
-                        }
+                        setFloatingIpArpRule(osFip, event.portId(), completedGws, true);
                     });
                     break;
                 case OPENSTACK_FLOATING_IP_REMOVED:
@@ -568,10 +553,7 @@
                         // during floating IP deletion, if the floating IP is
                         // still associated with any port of VM, then we will
                         // remove floating IP related ARP rules from gateway node
-                        if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
-                                instancePortService.instancePort(osFip.getPortId()) != null) {
-                            setFloatingIpArpRule(event.floatingIp(), completedGws, false);
-                        }
+                        setFloatingIpArpRule(event.floatingIp(), event.portId(), completedGws, false);
                     });
                     break;
                 default:
@@ -662,7 +644,7 @@
                     osRouterService.floatingIps().stream()
                             .filter(f -> f.getPortId() != null)
                             .filter(f -> f.getPortId().equals(instPort.portId()))
-                            .forEach(f -> setFloatingIpArpRule(f, gateways, true));
+                            .forEach(f -> setFloatingIpArpRule(f, instPort.portId(), gateways, true));
 
                     break;
                 case OPENSTACK_INSTANCE_MIGRATION_STARTED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
index 172dd78..73e9f19 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
@@ -17,7 +17,6 @@
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -53,6 +52,7 @@
 import org.onosproject.openstacknetworking.api.OpenstackRouterAdminService;
 import org.onosproject.openstacknetworking.api.OpenstackRouterEvent;
 import org.onosproject.openstacknetworking.api.OpenstackRouterListener;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
 import org.onosproject.openstacknode.api.OpenstackNode;
 import org.onosproject.openstacknode.api.OpenstackNodeEvent;
 import org.onosproject.openstacknode.api.OpenstackNodeListener;
@@ -69,7 +69,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -82,10 +81,9 @@
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_FLOATING_EXTERNAL;
 import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_FLOATING_INTERNAL;
 import static org.onosproject.openstacknetworking.api.Constants.ROUTING_TABLE;
-import static org.onosproject.openstacknetworking.api.InstancePort.State.PENDING_REMOVAL;
-import static org.onosproject.openstacknetworking.api.InstancePort.State.REMOVED;
 import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_MIGRATION_ENDED;
 import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_MIGRATION_STARTED;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.associatedFloatingIp;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
 import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.isAssociatedWithVM;
@@ -132,6 +130,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackFlowRuleService osFlowRuleService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PreCommitPortService preCommitPortService;
+
     private final ExecutorService eventExecutor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
     private final OpenstackRouterListener floatingIpListener = new InternalFloatingIpListener();
@@ -140,8 +141,6 @@
     private final OpenstackNetworkListener osNetworkListener = new InternalOpenstackNetworkListener();
     private final InstancePortListener instPortListener = new InternalInstancePortListener();
 
-    private Map<String, Port> terminatedOsPorts = Maps.newConcurrentMap();
-
     private ApplicationId appId;
     private NodeId localNodeId;
 
@@ -176,6 +175,7 @@
     private void setFloatingIpRules(NetFloatingIP floatingIp, Port osPort,
                                     OpenstackNode gateway, boolean install) {
         Network osNet = osNetworkService.network(osPort.getNetworkId());
+
         if (osNet == null) {
             final String errorFormat = ERR_FLOW + "no network(%s) exists";
             final String error = String.format(errorFormat,
@@ -195,16 +195,22 @@
             throw new IllegalStateException(error);
         }
 
-        if (instPort.state() == PENDING_REMOVAL) {
-            instancePortService.updateInstancePort(instPort.updateState(REMOVED));
-        }
-
         ExternalPeerRouter externalPeerRouter = externalPeerRouter(osNet);
         if (externalPeerRouter == null) {
             final String errorFormat = ERR_FLOW + "no external peer router found";
             throw new IllegalStateException(errorFormat);
         }
 
+        if (install) {
+            preCommitPortService.subscribePreCommit(osPort.getId(),
+                    OPENSTACK_PORT_PRE_REMOVE, this.getClass().getName());
+            log.info("Subscribed the port pre-remove event");
+        } else {
+            preCommitPortService.unsubscribePreCommit(osPort.getId(),
+                    OPENSTACK_PORT_PRE_REMOVE, this.getClass().getName());
+            log.info("Unsubscribed the port pre-remove event");
+        }
+
         updateComputeNodeRules(instPort, osNet, gateway, install);
         updateGatewayNodeRules(floatingIp, instPort, osNet, externalPeerRouter, gateway, install);
 
@@ -214,6 +220,7 @@
 
         // TODO: need to refactor setUpstreamRules if possible
         setUpstreamRules(floatingIp, osNet, instPort, externalPeerRouter, install);
+
         log.trace("Succeeded to set flow rules for floating ip {}:{} and install: {}",
                 floatingIp.getFloatingIpAddress(),
                 floatingIp.getFixedIpAddress(),
@@ -615,10 +622,6 @@
 
     private void disassociateFloatingIp(NetFloatingIP osFip, String portId) {
         Port osPort = osNetworkService.port(portId);
-        if (osPort == null) {
-            osPort = terminatedOsPorts.get(portId);
-            terminatedOsPorts.remove(portId);
-        }
 
         if (osPort == null) {
             final String errorFormat = ERR_FLOW + "port(%s) not found";
@@ -832,8 +835,6 @@
                     if (instPort != null && instPort.portId() != null) {
                         String portId = instPort.portId();
 
-                        terminatedOsPorts.remove(portId);
-
                         Port port = osNetworkService.port(portId);
 
                         osRouterAdminService.floatingIps().stream()
@@ -950,26 +951,21 @@
 
         @Override
         public void event(OpenstackNetworkEvent event) {
+            String portId;
+
             switch (event.type()) {
-                case OPENSTACK_PORT_REMOVED:
-                    String portId = event.port().getId();
-                    terminatedOsPorts.put(portId, event.port());
+                case OPENSTACK_PORT_PRE_REMOVE:
+                    portId = event.port().getId();
 
                     InstancePort instPort = instancePortService.instancePort(portId);
-                    InstancePort updated = instPort.updateState(PENDING_REMOVAL);
-                    instancePortService.updateInstancePort(updated);
-
                     updateFipStore(instPort);
 
-                    // we will hold the instance port in its store, until its
-                    // state is changed to REMOVED
-                    while (true) {
-                        if (instancePortService.instancePort(portId).state() ==
-                                REMOVED) {
-                            instancePortService.removeInstancePort(portId);
-                            break;
-                        }
-                    }
+                    break;
+                case OPENSTACK_PORT_REMOVED:
+                    portId = event.port().getId();
+
+                    instancePortService.removeInstancePort(portId);
+
                     break;
                 default:
                     break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java
new file mode 100644
index 0000000..a3aa0f4
--- /dev/null
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknetworking.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of pre-commit service.
+ */
+@Service
+@Component(immediate = true)
+public class PreCommitPortManager implements PreCommitPortService {
+
+    protected final Logger log = getLogger(getClass());
+
+    private Map<String, Map<Type, Set<String>>> store = Maps.newConcurrentMap();
+
+    @Activate
+    protected void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public void subscribePreCommit(String portId, Type eventType, String className) {
+        store.computeIfAbsent(portId, s -> Maps.newConcurrentMap());
+
+        store.compute(portId, (k, v) -> {
+
+            if (className == null || className.isEmpty()) {
+                return null;
+            }
+
+            Objects.requireNonNull(v).computeIfAbsent(eventType,
+                                     s -> Sets.newConcurrentHashSet());
+
+
+            Objects.requireNonNull(v).compute(eventType, (i, j) -> {
+                Objects.requireNonNull(j).add(className);
+                return j;
+            });
+
+            return v;
+        });
+    }
+
+    @Override
+    public void unsubscribePreCommit(String portId, Type eventType, String className) {
+
+        store.computeIfPresent(portId, (k, v) -> {
+
+            if (className == null || className.isEmpty()) {
+                return null;
+            }
+
+            Objects.requireNonNull(v).computeIfPresent(eventType, (i, j) -> {
+                Objects.requireNonNull(j).remove(className);
+                return j;
+            });
+
+            return v;
+        });
+    }
+
+    @Override
+    public int subscriberCountByEventType(String portId, Type eventType) {
+
+        Map<Type, Set<String>> typeMap = store.get(portId);
+
+        if (typeMap == null || typeMap.isEmpty()) {
+            return 0;
+        }
+
+        if (typeMap.get(eventType) == null || typeMap.get(eventType).isEmpty()) {
+            return 0;
+        }
+
+        return typeMap.get(eventType).size();
+    }
+
+    @Override
+    public int subscriberCount(String portId) {
+
+        Map<Type, Set<String>> typeMap = store.get(portId);
+
+        if (typeMap == null || typeMap.isEmpty()) {
+            return 0;
+        }
+
+        return typeMap.values().stream()
+                .filter(Objects::nonNull)
+                .map(Set::size)
+                .reduce(0, Integer::sum);
+    }
+}