Updating multimap API and commands and providing implementation.

Change-Id: Iff49b429cfc7c0142f3ab2e1dde1a32e85f20e87
(cherry picked from commit 44a1fef950f3d3450f284529ea7f1988025724af)
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 9d42baf..99d6274 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.store.service;
 
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 
 import java.util.Collection;
@@ -92,7 +91,7 @@
      * and others ignoring put requests for existing entries.
      * @param key the key to add
      * @param value the value to add
-     * @return a future whose value will betrue if the map has changed because
+     * @return a future whose value will be true if the map has changed because
      * of this call, false otherwise
      */
     CompletableFuture<Boolean> put(K key, V value);
@@ -119,16 +118,18 @@
      * @return a future whose value will be true if the map changes because of
      * this call, false otherwise.
      */
-    CompletableFuture<Boolean> removeAll(K key, Iterable<? extends V> values);
+    CompletableFuture<Boolean> removeAll(K key,
+                                         Collection<? extends V> values);
 
     /**
      * Removes all values associated with the specified key as well as the key
      * itself.
      * @param key the key whose key-value pairs will be removed
      * @return a future whose value is the set of values that were removed,
-     * which may be empty
+     * which may be empty, if the values did not exist the version will be
+     * less than one.
      */
-    CompletableFuture<Versioned<Collection<byte[]>>> removeAll(K key);
+    CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key);
 
     /**
      * Adds the set of key-value pairs of the specified key with each of the
@@ -140,17 +141,8 @@
      * @return a future whose value will be true if any change in the map
      * results from this call, false otherwise
      */
-    CompletableFuture<Boolean> putAll(K key, Iterable<? extends V> values);
-
-    /**
-     * Adds all entries from this multimap that are not already present, and
-     * may or may not add duplicate entries depending on the implementation.
-     * @param multiMap the map whose entries should be added
-     * @return a future whose value will be true if any change results from
-     * this call, false otherwise
-     */
-    CompletableFuture<Boolean> putAll(
-            Multimap<? extends K, ? extends V> multiMap);
+    CompletableFuture<Boolean> putAll(K key,
+                                      Collection<? extends V> values);
 
     /**
      * Stores all the values in values associated with the key specified,
@@ -161,7 +153,8 @@
      * @return a future whose value will be the collection of removed values,
      * which may be empty
      */
-    CompletableFuture<Collection<V>> replaceValues(K key, Iterable<V> values);
+    CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(
+            K key, Collection<V> values);
 
     /**
      * Removes all key-value pairs, after which it will be empty.
@@ -177,7 +170,7 @@
      * @return a future whose value will be the collection of the values
      * associated with the specified key, the collection may be empty
      */
-    CompletableFuture<Collection<V>> get(K key);
+    CompletableFuture<Versioned<Collection<? extends V>>> get(K key);
 
     /**
      * Returns a set of the keys contained in this multimap with one or more
@@ -203,7 +196,7 @@
      * @return a future whose value will be a collection of values, this may be
      * empty
      */
-     CompletableFuture<Collection<V>> values();
+    CompletableFuture<Multiset<V>> values();
 
     /**
      * Returns a collection of each key-value pair in this map.
diff --git a/core/store/primitives/BUCK b/core/store/primitives/BUCK
index c4a9b8f..b43ee8b 100644
--- a/core/store/primitives/BUCK
+++ b/core/store/primitives/BUCK
@@ -25,6 +25,7 @@
 TEST_DEPS = [
     '//lib:TEST',
     '//core/api:onos-api-tests',
+    '//lib:onos-atomix',
 ]
 
 osgi_jar_with_tests (
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
index 9edfcd1..6838ab3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
@@ -28,9 +28,9 @@
 import io.atomix.copycat.Command;
 import io.atomix.copycat.Query;
 import org.onlab.util.Match;
+import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -123,7 +123,8 @@
         }
 
         @Override
-        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        public void writeObject(BufferOutput<?> buffer,
+                                Serializer serializer) {
             super.writeObject(buffer, serializer);
             serializer.writeObject(key, buffer);
         }
@@ -166,7 +167,8 @@
         }
 
         @Override
-        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        public void writeObject(BufferOutput<?> buffer,
+                                Serializer serializer) {
             super.writeObject(buffer, serializer);
         }
 
@@ -265,49 +267,94 @@
     }
 
     /**
-     * Update and get command. Note that corresponding values must have the
-     * same index in the respective arrays.
+     * Remove command, backs remove and removeAll's that return booleans.
      */
     @SuppressWarnings("serial")
-    public static class UpdateAndGet extends
-            MultimapCommand<MapEntryUpdateResult<String, Collection<byte[]>>> {
+    public static class RemoveAll extends
+            MultimapCommand<Versioned<Collection<? extends byte[]>>> {
         private String key;
-        private List<byte[]> values;
-        private List<Match<byte[]>> valueMatches;
-        private List<Match<Long>> versionMatches;
+        private Match<Long> versionMatch;
 
-        public UpdateAndGet() {
+        public RemoveAll() {
         }
 
-        public UpdateAndGet(String key, List<byte[]> values,
-                            List<Match<byte[]>> valueMatches,
-                            List<Match<Long>> versionMatches) {
-            this.key = key;
-            this.values = values;
-            this.valueMatches = valueMatches;
-            this.versionMatches = versionMatches;
+        public RemoveAll(String key, Match<Long> versionMatch) {
+            this.key = Assert.notNull(key, "key");
+            this.versionMatch = versionMatch;
         }
 
         public String key() {
             return this.key;
         }
 
-        public List<byte[]> values() {
-            return values;
-        }
-
-        public List<Match<byte[]>> valueMatches() {
-            return valueMatches;
-        }
-
-        public List<Match<Long>> versionMatches() {
-            return versionMatches;
+        public Match<Long> versionMatch() {
+            return versionMatch;
         }
 
         @Override
         public CompactionMode compaction() {
-            return values == null ? CompactionMode.FULL :
-                    CompactionMode.QUORUM;
+            return CompactionMode.FULL;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer,
+                                Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(key, buffer);
+            serializer.writeObject(versionMatch, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            key = serializer.readObject(buffer);
+            versionMatch = serializer.readObject(buffer);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("key", key)
+                    .add("versionMatch", versionMatch)
+                    .toString();
+        }
+    }
+
+    /**
+     * Remove command, backs remove and removeAll's that return booleans.
+     */
+    @SuppressWarnings("serial")
+    public static class MultiRemove extends
+            MultimapCommand<Boolean> {
+        private String key;
+        private Collection<byte[]> values;
+        private Match<Long> versionMatch;
+
+        public MultiRemove() {
+        }
+
+        public MultiRemove(String key, Collection<byte[]> valueMatches,
+                           Match<Long> versionMatch) {
+            this.key = Assert.notNull(key, "key");
+            this.values = valueMatches;
+            this.versionMatch = versionMatch;
+        }
+
+        public String key() {
+            return this.key;
+        }
+
+        public Collection<byte[]> values() {
+            return values;
+        }
+
+        public Match<Long> versionMatch() {
+            return versionMatch;
+        }
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.FULL;
         }
 
         @Override
@@ -316,8 +363,7 @@
             super.writeObject(buffer, serializer);
             serializer.writeObject(key, buffer);
             serializer.writeObject(values, buffer);
-            serializer.writeObject(valueMatches, buffer);
-            serializer.writeObject(versionMatches, buffer);
+            serializer.writeObject(versionMatch, buffer);
         }
 
         @Override
@@ -325,13 +371,143 @@
             super.readObject(buffer, serializer);
             key = serializer.readObject(buffer);
             values = serializer.readObject(buffer);
-            valueMatches = serializer.readObject(buffer);
-            versionMatches = serializer.readObject(buffer);
+            versionMatch = serializer.readObject(buffer);
         }
 
         @Override
         public String toString() {
-            return super.toString();
+            return MoreObjects.toStringHelper(getClass())
+                    .add("key", key)
+                    .add("values", values)
+                    .add("versionMatch", versionMatch)
+                    .toString();
+        }
+    }
+
+    /**
+     * Command to back the put and putAll methods.
+     */
+    @SuppressWarnings("serial")
+    public static class  Put extends MultimapCommand<Boolean> {
+        private String key;
+        private Collection<? extends byte[]> values;
+        private Match<Long> versionMatch;
+
+        public Put() {
+        }
+
+        public Put(String key, Collection<? extends byte[]> values,
+                   Match<Long> versionMatch) {
+            this.key = Assert.notNull(key, "key");
+            this.values = values;
+            this.versionMatch = versionMatch;
+        }
+
+        public String key() {
+            return key;
+        }
+
+        public Collection<? extends byte[]> values() {
+            return values;
+        }
+
+        public Match<Long> versionMatch() {
+            return versionMatch;
+        }
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.QUORUM;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer,
+                                Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(key, buffer);
+            serializer.writeObject(values, buffer);
+            serializer.writeObject(versionMatch, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            key = serializer.readObject(buffer);
+            values = serializer.readObject(buffer);
+            versionMatch = serializer.readObject(buffer);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("key", key)
+                    .add("values", values)
+                    .add("versionMatch", versionMatch)
+                    .toString();
+        }
+    }
+
+    /**
+     * Replace command, returns the collection that was replaced.
+     */
+    @SuppressWarnings("serial")
+    public static class Replace extends
+            MultimapCommand<Versioned<Collection<? extends byte[]>>> {
+        private String key;
+        private Collection<byte[]> values;
+        private Match<Long> versionMatch;
+
+        public Replace() {
+        }
+
+        public Replace(String key, Collection<byte[]> values,
+                       Match<Long> versionMatch) {
+            this.key = Assert.notNull(key, "key");
+            this.values = values;
+            this.versionMatch = versionMatch;
+        }
+
+        public String key() {
+            return this.key;
+        }
+
+        public Match<Long> versionMatch() {
+            return versionMatch;
+        }
+
+        public Collection<byte[]> values() {
+            return values;
+        }
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.FULL;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer,
+                                Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(key, buffer);
+            serializer.writeObject(values, buffer);
+            serializer.writeObject(versionMatch, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            key = serializer.readObject(buffer);
+            values = serializer.readObject(buffer);
+            versionMatch = serializer.readObject(buffer);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("key", key)
+                    .add("values", values)
+                    .add("versionMatch", versionMatch)
+                    .toString();
         }
     }
 
@@ -360,7 +536,7 @@
      * Value collection query.
      */
     @SuppressWarnings("serial")
-    public static class Values extends MultimapQuery<Collection<byte[]>> {
+    public static class Values extends MultimapQuery<Multiset<byte[]>> {
     }
 
     /**
@@ -374,7 +550,11 @@
     /**
      * Get value query.
      */
-    public static class Get extends KeyQuery<Collection<byte[]>> {
+    public static class Get extends
+            KeyQuery<Versioned<Collection<? extends byte[]>>> {
+        public Get(String key) {
+            super(key);
+        }
     }
 
     /**
@@ -387,7 +567,7 @@
             registry.register(ContainsKey.class, -1000);
             registry.register(ContainsValue.class, -1001);
             registry.register(ContainsEntry.class, -1002);
-            registry.register(UpdateAndGet.class, -1003);
+            registry.register(Replace.class, -1003);
             registry.register(Clear.class, -1004);
             registry.register(KeySet.class, -1005);
             registry.register(Keys.class, -1006);
@@ -396,6 +576,9 @@
             registry.register(Size.class, -1009);
             registry.register(IsEmpty.class, -1010);
             registry.register(Get.class, -1011);
+            registry.register(Put.class, -1012);
+            registry.register(RemoveAll.class, -1013);
+            registry.register(MultiRemove.class, -1014);
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
index 9d31d6e..3d735be 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
@@ -17,11 +17,10 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.resource.AbstractResource;
-import org.onlab.util.Match;
+import io.atomix.resource.ResourceTypeInfo;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.Versioned;
 
@@ -32,13 +31,28 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.*;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Clear;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsEntry;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsKey;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsValue;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Entries;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Get;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.IsEmpty;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.KeySet;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultiRemove;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Put;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.RemoveAll;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Replace;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Values;
 
 /**
  * Set based implementation of the {@link AsyncConsistentMultimap}.
  * <p>
  * Note: this implementation does not allow null entries or duplicate entries.
  */
+@ResourceTypeInfo(id = -153, factory = AsyncConsistentSetMultimapFactory.class)
 public class AsyncConsistentSetMultimap
         extends AbstractResource<AsyncConsistentSetMultimap>
         implements AsyncConsistentMultimap<String, byte[]> {
@@ -81,68 +95,50 @@
 
     @Override
     public CompletableFuture<Boolean> put(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, Lists.newArrayList(value),
-                                       Lists.newArrayList(Match.NULL),
-                                       Lists.newArrayList(Match.NULL)))
-                .whenComplete((result, e) -> throwIfLocked(result.status()))
-                .thenApply(result ->
-                                   result.status() == MapEntryUpdateResult.Status.OK);
+        return submit(new Put(key, Lists.newArrayList(value), null));
     }
 
     @Override
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, Lists.newArrayList(value),
-                                       Lists.newArrayList(Match.ifValue(value)),
-                                       Lists.newArrayList(Match.NULL)))
-                .whenComplete((result, e) -> throwIfLocked(result.status()))
-                .thenApply(result ->
-                                   result.status() == MapEntryUpdateResult.Status.OK);
+        return submit(new MultiRemove(key,
+                                      Lists.newArrayList(value),
+                                      null));
     }
 
     @Override
-    public CompletableFuture<Boolean> removeAll(String key, Iterable<? extends byte[]> values) {
-
-        throw new UnsupportedOperationException("This operation cannot be " +
-                                                        "used without support for " +
-                                                        "transactions.");
+    public CompletableFuture<Boolean> removeAll(
+            String key, Collection<? extends byte[]> values) {
+        return submit(new MultiRemove(key, (Collection<byte[]>) values, null));
     }
 
     @Override
-    public CompletableFuture<Versioned<Collection<byte[]>>> removeAll(String key) {
-        return submit(new UpdateAndGet(key, null, null, null))
-                .whenComplete((result, e) -> throwIfLocked(result.status()))
-                .thenApply(result -> result.oldValue());
+    public CompletableFuture<
+            Versioned<Collection<? extends byte[]>>> removeAll(String key) {
+        return submit(new RemoveAll(key, null));
     }
 
     @Override
-    public CompletableFuture<Boolean> putAll(String key, Iterable<? extends byte[]> values) {
-        throw new UnsupportedOperationException("This operation cannot be " +
-                                                        "used without support for " +
-                                                        "transactions.");
+    public CompletableFuture<Boolean> putAll(
+            String key, Collection<? extends byte[]> values) {
+        return submit(new Put(key, values, null));
     }
 
     @Override
-    public CompletableFuture<Boolean> putAll(Multimap<? extends String, ? extends byte[]> multiMap) {
-        throw new UnsupportedOperationException("This operation cannot be " +
-                                                        "used without support for " +
-                                                        "transactions.");
-    }
-
-    @Override
-    public CompletableFuture<Collection<byte[]>> replaceValues(String key, Iterable<byte[]> values) {
-        throw new UnsupportedOperationException("This operation cannot be " +
-                                                        "used without support for " +
-                                                        "transactions.");
+    public CompletableFuture<
+            Versioned<Collection<? extends byte[]>>> replaceValues(
+            String key, Collection<byte[]> values) {
+        return submit(new Replace(key, values, null));
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return submit(new AsyncConsistentMultimapCommands.Clear());
+        return submit(new Clear());
     }
 
     @Override
-    public CompletableFuture<Collection<byte[]>> get(String key) {
-        return submit(new Get());
+    public CompletableFuture<
+            Versioned<Collection<? extends byte[]>>> get(String key) {
+        return submit(new Get(key));
     }
 
     @Override
@@ -156,7 +152,7 @@
     }
 
     @Override
-    public CompletableFuture<Collection<byte[]>> values() {
+    public CompletableFuture<Multiset<byte[]>> values() {
         return submit(new Values());
     }
 
@@ -182,7 +178,9 @@
      */
     private void throwIfLocked(MapEntryUpdateResult.Status status) {
         if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
-            throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
+            throw new ConcurrentModificationException("Cannot update map: " +
+                                                      "Another transaction " +
+                                                      "in progress");
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapFactory.java
new file mode 100644
index 0000000..3222147
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.ResourceFactory;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Properties;
+
+/**
+ * {@link AsyncConsistentSetMultimap} resource factory.
+ */
+public class AsyncConsistentSetMultimapFactory implements
+        ResourceFactory<AsyncConsistentSetMultimap> {
+    @Override
+    public SerializableTypeResolver createSerializableTypeResolver() {
+        return new AsyncConsistentMultimapCommands.TypeResolver();
+    }
+
+    @Override
+    public ResourceStateMachine createStateMachine(Properties config) {
+        return new AsyncConsistentSetMultimapState(config);
+    }
+
+    @Override
+    public AsyncConsistentSetMultimap createInstance(CopycatClient client,
+                                                     Properties properties) {
+        return new AsyncConsistentSetMultimap(client, properties);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapState.java
new file mode 100644
index 0000000..878aac24
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapState.java
@@ -0,0 +1,755 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+import org.onlab.util.CountDownCompleter;
+import org.onlab.util.Match;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Clear;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsEntry;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsKey;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsValue;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Entries;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Get;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.IsEmpty;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.KeySet;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultiRemove;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultimapCommand;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Put;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.RemoveAll;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Replace;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Values;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * State Machine for {@link AsyncConsistentSetMultimap} resource.
+ */
+public class AsyncConsistentSetMultimapState extends ResourceStateMachine
+        implements SessionListener, Snapshottable {
+
+    private final Logger log = getLogger(getClass());
+    private final AtomicLong globalVersion = new AtomicLong(1);
+    //TODO Add listener map here
+    private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
+
+    public AsyncConsistentSetMultimapState(Properties properties) {
+        super(properties);
+    }
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+    }
+
+    @Override
+    protected void configure(StateMachineExecutor executor) {
+        executor.register(Size.class, this::size);
+        executor.register(IsEmpty.class, this::isEmpty);
+        executor.register(ContainsKey.class, this::containsKey);
+        executor.register(ContainsValue.class, this::containsValue);
+        executor.register(ContainsEntry.class, this::containsEntry);
+        executor.register(Clear.class, this::clear);
+        executor.register(KeySet.class, this::keySet);
+        executor.register(Keys.class, this::keys);
+        executor.register(Values.class, this::values);
+        executor.register(Entries.class, this::entries);
+        executor.register(Get.class, this::get);
+        executor.register(RemoveAll.class, this::removeAll);
+        executor.register(MultiRemove.class, this::multiRemove);
+        executor.register(Put.class, this::put);
+        executor.register(Replace.class, this::replace);
+    }
+
+    @Override
+    public void delete() {
+        super.delete();
+    }
+
+    /**
+     * Handles a Size commit.
+     *
+     * @param commit Size commit
+     * @return number of unique key value pairs in the multimap
+     */
+    protected int size(Commit<? extends Size> commit) {
+        try {
+            return backingMap.values()
+                    .stream()
+                    .map(valueCollection -> valueCollection.values().size())
+                    .collect(Collectors.summingInt(size -> size));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles an IsEmpty commit.
+     *
+     * @param commit IsEmpty commit
+     * @return true if the multimap contains no key-value pairs, else false
+     */
+    protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
+        try {
+            return backingMap.isEmpty();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a contains key commit.
+     *
+     * @param commit ContainsKey commit
+     * @return returns true if the key is in the multimap, else false
+     */
+    protected boolean containsKey(Commit<? extends ContainsKey> commit) {
+        try {
+            return backingMap.containsKey(commit.operation().key());
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a ContainsValue commit.
+     *
+     * @param commit ContainsValue commit
+     * @return true if the value is in the multimap, else false
+     */
+    protected boolean containsValue(Commit<? extends ContainsValue> commit) {
+        try {
+            Match<byte[]> match = Match.ifValue(commit.operation().value());
+            return backingMap
+                    .values()
+                    .stream()
+                    .anyMatch(valueList ->
+                          valueList
+                              .values()
+                              .stream()
+                              .anyMatch(byteValue ->
+                                    match.matches(byteValue)));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a ContainsEntry commit.
+     *
+     * @param commit ContainsEntry commit
+     * @return true if the key-value pair exists, else false
+     */
+    protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
+        try {
+            MapEntryValue entryValue =
+                    backingMap.get(commit.operation().key());
+            if (entryValue == null) {
+                return false;
+            } else {
+                Match valueMatch = Match.ifValue(commit.operation().value());
+                return entryValue
+                        .values()
+                        .stream()
+                        .anyMatch(byteValue -> valueMatch.matches(byteValue));
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a Clear commit.
+     *
+     * @param commit Clear commit
+     */
+    protected void clear(Commit<? extends Clear> commit) {
+        try {
+            backingMap.clear();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a KeySet commit.
+     *
+     * @param commit KeySet commit
+     * @return a set of all keys in the multimap
+     */
+    protected Set<String> keySet(Commit<? extends KeySet> commit) {
+        try {
+            return backingMap.keySet();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a Keys commit.
+     *
+     * @param commit Keys commit
+     * @return a multiset of keys with each key included an equal number of
+     * times to the total key-value pairs in which that key participates
+     */
+    protected Multiset<String> keys(Commit<? extends Keys> commit) {
+        try {
+            Multiset keys = HashMultiset.create();
+            backingMap.forEach((key, mapEntryValue) -> {
+                keys.add(key, mapEntryValue.values().size());
+            });
+            return keys;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a Values commit.
+     *
+     * @param commit Values commit
+     * @return the set of values in the multimap with duplicates included
+     */
+    protected Multiset<byte[]> values(Commit<? extends Values> commit) {
+        try {
+            return backingMap
+                    .values()
+                    .stream()
+                    .collect(new HashMultisetValueCollector());
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles an Entries commit.
+     *
+     * @param commit Entries commit
+     * @return a set of all key-value pairs in the multimap
+     */
+    protected Collection<Map.Entry<String, byte[]>> entries(
+            Commit<? extends Entries> commit) {
+        try {
+            return backingMap
+                    .entrySet()
+                    .stream()
+                    .collect(new EntrySetCollector());
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a Get commit.
+     *
+     * @param commit Get commit
+     * @return the collection of values associated with the key or an empty
+     * list if none exist
+     */
+    protected Versioned<Collection<? extends byte[]>> get(
+            Commit<? extends Get> commit) {
+        try {
+            MapEntryValue mapEntryValue = backingMap.get(commit.operation().key());
+            return toVersioned(backingMap.get(commit.operation().key()));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a removeAll commit, and returns the previous mapping.
+     *
+     * @param commit removeAll commit
+     * @return collection of removed values
+     */
+    protected Versioned<Collection<? extends byte[]>> removeAll(
+            Commit<? extends RemoveAll> commit) {
+        if (!backingMap.containsKey(commit.operation().key())) {
+            commit.close();
+            return new Versioned<>(Sets.newHashSet(), -1);
+        } else {
+            return backingMap.get(commit.operation().key()).addCommit(commit);
+        }
+    }
+
+    /**
+     * Handles a multiRemove commit, returns true if the remove results in any
+     * change.
+     * @param commit multiRemove commit
+     * @return true if any change results, else false
+     */
+    protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
+        if (!backingMap.containsKey(commit.operation().key())) {
+            commit.close();
+            return false;
+        } else {
+            return (backingMap
+                    .get(commit.operation().key())
+                    .addCommit(commit)) != null;
+        }
+    }
+
+    /**
+     * Handles a put commit, returns true if any change results from this
+     * commit.
+     * @param commit a put commit
+     * @return true if this commit results in a change, else false
+     */
+    protected boolean put(Commit<? extends Put> commit) {
+        if (commit.operation().values().isEmpty()) {
+            return false;
+        }
+        if (!backingMap.containsKey(commit.operation().key())) {
+            backingMap.put(commit.operation().key(),
+                           new NonTransactionalCommit(1));
+        }
+        return backingMap
+                .get(commit.operation().key())
+                .addCommit(commit) != null;
+    }
+
+    protected Versioned<Collection<? extends byte[]>> replace(
+            Commit<? extends Replace> commit) {
+        if (!backingMap.containsKey(commit.operation().key())) {
+            backingMap.put(commit.operation().key(),
+                           new NonTransactionalCommit(1));
+        }
+        return backingMap.get(commit.operation().key()).addCommit(commit);
+    }
+
+    @Override
+    public void register(ServerSession session) {
+        super.register(session);
+    }
+
+    @Override
+    public void unregister(ServerSession session) {
+        super.unregister(session);
+    }
+
+    @Override
+    public void expire(ServerSession session) {
+        super.expire(session);
+    }
+
+    @Override
+    public void close(ServerSession session) {
+        super.close(session);
+    }
+
+    private interface MapEntryValue {
+
+        /**
+         * Returns the list of raw {@code byte[]'s}.
+         *
+         * @return list of raw values
+         */
+        Collection<? extends byte[]> values();
+
+        /**
+         * Returns the version of the value.
+         *
+         * @return version
+         */
+        long version();
+
+        /**
+         * Discards the value by invoke appropriate clean up actions.
+         */
+        void discard();
+
+        /**
+         * Add a new commit and modifies the set of values accordingly.
+         * In the case of a replace or removeAll it returns the set of removed
+         * values. In the case of put or multiRemove it returns null for no
+         * change and a set of the added or removed values respectively if a
+         * change resulted.
+         *
+         * @param commit the commit to be added
+         */
+        Versioned<Collection<? extends byte[]>> addCommit(
+                Commit<? extends MultimapCommand> commit);
+    }
+
+    private class NonTransactionalCommit implements MapEntryValue {
+        private long version;
+        private final TreeMap<byte[], CountDownCompleter<Commit>>
+                valueCountdownMap = Maps.newTreeMap(new ByteArrayComparator());
+        /*This is a mapping of commits that added values to the commits
+        * removing those values, they will not be circular because keys will
+        * be exclusively Put and Replace commits and values will be exclusively
+        * Multiremove commits, each time a Put or replace is removed it should
+        * as part of closing go through and countdown each of the remove
+        * commits depending on it.*/
+        private final HashMultimap<Commit, CountDownCompleter<Commit>>
+                additiveToRemovalCommits = HashMultimap.create();
+
+        public NonTransactionalCommit(
+                long version) {
+            //Set the version to current it will only be updated once this is
+            // populated
+            this.version = globalVersion.get();
+        }
+
+        @Override
+        public Collection<? extends byte[]> values() {
+            return valueCountdownMap.keySet();
+        }
+
+        @Override
+        public long version() {
+            return version;
+        }
+
+        @Override
+        public void discard() {
+            valueCountdownMap.values().forEach(completer ->
+                                                   completer.object().close());
+        }
+
+        @Override
+        public Versioned<Collection<? extends byte[]>> addCommit(
+                Commit<? extends MultimapCommand> commit) {
+            Preconditions.checkNotNull(commit);
+            Preconditions.checkNotNull(commit.operation());
+            Versioned<Collection<? extends byte[]>> retVersion;
+
+            if (commit.operation() instanceof Put) {
+                //Using a treeset here sanitizes the input, removing duplicates
+                Set<byte[]> valuesToAdd =
+                        Sets.newTreeSet(new ByteArrayComparator());
+                ((Put) commit.operation()).values().forEach(value -> {
+                    if (!valueCountdownMap.containsKey(value)) {
+                        valuesToAdd.add(value);
+                    }
+                });
+                if (valuesToAdd.isEmpty()) {
+                    //Do not increment or add the commit if no change resulted
+//                    TODO fairly sure the below case is unreachable but
+//                    TODO need to make sure
+//                    if (valueCountdownMap.isEmpty()) {
+//                        backingMap.remove(((Put) commit.operation()).key());
+//                    }
+                    commit.close();
+                    return null;
+                }
+                //When all values from a commit have been removed decrement all
+                //removal commits relying on it and remove itself from the
+                //mapping of additive commits to the commits removing the
+                //values it added. (Only multiremoves will be dependent)
+                CountDownCompleter<Commit> completer =
+                        new CountDownCompleter<>(commit, valuesToAdd.size(),
+                        c -> {
+                            if (additiveToRemovalCommits.containsKey(c)) {
+                                additiveToRemovalCommits.
+                                        get(c).
+                                        forEach(countdown ->
+                                                        countdown.countDown());
+                                additiveToRemovalCommits.removeAll(c);
+                            }
+                            c.close();
+                        });
+                retVersion = new Versioned<>(valuesToAdd, version);
+                valuesToAdd.forEach(value -> valueCountdownMap.put(value,
+                                                                   completer));
+                version++;
+                return retVersion;
+
+            } else if (commit.operation() instanceof Replace) {
+                //Will this work??  Need to check before check-in!
+                Set<byte[]> removedValues = Sets.newHashSet();
+                removedValues.addAll(valueCountdownMap.keySet());
+                retVersion = new Versioned<>(removedValues, version);
+                valueCountdownMap.values().forEach(countdown ->
+                                                   countdown.countDown());
+                valueCountdownMap.clear();
+                Set<byte[]> valuesToAdd =
+                        Sets.newTreeSet(new ByteArrayComparator());
+                ((Replace) commit.operation()).values().forEach(value -> {
+                    valuesToAdd.add(value);
+                });
+                if (valuesToAdd.isEmpty()) {
+                    version = globalVersion.incrementAndGet();
+                    backingMap.remove(((Replace) commit.operation()).key());
+                    //Order is important here, the commit must be closed last
+                    //(or minimally after all uses)
+                    commit.close();
+                    return retVersion;
+                }
+                CountDownCompleter<Commit> completer =
+                        new CountDownCompleter<>(commit, valuesToAdd.size(),
+                                     c -> {
+                                         if (additiveToRemovalCommits
+                                             .containsKey(c)) {
+                                            additiveToRemovalCommits.
+                                                 get(c).
+                                                 forEach(countdown ->
+                                                     countdown.countDown());
+                                             additiveToRemovalCommits.
+                                                     removeAll(c);
+                                         }
+                                         c.close();
+                                     });
+                valuesToAdd.forEach(value ->
+                                    valueCountdownMap.put(value, completer));
+                version = globalVersion.incrementAndGet();
+                return retVersion;
+
+            } else if (commit.operation() instanceof RemoveAll) {
+                Set<byte[]> removed = Sets.newHashSet();
+                //We can assume here that values only appear once and so we
+                //do not need to sanitize the return for duplicates.
+                removed.addAll(valueCountdownMap.keySet());
+                retVersion = new Versioned<>(removed, version);
+                valueCountdownMap.values().forEach(countdown ->
+                                                   countdown.countDown());
+                valueCountdownMap.clear();
+                //In the case of a removeAll all commits will be removed and
+                //unlike the multiRemove case we do not need to consider
+                //dependencies among additive and removal commits.
+
+                //Save the key for use after the commit is closed
+                String key = ((RemoveAll) commit.operation()).key();
+                commit.close();
+                version = globalVersion.incrementAndGet();
+                backingMap.remove(key);
+                return retVersion;
+
+            } else if (commit.operation() instanceof MultiRemove) {
+                //Must first calculate how many commits the removal depends on.
+                //At this time we also sanitize the removal set by adding to a
+                //set with proper handling of byte[] equality.
+                Set<byte[]> removed = Sets.newHashSet();
+                Set<Commit> commitsRemovedFrom = Sets.newHashSet();
+                ((MultiRemove) commit.operation()).values().forEach(value -> {
+                    if (valueCountdownMap.containsKey(value)) {
+                        removed.add(value);
+                        commitsRemovedFrom
+                                .add(valueCountdownMap.get(value).object());
+                    }
+                });
+                //If there is nothing to be removed no action should be taken.
+                if (removed.isEmpty()) {
+                    //Do not increment or add the commit if no change resulted
+                    commit.close();
+                    return null;
+                }
+                //When all additive commits this depends on are closed this can
+                //be closed as well.
+                CountDownCompleter<Commit> completer =
+                        new CountDownCompleter<>(commit,
+                                                 commitsRemovedFrom.size(),
+                                                 c -> c.close());
+                commitsRemovedFrom.forEach(commitRemovedFrom -> {
+                    additiveToRemovalCommits.put(commitRemovedFrom, completer);
+                });
+                //Save key in case countdown results in closing the commit.
+                String removedKey = ((MultiRemove) commit.operation()).key();
+                removed.forEach(removedValue -> {
+                    valueCountdownMap.remove(removedValue).countDown();
+                });
+                //The version is updated locally as well as globally even if
+                //this object will be removed from the map in case any other
+                //party still holds a reference to this object.
+                retVersion = new Versioned<>(removed, version);
+                version = globalVersion.incrementAndGet();
+                if (valueCountdownMap.isEmpty()) {
+                    backingMap
+                            .remove(removedKey);
+                }
+                return retVersion;
+
+            } else {
+                throw new IllegalArgumentException();
+            }
+        }
+    }
+
+    /**
+     * A collector that creates MapEntryValues and creates a multiset of all
+     * values in the map an equal number of times to the number of sets in
+     * which they participate.
+     */
+    private class HashMultisetValueCollector implements
+            Collector<MapEntryValue,
+                    HashMultiset<byte[]>,
+                    HashMultiset<byte[]>> {
+        private HashMultiset<byte[]> multiset = null;
+
+        @Override
+        public Supplier<HashMultiset<byte[]>> supplier() {
+            return new Supplier<HashMultiset<byte[]>>() {
+                @Override
+                public HashMultiset<byte[]> get() {
+                    if (multiset == null) {
+                        multiset = HashMultiset.create();
+                    }
+                    return multiset;
+                }
+            };
+        }
+
+        @Override
+        public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
+            return (multiset, mapEntryValue) ->
+                    multiset.addAll(mapEntryValue.values());
+        }
+
+        @Override
+        public BinaryOperator<HashMultiset<byte[]>> combiner() {
+            return (setOne, setTwo) -> {
+                setOne.addAll(setTwo);
+                return setOne;
+            };
+        }
+
+        @Override
+        public Function<HashMultiset<byte[]>,
+                HashMultiset<byte[]>> finisher() {
+            return (unused) -> multiset;
+        }
+
+        @Override
+        public Set<Characteristics> characteristics() {
+            return EnumSet.of(Characteristics.UNORDERED);
+        }
+    }
+
+    /**
+     * A collector that creates Entries of {@code <String, MapEntryValue>} and
+     * creates a set of entries all key value pairs in the map.
+     */
+    private class EntrySetCollector implements
+            Collector<Map.Entry<String, MapEntryValue>,
+                    Set<Map.Entry<String, byte[]>>,
+                    Set<Map.Entry<String, byte[]>>> {
+        private Set<Map.Entry<String, byte[]>> set = null;
+
+        @Override
+        public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
+            return new Supplier<Set<Map.Entry<String, byte[]>>>() {
+                @Override
+                public Set<Map.Entry<String, byte[]>> get() {
+                    if (set == null) {
+                        set = Sets.newHashSet();
+                    }
+                    return set;
+                }
+            };
+        }
+
+        @Override
+        public BiConsumer<Set<Map.Entry<String, byte[]>>,
+                Map.Entry<String, MapEntryValue>> accumulator() {
+            return (set, entry) -> {
+                entry
+                    .getValue()
+                    .values()
+                    .forEach(byteValue ->
+                             set.add(Maps.immutableEntry(entry.getKey(),
+                                                         byteValue)));
+            };
+        }
+
+        @Override
+        public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
+            return (setOne, setTwo) -> {
+                setOne.addAll(setTwo);
+                return setOne;
+            };
+        }
+
+        @Override
+        public Function<Set<Map.Entry<String, byte[]>>,
+                Set<Map.Entry<String, byte[]>>> finisher() {
+            return (unused) -> set;
+        }
+
+        @Override
+        public Set<Characteristics> characteristics() {
+            return EnumSet.of(Characteristics.UNORDERED);
+        }
+    }
+    /**
+     * Utility for turning a {@code MapEntryValue} to {@code Versioned}.
+     * @param value map entry value
+     * @return versioned instance or an empty list versioned -1 if argument is
+     * null
+     */
+    private Versioned<Collection<? extends byte[]>> toVersioned(
+            MapEntryValue value) {
+        return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
+                new Versioned<>(value.values(),
+                                value.version());
+    }
+
+    private class ByteArrayComparator implements Comparator<byte[]> {
+
+        @Override
+        public int compare(byte[] o1, byte[] o2) {
+            if (Arrays.equals(o1, o2)) {
+                return 0;
+            } else {
+                for (int i = 0; i < o1.length && i < o2.length; i++) {
+                    if (o1[i] < o2[i]) {
+                        return -1;
+                    } else if (o1[i] > o2[i]) {
+                        return 1;
+                    }
+                }
+                return o1.length > o2.length ? 1 : -1;
+            }
+        }
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapTest.java
new file mode 100644
index 0000000..313b098
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimapTest.java
@@ -0,0 +1,543 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.TreeMultiset;
+import com.google.common.io.Files;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.LocalTransport;
+import io.atomix.copycat.server.CopycatServer;
+import io.atomix.copycat.server.storage.Storage;
+import io.atomix.copycat.server.storage.StorageLevel;
+import io.atomix.manager.state.ResourceManagerState;
+import io.atomix.resource.ResourceType;
+import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.util.Tools;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link AsyncConsistentSetMultimap}.
+ */
+public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
+    private final File testDir = Files.createTempDir();
+    private final String keyOne = "hello";
+    private final String keyTwo = "goodbye";
+    private final String keyThree = "foo";
+    private final String keyFour = "bar";
+    private final byte[] valueOne = Tools.getBytesUtf8(keyOne);
+    private final byte[] valueTwo = Tools.getBytesUtf8(keyTwo);
+    private final byte[] valueThree = Tools.getBytesUtf8(keyThree);
+    private final byte[] valueFour = Tools.getBytesUtf8(keyFour);
+    private final List<String> allKeys = Lists.newArrayList(keyOne, keyTwo,
+                                                            keyThree, keyFour);
+    private final List<byte[]> allValues = Lists.newArrayList(valueOne,
+                                                              valueTwo,
+                                                              valueThree,
+                                                              valueFour);
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(AsyncConsistentSetMultimap.class);
+    }
+
+    /**
+     * Test that size behaves correctly (This includes testing of the empty
+     * check).
+     */
+    @Ignore
+    @Test
+    public void testSize() throws Throwable {
+        clearTests();
+        AsyncConsistentSetMultimap map = createResource(3);
+        //Simplest operation case
+        map.isEmpty().thenAccept(result -> assertTrue(result));
+        map.put(keyOne, valueOne).
+                thenAccept(result -> assertTrue(result)).join();
+        map.isEmpty().thenAccept(result -> assertFalse(result));
+        map.size().thenAccept(result -> assertEquals(1, (int) result))
+                .join();
+        //Make sure sizing is dependent on values not keys
+        map.put(keyOne, valueTwo).
+                thenAccept(result -> assertTrue(result)).join();
+        map.size().thenAccept(result -> assertEquals(2, (int) result))
+                .join();
+        //Ensure that double adding has no effect
+        map.put(keyOne, valueOne).
+                thenAccept(result -> assertFalse(result)).join();
+        map.size().thenAccept(result -> assertEquals(2, (int) result))
+                .join();
+        //Check handling for multiple keys
+        map.put(keyTwo, valueOne)
+                .thenAccept(result -> assertTrue(result)).join();
+        map.put(keyTwo, valueTwo)
+                .thenAccept(result -> assertTrue(result)).join();
+        map.size().thenAccept(result -> assertEquals(4, (int) result))
+                .join();
+        //Check size with removal
+        map.remove(keyOne, valueOne).
+                thenAccept(result -> assertTrue(result)).join();
+        map.size().thenAccept(result -> assertEquals(3, (int) result))
+                .join();
+        //Check behavior under remove of non-existant key
+        map.remove(keyOne, valueOne).
+                thenAccept(result -> assertFalse(result)).join();
+        map.size().thenAccept(result -> assertEquals(3, (int) result))
+                .join();
+        //Check clearing the entirety of the map
+        map.clear().join();
+        map.size().thenAccept(result -> assertEquals(0, (int) result))
+                .join();
+        map.isEmpty().thenAccept(result -> assertTrue(result));
+
+        map.destroy().join();
+        clearTests();
+    }
+
+    /**
+     * Contains tests for value, key and entry.
+     */
+    @Ignore
+    @Test
+    public void containsTest() throws Throwable {
+        clearTests();
+        AsyncConsistentSetMultimap map = createResource(3);
+
+        //Populate the maps
+        allKeys.forEach(key -> {
+            map.putAll(key, allValues)
+                    .thenAccept(result -> assertTrue(result)).join();
+        });
+        map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
+
+        //Test key contains positive results
+        allKeys.forEach(key -> {
+            map.containsKey(key)
+                    .thenAccept(result -> assertTrue(result)).join();
+        });
+
+        //Test value contains positive results
+        allValues.forEach(value -> {
+            map.containsValue(value)
+                    .thenAccept(result -> assertTrue(result)).join();
+        });
+
+        //Test contains entry for all possible entries
+        allKeys.forEach(key -> {
+            allValues.forEach(value -> {
+                map.containsEntry(key, value)
+                        .thenAccept(result -> assertTrue(result)).join();
+            });
+        });
+
+        //Test behavior after removals
+        allValues.forEach(value -> {
+            final String[] removedKey = new String[1];
+            allKeys.forEach(key -> {
+                map.remove(key, value)
+                        .thenAccept(result -> assertTrue(result)).join();
+                map.containsEntry(key, value)
+                        .thenAccept(result -> assertFalse(result)).join();
+                removedKey[0] = key;
+            });
+            //Check that contains key works properly for removed keys
+            map.containsKey(removedKey[0])
+                    .thenAccept(result -> assertFalse(result));
+        });
+
+        //Check that contains value works correctly for removed values
+        allValues.forEach(value -> {
+            map.containsValue(value)
+                    .thenAccept(result -> assertFalse(result)).join();
+        });
+
+        map.destroy().join();
+        clearTests();
+    }
+
+    /**
+     * Contains tests for put, putAll, remove, removeAll and replace.
+     * @throws Exception
+     */
+    @Ignore
+    @Test
+    public void addAndRemoveTest() throws Exception {
+        clearTests();
+        AsyncConsistentSetMultimap map = createResource(3);
+
+        //Test single put
+        allKeys.forEach(key -> {
+            //Value should actually be added here
+            allValues.forEach(value -> {
+                map.put(key, value)
+                        .thenAccept(result -> assertTrue(result)).join();
+                //Duplicate values should be ignored here
+                map.put(key, value)
+                        .thenAccept(result -> assertFalse(result)).join();
+            });
+        });
+
+        //Test single remove
+        allKeys.forEach(key -> {
+            //Value should actually be added here
+            allValues.forEach(value -> {
+                map.remove(key, value)
+                        .thenAccept(result -> assertTrue(result)).join();
+                //Duplicate values should be ignored here
+                map.remove(key, value)
+                        .thenAccept(result -> assertFalse(result)).join();
+            });
+        });
+
+        map.isEmpty().thenAccept(result -> assertTrue(result)).join();
+
+        //Test multi put
+        allKeys.forEach(key -> {
+            map.putAll(key, Lists.newArrayList(allValues.subList(0, 2)))
+                    .thenAccept(result -> assertTrue(result)).join();
+            map.putAll(key, Lists.newArrayList(allValues.subList(0, 2)))
+                    .thenAccept(result -> assertFalse(result)).join();
+            map.putAll(key, Lists.newArrayList(allValues.subList(2, 4)))
+                    .thenAccept(result -> assertTrue(result)).join();
+            map.putAll(key, Lists.newArrayList(allValues.subList(2, 4)))
+                    .thenAccept(result -> assertFalse(result)).join();
+
+        });
+
+        //Test multi remove
+        allKeys.forEach(key -> {
+            //Split the lists to test how multiRemove can work piecewise
+            map.removeAll(key, Lists.newArrayList(allValues.subList(0, 2)))
+                    .thenAccept(result -> assertTrue(result)).join();
+            map.removeAll(key, Lists.newArrayList(allValues.subList(0, 2)))
+                    .thenAccept(result -> assertFalse(result)).join();
+            map.removeAll(key, Lists.newArrayList(allValues.subList(2, 4)))
+                    .thenAccept(result -> assertTrue(result)).join();
+            map.removeAll(key, Lists.newArrayList(allValues.subList(2, 4)))
+                    .thenAccept(result -> assertFalse(result)).join();
+        });
+
+        map.isEmpty().thenAccept(result -> assertTrue(result)).join();
+
+        //Repopulate for next test
+        allKeys.forEach(key -> {
+            map.putAll(key, allValues)
+                    .thenAccept(result -> assertTrue(result)).join();
+        });
+
+        map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
+
+        //Test removeAll of entire entry
+        allKeys.forEach(key -> {
+            map.removeAll(key).thenAccept(result -> {
+                assertTrue(
+                        byteArrayCollectionIsEqual(allValues, result.value()));
+            }).join();
+            map.removeAll(key).thenAccept(result -> {
+                assertFalse(
+                        byteArrayCollectionIsEqual(allValues, result.value()));
+            }).join();
+        });
+
+        map.isEmpty().thenAccept(result -> assertTrue(result)).join();
+
+        //Repopulate for next test
+        allKeys.forEach(key -> {
+            map.putAll(key, allValues)
+                    .thenAccept(result -> assertTrue(result)).join();
+        });
+
+        map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
+
+        allKeys.forEach(key -> {
+            map.replaceValues(key, allValues)
+                    .thenAccept(result ->
+                        assertTrue(byteArrayCollectionIsEqual(allValues,
+                                                              result.value())))
+                    .join();
+            map.replaceValues(key, Lists.newArrayList())
+                    .thenAccept(result ->
+                        assertTrue(byteArrayCollectionIsEqual(allValues,
+                                                              result.value())))
+                    .join();
+            map.replaceValues(key, allValues)
+                    .thenAccept(result ->
+                        assertTrue(result.value().isEmpty()))
+                    .join();
+        });
+
+
+        //Test replacements of partial sets
+        map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
+
+        allKeys.forEach(key -> {
+            map.remove(key, valueOne)
+                    .thenAccept(result ->
+                                        assertTrue(result)).join();
+            map.replaceValues(key, Lists.newArrayList())
+                    .thenAccept(result ->
+                        assertTrue(byteArrayCollectionIsEqual(
+                                Lists.newArrayList(valueTwo, valueThree,
+                                                   valueFour),
+                                result.value())))
+                    .join();
+            map.replaceValues(key, allValues)
+                    .thenAccept(result ->
+                                        assertTrue(result.value().isEmpty()))
+                    .join();
+        });
+
+        map.destroy().join();
+        clearTests();
+    }
+
+    /**
+     * Tests the get, keySet, keys, values, and entries implementations as well
+     * as a trivial test of the asMap functionality (throws error)
+     * @throws Exception
+     */
+    @Ignore
+    @Test
+    public void testAccessors() throws Exception {
+        clearTests();
+        AsyncConsistentSetMultimap map = createResource(3);
+
+        //Populate for full map behavior tests
+        allKeys.forEach(key -> {
+            map.putAll(key, allValues)
+                    .thenAccept(result -> assertTrue(result)).join();
+        });
+
+        map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
+
+        allKeys.forEach(key -> {
+                map.get(key).thenAccept(result -> {
+                    assertTrue(byteArrayCollectionIsEqual(allValues,
+                                                          result.value()));
+                }).join();
+            });
+
+        //Test that the key set is correct
+        map.keySet()
+                .thenAccept(result ->
+                            assertTrue(stringArrayCollectionIsEqual(allKeys,
+                                                                    result)))
+                .join();
+        //Test that the correct set and occurrence of values are found in the
+        //values result
+        map.values().thenAccept(result -> {
+            final Multiset<byte[]> set = TreeMultiset.create(
+                    new ByteArrayComparator());
+            for (int i = 0; i < 4; i++) {
+                set.addAll(allValues);
+            }
+            assertEquals(16, result.size());
+            result.forEach(value -> assertTrue(set.remove(value)));
+            assertTrue(set.isEmpty());
+
+        }).join();
+
+        //Test that keys returns the right result including the correct number
+        //of each item
+        map.keys().thenAccept(result -> {
+            final Multiset<String> set = TreeMultiset.create();
+            for (int i = 0; i < 4; i++) {
+                set.addAll(allKeys);
+            }
+            assertEquals(16, result.size());
+            result.forEach(value -> assertTrue(set.remove(value)));
+            assertTrue(set.isEmpty());
+
+        }).join();
+
+        //Test that the right combination of key, value pairs are present
+        map.entries().thenAccept(result -> {
+            final Multiset<Map.Entry<String, byte[]>> set =
+                    TreeMultiset.create(new EntryComparator());
+            allKeys.forEach(key -> {
+                allValues.forEach(value -> {
+                    set.add(new DefaultMapEntry(key, value));
+                });
+            });
+            assertEquals(16, result.size());
+            result.forEach(entry -> assertTrue(set.remove(entry)));
+            assertTrue(set.isEmpty());
+        }).join();
+
+
+        //Testing for empty map behavior
+        map.clear().join();
+
+        allKeys.forEach(key -> {
+            map.get(key).thenAccept(result -> {
+                assertTrue(result.value().isEmpty());
+            }).join();
+        });
+
+        map.keySet().thenAccept(result -> assertTrue(result.isEmpty())).join();
+        map.values().thenAccept(result -> assertTrue(result.isEmpty())).join();
+        map.keys().thenAccept(result -> assertTrue(result.isEmpty())).join();
+        map.entries()
+                .thenAccept(result -> assertTrue(result.isEmpty())).join();
+
+        map.destroy();
+        clearTests();
+    }
+
+
+    private AsyncConsistentSetMultimap createResource(int clusterSize) {
+        try {
+            createCopycatServers(clusterSize);
+            AsyncConsistentSetMultimap map = createAtomixClient().
+                    getResource("testMap", AsyncConsistentSetMultimap.class)
+                    .join();
+            return map;
+        } catch (Throwable e) {
+            throw new RuntimeException(e.toString());
+        }
+    }
+
+    @Override
+    protected CopycatServer createCopycatServer(Address address) {
+        CopycatServer server = CopycatServer.builder(address, members)
+                .withTransport(new LocalTransport(registry))
+                .withStorage(Storage.builder()
+                                     .withStorageLevel(StorageLevel.MEMORY)
+                                     .withDirectory(testDir + "/" + address.port())
+                                     .build())
+                .withStateMachine(ResourceManagerState::new)
+                .withSerializer(serializer.clone())
+                .withHeartbeatInterval(Duration.ofMillis(25))
+                .withElectionTimeout(Duration.ofMillis(50))
+                .withSessionTimeout(Duration.ofMillis(100))
+                .build();
+        copycatServers.add(server);
+        return server;    }
+
+    /**
+     * Returns two arrays contain the same set of elements,
+     * regardless of order.
+     * @param o1 first collection
+     * @param o2 second collection
+     * @return true if they contain the same elements
+     */
+    private boolean byteArrayCollectionIsEqual(
+            Collection<? extends byte[]> o1, Collection<? extends byte[]> o2) {
+        if (o1 == null || o2 == null || o1.size() != o2.size()) {
+            return false;
+        }
+        for (byte[] array1 : o1) {
+            boolean matched = false;
+            for (byte[] array2 : o2) {
+                if (Arrays.equals(array1, array2)) {
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Compares two collections of strings returns true if they contain the
+     * same strings, false otherwise.
+     * @param s1 string collection one
+     * @param s2 string collection two
+     * @return true if the two sets contain the same strings
+     */
+    private boolean stringArrayCollectionIsEqual(
+            Collection<? extends String> s1, Collection<? extends String> s2) {
+        if (s1 == null || s2 == null || s1.size() != s2.size()) {
+            return false;
+        }
+        for (String string1 : s1) {
+            boolean matched = false;
+            for (String string2 : s2) {
+                if (string1.equals(string2)) {
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Byte array comparator implementation.
+     */
+    private class ByteArrayComparator implements Comparator<byte[]> {
+
+        @Override
+        public int compare(byte[] o1, byte[] o2) {
+            if (Arrays.equals(o1, o2)) {
+                return 0;
+            } else {
+                for (int i = 0; i < o1.length && i < o2.length; i++) {
+                    if (o1[i] < o2[i]) {
+                        return -1;
+                    } else if (o1[i] > o2[i]) {
+                        return 1;
+                    }
+                }
+                return o1.length > o2.length ? 1 : -1;
+            }
+        }
+    }
+
+    /**
+     * Entry comparator, uses both key and value to determine equality,
+     * for comparison falls back to the default string comparator.
+     */
+    private class EntryComparator
+            implements Comparator<Map.Entry<String, byte[]>> {
+
+        @Override
+        public int compare(Map.Entry<String, byte[]> o1,
+                           Map.Entry<String, byte[]> o2) {
+            if (o1 == null || o1.getKey() == null || o2 == null ||
+                    o2.getKey() == null) {
+                throw new IllegalArgumentException();
+            }
+            if (o1.getKey().equals(o2.getKey()) &&
+                    Arrays.equals(o1.getValue(), o2.getValue())) {
+                return 0;
+            } else {
+                return o1.getKey().compareTo(o2.getKey());
+            }
+        }
+    }
+}