ONOS-2890 Deactivating apps removes other apps' packet processors

- Reworking store commands to use compute for updates
- Trigger add/remove to devices on map events

Change-Id: I33391435945c1775a9ef0cdc83920fc1cadfd658
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index d4c89c9..f0f3eb5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,7 +15,9 @@
  */
 package org.onosproject.store.packet.impl;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,14 +43,13 @@
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -117,6 +118,7 @@
     public void deactivate() {
         communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
         messageHandlingExecutor.shutdown();
+        tracker = null;
         log.info("Stopped");
     }
 
@@ -143,13 +145,13 @@
     }
 
     @Override
-    public boolean requestPackets(PacketRequest request) {
-        return tracker.add(request);
+    public void requestPackets(PacketRequest request) {
+        tracker.add(request);
     }
 
     @Override
-    public boolean cancelPackets(PacketRequest request) {
-        return tracker.remove(request);
+    public void cancelPackets(PacketRequest request) {
+        tracker.remove(request);
     }
 
     @Override
@@ -169,33 +171,50 @@
                     .build();
         }
 
-        public boolean add(PacketRequest request) {
-            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
-            if (old != null && old.value().contains(request)) {
-                return false;
+        public void add(PacketRequest request) {
+            AtomicBoolean firstRequest = new AtomicBoolean(false);
+            requests.compute(request.selector(), (s, existingRequests) -> {
+                if (existingRequests == null) {
+                    firstRequest.set(true);
+                    return ImmutableSet.of(request);
+                } else if (!existingRequests.contains(request)) {
+                    return ImmutableSet.<PacketRequest>builder()
+                                       .addAll(existingRequests)
+                                       .add(request)
+                                       .build();
+                } else {
+                    return existingRequests;
+                }
+            });
+
+            if (firstRequest.get() && delegate != null) {
+                // The instance that makes the first request will push to all devices
+                delegate.requestPackets(request);
             }
-            // FIXME: add retry logic using a random delay
-            Set<PacketRequest> newSet = new HashSet<>();
-            newSet.add(request);
-            if (old == null) {
-                return requests.putIfAbsent(request.selector(), newSet) == null;
-            }
-            newSet.addAll(old.value());
-            return requests.replace(request.selector(), old.version(), newSet);
         }
 
-        public boolean remove(PacketRequest request) {
-            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
-            if (old == null || !old.value().contains(request)) {
-                return false;
+        public void remove(PacketRequest request) {
+            AtomicBoolean removedLast = new AtomicBoolean(false);
+            requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+                if (existingRequests.contains(request)) {
+                    Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+                    newRequests.remove(request);
+                    if (newRequests.size() > 0) {
+                        return ImmutableSet.copyOf(newRequests);
+                    } else {
+                        removedLast.set(true);
+                        return null;
+                    }
+                } else {
+                    return existingRequests;
+                }
+            });
+
+            if (removedLast.get() && delegate != null) {
+                // The instance that removes the last request will remove from all devices
+                delegate.cancelPackets(request);
             }
-            // FIXME: add retry logic using a random delay
-            Set<PacketRequest> newSet = new HashSet<>(old.value());
-            newSet.remove(request);
-            if (newSet.isEmpty()) {
-                return requests.remove(request.selector(), old.version());
-            }
-            return requests.replace(request.selector(), old.version(), newSet);
+
         }
 
         public List<PacketRequest> requests() {
@@ -204,6 +223,5 @@
             list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
             return list;
         }
-
     }
 }