ONOS-2145  Added ability to withdraw packet intercepts via PacketService::cancelPackets.

Change-Id: Ie41271fa02740560bd67b0faf49f633ee749773c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 5357fa8..027378a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.packet.impl;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -25,6 +26,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketEvent.Type;
@@ -41,8 +43,10 @@
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -96,7 +100,7 @@
 
     @Activate
     public void activate() {
-        messageHandlingExecutor =  Executors.newFixedThreadPool(
+        messageHandlingExecutor = Executors.newFixedThreadPool(
                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
                 groupedThreads("onos/store/packet", "message-handlers"));
 
@@ -104,7 +108,7 @@
                                            new InternalClusterMessageHandler(),
                                            messageHandlingExecutor);
 
-        tracker =  new PacketRequestTracker();
+        tracker = new PacketRequestTracker();
 
         log.info("Started");
     }
@@ -141,6 +145,11 @@
     }
 
     @Override
+    public boolean cancelPackets(PacketRequest request) {
+        return tracker.remove(request);
+    }
+
+    @Override
     public Set<PacketRequest> existingRequests() {
         return tracker.requests();
     }
@@ -162,47 +171,49 @@
 
     private class PacketRequestTracker {
 
-        private ConsistentMap<PacketRequest, Boolean> requests;
+        private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
 
         public PacketRequestTracker() {
-            requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
-                    .withName("packet-requests")
+            requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
+                    .withName("onos-packet-requests")
                     .withPartitionsDisabled()
-                    .withSerializer(Serializer.using(
-                            new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
-                    .withSerializer(new Serializer() {
-                        KryoNamespace kryo = new KryoNamespace.Builder()
-                                .register(KryoNamespaces.API)
-                                .build();
-
-                        @Override
-                        public <T> byte[] encode(T object) {
-                            return kryo.serialize(object);
-                        }
-
-                        @Override
-                        public <T> T decode(byte[] bytes) {
-                            return kryo.deserialize(bytes);
-                        }
-                    }).build();
+                    .withSerializer(Serializer.using(KryoNamespaces.API))
+                    .build();
         }
 
         public boolean add(PacketRequest request) {
-            if (requests.putIfAbsent(request, true) == null) {
-                return true;
+            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
+            if (old != null && old.value().contains(request)) {
+                return false;
             }
-            return false;
+            // FIXME: add retry logic using a random delay
+            Set<PacketRequest> newSet = new HashSet<>();
+            newSet.add(request);
+            if (old == null) {
+                return requests.putIfAbsent(request.selector(), newSet) == null;
+            }
+            newSet.addAll(old.value());
+            return requests.replace(request.selector(), old.version(), newSet);
         }
 
         public boolean remove(PacketRequest request) {
-            if (requests.remove(request) == null) {
+            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
+            if (old == null || !old.value().contains(request)) {
                 return false;
             }
-            return true;
+            // FIXME: add retry logic using a random delay
+            Set<PacketRequest> newSet = new HashSet<>(old.value());
+            newSet.remove(request);
+            if (newSet.isEmpty()) {
+                return requests.remove(request.selector(), old.version());
+            }
+            return requests.replace(request.selector(), old.version(), newSet);
         }
 
         public Set<PacketRequest> requests() {
-            return requests.keySet();
+            ImmutableSet.Builder<PacketRequest> builder = ImmutableSet.builder();
+            requests.values().forEach(v -> builder.addAll(v.value()));
+            return builder.build();
         }
 
     }