ONOS-2890 Deactivating apps removes other apps' packet processors
- Reworking store commands to use compute for updates
- Trigger add/remove to devices on map events
Change-Id: I33391435945c1775a9ef0cdc83920fc1cadfd658
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
index 97f7cb5..d83fc9a 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
@@ -37,17 +37,15 @@
* Requests intercept of packets that match the given selector.
*
* @param request a packet request
- * @return true if the first time the given selector was requested
*/
- boolean requestPackets(PacketRequest request);
+ void requestPackets(PacketRequest request);
/**
* Cancels intercept of packets that match the given selector.
*
* @param request a packet request
- * @return true if there is no other application requesting the given selector
*/
- boolean cancelPackets(PacketRequest request);
+ void cancelPackets(PacketRequest request);
/**
* Obtains all existing requests in the system.
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java
index bf5c3cc..2e59b19 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java
@@ -21,4 +21,20 @@
* Packet store delegate abstraction.
*/
public interface PacketStoreDelegate extends StoreDelegate<PacketEvent> {
+
+ /**
+ * Requests that packets matching to following request be collected
+ * from all switches.
+ *
+ * @param request packet request
+ */
+ void requestPackets(PacketRequest request);
+
+ /**
+ * Requests that packets matching to following request no longer be
+ * collected from any switches.
+ *
+ * @param request packet request
+ */
+ void cancelPackets(PacketRequest request);
}
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
index f835926..7dda12c 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
@@ -15,10 +15,13 @@
*/
package org.onosproject.store.trivial;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketEvent.Type;
@@ -27,7 +30,9 @@
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -39,7 +44,7 @@
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
- private Set<PacketRequest> requests = Sets.newConcurrentHashSet();
+ private Map<TrafficSelector, Set<PacketRequest>> requests = Maps.newConcurrentMap();
@Override
public void emit(OutboundPacket packet) {
@@ -47,18 +52,50 @@
}
@Override
- public boolean requestPackets(PacketRequest request) {
- return requests.add(request);
+ public void requestPackets(PacketRequest request) {
+ requests.compute(request.selector(), (s, existingRequests) -> {
+ if (existingRequests == null) {
+ return ImmutableSet.of(request);
+ } else if (!existingRequests.contains(request)) {
+ if (delegate != null) {
+ delegate.requestPackets(request);
+ }
+ return ImmutableSet.<PacketRequest>builder()
+ .addAll(existingRequests)
+ .add(request)
+ .build();
+ } else {
+ return existingRequests;
+ }
+ });
}
@Override
- public boolean cancelPackets(PacketRequest request) {
- return requests.remove(request);
+ public void cancelPackets(PacketRequest request) {
+ requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+ if (existingRequests.contains(request)) {
+ HashSet<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+ newRequests.remove(request);
+ if (newRequests.size() > 0) {
+ return ImmutableSet.copyOf(newRequests);
+ } else {
+ if (delegate != null) {
+ delegate.cancelPackets(request);
+ }
+ return null;
+ }
+ } else {
+ return existingRequests;
+ }
+ });
}
@Override
public List<PacketRequest> existingRequests() {
- return ImmutableList.copyOf(requests);
+ List<PacketRequest> list = Lists.newArrayList();
+ requests.values().forEach(list::addAll);
+ list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+ return list;
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index b39238d..8e87a07 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -168,9 +168,7 @@
checkNotNull(appId, "Application ID cannot be null");
PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
- if (store.requestPackets(request)) {
- pushToAllDevices(request);
- }
+ store.requestPackets(request);
}
@Override
@@ -181,9 +179,7 @@
checkNotNull(appId, "Application ID cannot be null");
PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
- if (store.cancelPackets(request)) {
- removeFromAllDevices(request);
- }
+ store.cancelPackets(request);
}
@Override
@@ -332,6 +328,16 @@
public void notify(PacketEvent event) {
localEmit(event.subject());
}
+
+ @Override
+ public void requestPackets(PacketRequest request) {
+ pushToAllDevices(request);
+ }
+
+ @Override
+ public void cancelPackets(PacketRequest request) {
+ removeFromAllDevices(request);
+ }
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index d4c89c9..f0f3eb5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,7 +15,9 @@
*/
package org.onosproject.store.packet.impl;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -41,14 +43,13 @@
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -117,6 +118,7 @@
public void deactivate() {
communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
messageHandlingExecutor.shutdown();
+ tracker = null;
log.info("Stopped");
}
@@ -143,13 +145,13 @@
}
@Override
- public boolean requestPackets(PacketRequest request) {
- return tracker.add(request);
+ public void requestPackets(PacketRequest request) {
+ tracker.add(request);
}
@Override
- public boolean cancelPackets(PacketRequest request) {
- return tracker.remove(request);
+ public void cancelPackets(PacketRequest request) {
+ tracker.remove(request);
}
@Override
@@ -169,33 +171,50 @@
.build();
}
- public boolean add(PacketRequest request) {
- Versioned<Set<PacketRequest>> old = requests.get(request.selector());
- if (old != null && old.value().contains(request)) {
- return false;
+ public void add(PacketRequest request) {
+ AtomicBoolean firstRequest = new AtomicBoolean(false);
+ requests.compute(request.selector(), (s, existingRequests) -> {
+ if (existingRequests == null) {
+ firstRequest.set(true);
+ return ImmutableSet.of(request);
+ } else if (!existingRequests.contains(request)) {
+ return ImmutableSet.<PacketRequest>builder()
+ .addAll(existingRequests)
+ .add(request)
+ .build();
+ } else {
+ return existingRequests;
+ }
+ });
+
+ if (firstRequest.get() && delegate != null) {
+ // The instance that makes the first request will push to all devices
+ delegate.requestPackets(request);
}
- // FIXME: add retry logic using a random delay
- Set<PacketRequest> newSet = new HashSet<>();
- newSet.add(request);
- if (old == null) {
- return requests.putIfAbsent(request.selector(), newSet) == null;
- }
- newSet.addAll(old.value());
- return requests.replace(request.selector(), old.version(), newSet);
}
- public boolean remove(PacketRequest request) {
- Versioned<Set<PacketRequest>> old = requests.get(request.selector());
- if (old == null || !old.value().contains(request)) {
- return false;
+ public void remove(PacketRequest request) {
+ AtomicBoolean removedLast = new AtomicBoolean(false);
+ requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+ if (existingRequests.contains(request)) {
+ Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+ newRequests.remove(request);
+ if (newRequests.size() > 0) {
+ return ImmutableSet.copyOf(newRequests);
+ } else {
+ removedLast.set(true);
+ return null;
+ }
+ } else {
+ return existingRequests;
+ }
+ });
+
+ if (removedLast.get() && delegate != null) {
+ // The instance that removes the last request will remove from all devices
+ delegate.cancelPackets(request);
}
- // FIXME: add retry logic using a random delay
- Set<PacketRequest> newSet = new HashSet<>(old.value());
- newSet.remove(request);
- if (newSet.isEmpty()) {
- return requests.remove(request.selector(), old.version());
- }
- return requests.replace(request.selector(), old.version(), newSet);
+
}
public List<PacketRequest> requests() {
@@ -204,6 +223,5 @@
list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
return list;
}
-
}
}