Add precommit capability to openstack port service to resolve NPE
Change-Id: I91bdc24a3ba2b586b5503d35a8da4c6824c99765
diff --git a/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/InstancePort.java b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/InstancePort.java
index 83c0741..5c44c96 100644
--- a/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/InstancePort.java
+++ b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/InstancePort.java
@@ -42,11 +42,6 @@
INACTIVE,
/**
- * Signifies that the given instance port is in pending removal state.
- */
- PENDING_REMOVAL,
-
- /**
* Signifies that the given instance port is in migrating state.
*/
MIGRATING,
diff --git a/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/OpenstackNetworkEvent.java b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/OpenstackNetworkEvent.java
index 4f4892f..88b61be 100644
--- a/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/OpenstackNetworkEvent.java
+++ b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/OpenstackNetworkEvent.java
@@ -69,11 +69,21 @@
OPENSTACK_PORT_CREATED,
/**
+ * Signifies that the OpenStack port will be updated.
+ */
+ OPENSTACK_PORT_PRE_UPDATE,
+
+ /**
* Signifies that the OpenStack port is updated.
*/
OPENSTACK_PORT_UPDATED,
/**
+ * Signifies that the OpenStack port will be removed.
+ */
+ OPENSTACK_PORT_PRE_REMOVE,
+
+ /**
* Signifies that the OpenStack port is removed.
*/
OPENSTACK_PORT_REMOVED,
diff --git a/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/PreCommitPortService.java b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/PreCommitPortService.java
new file mode 100644
index 0000000..bc06e17
--- /dev/null
+++ b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/PreCommitPortService.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknetworking.api;
+
+import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type;
+
+/**
+ * Handles port precommit request.
+ */
+public interface PreCommitPortService extends PreCommitService<String, Type> {
+}
diff --git a/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/PreCommitService.java b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/PreCommitService.java
new file mode 100644
index 0000000..4b94997
--- /dev/null
+++ b/apps/openstacknetworking/api/src/main/java/org/onosproject/openstacknetworking/api/PreCommitService.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknetworking.api;
+
+/**
+ * Handles precommit request.
+ */
+public interface PreCommitService<T, E> {
+
+ /**
+ * Subscribes pre-update event for the given subject inside the given class.
+ *
+ * @param subject subject to subscribe
+ * @param eventType event type (update or remove)
+ * @param className target class name
+ */
+ void subscribePreCommit(T subject, E eventType, String className);
+
+ /**
+ * Unsubscribes pre-update event for the given subject inside the given class.
+ *
+ * @param subject subject to unsubscribe
+ * @param eventType event type (update or remove)
+ * @param className target class name
+ */
+ void unsubscribePreCommit(T subject, E eventType, String className);
+
+ /**
+ * Obtains the count value of subscribers for the given subject and event type.
+ *
+ * @param subject subject to subscribe
+ * @param eventType event type (update or remove)
+ * @return subscriber count
+ */
+ int subscriberCountByEventType(T subject, E eventType);
+
+ /**
+ * Obtains the count value of subscribers for the given subject.
+ *
+ * @param subject subject to subscribe
+ * @return subscriber count
+ */
+ int subscriberCount(T subject);
+}
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java
index 248061c..3581580 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/DistributedOpenstackNetworkStore.java
@@ -29,6 +29,7 @@
import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
import org.onosproject.openstacknetworking.api.OpenstackNetworkStore;
import org.onosproject.openstacknetworking.api.OpenstackNetworkStoreDelegate;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
@@ -68,6 +69,7 @@
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_NETWORK_REMOVED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_NETWORK_UPDATED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_CREATED;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_REMOVED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_SECURITY_GROUP_ADDED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_SECURITY_GROUP_REMOVED;
@@ -118,6 +120,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PreCommitPortService preCommitPortService;
+
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
@@ -255,6 +260,28 @@
@Override
public Port removePort(String portId) {
+
+ Port port = osPortStore.asJavaMap().get(portId);
+
+ if (port == null) {
+ return null;
+ }
+
+ eventExecutor.execute(() ->
+ notifyDelegate(new OpenstackNetworkEvent(
+ OPENSTACK_PORT_PRE_REMOVE,
+ network(port.getNetworkId()), port))
+ );
+
+ log.debug("Prepare OpenStack port remove");
+
+ while (true) {
+ if (preCommitPortService.subscriberCountByEventType(
+ portId, OPENSTACK_PORT_PRE_REMOVE) == 0) {
+ break;
+ }
+ }
+
Versioned<Port> osPort = osPortStore.remove(portId);
return osPort == null ? null : osPort.value();
}
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
index 39975dc..9e47a0c 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingArpHandler.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.openstacknetworking.impl;
-import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
@@ -347,14 +346,14 @@
finalGws.remove(gateway);
osRouterService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, finalGws, false);
+ setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
finalGws.add(gateway);
}
});
}
osRouterService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, finalGws, true);
+ setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
}
});
} else {
@@ -365,14 +364,14 @@
finalGws.add(gateway);
osRouterService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, finalGws, false);
+ setFloatingIpArpRule(fip, fip.getPortId(), finalGws, false);
}
});
finalGws.remove(gateway);
if (completedGws.size() >= 1) {
osRouterService.floatingIps().forEach(fip -> {
if (fip.getPortId() != null) {
- setFloatingIpArpRule(fip, finalGws, true);
+ setFloatingIpArpRule(fip, fip.getPortId(), finalGws, true);
}
});
}
@@ -417,10 +416,11 @@
* without the helps from controller.
*
* @param fip floating IP address
+ * @param portId port identifier
* @param gateways a set of gateway nodes
* @param install flow rule installation flag
*/
- private synchronized void setFloatingIpArpRule(NetFloatingIP fip,
+ private synchronized void setFloatingIpArpRule(NetFloatingIP fip, String portId,
Set<OpenstackNode> gateways,
boolean install) {
if (ARP_BROADCAST_MODE.equals(getArpMode())) {
@@ -430,13 +430,7 @@
return;
}
- if (fip.getPortId() == null) {
- log.trace("Unknown target ARP request for {}, ignore it",
- fip.getFloatingIpAddress());
- return;
- }
-
- InstancePort instPort = instancePortService.instancePort(fip.getPortId());
+ InstancePort instPort = instancePortService.instancePort(portId);
MacAddress targetMac = instPort.macAddress();
OpenstackNode gw = getGwByInstancePort(gateways, instPort);
@@ -530,22 +524,16 @@
eventExecutor.execute(() -> {
NetFloatingIP osFip = event.floatingIp();
- if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
- instancePortService.instancePort(osFip.getPortId()) != null) {
- // associate a floating IP with an existing VM
- setFloatingIpArpRule(event.floatingIp(), completedGws, true);
- }
+ // associate a floating IP with an existing VM
+ setFloatingIpArpRule(event.floatingIp(), event.portId(), completedGws, true);
});
break;
case OPENSTACK_FLOATING_IP_DISASSOCIATED:
eventExecutor.execute(() -> {
NetFloatingIP osFip = event.floatingIp();
- if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
- instancePortService.instancePort(osFip.getPortId()) != null) {
- // associate a floating IP with an existing VM
- setFloatingIpArpRule(event.floatingIp(), completedGws, false);
- }
+ // associate a floating IP with an existing VM
+ setFloatingIpArpRule(event.floatingIp(), event.portId(), completedGws, false);
});
break;
case OPENSTACK_FLOATING_IP_CREATED:
@@ -555,10 +543,7 @@
// during floating IP creation, if the floating IP is
// associated with any port of VM, then we will set
// floating IP related ARP rules to gateway node
- if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
- instancePortService.instancePort(osFip.getPortId()) != null) {
- setFloatingIpArpRule(osFip, completedGws, true);
- }
+ setFloatingIpArpRule(osFip, event.portId(), completedGws, true);
});
break;
case OPENSTACK_FLOATING_IP_REMOVED:
@@ -568,10 +553,7 @@
// during floating IP deletion, if the floating IP is
// still associated with any port of VM, then we will
// remove floating IP related ARP rules from gateway node
- if (!Strings.isNullOrEmpty(osFip.getPortId()) &&
- instancePortService.instancePort(osFip.getPortId()) != null) {
- setFloatingIpArpRule(event.floatingIp(), completedGws, false);
- }
+ setFloatingIpArpRule(event.floatingIp(), event.portId(), completedGws, false);
});
break;
default:
@@ -662,7 +644,7 @@
osRouterService.floatingIps().stream()
.filter(f -> f.getPortId() != null)
.filter(f -> f.getPortId().equals(instPort.portId()))
- .forEach(f -> setFloatingIpArpRule(f, gateways, true));
+ .forEach(f -> setFloatingIpArpRule(f, instPort.portId(), gateways, true));
break;
case OPENSTACK_INSTANCE_MIGRATION_STARTED:
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
index 172dd78..73e9f19 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/OpenstackRoutingFloatingIpHandler.java
@@ -17,7 +17,6 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -53,6 +52,7 @@
import org.onosproject.openstacknetworking.api.OpenstackRouterAdminService;
import org.onosproject.openstacknetworking.api.OpenstackRouterEvent;
import org.onosproject.openstacknetworking.api.OpenstackRouterListener;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeEvent;
import org.onosproject.openstacknode.api.OpenstackNodeListener;
@@ -69,7 +69,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -82,10 +81,9 @@
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_FLOATING_EXTERNAL;
import static org.onosproject.openstacknetworking.api.Constants.PRIORITY_FLOATING_INTERNAL;
import static org.onosproject.openstacknetworking.api.Constants.ROUTING_TABLE;
-import static org.onosproject.openstacknetworking.api.InstancePort.State.PENDING_REMOVAL;
-import static org.onosproject.openstacknetworking.api.InstancePort.State.REMOVED;
import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_MIGRATION_ENDED;
import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_MIGRATION_STARTED;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.associatedFloatingIp;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.getGwByComputeDevId;
import static org.onosproject.openstacknetworking.util.OpenstackNetworkingUtil.isAssociatedWithVM;
@@ -132,6 +130,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackFlowRuleService osFlowRuleService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PreCommitPortService preCommitPortService;
+
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final OpenstackRouterListener floatingIpListener = new InternalFloatingIpListener();
@@ -140,8 +141,6 @@
private final OpenstackNetworkListener osNetworkListener = new InternalOpenstackNetworkListener();
private final InstancePortListener instPortListener = new InternalInstancePortListener();
- private Map<String, Port> terminatedOsPorts = Maps.newConcurrentMap();
-
private ApplicationId appId;
private NodeId localNodeId;
@@ -176,6 +175,7 @@
private void setFloatingIpRules(NetFloatingIP floatingIp, Port osPort,
OpenstackNode gateway, boolean install) {
Network osNet = osNetworkService.network(osPort.getNetworkId());
+
if (osNet == null) {
final String errorFormat = ERR_FLOW + "no network(%s) exists";
final String error = String.format(errorFormat,
@@ -195,16 +195,22 @@
throw new IllegalStateException(error);
}
- if (instPort.state() == PENDING_REMOVAL) {
- instancePortService.updateInstancePort(instPort.updateState(REMOVED));
- }
-
ExternalPeerRouter externalPeerRouter = externalPeerRouter(osNet);
if (externalPeerRouter == null) {
final String errorFormat = ERR_FLOW + "no external peer router found";
throw new IllegalStateException(errorFormat);
}
+ if (install) {
+ preCommitPortService.subscribePreCommit(osPort.getId(),
+ OPENSTACK_PORT_PRE_REMOVE, this.getClass().getName());
+ log.info("Subscribed the port pre-remove event");
+ } else {
+ preCommitPortService.unsubscribePreCommit(osPort.getId(),
+ OPENSTACK_PORT_PRE_REMOVE, this.getClass().getName());
+ log.info("Unsubscribed the port pre-remove event");
+ }
+
updateComputeNodeRules(instPort, osNet, gateway, install);
updateGatewayNodeRules(floatingIp, instPort, osNet, externalPeerRouter, gateway, install);
@@ -214,6 +220,7 @@
// TODO: need to refactor setUpstreamRules if possible
setUpstreamRules(floatingIp, osNet, instPort, externalPeerRouter, install);
+
log.trace("Succeeded to set flow rules for floating ip {}:{} and install: {}",
floatingIp.getFloatingIpAddress(),
floatingIp.getFixedIpAddress(),
@@ -615,10 +622,6 @@
private void disassociateFloatingIp(NetFloatingIP osFip, String portId) {
Port osPort = osNetworkService.port(portId);
- if (osPort == null) {
- osPort = terminatedOsPorts.get(portId);
- terminatedOsPorts.remove(portId);
- }
if (osPort == null) {
final String errorFormat = ERR_FLOW + "port(%s) not found";
@@ -832,8 +835,6 @@
if (instPort != null && instPort.portId() != null) {
String portId = instPort.portId();
- terminatedOsPorts.remove(portId);
-
Port port = osNetworkService.port(portId);
osRouterAdminService.floatingIps().stream()
@@ -950,26 +951,21 @@
@Override
public void event(OpenstackNetworkEvent event) {
+ String portId;
+
switch (event.type()) {
- case OPENSTACK_PORT_REMOVED:
- String portId = event.port().getId();
- terminatedOsPorts.put(portId, event.port());
+ case OPENSTACK_PORT_PRE_REMOVE:
+ portId = event.port().getId();
InstancePort instPort = instancePortService.instancePort(portId);
- InstancePort updated = instPort.updateState(PENDING_REMOVAL);
- instancePortService.updateInstancePort(updated);
-
updateFipStore(instPort);
- // we will hold the instance port in its store, until its
- // state is changed to REMOVED
- while (true) {
- if (instancePortService.instancePort(portId).state() ==
- REMOVED) {
- instancePortService.removeInstancePort(portId);
- break;
- }
- }
+ break;
+ case OPENSTACK_PORT_REMOVED:
+ portId = event.port().getId();
+
+ instancePortService.removeInstancePort(portId);
+
break;
default:
break;
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java
new file mode 100644
index 0000000..a3aa0f4
--- /dev/null
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknetworking.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of pre-commit service.
+ */
+@Service
+@Component(immediate = true)
+public class PreCommitPortManager implements PreCommitPortService {
+
+ protected final Logger log = getLogger(getClass());
+
+ private Map<String, Map<Type, Set<String>>> store = Maps.newConcurrentMap();
+
+ @Activate
+ protected void activate() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public void subscribePreCommit(String portId, Type eventType, String className) {
+ store.computeIfAbsent(portId, s -> Maps.newConcurrentMap());
+
+ store.compute(portId, (k, v) -> {
+
+ if (className == null || className.isEmpty()) {
+ return null;
+ }
+
+ Objects.requireNonNull(v).computeIfAbsent(eventType,
+ s -> Sets.newConcurrentHashSet());
+
+
+ Objects.requireNonNull(v).compute(eventType, (i, j) -> {
+ Objects.requireNonNull(j).add(className);
+ return j;
+ });
+
+ return v;
+ });
+ }
+
+ @Override
+ public void unsubscribePreCommit(String portId, Type eventType, String className) {
+
+ store.computeIfPresent(portId, (k, v) -> {
+
+ if (className == null || className.isEmpty()) {
+ return null;
+ }
+
+ Objects.requireNonNull(v).computeIfPresent(eventType, (i, j) -> {
+ Objects.requireNonNull(j).remove(className);
+ return j;
+ });
+
+ return v;
+ });
+ }
+
+ @Override
+ public int subscriberCountByEventType(String portId, Type eventType) {
+
+ Map<Type, Set<String>> typeMap = store.get(portId);
+
+ if (typeMap == null || typeMap.isEmpty()) {
+ return 0;
+ }
+
+ if (typeMap.get(eventType) == null || typeMap.get(eventType).isEmpty()) {
+ return 0;
+ }
+
+ return typeMap.get(eventType).size();
+ }
+
+ @Override
+ public int subscriberCount(String portId) {
+
+ Map<Type, Set<String>> typeMap = store.get(portId);
+
+ if (typeMap == null || typeMap.isEmpty()) {
+ return 0;
+ }
+
+ return typeMap.values().stream()
+ .filter(Objects::nonNull)
+ .map(Set::size)
+ .reduce(0, Integer::sum);
+ }
+}
diff --git a/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackNetworkManagerTest.java b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackNetworkManagerTest.java
index 8104c65..a0b6af6 100644
--- a/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackNetworkManagerTest.java
+++ b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/OpenstackNetworkManagerTest.java
@@ -32,7 +32,9 @@
import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.packet.PacketServiceAdapter;
import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type;
import org.onosproject.openstacknetworking.api.OpenstackNetworkListener;
+import org.onosproject.openstacknetworking.api.PreCommitPortService;
import org.onosproject.openstacknode.api.OpenstackNode;
import org.onosproject.openstacknode.api.OpenstackNodeAdminService;
import org.onosproject.openstacknode.api.OpenstackNodeListener;
@@ -57,6 +59,7 @@
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_NETWORK_REMOVED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_NETWORK_UPDATED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_CREATED;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_REMOVED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_UPDATED;
import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_SUBNET_CREATED;
@@ -120,6 +123,7 @@
osNetworkStore = new DistributedOpenstackNetworkStore();
TestUtils.setField(osNetworkStore, "coreService", new TestCoreService());
TestUtils.setField(osNetworkStore, "storageService", new TestStorageService());
+ TestUtils.setField(osNetworkStore, "preCommitPortService", new TestPreCommitPortService());
TestUtils.setField(osNetworkStore, "eventExecutor", MoreExecutors.newDirectExecutorService());
osNetworkStore.activate();
@@ -476,7 +480,7 @@
assertEquals("Number of port did not match", 0, target.ports().size());
assertTrue("Port was not created", target.port(PORT_ID) == null);
- validateEvents(OPENSTACK_PORT_CREATED, OPENSTACK_PORT_REMOVED);
+ validateEvents(OPENSTACK_PORT_CREATED, OPENSTACK_PORT_PRE_REMOVE, OPENSTACK_PORT_REMOVED);
}
/**
@@ -665,6 +669,29 @@
}
}
+ private static class TestPreCommitPortService implements PreCommitPortService {
+
+ @Override
+ public void subscribePreCommit(String subject, Type eventType, String className) {
+
+ }
+
+ @Override
+ public void unsubscribePreCommit(String subject, Type eventType, String className) {
+
+ }
+
+ @Override
+ public int subscriberCountByEventType(String subject, Type eventType) {
+ return 0;
+ }
+
+ @Override
+ public int subscriberCount(String subject) {
+ return 0;
+ }
+ }
+
private static class TestOpenstackNetworkListener implements OpenstackNetworkListener {
private List<OpenstackNetworkEvent> events = Lists.newArrayList();
diff --git a/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/PreCommitPortManagerTest.java b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/PreCommitPortManagerTest.java
new file mode 100644
index 0000000..b6ba3f9
--- /dev/null
+++ b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/PreCommitPortManagerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacknetworking.impl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
+import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_UPDATE;
+
+/**
+ * Unit tests for pre-commit port manager.
+ */
+public class PreCommitPortManagerTest {
+
+ private static final String PORT_ID_1 = "port-1";
+ private static final String PORT_ID_2 = "port-2";
+
+ private static final String CLASS_NAME_1 = "class-1";
+ private static final String CLASS_NAME_2 = "class-2";
+
+ private PreCommitPortManager target;
+
+ /**
+ * Initializes this unit test.
+ */
+ @Before
+ public void setUp() {
+ target = new PreCommitPortManager();
+ target.activate();
+ }
+
+ /**
+ * Tears down this unit test.
+ */
+ @After
+ public void tearDown() {
+ target.deactivate();
+ }
+
+ /**
+ * Tests subscribe pre-commit method.
+ */
+ @Test
+ public void testSubscribePreCommit() {
+
+ sampleSubscribe();
+
+ assertEquals(1, target.subscriberCountByEventType(PORT_ID_1, OPENSTACK_PORT_PRE_REMOVE));
+ assertEquals(2, target.subscriberCountByEventType(PORT_ID_2, OPENSTACK_PORT_PRE_REMOVE));
+
+ assertEquals(0, target.subscriberCountByEventType(PORT_ID_1, OPENSTACK_PORT_PRE_UPDATE));
+ assertEquals(1, target.subscriberCountByEventType(PORT_ID_2, OPENSTACK_PORT_PRE_UPDATE));
+
+ assertEquals(1, target.subscriberCount(PORT_ID_1));
+ assertEquals(3, target.subscriberCount(PORT_ID_2));
+ }
+
+ /**
+ * Tests unsubscribe pre-commit method.
+ */
+ @Test
+ public void testUnsubscribePreCommit() {
+
+ sampleSubscribe();
+
+ target.unsubscribePreCommit(PORT_ID_1, OPENSTACK_PORT_PRE_REMOVE, CLASS_NAME_1);
+ target.unsubscribePreCommit(PORT_ID_2, OPENSTACK_PORT_PRE_REMOVE, CLASS_NAME_2);
+
+ assertEquals(0, target.subscriberCountByEventType(PORT_ID_1, OPENSTACK_PORT_PRE_REMOVE));
+ assertEquals(1, target.subscriberCountByEventType(PORT_ID_2, OPENSTACK_PORT_PRE_REMOVE));
+
+ assertEquals(0, target.subscriberCount(PORT_ID_1));
+ assertEquals(2, target.subscriberCount(PORT_ID_2));
+ }
+
+ private void sampleSubscribe() {
+
+ target.subscribePreCommit(PORT_ID_1, OPENSTACK_PORT_PRE_REMOVE, CLASS_NAME_1);
+
+ target.subscribePreCommit(PORT_ID_2, OPENSTACK_PORT_PRE_REMOVE, CLASS_NAME_1);
+ target.subscribePreCommit(PORT_ID_2, OPENSTACK_PORT_PRE_REMOVE, CLASS_NAME_2);
+
+ target.subscribePreCommit(PORT_ID_2, OPENSTACK_PORT_PRE_UPDATE, CLASS_NAME_1);
+ }
+}