ONOS-2890 Deactivating apps removes other apps' packet processors

- Reworking store commands to use compute for updates
- Trigger add/remove to devices on map events

Change-Id: I33391435945c1775a9ef0cdc83920fc1cadfd658
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
index f835926..7dda12c 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
@@ -15,10 +15,13 @@
  */
 package org.onosproject.store.trivial;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketEvent.Type;
@@ -27,7 +30,9 @@
 import org.onosproject.net.packet.PacketStoreDelegate;
 import org.onosproject.store.AbstractStore;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -39,7 +44,7 @@
         extends AbstractStore<PacketEvent, PacketStoreDelegate>
         implements PacketStore {
 
-    private Set<PacketRequest> requests = Sets.newConcurrentHashSet();
+    private Map<TrafficSelector, Set<PacketRequest>> requests = Maps.newConcurrentMap();
 
     @Override
     public void emit(OutboundPacket packet) {
@@ -47,18 +52,50 @@
     }
 
     @Override
-    public boolean requestPackets(PacketRequest request) {
-        return requests.add(request);
+    public void requestPackets(PacketRequest request) {
+        requests.compute(request.selector(), (s, existingRequests) -> {
+            if (existingRequests == null) {
+                return ImmutableSet.of(request);
+            } else if (!existingRequests.contains(request)) {
+                if (delegate != null) {
+                    delegate.requestPackets(request);
+                }
+                return ImmutableSet.<PacketRequest>builder()
+                        .addAll(existingRequests)
+                        .add(request)
+                        .build();
+            } else {
+                return existingRequests;
+            }
+        });
     }
 
     @Override
-    public boolean cancelPackets(PacketRequest request) {
-        return requests.remove(request);
+    public void cancelPackets(PacketRequest request) {
+        requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
+            if (existingRequests.contains(request)) {
+                HashSet<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
+                newRequests.remove(request);
+                if (newRequests.size() > 0) {
+                    return ImmutableSet.copyOf(newRequests);
+                } else {
+                    if (delegate != null) {
+                        delegate.cancelPackets(request);
+                    }
+                    return null;
+                }
+            } else {
+                return existingRequests;
+            }
+        });
     }
 
     @Override
     public List<PacketRequest> existingRequests() {
-        return ImmutableList.copyOf(requests);
+        List<PacketRequest> list = Lists.newArrayList();
+        requests.values().forEach(list::addAll);
+        list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
+        return list;
     }
 
 }