Distributed group store using eventual consistent map abstraction

Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index c914784..a571b4e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -16,25 +16,48 @@
 package org.onosproject.store.group.impl;
 
 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.NewConcurrentHashMap;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.core.DefaultGroupId;
 import org.onosproject.core.GroupId;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L0ModificationInstruction;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flow.instructions.L3ModificationInstruction;
 import org.onosproject.net.group.DefaultGroup;
+import org.onosproject.net.group.DefaultGroupBucket;
 import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
 import org.onosproject.net.group.Group;
 import org.onosproject.net.group.Group.GroupState;
 import org.onosproject.net.group.GroupBucket;
@@ -48,10 +71,21 @@
 import org.onosproject.net.group.GroupStoreDelegate;
 import org.onosproject.net.group.StoredGroupEntry;
 import org.onosproject.store.AbstractStore;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.ecmap.EventuallyConsistentMap;
+import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
+import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
+import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.impl.MultiValuedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
-import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
 
 /**
  * Manages inventory of group entries using trivial in-memory implementation.
@@ -67,85 +101,165 @@
     private final int dummyId = 0xffffffff;
     private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
 
-    // inner Map is per device group table
-    private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
-            groupEntriesByKey = new ConcurrentHashMap<>();
-    private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
-            groupEntriesById = new ConcurrentHashMap<>();
-    private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
-            pendingGroupEntriesByKey = new ConcurrentHashMap<>();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    // Per device group table with (device id + app cookie) as key
+    private EventuallyConsistentMap<GroupStoreKeyMapKey,
+        StoredGroupEntry> groupStoreEntriesByKey = null;
+    // Per device group table with (device id + group id) as key
+    private EventuallyConsistentMap<GroupStoreIdMapKey,
+        StoredGroupEntry> groupStoreEntriesById = null;
+    private EventuallyConsistentMap<GroupStoreKeyMapKey,
+        StoredGroupEntry> auditPendingReqQueue = null;
     private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
             extraneousGroupEntriesById = new ConcurrentHashMap<>();
+    private ExecutorService messageHandlingExecutor;
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
 
     private final HashMap<DeviceId, Boolean> deviceAuditStatus =
             new HashMap<DeviceId, Boolean>();
 
     private final AtomicInteger groupIdGen = new AtomicInteger();
 
+    private KryoNamespace.Builder kryoBuilder = null;
+
     @Activate
     public void activate() {
+        kryoBuilder = new KryoNamespace.Builder()
+            .register(DefaultGroup.class,
+                      DefaultGroupBucket.class,
+                      DefaultGroupDescription.class,
+                      DefaultGroupKey.class,
+                      GroupDescription.Type.class,
+                      Group.GroupState.class,
+                      GroupBuckets.class,
+                      DefaultGroupId.class,
+                      GroupStoreMessage.class,
+                      GroupStoreMessage.Type.class,
+                      UpdateType.class,
+                      GroupStoreMessageSubjects.class,
+                      MultiValuedTimestamp.class,
+                      GroupStoreKeyMapKey.class,
+                      GroupStoreIdMapKey.class,
+                      GroupStoreMapKey.class
+                    )
+            .register(URI.class)
+            .register(DeviceId.class)
+            .register(PortNumber.class)
+            .register(DefaultApplicationId.class)
+            .register(DefaultTrafficTreatment.class,
+                      Instructions.DropInstruction.class,
+                      Instructions.OutputInstruction.class,
+                      Instructions.GroupInstruction.class,
+                      Instructions.TableTypeTransition.class,
+                      FlowRule.Type.class,
+                      L0ModificationInstruction.class,
+                      L0ModificationInstruction.L0SubType.class,
+                      L0ModificationInstruction.ModLambdaInstruction.class,
+                      L2ModificationInstruction.class,
+                      L2ModificationInstruction.L2SubType.class,
+                      L2ModificationInstruction.ModEtherInstruction.class,
+                      L2ModificationInstruction.PushHeaderInstructions.class,
+                      L2ModificationInstruction.ModVlanIdInstruction.class,
+                      L2ModificationInstruction.ModVlanPcpInstruction.class,
+                      L2ModificationInstruction.ModMplsLabelInstruction.class,
+                      L2ModificationInstruction.ModMplsTtlInstruction.class,
+                      L3ModificationInstruction.class,
+                      L3ModificationInstruction.L3SubType.class,
+                      L3ModificationInstruction.ModIPInstruction.class,
+                      L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
+                      L3ModificationInstruction.ModTtlInstruction.class,
+                      org.onlab.packet.MplsLabel.class
+                    )
+            .register(org.onosproject.cluster.NodeId.class)
+            .register(KryoNamespaces.BASIC)
+            .register(KryoNamespaces.MISC);
+
+        messageHandlingExecutor = Executors.
+                newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                                   groupedThreads("onos/store/group",
+                                                  "message-handlers"));
+        clusterCommunicator.
+            addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                          new ClusterGroupMsgHandler(),
+                          messageHandlingExecutor);
+
+        log.debug("Creating EC map groupstorekeymap");
+        groupStoreEntriesByKey =
+                new EventuallyConsistentMapImpl<>("groupstorekeymap",
+                        clusterService,
+                        clusterCommunicator,
+                        kryoBuilder,
+                        new GroupStoreLogicalClockManager<>());
+        log.trace("Current size {}", groupStoreEntriesByKey.size());
+
+        log.debug("Creating EC map groupstoreidmap");
+        groupStoreEntriesById =
+                new EventuallyConsistentMapImpl<>("groupstoreidmap",
+                        clusterService,
+                        clusterCommunicator,
+                        kryoBuilder,
+                        new GroupStoreLogicalClockManager<>());
+        groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
+        log.trace("Current size {}", groupStoreEntriesById.size());
+
+        log.debug("Creating EC map pendinggroupkeymap");
+        auditPendingReqQueue =
+                new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
+                clusterService,
+                clusterCommunicator,
+                kryoBuilder,
+                new GroupStoreLogicalClockManager<>());
+        log.trace("Current size {}", auditPendingReqQueue.size());
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        groupEntriesByKey.clear();
-        groupEntriesById.clear();
         log.info("Stopped");
     }
 
-    private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
-    lazyEmptyGroupKeyTable() {
-        return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
-    }
-
-    private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
-    lazyEmptyGroupIdTable() {
-        return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
-    }
-
-    private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
-    lazyEmptyPendingGroupKeyTable() {
-        return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
-    }
-
     private static NewConcurrentHashMap<GroupId, Group>
-    lazyEmptyExtraneousGroupIdTable() {
+        lazyEmptyExtraneousGroupIdTable() {
         return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
     }
 
     /**
-     * Returns the group key table for specified device.
+     * Returns the group store eventual consistent key map.
      *
-     * @param deviceId identifier of the device
-     * @return Map representing group key table of given device.
+     * @return Map representing group key table.
      */
-    private ConcurrentMap<GroupKey, StoredGroupEntry> getGroupKeyTable(DeviceId deviceId) {
-        return createIfAbsentUnchecked(groupEntriesByKey,
-                                       deviceId, lazyEmptyGroupKeyTable());
+    private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+        getGroupStoreKeyMap() {
+        return groupStoreEntriesByKey;
     }
 
     /**
-     * Returns the group id table for specified device.
+     * Returns the group store eventual consistent id map.
      *
-     * @param deviceId identifier of the device
-     * @return Map representing group key table of given device.
+     * @return Map representing group id table.
      */
-    private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
-        return createIfAbsentUnchecked(groupEntriesById,
-                                       deviceId, lazyEmptyGroupIdTable());
+    private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
+        getGroupStoreIdMap() {
+        return groupStoreEntriesById;
     }
 
     /**
-     * Returns the pending group key table for specified device.
+     * Returns the pending group request table.
      *
-     * @param deviceId identifier of the device
-     * @return Map representing group key table of given device.
+     * @return Map representing group key table.
      */
-    private ConcurrentMap<GroupKey, StoredGroupEntry>
-    getPendingGroupKeyTable(DeviceId deviceId) {
-        return createIfAbsentUnchecked(pendingGroupEntriesByKey,
-                                       deviceId, lazyEmptyPendingGroupKeyTable());
+    private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+        getPendingGroupKeyTable() {
+        return auditPendingReqQueue;
     }
 
     /**
@@ -168,8 +282,8 @@
      */
     @Override
     public int getGroupCount(DeviceId deviceId) {
-        return (groupEntriesByKey.get(deviceId) != null) ?
-                groupEntriesByKey.get(deviceId).size() : 0;
+        return (getGroups(deviceId) != null) ?
+                         Iterables.size(getGroups(deviceId)) : 0;
     }
 
     /**
@@ -182,16 +296,11 @@
     @Override
     public Iterable<Group> getGroups(DeviceId deviceId) {
         // flatten and make iterator unmodifiable
-        return FluentIterable.from(getGroupKeyTable(deviceId).values())
-                .transform(
-                        new Function<StoredGroupEntry, Group>() {
-
-                            @Override
-                            public Group apply(
-                                    StoredGroupEntry input) {
-                                return input;
-                            }
-                        });
+        log.trace("getGroups: for device {} total number of groups {}",
+                  deviceId, getGroupStoreKeyMap().values().size());
+        return FluentIterable.from(getGroupStoreKeyMap().values())
+                .filter(input -> input.deviceId().equals(deviceId))
+                .transform(input -> input);
     }
 
     /**
@@ -204,19 +313,31 @@
      */
     @Override
     public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
-        return (groupEntriesByKey.get(deviceId) != null) ?
-                groupEntriesByKey.get(deviceId).get(appCookie) :
-                null;
+        return getStoredGroupEntry(deviceId, appCookie);
+    }
+
+    private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
+                                                 GroupKey appCookie) {
+        return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
+                                                                 appCookie));
+    }
+
+    @Override
+    public Group getGroup(DeviceId deviceId, GroupId groupId) {
+        return getStoredGroupEntry(deviceId, groupId);
+    }
+
+    private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
+                                                 GroupId groupId) {
+        return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
+                                                               groupId));
     }
 
     private int getFreeGroupIdValue(DeviceId deviceId) {
         int freeId = groupIdGen.incrementAndGet();
 
         while (true) {
-            Group existing = (
-                    groupEntriesById.get(deviceId) != null) ?
-                    groupEntriesById.get(deviceId).get(new DefaultGroupId(freeId)) :
-                    null;
+            Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
             if (existing == null) {
                 existing = (
                         extraneousGroupEntriesById.get(deviceId) != null) ?
@@ -240,23 +361,45 @@
      */
     @Override
     public void storeGroupDescription(GroupDescription groupDesc) {
+        log.trace("In storeGroupDescription");
         // Check if a group is existing with the same key
         if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
+            log.warn("Group already exists with the same key {}",
+                     groupDesc.appCookie());
             return;
         }
 
-        if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
-            // Device group audit has not completed yet
-            // Add this group description to pending group key table
-            // Create a group entry object with Dummy Group ID
-            StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
-            group.setState(GroupState.WAITING_AUDIT_COMPLETE);
-            ConcurrentMap<GroupKey, StoredGroupEntry> pendingKeyTable =
-                    getPendingGroupKeyTable(groupDesc.deviceId());
-            pendingKeyTable.put(groupDesc.appCookie(), group);
+        // Check if group to be created by a remote instance
+        if (mastershipService.getLocalRole(
+                     groupDesc.deviceId()) != MastershipRole.MASTER) {
+            log.debug("Device {} local role is not MASTER",
+                      groupDesc.deviceId());
+            GroupStoreMessage groupOp = GroupStoreMessage.
+                    createGroupAddRequestMsg(groupDesc.deviceId(),
+                                             groupDesc);
+            ClusterMessage message = new ClusterMessage(
+                                    clusterService.getLocalNode().id(),
+                                    GroupStoreMessageSubjects.
+                                    REMOTE_GROUP_OP_REQUEST,
+                                    kryoBuilder.build().serialize(groupOp));
+            if (!clusterCommunicator.unicast(message,
+                                             mastershipService.
+                                             getMasterFor(
+                                                groupDesc.deviceId()))) {
+                log.warn("Failed to send request to master: {} to {}",
+                         message,
+                         mastershipService.getMasterFor(groupDesc.deviceId()));
+                //TODO: Send Group operation failure event
+            }
+            log.debug("Sent Group operation request for device {} "
+                    + "to remote MASTER {}",
+                      groupDesc.deviceId(),
+                      mastershipService.getMasterFor(groupDesc.deviceId()));
             return;
         }
 
+        log.debug("Store group for device {} is getting handled locally",
+                  groupDesc.deviceId());
         storeGroupDescriptionInternal(groupDesc);
     }
 
@@ -266,17 +409,34 @@
             return;
         }
 
+        if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
+            // Device group audit has not completed yet
+            // Add this group description to pending group key table
+            // Create a group entry object with Dummy Group ID
+            log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
+                    + "pending...Queuing Group ADD request",
+                    groupDesc.deviceId());
+            StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
+            group.setState(GroupState.WAITING_AUDIT_COMPLETE);
+            EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
+                    getPendingGroupKeyTable();
+            pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+                                                        groupDesc.appCookie()),
+                                group);
+            return;
+        }
+
         // Get a new group identifier
         GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
         // Create a group entry object
         StoredGroupEntry group = new DefaultGroup(id, groupDesc);
-        // Insert the newly created group entry into concurrent key and id maps
-        ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
-                getGroupKeyTable(groupDesc.deviceId());
-        keyTable.put(groupDesc.appCookie(), group);
-        ConcurrentMap<GroupId, StoredGroupEntry> idTable =
-                getGroupIdTable(groupDesc.deviceId());
-        idTable.put(id, group);
+        // Insert the newly created group entry into key and id maps
+        getGroupStoreKeyMap().
+            put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+                                        groupDesc.appCookie()), group);
+        getGroupStoreIdMap().
+            put(new GroupStoreIdMapKey(groupDesc.deviceId(),
+                                    id), group);
         notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
                                       group));
     }
@@ -297,6 +457,42 @@
                                        UpdateType type,
                                        GroupBuckets newBuckets,
                                        GroupKey newAppCookie) {
+        // Check if group update to be done by a remote instance
+        if (mastershipService.
+                getLocalRole(deviceId) != MastershipRole.MASTER) {
+            GroupStoreMessage groupOp = GroupStoreMessage.
+                    createGroupUpdateRequestMsg(deviceId,
+                                                oldAppCookie,
+                                                type,
+                                                newBuckets,
+                                                newAppCookie);
+            ClusterMessage message =
+                    new ClusterMessage(clusterService.getLocalNode().id(),
+                                       GroupStoreMessageSubjects.
+                                              REMOTE_GROUP_OP_REQUEST,
+                                       kryoBuilder.build().serialize(groupOp));
+            if (!clusterCommunicator.unicast(message,
+                                             mastershipService.
+                                             getMasterFor(deviceId))) {
+                log.warn("Failed to send request to master: {} to {}",
+                         message,
+                         mastershipService.getMasterFor(deviceId));
+                //TODO: Send Group operation failure event
+            }
+            return;
+        }
+        updateGroupDescriptionInternal(deviceId,
+                                       oldAppCookie,
+                                       type,
+                                       newBuckets,
+                                       newAppCookie);
+    }
+
+    private void updateGroupDescriptionInternal(DeviceId deviceId,
+                                       GroupKey oldAppCookie,
+                                       UpdateType type,
+                                       GroupBuckets newBuckets,
+                                       GroupKey newAppCookie) {
         // Check if a group is existing with the provided key
         Group oldGroup = getGroup(deviceId, oldAppCookie);
         if (oldGroup == null) {
@@ -323,14 +519,17 @@
             newGroup.setPackets(oldGroup.packets());
             newGroup.setBytes(oldGroup.bytes());
             // Remove the old entry from maps and add new entry using new key
-            ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
-                    getGroupKeyTable(oldGroup.deviceId());
-            ConcurrentMap<GroupId, StoredGroupEntry> idTable =
-                    getGroupIdTable(oldGroup.deviceId());
-            keyTable.remove(oldGroup.appCookie());
-            idTable.remove(oldGroup.id());
-            keyTable.put(newGroup.appCookie(), newGroup);
-            idTable.put(newGroup.id(), newGroup);
+            getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
+                                        oldGroup.appCookie()));
+            getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
+                                                                 oldGroup.id()));
+            getGroupStoreKeyMap().
+                put(new GroupStoreKeyMapKey(newGroup.deviceId(),
+                                            newGroup.appCookie()), newGroup);
+            getGroupStoreIdMap().
+            put(new GroupStoreIdMapKey(newGroup.deviceId(),
+                                       newGroup.id()), newGroup);
+
             notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
         }
     }
@@ -379,10 +578,34 @@
     @Override
     public void deleteGroupDescription(DeviceId deviceId,
                                        GroupKey appCookie) {
+        // Check if group to be deleted by a remote instance
+        if (mastershipService.
+                getLocalRole(deviceId) != MastershipRole.MASTER) {
+            GroupStoreMessage groupOp = GroupStoreMessage.
+                    createGroupDeleteRequestMsg(deviceId,
+                                                appCookie);
+            ClusterMessage message =
+                    new ClusterMessage(clusterService.getLocalNode().id(),
+                                       GroupStoreMessageSubjects.
+                                              REMOTE_GROUP_OP_REQUEST,
+                                       kryoBuilder.build().serialize(groupOp));
+            if (!clusterCommunicator.unicast(message,
+                                             mastershipService.
+                                             getMasterFor(deviceId))) {
+                log.warn("Failed to send request to master: {} to {}",
+                         message,
+                         mastershipService.getMasterFor(deviceId));
+                //TODO: Send Group operation failure event
+            }
+            return;
+        }
+        deleteGroupDescriptionInternal(deviceId, appCookie);
+    }
+
+    private void deleteGroupDescriptionInternal(DeviceId deviceId,
+                                                GroupKey appCookie) {
         // Check if a group is existing with the provided key
-        StoredGroupEntry existing = (groupEntriesByKey.get(deviceId) != null) ?
-                groupEntriesByKey.get(deviceId).get(appCookie) :
-                null;
+        StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
         if (existing == null) {
             return;
         }
@@ -401,26 +624,35 @@
     @Override
     public void addOrUpdateGroupEntry(Group group) {
         // check if this new entry is an update to an existing entry
-        StoredGroupEntry existing = (groupEntriesById.get(
-                group.deviceId()) != null) ?
-                groupEntriesById.get(group.deviceId()).get(group.id()) :
-                null;
+        StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
+                                                        group.id());
         GroupEvent event = null;
 
         if (existing != null) {
+            log.trace("addOrUpdateGroupEntry: updating group "
+                    + "entry {} in device {}",
+                    group.id(),
+                    group.deviceId());
             synchronized (existing) {
                 existing.setLife(group.life());
                 existing.setPackets(group.packets());
                 existing.setBytes(group.bytes());
                 if (existing.state() == GroupState.PENDING_ADD) {
                     existing.setState(GroupState.ADDED);
+                    existing.setIsGroupStateAddedFirstTime(true);
                     event = new GroupEvent(Type.GROUP_ADDED, existing);
                 } else {
-                    if (existing.state() == GroupState.PENDING_UPDATE) {
-                        existing.setState(GroupState.PENDING_UPDATE);
-                    }
+                    existing.setState(GroupState.ADDED);
+                    existing.setIsGroupStateAddedFirstTime(false);
                     event = new GroupEvent(Type.GROUP_UPDATED, existing);
                 }
+                //Re-PUT map entries to trigger map update events
+                getGroupStoreKeyMap().
+                    put(new GroupStoreKeyMapKey(existing.deviceId(),
+                                                existing.appCookie()), existing);
+                getGroupStoreIdMap().
+                    put(new GroupStoreIdMapKey(existing.deviceId(),
+                                               existing.id()), existing);
             }
         }
 
@@ -436,18 +668,18 @@
      */
     @Override
     public void removeGroupEntry(Group group) {
-        StoredGroupEntry existing = (groupEntriesById.get(
-                group.deviceId()) != null) ?
-                groupEntriesById.get(group.deviceId()).get(group.id()) :
-                null;
+        StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
+                                                        group.id());
 
         if (existing != null) {
-            ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
-                    getGroupKeyTable(existing.deviceId());
-            ConcurrentMap<GroupId, StoredGroupEntry> idTable =
-                    getGroupIdTable(existing.deviceId());
-            idTable.remove(existing.id());
-            keyTable.remove(existing.appCookie());
+            log.trace("removeGroupEntry: removing group "
+                    + "entry {} in device {}",
+                    group.id(),
+                    group.deviceId());
+            getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
+                                                                 existing.appCookie()));
+            getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
+                                                               existing.id()));
             notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
         }
     }
@@ -461,9 +693,17 @@
                                   + "completed for device {}", deviceId);
                 deviceAuditStatus.put(deviceId, true);
                 // Execute all pending group requests
-                ConcurrentMap<GroupKey, StoredGroupEntry> pendingGroupRequests =
-                        getPendingGroupKeyTable(deviceId);
-                for (Group group:pendingGroupRequests.values()) {
+                List<StoredGroupEntry> pendingGroupRequests =
+                        getPendingGroupKeyTable().values()
+                        .stream()
+                        .filter(g-> g.deviceId().equals(deviceId))
+                        .collect(Collectors.toList());
+                log.trace("deviceInitialAuditCompleted: processing "
+                        + "pending group add requests for device {} and "
+                        + "number of pending requests {}",
+                        deviceId,
+                        pendingGroupRequests.size());
+                for (Group group:pendingGroupRequests) {
                     GroupDescription tmp = new DefaultGroupDescription(
                             group.deviceId(),
                             group.type(),
@@ -471,8 +711,9 @@
                             group.appCookie(),
                             group.appId());
                     storeGroupDescriptionInternal(tmp);
+                    getPendingGroupKeyTable().
+                        remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
                 }
-                getPendingGroupKeyTable(deviceId).clear();
             } else {
                 if (deviceAuditStatus.get(deviceId)) {
                     log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
@@ -494,10 +735,8 @@
     @Override
     public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
 
-        StoredGroupEntry existing = (groupEntriesById.get(
-                deviceId) != null) ?
-                groupEntriesById.get(deviceId).get(operation.groupId()) :
-                null;
+        StoredGroupEntry existing = getStoredGroupEntry(deviceId,
+                                                        operation.groupId());
 
         if (existing == null) {
             log.warn("No group entry with ID {} found ", operation.groupId());
@@ -518,27 +757,37 @@
                 log.warn("Unknown group operation type {}", operation.opType());
         }
 
-        ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
-                getGroupKeyTable(existing.deviceId());
-        ConcurrentMap<GroupId, StoredGroupEntry> idTable =
-                getGroupIdTable(existing.deviceId());
-        idTable.remove(existing.id());
-        keyTable.remove(existing.appCookie());
+        getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
+                                                             existing.appCookie()));
+        getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
+                                                           existing.id()));
     }
 
     @Override
     public void addOrUpdateExtraneousGroupEntry(Group group) {
+        log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
+                + "group entry {} in device {}",
+                group.id(),
+                group.deviceId());
         ConcurrentMap<GroupId, Group> extraneousIdTable =
                 getExtraneousGroupIdTable(group.deviceId());
         extraneousIdTable.put(group.id(), group);
         // Check the reference counter
         if (group.referenceCount() == 0) {
+            log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
+                    + "counter is zero and triggering remove",
+                    group.id(),
+                    group.deviceId());
             notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
         }
     }
 
     @Override
     public void removeExtraneousGroupEntry(Group group) {
+        log.trace("removeExtraneousGroupEntry: remove extraneous "
+                + "group entry {} of device {} from store",
+                group.id(),
+                group.deviceId());
         ConcurrentMap<GroupId, Group> extraneousIdTable =
                 getExtraneousGroupIdTable(group.deviceId());
         extraneousIdTable.remove(group.id());
@@ -551,5 +800,192 @@
                 getExtraneousGroupIdTable(deviceId).values());
     }
 
+    /**
+     * ClockService that generates wallclock based timestamps.
+     */
+    private class GroupStoreLogicalClockManager<T, U>
+        implements ClockService<T, U> {
 
+        private final AtomicLong sequenceNumber = new AtomicLong(0);
+
+        @Override
+        public Timestamp getTimestamp(T t1, U u1) {
+            return new MultiValuedTimestamp<>(System.currentTimeMillis(),
+                    sequenceNumber.getAndIncrement());
+        }
+    }
+
+    /**
+     * Map handler to receive any events when the group map is updated.
+     */
+    private class GroupStoreIdMapListener implements
+        EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
+
+        @Override
+        public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
+                          StoredGroupEntry> mapEvent) {
+            GroupEvent groupEvent = null;
+            log.trace("GroupStoreIdMapListener: received groupid map event {}",
+                      mapEvent.type());
+            if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
+                log.trace("GroupIdMapListener: Received PUT event");
+                if (mapEvent.value().state() == Group.GroupState.ADDED) {
+                    if (mapEvent.value().isGroupStateAddedFirstTime()) {
+                        groupEvent = new GroupEvent(Type.GROUP_ADDED,
+                                                    mapEvent.value());
+                        log.trace("GroupIdMapListener: Received first time "
+                                + "GROUP_ADDED state update");
+                    } else {
+                        groupEvent = new GroupEvent(Type.GROUP_UPDATED,
+                                                    mapEvent.value());
+                        log.trace("GroupIdMapListener: Received following "
+                                + "GROUP_ADDED state update");
+                    }
+                }
+            } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
+                log.trace("GroupIdMapListener: Received REMOVE event");
+                groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
+            }
+
+            if (groupEvent != null) {
+                notifyDelegate(groupEvent);
+            }
+        }
+    }
+    /**
+     * Message handler to receive messages from group subsystems of
+     * other cluster members.
+     */
+    private final class ClusterGroupMsgHandler
+                    implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            log.trace("ClusterGroupMsgHandler: received remote group message");
+            if (message.subject() ==
+                    GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
+                GroupStoreMessage groupOp = kryoBuilder.
+                        build().deserialize(message.payload());
+                log.trace("received remote group operation request");
+                if (!(mastershipService.
+                        getLocalRole(groupOp.deviceId()) !=
+                        MastershipRole.MASTER)) {
+                    log.warn("ClusterGroupMsgHandler: This node is not "
+                            + "MASTER for device {}", groupOp.deviceId());
+                    return;
+                }
+                if (groupOp.type() == GroupStoreMessage.Type.ADD) {
+                    log.trace("processing remote group "
+                            + "add operation request");
+                    storeGroupDescriptionInternal(groupOp.groupDesc());
+                } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
+                    log.trace("processing remote group "
+                            + "update operation request");
+                    updateGroupDescriptionInternal(groupOp.deviceId(),
+                                                   groupOp.appCookie(),
+                                                   groupOp.updateType(),
+                                                   groupOp.updateBuckets(),
+                                                   groupOp.newAppCookie());
+                } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
+                    log.trace("processing remote group "
+                            + "delete operation request");
+                    deleteGroupDescriptionInternal(groupOp.deviceId(),
+                                                   groupOp.appCookie());
+                }
+            }
+        }
+    }
+
+    /**
+     * Flattened map key to be used to store group entries.
+     */
+    private class GroupStoreMapKey {
+        private final DeviceId deviceId;
+
+        public GroupStoreMapKey(DeviceId deviceId) {
+            this.deviceId = deviceId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof GroupStoreMapKey)) {
+                return false;
+            }
+            GroupStoreMapKey that = (GroupStoreMapKey) o;
+            return this.deviceId.equals(that.deviceId);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = 17;
+
+            result = 31 * result + Objects.hash(this.deviceId);
+
+            return result;
+        }
+    }
+
+    private class GroupStoreKeyMapKey extends GroupStoreMapKey {
+        private final GroupKey appCookie;
+        public GroupStoreKeyMapKey(DeviceId deviceId,
+                                   GroupKey appCookie) {
+            super(deviceId);
+            this.appCookie = appCookie;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof GroupStoreKeyMapKey)) {
+                return false;
+            }
+            GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
+            return (super.equals(that) &&
+                    this.appCookie.equals(that.appCookie));
+        }
+
+        @Override
+        public int hashCode() {
+            int result = 17;
+
+            result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
+
+            return result;
+        }
+    }
+
+    private class GroupStoreIdMapKey extends GroupStoreMapKey {
+        private final GroupId groupId;
+        public GroupStoreIdMapKey(DeviceId deviceId,
+                                  GroupId groupId) {
+            super(deviceId);
+            this.groupId = groupId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof GroupStoreIdMapKey)) {
+                return false;
+            }
+            GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
+            return (super.equals(that) &&
+                    this.groupId.equals(that.groupId));
+        }
+
+        @Override
+        public int hashCode() {
+            int result = 17;
+
+            result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
+
+            return result;
+        }
+    }
 }