ONOS-2145 Added ability to withdraw packet intercepts via PacketService::cancelPackets.
Change-Id: Ie41271fa02740560bd67b0faf49f633ee749773c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 5357fa8..027378a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.packet.impl;
+import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,6 +26,7 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketEvent.Type;
@@ -41,8 +43,10 @@
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
+import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -96,7 +100,7 @@
@Activate
public void activate() {
- messageHandlingExecutor = Executors.newFixedThreadPool(
+ messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/packet", "message-handlers"));
@@ -104,7 +108,7 @@
new InternalClusterMessageHandler(),
messageHandlingExecutor);
- tracker = new PacketRequestTracker();
+ tracker = new PacketRequestTracker();
log.info("Started");
}
@@ -141,6 +145,11 @@
}
@Override
+ public boolean cancelPackets(PacketRequest request) {
+ return tracker.remove(request);
+ }
+
+ @Override
public Set<PacketRequest> existingRequests() {
return tracker.requests();
}
@@ -162,47 +171,49 @@
private class PacketRequestTracker {
- private ConsistentMap<PacketRequest, Boolean> requests;
+ private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
public PacketRequestTracker() {
- requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
- .withName("packet-requests")
+ requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
+ .withName("onos-packet-requests")
.withPartitionsDisabled()
- .withSerializer(Serializer.using(
- new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
- .withSerializer(new Serializer() {
- KryoNamespace kryo = new KryoNamespace.Builder()
- .register(KryoNamespaces.API)
- .build();
-
- @Override
- public <T> byte[] encode(T object) {
- return kryo.serialize(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return kryo.deserialize(bytes);
- }
- }).build();
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .build();
}
public boolean add(PacketRequest request) {
- if (requests.putIfAbsent(request, true) == null) {
- return true;
+ Versioned<Set<PacketRequest>> old = requests.get(request.selector());
+ if (old != null && old.value().contains(request)) {
+ return false;
}
- return false;
+ // FIXME: add retry logic using a random delay
+ Set<PacketRequest> newSet = new HashSet<>();
+ newSet.add(request);
+ if (old == null) {
+ return requests.putIfAbsent(request.selector(), newSet) == null;
+ }
+ newSet.addAll(old.value());
+ return requests.replace(request.selector(), old.version(), newSet);
}
public boolean remove(PacketRequest request) {
- if (requests.remove(request) == null) {
+ Versioned<Set<PacketRequest>> old = requests.get(request.selector());
+ if (old == null || !old.value().contains(request)) {
return false;
}
- return true;
+ // FIXME: add retry logic using a random delay
+ Set<PacketRequest> newSet = new HashSet<>(old.value());
+ newSet.remove(request);
+ if (newSet.isEmpty()) {
+ return requests.remove(request.selector(), old.version());
+ }
+ return requests.replace(request.selector(), old.version(), newSet);
}
public Set<PacketRequest> requests() {
- return requests.keySet();
+ ImmutableSet.Builder<PacketRequest> builder = ImmutableSet.builder();
+ requests.values().forEach(v -> builder.addAll(v.value()));
+ return builder.build();
}
}