Refactor DistributedPacketStore to store packet requests in a ConsistentMultimap

Change-Id: Ia4a93c47fee726009673e99609b2f8800807e675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index bb249e1..779bb7a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -78,11 +78,21 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
+        return delegateMap.putAndGet(key, value);
+    }
+
+    @Override
     public CompletableFuture<Boolean> remove(K key, V value) {
         return delegateMap.remove(key, value);
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
+        return delegateMap.removeAndGet(key, value);
+    }
+
+    @Override
     public CompletableFuture<Boolean> removeAll(
             K key, Collection<? extends V> values) {
         return delegateMap.removeAll(key, values);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
index badbf05..a4e0bf9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -111,6 +111,11 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
+        return getMultimap(key).putAndGet(key, value);
+    }
+
+    @Override
     public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
         return getMultimap(key).removeAll(key, values);
     }
@@ -174,6 +179,11 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
+        return getMultimap(key).removeAndGet(key, value);
+    }
+
+    @Override
     public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() {
         return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
             .thenApply(PartitionedMultimapIterator::new);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index b00d06a..6fab8c8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -141,6 +141,16 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends V1>>> putAndGet(K1 key, V1 value) {
+        try {
+            return backingMap.putAndGet(keyEncoder.apply(key), valueEncoder.apply(value))
+                .thenApply(versionedValueCollectionDecode);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Boolean> remove(K1 key, V1 value) {
         try {
             return backingMap.remove(keyEncoder.apply(key), valueEncoder
@@ -151,6 +161,16 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends V1>>> removeAndGet(K1 key, V1 value) {
+        try {
+            return backingMap.removeAndGet(keyEncoder.apply(key), valueEncoder.apply(value))
+                .thenApply(versionedValueCollectionDecode);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Boolean> removeAll(
             K1 key, Collection<? extends V1> values) {
         try {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index 44cc614..39d49fe 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -61,9 +61,11 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@@ -148,6 +150,15 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends byte[]>>> putAndGet(String key, byte[] value) {
+        return proxy.invoke(
+            PUT_AND_GET,
+            SERIALIZER::encode,
+            new Put(key, Lists.newArrayList(value), null),
+            SERIALIZER::decode);
+    }
+
+    @Override
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
         return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key,
             Lists.newArrayList(value),
@@ -155,6 +166,13 @@
     }
 
     @Override
+    public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAndGet(String key, byte[] value) {
+        return proxy.invoke(REMOVE_AND_GET, SERIALIZER::encode, new MultiRemove(key,
+            Lists.newArrayList(value),
+            null), SERIALIZER::decode);
+    }
+
+    @Override
     public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
         return proxy.invoke(
             REMOVE,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 82372a2..6ad93b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -45,7 +45,9 @@
     VALUES(OperationType.QUERY),
     ENTRIES(OperationType.QUERY),
     PUT(OperationType.COMMAND),
+    PUT_AND_GET(OperationType.COMMAND),
     REMOVE(OperationType.COMMAND),
+    REMOVE_AND_GET(OperationType.COMMAND),
     REMOVE_ALL(OperationType.COMMAND),
     REPLACE(OperationType.COMMAND),
     CLEAR(OperationType.COMMAND),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 2312d8f..5a93bef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -77,9 +77,11 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@@ -153,7 +155,9 @@
         executor.register(GET, serializer::decode, this::get, serializer::encode);
         executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
         executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
+        executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
         executor.register(PUT, serializer::decode, this::put, serializer::encode);
+        executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
         executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
         executor.register(ADD_LISTENER, this::listen);
         executor.register(REMOVE_LISTENER, this::unlisten);
@@ -352,8 +356,8 @@
         }
 
         Versioned<Collection<? extends byte[]>> removedValues = backingMap
-                .get(key)
-                .addCommit(commit);
+            .get(key)
+            .addCommit(commit);
 
         if (removedValues != null) {
             if (removedValues.value().isEmpty()) {
@@ -361,9 +365,9 @@
             }
 
             publish(removedValues.value().stream()
-                    .map(value -> new MultimapEvent<String, byte[]>(
-                            "", key, null, value))
-                    .collect(Collectors.toList()));
+                .map(value -> new MultimapEvent<String, byte[]>(
+                    "", key, null, value))
+                .collect(Collectors.toList()));
             return true;
         }
 
@@ -371,6 +375,37 @@
     }
 
     /**
+     * Handles a removeAndGet commit.
+     *
+     * @param commit multiRemove commit
+     * @return the updated values or null if the values are empty
+     */
+    protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
+        String key = commit.value().key();
+
+        if (!backingMap.containsKey(key)) {
+            return null;
+        }
+
+        Versioned<Collection<? extends byte[]>> removedValues = backingMap
+            .get(key)
+            .addCommit(commit);
+
+        if (removedValues != null) {
+            if (removedValues.value().isEmpty()) {
+                backingMap.remove(key);
+            }
+
+            publish(removedValues.value().stream()
+                .map(value -> new MultimapEvent<String, byte[]>(
+                    "", key, null, value))
+                .collect(Collectors.toList()));
+        }
+
+        return toVersioned(backingMap.get(key));
+    }
+
+    /**
      * Handles a put commit, returns true if any change results from this
      * commit.
      * @param commit a put commit
@@ -386,20 +421,49 @@
         }
 
         Versioned<Collection<? extends byte[]>> addedValues = backingMap
-                .get(key)
-                .addCommit(commit);
+            .get(key)
+            .addCommit(commit);
 
         if (addedValues != null) {
             publish(addedValues.value().stream()
-                    .map(value -> new MultimapEvent<String, byte[]>(
-                            "", key, value, null))
-                    .collect(Collectors.toList()));
+                .map(value -> new MultimapEvent<String, byte[]>(
+                    "", key, value, null))
+                .collect(Collectors.toList()));
             return true;
         }
 
         return false;
     }
 
+    /**
+     * Handles a putAndGet commit.
+     *
+     * @param commit a put commit
+     * @return the updated values
+     */
+    protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
+        String key = commit.value().key();
+        if (commit.value().values().isEmpty()) {
+            return null;
+        }
+        if (!backingMap.containsKey(key)) {
+            backingMap.put(key, new NonTransactionalCommit());
+        }
+
+        Versioned<Collection<? extends byte[]>> addedValues = backingMap
+            .get(key)
+            .addCommit(commit);
+
+        if (addedValues != null) {
+            publish(addedValues.value().stream()
+                .map(value -> new MultimapEvent<String, byte[]>(
+                    "", key, value, null))
+                .collect(Collectors.toList()));
+        }
+
+        return toVersioned(backingMap.get(key));
+    }
+
     protected Versioned<Collection<? extends byte[]>> replace(
             Commit<? extends Replace> commit) {
         if (!backingMap.containsKey(commit.value().key())) {