ONOS-2890 Deactivating apps removes other apps' packet processors

- Reworking store commands to use compute for updates
- Trigger add/remove to devices on map events

Change-Id: I33391435945c1775a9ef0cdc83920fc1cadfd658
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
index 97f7cb5..d83fc9a 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
@@ -37,17 +37,15 @@
      * Requests intercept of packets that match the given selector.
      *
      * @param request a packet request
-     * @return true if the first time the given selector was requested
      */
-    boolean requestPackets(PacketRequest request);
+    void requestPackets(PacketRequest request);
 
     /**
      * Cancels intercept of packets that match the given selector.
      *
      * @param request a packet request
-     * @return true if there is no other application requesting the given selector
      */
-    boolean cancelPackets(PacketRequest request);
+    void cancelPackets(PacketRequest request);
 
     /**
      * Obtains all existing requests in the system.
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java
index bf5c3cc..2e59b19 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketStoreDelegate.java
@@ -21,4 +21,20 @@
  * Packet store delegate abstraction.
  */
 public interface PacketStoreDelegate extends StoreDelegate<PacketEvent> {
+
+    /**
+     * Requests that packets matching to following request be collected
+     * from all switches.
+     *
+     * @param request packet request
+     */
+    void requestPackets(PacketRequest request);
+
+    /**
+     * Requests that packets matching to following request no longer be
+     * collected from any switches.
+     *
+     * @param request packet request
+     */
+    void cancelPackets(PacketRequest request);
 }
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
index f835926..7dda12c 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
@@ -15,10 +15,13 @@
  */
 package org.onosproject.store.trivial;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketEvent.Type;
@@ -27,7 +30,9 @@
 import org.onosproject.net.packet.PacketStoreDelegate;
 import org.onosproject.store.AbstractStore;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -39,7 +44,7 @@
         extends AbstractStore<PacketEvent, PacketStoreDelegate>
         implements PacketStore {
 
-    private Set<PacketRequest> requests = Sets.newConcurrentHashSet();
+    private Map<TrafficSelector, Set<PacketRequest>> requests = Maps.newConcurrentMap();
 
     @Override
     public void emit(OutboundPacket packet) {
@@ -47,18 +52,50 @@
     }
 
     @Override
-    public boolean requestPackets(PacketRequest request) {
-        return requests.add(request);
+    public void requestPackets(PacketRequest request) {
+        requests.compute(request.selector(), (s, existingRequests) -> {
+            if (existingRequests == null) {
+                return ImmutableSet.of(request);
+            } else if (!existingRequests.contains(request)) {
+                if (delegate != null) {
+                    delegate.requestPackets(request);
+                }
+                return ImmutableSet.<PacketRequest>builder()
+                        .addAll(existingRequests)
+                        .add(request)
+                        .build();
+            } else {
+                return existingRequests;
+            }
+        });
     }
 
     @Override
-    public boolean cancelPackets(PacketRequest request) {
-        return requests.remove(request);
+    public void cancelPackets(PacketRequest request) {
+        requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+            if (existingRequests.contains(request)) {
+                HashSet<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+                newRequests.remove(request);
+                if (newRequests.size() > 0) {
+                    return ImmutableSet.copyOf(newRequests);
+                } else {
+                    if (delegate != null) {
+                        delegate.cancelPackets(request);
+                    }
+                    return null;
+                }
+            } else {
+                return existingRequests;
+            }
+        });
     }
 
     @Override
     public List<PacketRequest> existingRequests() {
-        return ImmutableList.copyOf(requests);
+        List<PacketRequest> list = Lists.newArrayList();
+        requests.values().forEach(list::addAll);
+        list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+        return list;
     }
 
 }
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index b39238d..8e87a07 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -168,9 +168,7 @@
         checkNotNull(appId, "Application ID cannot be null");
 
         PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
-        if (store.requestPackets(request)) {
-            pushToAllDevices(request);
-        }
+        store.requestPackets(request);
     }
 
     @Override
@@ -181,9 +179,7 @@
         checkNotNull(appId, "Application ID cannot be null");
 
         PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
-        if (store.cancelPackets(request)) {
-            removeFromAllDevices(request);
-        }
+        store.cancelPackets(request);
     }
 
     @Override
@@ -332,6 +328,16 @@
         public void notify(PacketEvent event) {
             localEmit(event.subject());
         }
+
+        @Override
+        public void requestPackets(PacketRequest request) {
+            pushToAllDevices(request);
+        }
+
+        @Override
+        public void cancelPackets(PacketRequest request) {
+            removeFromAllDevices(request);
+        }
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index d4c89c9..f0f3eb5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,7 +15,9 @@
  */
 package org.onosproject.store.packet.impl;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,14 +43,13 @@
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -117,6 +118,7 @@
     public void deactivate() {
         communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
         messageHandlingExecutor.shutdown();
+        tracker = null;
         log.info("Stopped");
     }
 
@@ -143,13 +145,13 @@
     }
 
     @Override
-    public boolean requestPackets(PacketRequest request) {
-        return tracker.add(request);
+    public void requestPackets(PacketRequest request) {
+        tracker.add(request);
     }
 
     @Override
-    public boolean cancelPackets(PacketRequest request) {
-        return tracker.remove(request);
+    public void cancelPackets(PacketRequest request) {
+        tracker.remove(request);
     }
 
     @Override
@@ -169,33 +171,50 @@
                     .build();
         }
 
-        public boolean add(PacketRequest request) {
-            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
-            if (old != null && old.value().contains(request)) {
-                return false;
+        public void add(PacketRequest request) {
+            AtomicBoolean firstRequest = new AtomicBoolean(false);
+            requests.compute(request.selector(), (s, existingRequests) -> {
+                if (existingRequests == null) {
+                    firstRequest.set(true);
+                    return ImmutableSet.of(request);
+                } else if (!existingRequests.contains(request)) {
+                    return ImmutableSet.<PacketRequest>builder()
+                                       .addAll(existingRequests)
+                                       .add(request)
+                                       .build();
+                } else {
+                    return existingRequests;
+                }
+            });
+
+            if (firstRequest.get() && delegate != null) {
+                // The instance that makes the first request will push to all devices
+                delegate.requestPackets(request);
             }
-            // FIXME: add retry logic using a random delay
-            Set<PacketRequest> newSet = new HashSet<>();
-            newSet.add(request);
-            if (old == null) {
-                return requests.putIfAbsent(request.selector(), newSet) == null;
-            }
-            newSet.addAll(old.value());
-            return requests.replace(request.selector(), old.version(), newSet);
         }
 
-        public boolean remove(PacketRequest request) {
-            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
-            if (old == null || !old.value().contains(request)) {
-                return false;
+        public void remove(PacketRequest request) {
+            AtomicBoolean removedLast = new AtomicBoolean(false);
+            requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+                if (existingRequests.contains(request)) {
+                    Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+                    newRequests.remove(request);
+                    if (newRequests.size() > 0) {
+                        return ImmutableSet.copyOf(newRequests);
+                    } else {
+                        removedLast.set(true);
+                        return null;
+                    }
+                } else {
+                    return existingRequests;
+                }
+            });
+
+            if (removedLast.get() && delegate != null) {
+                // The instance that removes the last request will remove from all devices
+                delegate.cancelPackets(request);
             }
-            // FIXME: add retry logic using a random delay
-            Set<PacketRequest> newSet = new HashSet<>(old.value());
-            newSet.remove(request);
-            if (newSet.isEmpty()) {
-                return requests.remove(request.selector(), old.version());
-            }
-            return requests.replace(request.selector(), old.version(), newSet);
+
         }
 
         public List<PacketRequest> requests() {
@@ -204,6 +223,5 @@
             list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
             return list;
         }
-
     }
 }