[ONOS-7551] Support primitive revisions for upgrades

Change-Id: Ib56e10f06ab9abedd176cdd84add6cbf4e3d4c50
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 0e53f77..b6f27f0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -19,7 +19,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import com.google.common.base.Charsets;
@@ -44,10 +43,20 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.AtomicCounterMapOptions;
+import org.onosproject.store.service.AtomicCounterOptions;
+import org.onosproject.store.service.AtomicIdGeneratorOptions;
+import org.onosproject.store.service.AtomicValueOptions;
+import org.onosproject.store.service.ConsistentMapOptions;
+import org.onosproject.store.service.ConsistentMultimapOptions;
+import org.onosproject.store.service.ConsistentTreeMapOptions;
+import org.onosproject.store.service.DistributedLockOptions;
+import org.onosproject.store.service.DistributedSetOptions;
 import org.onosproject.store.service.DocumentPath;
-import org.onosproject.store.service.Ordering;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DocumentTreeOptions;
+import org.onosproject.store.service.LeaderElectorOptions;
 import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueOptions;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -72,13 +81,11 @@
     }
 
     @Override
-    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
-        checkNotNull(name);
-        checkNotNull(serializer);
+    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(ConsistentMapOptions options) {
         Map<PartitionId, AsyncConsistentMap<byte[], byte[]>> maps =
                 Maps.transformValues(members,
                                      partition -> DistributedPrimitives.newTranscodingMap(
-                                             partition.<String, byte[]>newAsyncConsistentMap(name, null),
+                                             partition.<String, byte[]>newAsyncConsistentMap(options.name(), null),
                                              HexString::toHexString,
                                              HexString::fromHexString,
                                              Function.identity(),
@@ -87,78 +94,76 @@
             int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
             return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
         };
-        AsyncConsistentMap<byte[], byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+        AsyncConsistentMap<byte[], byte[]> partitionedMap =
+            new PartitionedAsyncConsistentMap<>(options.name(), maps, hasher);
         return DistributedPrimitives.newTranscodingMap(partitionedMap,
-                key -> serializer.encode(key),
-                bytes -> serializer.decode(bytes),
-                value -> value == null ? null : serializer.encode(value),
-                bytes -> serializer.decode(bytes));
+                key -> options.serializer().encode(key),
+                bytes -> options.serializer().decode(bytes),
+                value -> value == null ? null : options.serializer().encode(value),
+                bytes -> options.serializer().decode(bytes));
     }
 
     @Override
-    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
-        return getCreator(name).newAsyncConsistentTreeMap(name, serializer);
+    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(ConsistentTreeMapOptions options) {
+        return getCreator(options.name()).newAsyncConsistentTreeMap(options.name(), options.serializer());
     }
 
     @Override
-    public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
-        return getCreator(name).newAsyncConsistentSetMultimap(name, serializer);
+    public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(ConsistentMultimapOptions options) {
+        return getCreator(options.name()).newAsyncConsistentSetMultimap(options);
     }
 
     @Override
-    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
-        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
+    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(DistributedSetOptions options) {
+        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(options.name(), options.serializer()));
     }
 
     @Override
-    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
-        return getCreator(name).newAsyncAtomicCounterMap(name, serializer);
+    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(AtomicCounterMapOptions options) {
+        return getCreator(options.name()).newAsyncAtomicCounterMap(options);
     }
 
     @Override
-    public AsyncAtomicCounter newAsyncCounter(String name) {
-        return getCreator(name).newAsyncCounter(name);
+    public AsyncAtomicCounter newAsyncCounter(AtomicCounterOptions options) {
+        return getCreator(options.name()).newAsyncCounter(options);
     }
 
     @Override
-    public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
-        return getCreator(name).newAsyncIdGenerator(name);
+    public AsyncAtomicIdGenerator newAsyncIdGenerator(AtomicIdGeneratorOptions options) {
+        return getCreator(options.name()).newAsyncIdGenerator(options);
     }
 
     @Override
-    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
-        return getCreator(name).newAsyncAtomicValue(name, serializer);
+    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(AtomicValueOptions options) {
+        return getCreator(options.name()).newAsyncAtomicValue(options);
     }
 
     @Override
-    public AsyncDistributedLock newAsyncDistributedLock(String name) {
-        return getCreator(name).newAsyncDistributedLock(name);
+    public AsyncDistributedLock newAsyncDistributedLock(DistributedLockOptions options) {
+        return getCreator(options.name()).newAsyncDistributedLock(options);
     }
 
     @Override
-    public AsyncLeaderElector newAsyncLeaderElector(String name, long leaderTimeout, TimeUnit timeUnit) {
-        checkNotNull(name);
+    public AsyncLeaderElector newAsyncLeaderElector(LeaderElectorOptions options) {
         Map<PartitionId, AsyncLeaderElector> leaderElectors =
                 Maps.transformValues(members,
-                                     partition -> partition.newAsyncLeaderElector(name, leaderTimeout, timeUnit));
+                                     partition -> partition.newAsyncLeaderElector(options));
         Hasher<String> hasher = topic -> {
             int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
             return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
         };
-        return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher);
+        return new PartitionedAsyncLeaderElector(options.name(), leaderElectors, hasher);
     }
 
     @Override
-    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
-        return getCreator(name).newWorkQueue(name, serializer);
+    public <E> WorkQueue<E> newWorkQueue(WorkQueueOptions options) {
+        return getCreator(options.name()).newWorkQueue(options);
     }
 
     @Override
-    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer, Ordering ordering) {
-        checkNotNull(name);
-        checkNotNull(serializer);
+    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(DocumentTreeOptions options) {
         Map<PartitionId, AsyncDocumentTree<V>> trees =
-                Maps.transformValues(members, part -> part.<V>newAsyncDocumentTree(name, serializer, ordering));
+                Maps.transformValues(members, part -> part.<V>newAsyncDocumentTree(options));
         Hasher<DocumentPath> hasher = key -> {
             int bucket = (key == null) ? 0 :
                     Math.abs(Hashing.murmur3_32()
@@ -166,7 +171,7 @@
                                   .asInt()) % buckets;
             return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
         };
-        return new PartitionedAsyncDocumentTree<>(name, trees, hasher);
+        return new PartitionedAsyncDocumentTree<>(options.name(), trees, hasher);
     }
 
     @Override