Moving group store to consistent map

Change-Id: Id3c23c0cd9d7c713bceffc7bd9125aa3de99c45e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 1d7ded1..64b7c87 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -56,11 +56,12 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.service.MultiValuedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -70,6 +71,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
@@ -79,7 +81,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
@@ -113,12 +114,12 @@
     protected MastershipService mastershipService;
 
     // Per device group table with (device id + app cookie) as key
-    private EventuallyConsistentMap<GroupStoreKeyMapKey,
+    private ConsistentMap<GroupStoreKeyMapKey,
         StoredGroupEntry> groupStoreEntriesByKey = null;
     // Per device group table with (device id + group id) as key
     private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
         groupEntriesById = new ConcurrentHashMap<>();
-    private EventuallyConsistentMap<GroupStoreKeyMapKey,
+    private ConsistentMap<GroupStoreKeyMapKey,
         StoredGroupEntry> auditPendingReqQueue = null;
     private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
             extraneousGroupEntriesById = new ConcurrentHashMap<>();
@@ -131,8 +132,6 @@
 
     private KryoNamespace.Builder kryoBuilder = null;
 
-    private final AtomicLong sequenceNumber = new AtomicLong(0);
-
     private KryoNamespace clusterMsgSerializer;
 
     @Activate
@@ -169,29 +168,21 @@
                 this::process,
                 messageHandlingExecutor);
 
-        log.debug("Creating EC map groupstorekeymap");
-        EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
-                keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
+        log.debug("Creating Consistent map onos-group-store-keymap");
 
-        groupStoreEntriesByKey = keyMapBuilder
-                .withName("groupstorekeymap")
-                .withSerializer(kryoBuilder)
-                .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
-                                                                            sequenceNumber.getAndIncrement()))
+        groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
+                .withName("onos-group-store-keymap")
+                .withSerializer(Serializer.using(kryoBuilder.build()))
                 .build();
         groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
         log.debug("Current size of groupstorekeymap:{}",
                   groupStoreEntriesByKey.size());
 
-        log.debug("Creating EC map pendinggroupkeymap");
-        EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
-                auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
+        log.debug("Creating Consistent map pendinggroupkeymap");
 
-        auditPendingReqQueue = auditMapBuilder
-                .withName("pendinggroupkeymap")
-                .withSerializer(kryoBuilder)
-                .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
-                                                                            sequenceNumber.getAndIncrement()))
+        auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
+                .withName("onos-pending-group-keymap")
+                .withSerializer(Serializer.using(kryoBuilder.build()))
                 .build();
         log.debug("Current size of pendinggroupkeymap:{}",
                   auditPendingReqQueue.size());
@@ -222,9 +213,9 @@
      *
      * @return Map representing group key table.
      */
-    private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+    private Map<GroupStoreKeyMapKey, StoredGroupEntry>
         getGroupStoreKeyMap() {
-        return groupStoreEntriesByKey;
+        return groupStoreEntriesByKey.asJavaMap();
     }
 
     /**
@@ -243,9 +234,9 @@
      *
      * @return Map representing group key table.
      */
-    private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+    private Map<GroupStoreKeyMapKey, StoredGroupEntry>
         getPendingGroupKeyTable() {
-        return auditPendingReqQueue;
+        return auditPendingReqQueue.asJavaMap();
     }
 
     /**
@@ -445,7 +436,7 @@
                     groupDesc.deviceId());
             StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
             group.setState(GroupState.WAITING_AUDIT_COMPLETE);
-            EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
+            Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
                     getPendingGroupKeyTable();
             pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
                                                         groupDesc.appCookie()),
@@ -851,7 +842,7 @@
         Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
                 new HashSet<>();
 
-        groupStoreEntriesByKey.entrySet().stream()
+        getGroupStoreKeyMap().entrySet().stream()
                 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
                 .forEach(entryPendingRemove::add);
 
@@ -987,14 +978,13 @@
      * Map handler to receive any events when the group key map is updated.
      */
     private class GroupStoreKeyMapListener implements
-            EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
+            MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
 
         @Override
-        public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
-                                  StoredGroupEntry> mapEvent) {
+        public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
             GroupEvent groupEvent = null;
             GroupStoreKeyMapKey key = mapEvent.key();
-            StoredGroupEntry group = mapEvent.value();
+            StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
             if ((key == null) && (group == null)) {
                 log.error("GroupStoreKeyMapListener: Received "
                         + "event {} with null entry", mapEvent.type());
@@ -1014,25 +1004,24 @@
                       mapEvent.type(),
                       group.id(),
                       key.deviceId());
-            if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
+            if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
                 // Update the group ID table
                 getGroupIdTable(group.deviceId()).put(group.id(), group);
-                if (mapEvent.value().state() == Group.GroupState.ADDED) {
-                    if (mapEvent.value().isGroupStateAddedFirstTime()) {
-                        groupEvent = new GroupEvent(Type.GROUP_ADDED,
-                                                    mapEvent.value());
+                StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
+                if (value.state() == Group.GroupState.ADDED) {
+                    if (value.isGroupStateAddedFirstTime()) {
+                        groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
                         log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
                                 group.id(),
                                 group.deviceId());
                     } else {
-                        groupEvent = new GroupEvent(Type.GROUP_UPDATED,
-                                                    mapEvent.value());
+                        groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
                         log.trace("Received following GROUP_ADDED state update for id {} in device {}",
                                 group.id(),
                                 group.deviceId());
                     }
                 }
-            } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
+            } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
                 groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
                 // Remove the entry from the group ID table
                 getGroupIdTable(group.deviceId()).remove(group.id(), group);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 066fced..4f37504 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -46,7 +46,7 @@
 import org.onosproject.net.group.GroupStore;
 import org.onosproject.net.group.GroupStoreDelegate;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
-import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.TestStorageService;
 
 import com.google.common.collect.ImmutableList;
@@ -109,7 +109,7 @@
 
     DistributedGroupStore groupStoreImpl;
     GroupStore groupStore;
-    EventuallyConsistentMap auditPendingReqQueue;
+    ConsistentMap auditPendingReqQueue;
 
     static class MasterOfAll extends MastershipServiceAdapter {
         @Override