Refactor DistributedPacketStore to store packet requests in a ConsistentMultimap

Change-Id: Ia4a93c47fee726009673e99609b2f8800807e675
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 5fdea05..1c02da9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,9 +15,7 @@
  */
 package org.onosproject.store.packet.impl;
 
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -43,20 +41,20 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Strings.isNullOrEmpty;
@@ -205,10 +203,10 @@
 
     private final class PacketRequestTracker {
 
-        private ConsistentMap<RequestKey, Set<PacketRequest>> requests;
+        private ConsistentMultimap<RequestKey, PacketRequest> requests;
 
         private PacketRequestTracker() {
-            requests = storageService.<RequestKey, Set<PacketRequest>>consistentMapBuilder()
+            requests = storageService.<RequestKey, PacketRequest>consistentMultimapBuilder()
                     .withName("onos-packet-requests")
                     .withSerializer(Serializer.using(KryoNamespace.newBuilder()
                             .register(KryoNamespaces.API)
@@ -218,68 +216,36 @@
         }
 
         private void add(PacketRequest request) {
-            AtomicBoolean firstRequest = addInternal(request);
-            if (firstRequest.get() && delegate != null) {
+            boolean firstRequest = addInternal(request);
+            if (firstRequest && delegate != null) {
                 // The instance that makes the first request will push to all devices
                 delegate.requestPackets(request);
             }
         }
 
-        private AtomicBoolean addInternal(PacketRequest request) {
-            AtomicBoolean firstRequest = new AtomicBoolean(false);
-            requests.compute(key(request), (s, existingRequests) -> {
-                // Reset to false just in case this is a retry due to
-                // ConcurrentModificationException
-                firstRequest.set(false);
-                if (existingRequests == null) {
-                    firstRequest.set(true);
-                    return ImmutableSet.of(request);
-                } else if (!existingRequests.contains(request)) {
-                    firstRequest.set(true);
-                    return ImmutableSet.<PacketRequest>builder()
-                                       .addAll(existingRequests)
-                                       .add(request)
-                                       .build();
-                } else {
-                    return existingRequests;
-                }
-            });
-            return firstRequest;
+        private boolean addInternal(PacketRequest request) {
+            Collection<? extends PacketRequest> values =
+                Versioned.valueOrNull(requests.putAndGet(key(request), request));
+            return values.size() == 1;
         }
 
         private void remove(PacketRequest request) {
-            AtomicBoolean removedLast = removeInternal(request);
-            if (removedLast.get() && delegate != null) {
+            boolean removedLast = removeInternal(request);
+            if (removedLast && delegate != null) {
                 // The instance that removes the last request will remove from all devices
                 delegate.cancelPackets(request);
             }
         }
 
-        private AtomicBoolean removeInternal(PacketRequest request) {
-            AtomicBoolean removedLast = new AtomicBoolean(false);
-            requests.computeIfPresent(key(request), (s, existingRequests) -> {
-                // Reset to false just in case this is a retry due to
-                // ConcurrentModificationException
-                removedLast.set(false);
-                if (existingRequests.contains(request)) {
-                    Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
-                    newRequests.remove(request);
-                    if (newRequests.size() > 0) {
-                        return ImmutableSet.copyOf(newRequests);
-                    } else {
-                        removedLast.set(true);
-                        return null;
-                    }
-                } else {
-                    return existingRequests;
-                }
-            });
-            return removedLast;
+        private boolean removeInternal(PacketRequest request) {
+            Collection<? extends PacketRequest> values =
+                Versioned.valueOrNull(requests.removeAndGet(key(request), request));
+            return values == null || values.isEmpty();
         }
 
         private List<PacketRequest> requests() {
             List<PacketRequest> list = Lists.newArrayList();
-            requests.values().forEach(v -> list.addAll(v.value()));
+            requests.values().forEach(v -> list.add(v));
             list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
             return list;
         }