Distributed group store using eventual consistent map abstraction
Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index c914784..a571b4e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -16,25 +16,48 @@
package org.onosproject.store.group.impl;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L0ModificationInstruction;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flow.instructions.L3ModificationInstruction;
import org.onosproject.net.group.DefaultGroup;
+import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.Group.GroupState;
import org.onosproject.net.group.GroupBucket;
@@ -48,10 +71,21 @@
import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.ecmap.EventuallyConsistentMap;
+import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
+import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
+import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.impl.MultiValuedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
-import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
/**
* Manages inventory of group entries using trivial in-memory implementation.
@@ -67,85 +101,165 @@
private final int dummyId = 0xffffffff;
private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
- // inner Map is per device group table
- private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
- groupEntriesByKey = new ConcurrentHashMap<>();
- private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
- groupEntriesById = new ConcurrentHashMap<>();
- private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
- pendingGroupEntriesByKey = new ConcurrentHashMap<>();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ // Per device group table with (device id + app cookie) as key
+ private EventuallyConsistentMap<GroupStoreKeyMapKey,
+ StoredGroupEntry> groupStoreEntriesByKey = null;
+ // Per device group table with (device id + group id) as key
+ private EventuallyConsistentMap<GroupStoreIdMapKey,
+ StoredGroupEntry> groupStoreEntriesById = null;
+ private EventuallyConsistentMap<GroupStoreKeyMapKey,
+ StoredGroupEntry> auditPendingReqQueue = null;
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
extraneousGroupEntriesById = new ConcurrentHashMap<>();
+ private ExecutorService messageHandlingExecutor;
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
private final HashMap<DeviceId, Boolean> deviceAuditStatus =
new HashMap<DeviceId, Boolean>();
private final AtomicInteger groupIdGen = new AtomicInteger();
+ private KryoNamespace.Builder kryoBuilder = null;
+
@Activate
public void activate() {
+ kryoBuilder = new KryoNamespace.Builder()
+ .register(DefaultGroup.class,
+ DefaultGroupBucket.class,
+ DefaultGroupDescription.class,
+ DefaultGroupKey.class,
+ GroupDescription.Type.class,
+ Group.GroupState.class,
+ GroupBuckets.class,
+ DefaultGroupId.class,
+ GroupStoreMessage.class,
+ GroupStoreMessage.Type.class,
+ UpdateType.class,
+ GroupStoreMessageSubjects.class,
+ MultiValuedTimestamp.class,
+ GroupStoreKeyMapKey.class,
+ GroupStoreIdMapKey.class,
+ GroupStoreMapKey.class
+ )
+ .register(URI.class)
+ .register(DeviceId.class)
+ .register(PortNumber.class)
+ .register(DefaultApplicationId.class)
+ .register(DefaultTrafficTreatment.class,
+ Instructions.DropInstruction.class,
+ Instructions.OutputInstruction.class,
+ Instructions.GroupInstruction.class,
+ Instructions.TableTypeTransition.class,
+ FlowRule.Type.class,
+ L0ModificationInstruction.class,
+ L0ModificationInstruction.L0SubType.class,
+ L0ModificationInstruction.ModLambdaInstruction.class,
+ L2ModificationInstruction.class,
+ L2ModificationInstruction.L2SubType.class,
+ L2ModificationInstruction.ModEtherInstruction.class,
+ L2ModificationInstruction.PushHeaderInstructions.class,
+ L2ModificationInstruction.ModVlanIdInstruction.class,
+ L2ModificationInstruction.ModVlanPcpInstruction.class,
+ L2ModificationInstruction.ModMplsLabelInstruction.class,
+ L2ModificationInstruction.ModMplsTtlInstruction.class,
+ L3ModificationInstruction.class,
+ L3ModificationInstruction.L3SubType.class,
+ L3ModificationInstruction.ModIPInstruction.class,
+ L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
+ L3ModificationInstruction.ModTtlInstruction.class,
+ org.onlab.packet.MplsLabel.class
+ )
+ .register(org.onosproject.cluster.NodeId.class)
+ .register(KryoNamespaces.BASIC)
+ .register(KryoNamespaces.MISC);
+
+ messageHandlingExecutor = Executors.
+ newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/group",
+ "message-handlers"));
+ clusterCommunicator.
+ addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ new ClusterGroupMsgHandler(),
+ messageHandlingExecutor);
+
+ log.debug("Creating EC map groupstorekeymap");
+ groupStoreEntriesByKey =
+ new EventuallyConsistentMapImpl<>("groupstorekeymap",
+ clusterService,
+ clusterCommunicator,
+ kryoBuilder,
+ new GroupStoreLogicalClockManager<>());
+ log.trace("Current size {}", groupStoreEntriesByKey.size());
+
+ log.debug("Creating EC map groupstoreidmap");
+ groupStoreEntriesById =
+ new EventuallyConsistentMapImpl<>("groupstoreidmap",
+ clusterService,
+ clusterCommunicator,
+ kryoBuilder,
+ new GroupStoreLogicalClockManager<>());
+ groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
+ log.trace("Current size {}", groupStoreEntriesById.size());
+
+ log.debug("Creating EC map pendinggroupkeymap");
+ auditPendingReqQueue =
+ new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
+ clusterService,
+ clusterCommunicator,
+ kryoBuilder,
+ new GroupStoreLogicalClockManager<>());
+ log.trace("Current size {}", auditPendingReqQueue.size());
+
log.info("Started");
}
@Deactivate
public void deactivate() {
- groupEntriesByKey.clear();
- groupEntriesById.clear();
log.info("Stopped");
}
- private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
- lazyEmptyGroupKeyTable() {
- return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
- }
-
- private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
- lazyEmptyGroupIdTable() {
- return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
- }
-
- private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
- lazyEmptyPendingGroupKeyTable() {
- return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
- }
-
private static NewConcurrentHashMap<GroupId, Group>
- lazyEmptyExtraneousGroupIdTable() {
+ lazyEmptyExtraneousGroupIdTable() {
return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
}
/**
- * Returns the group key table for specified device.
+ * Returns the group store eventual consistent key map.
*
- * @param deviceId identifier of the device
- * @return Map representing group key table of given device.
+ * @return Map representing group key table.
*/
- private ConcurrentMap<GroupKey, StoredGroupEntry> getGroupKeyTable(DeviceId deviceId) {
- return createIfAbsentUnchecked(groupEntriesByKey,
- deviceId, lazyEmptyGroupKeyTable());
+ private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+ getGroupStoreKeyMap() {
+ return groupStoreEntriesByKey;
}
/**
- * Returns the group id table for specified device.
+ * Returns the group store eventual consistent id map.
*
- * @param deviceId identifier of the device
- * @return Map representing group key table of given device.
+ * @return Map representing group id table.
*/
- private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
- return createIfAbsentUnchecked(groupEntriesById,
- deviceId, lazyEmptyGroupIdTable());
+ private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
+ getGroupStoreIdMap() {
+ return groupStoreEntriesById;
}
/**
- * Returns the pending group key table for specified device.
+ * Returns the pending group request table.
*
- * @param deviceId identifier of the device
- * @return Map representing group key table of given device.
+ * @return Map representing group key table.
*/
- private ConcurrentMap<GroupKey, StoredGroupEntry>
- getPendingGroupKeyTable(DeviceId deviceId) {
- return createIfAbsentUnchecked(pendingGroupEntriesByKey,
- deviceId, lazyEmptyPendingGroupKeyTable());
+ private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+ getPendingGroupKeyTable() {
+ return auditPendingReqQueue;
}
/**
@@ -168,8 +282,8 @@
*/
@Override
public int getGroupCount(DeviceId deviceId) {
- return (groupEntriesByKey.get(deviceId) != null) ?
- groupEntriesByKey.get(deviceId).size() : 0;
+ return (getGroups(deviceId) != null) ?
+ Iterables.size(getGroups(deviceId)) : 0;
}
/**
@@ -182,16 +296,11 @@
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
- return FluentIterable.from(getGroupKeyTable(deviceId).values())
- .transform(
- new Function<StoredGroupEntry, Group>() {
-
- @Override
- public Group apply(
- StoredGroupEntry input) {
- return input;
- }
- });
+ log.trace("getGroups: for device {} total number of groups {}",
+ deviceId, getGroupStoreKeyMap().values().size());
+ return FluentIterable.from(getGroupStoreKeyMap().values())
+ .filter(input -> input.deviceId().equals(deviceId))
+ .transform(input -> input);
}
/**
@@ -204,19 +313,31 @@
*/
@Override
public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
- return (groupEntriesByKey.get(deviceId) != null) ?
- groupEntriesByKey.get(deviceId).get(appCookie) :
- null;
+ return getStoredGroupEntry(deviceId, appCookie);
+ }
+
+ private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
+ GroupKey appCookie) {
+ return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
+ appCookie));
+ }
+
+ @Override
+ public Group getGroup(DeviceId deviceId, GroupId groupId) {
+ return getStoredGroupEntry(deviceId, groupId);
+ }
+
+ private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
+ GroupId groupId) {
+ return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
+ groupId));
}
private int getFreeGroupIdValue(DeviceId deviceId) {
int freeId = groupIdGen.incrementAndGet();
while (true) {
- Group existing = (
- groupEntriesById.get(deviceId) != null) ?
- groupEntriesById.get(deviceId).get(new DefaultGroupId(freeId)) :
- null;
+ Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
if (existing == null) {
existing = (
extraneousGroupEntriesById.get(deviceId) != null) ?
@@ -240,23 +361,45 @@
*/
@Override
public void storeGroupDescription(GroupDescription groupDesc) {
+ log.trace("In storeGroupDescription");
// Check if a group is existing with the same key
if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
+ log.warn("Group already exists with the same key {}",
+ groupDesc.appCookie());
return;
}
- if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
- // Device group audit has not completed yet
- // Add this group description to pending group key table
- // Create a group entry object with Dummy Group ID
- StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
- group.setState(GroupState.WAITING_AUDIT_COMPLETE);
- ConcurrentMap<GroupKey, StoredGroupEntry> pendingKeyTable =
- getPendingGroupKeyTable(groupDesc.deviceId());
- pendingKeyTable.put(groupDesc.appCookie(), group);
+ // Check if group to be created by a remote instance
+ if (mastershipService.getLocalRole(
+ groupDesc.deviceId()) != MastershipRole.MASTER) {
+ log.debug("Device {} local role is not MASTER",
+ groupDesc.deviceId());
+ GroupStoreMessage groupOp = GroupStoreMessage.
+ createGroupAddRequestMsg(groupDesc.deviceId(),
+ groupDesc);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GroupStoreMessageSubjects.
+ REMOTE_GROUP_OP_REQUEST,
+ kryoBuilder.build().serialize(groupOp));
+ if (!clusterCommunicator.unicast(message,
+ mastershipService.
+ getMasterFor(
+ groupDesc.deviceId()))) {
+ log.warn("Failed to send request to master: {} to {}",
+ message,
+ mastershipService.getMasterFor(groupDesc.deviceId()));
+ //TODO: Send Group operation failure event
+ }
+ log.debug("Sent Group operation request for device {} "
+ + "to remote MASTER {}",
+ groupDesc.deviceId(),
+ mastershipService.getMasterFor(groupDesc.deviceId()));
return;
}
+ log.debug("Store group for device {} is getting handled locally",
+ groupDesc.deviceId());
storeGroupDescriptionInternal(groupDesc);
}
@@ -266,17 +409,34 @@
return;
}
+ if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
+ // Device group audit has not completed yet
+ // Add this group description to pending group key table
+ // Create a group entry object with Dummy Group ID
+ log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
+ + "pending...Queuing Group ADD request",
+ groupDesc.deviceId());
+ StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
+ group.setState(GroupState.WAITING_AUDIT_COMPLETE);
+ EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
+ getPendingGroupKeyTable();
+ pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()),
+ group);
+ return;
+ }
+
// Get a new group identifier
GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
// Create a group entry object
StoredGroupEntry group = new DefaultGroup(id, groupDesc);
- // Insert the newly created group entry into concurrent key and id maps
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(groupDesc.deviceId());
- keyTable.put(groupDesc.appCookie(), group);
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(groupDesc.deviceId());
- idTable.put(id, group);
+ // Insert the newly created group entry into key and id maps
+ getGroupStoreKeyMap().
+ put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()), group);
+ getGroupStoreIdMap().
+ put(new GroupStoreIdMapKey(groupDesc.deviceId(),
+ id), group);
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
}
@@ -297,6 +457,42 @@
UpdateType type,
GroupBuckets newBuckets,
GroupKey newAppCookie) {
+ // Check if group update to be done by a remote instance
+ if (mastershipService.
+ getLocalRole(deviceId) != MastershipRole.MASTER) {
+ GroupStoreMessage groupOp = GroupStoreMessage.
+ createGroupUpdateRequestMsg(deviceId,
+ oldAppCookie,
+ type,
+ newBuckets,
+ newAppCookie);
+ ClusterMessage message =
+ new ClusterMessage(clusterService.getLocalNode().id(),
+ GroupStoreMessageSubjects.
+ REMOTE_GROUP_OP_REQUEST,
+ kryoBuilder.build().serialize(groupOp));
+ if (!clusterCommunicator.unicast(message,
+ mastershipService.
+ getMasterFor(deviceId))) {
+ log.warn("Failed to send request to master: {} to {}",
+ message,
+ mastershipService.getMasterFor(deviceId));
+ //TODO: Send Group operation failure event
+ }
+ return;
+ }
+ updateGroupDescriptionInternal(deviceId,
+ oldAppCookie,
+ type,
+ newBuckets,
+ newAppCookie);
+ }
+
+ private void updateGroupDescriptionInternal(DeviceId deviceId,
+ GroupKey oldAppCookie,
+ UpdateType type,
+ GroupBuckets newBuckets,
+ GroupKey newAppCookie) {
// Check if a group is existing with the provided key
Group oldGroup = getGroup(deviceId, oldAppCookie);
if (oldGroup == null) {
@@ -323,14 +519,17 @@
newGroup.setPackets(oldGroup.packets());
newGroup.setBytes(oldGroup.bytes());
// Remove the old entry from maps and add new entry using new key
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(oldGroup.deviceId());
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(oldGroup.deviceId());
- keyTable.remove(oldGroup.appCookie());
- idTable.remove(oldGroup.id());
- keyTable.put(newGroup.appCookie(), newGroup);
- idTable.put(newGroup.id(), newGroup);
+ getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
+ oldGroup.appCookie()));
+ getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
+ oldGroup.id()));
+ getGroupStoreKeyMap().
+ put(new GroupStoreKeyMapKey(newGroup.deviceId(),
+ newGroup.appCookie()), newGroup);
+ getGroupStoreIdMap().
+ put(new GroupStoreIdMapKey(newGroup.deviceId(),
+ newGroup.id()), newGroup);
+
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
}
}
@@ -379,10 +578,34 @@
@Override
public void deleteGroupDescription(DeviceId deviceId,
GroupKey appCookie) {
+ // Check if group to be deleted by a remote instance
+ if (mastershipService.
+ getLocalRole(deviceId) != MastershipRole.MASTER) {
+ GroupStoreMessage groupOp = GroupStoreMessage.
+ createGroupDeleteRequestMsg(deviceId,
+ appCookie);
+ ClusterMessage message =
+ new ClusterMessage(clusterService.getLocalNode().id(),
+ GroupStoreMessageSubjects.
+ REMOTE_GROUP_OP_REQUEST,
+ kryoBuilder.build().serialize(groupOp));
+ if (!clusterCommunicator.unicast(message,
+ mastershipService.
+ getMasterFor(deviceId))) {
+ log.warn("Failed to send request to master: {} to {}",
+ message,
+ mastershipService.getMasterFor(deviceId));
+ //TODO: Send Group operation failure event
+ }
+ return;
+ }
+ deleteGroupDescriptionInternal(deviceId, appCookie);
+ }
+
+ private void deleteGroupDescriptionInternal(DeviceId deviceId,
+ GroupKey appCookie) {
// Check if a group is existing with the provided key
- StoredGroupEntry existing = (groupEntriesByKey.get(deviceId) != null) ?
- groupEntriesByKey.get(deviceId).get(appCookie) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
if (existing == null) {
return;
}
@@ -401,26 +624,35 @@
@Override
public void addOrUpdateGroupEntry(Group group) {
// check if this new entry is an update to an existing entry
- StoredGroupEntry existing = (groupEntriesById.get(
- group.deviceId()) != null) ?
- groupEntriesById.get(group.deviceId()).get(group.id()) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
+ group.id());
GroupEvent event = null;
if (existing != null) {
+ log.trace("addOrUpdateGroupEntry: updating group "
+ + "entry {} in device {}",
+ group.id(),
+ group.deviceId());
synchronized (existing) {
existing.setLife(group.life());
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
if (existing.state() == GroupState.PENDING_ADD) {
existing.setState(GroupState.ADDED);
+ existing.setIsGroupStateAddedFirstTime(true);
event = new GroupEvent(Type.GROUP_ADDED, existing);
} else {
- if (existing.state() == GroupState.PENDING_UPDATE) {
- existing.setState(GroupState.PENDING_UPDATE);
- }
+ existing.setState(GroupState.ADDED);
+ existing.setIsGroupStateAddedFirstTime(false);
event = new GroupEvent(Type.GROUP_UPDATED, existing);
}
+ //Re-PUT map entries to trigger map update events
+ getGroupStoreKeyMap().
+ put(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()), existing);
+ getGroupStoreIdMap().
+ put(new GroupStoreIdMapKey(existing.deviceId(),
+ existing.id()), existing);
}
}
@@ -436,18 +668,18 @@
*/
@Override
public void removeGroupEntry(Group group) {
- StoredGroupEntry existing = (groupEntriesById.get(
- group.deviceId()) != null) ?
- groupEntriesById.get(group.deviceId()).get(group.id()) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
+ group.id());
if (existing != null) {
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(existing.deviceId());
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(existing.deviceId());
- idTable.remove(existing.id());
- keyTable.remove(existing.appCookie());
+ log.trace("removeGroupEntry: removing group "
+ + "entry {} in device {}",
+ group.id(),
+ group.deviceId());
+ getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()));
+ getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
+ existing.id()));
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
}
}
@@ -461,9 +693,17 @@
+ "completed for device {}", deviceId);
deviceAuditStatus.put(deviceId, true);
// Execute all pending group requests
- ConcurrentMap<GroupKey, StoredGroupEntry> pendingGroupRequests =
- getPendingGroupKeyTable(deviceId);
- for (Group group:pendingGroupRequests.values()) {
+ List<StoredGroupEntry> pendingGroupRequests =
+ getPendingGroupKeyTable().values()
+ .stream()
+ .filter(g-> g.deviceId().equals(deviceId))
+ .collect(Collectors.toList());
+ log.trace("deviceInitialAuditCompleted: processing "
+ + "pending group add requests for device {} and "
+ + "number of pending requests {}",
+ deviceId,
+ pendingGroupRequests.size());
+ for (Group group:pendingGroupRequests) {
GroupDescription tmp = new DefaultGroupDescription(
group.deviceId(),
group.type(),
@@ -471,8 +711,9 @@
group.appCookie(),
group.appId());
storeGroupDescriptionInternal(tmp);
+ getPendingGroupKeyTable().
+ remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
}
- getPendingGroupKeyTable(deviceId).clear();
} else {
if (deviceAuditStatus.get(deviceId)) {
log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
@@ -494,10 +735,8 @@
@Override
public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
- StoredGroupEntry existing = (groupEntriesById.get(
- deviceId) != null) ?
- groupEntriesById.get(deviceId).get(operation.groupId()) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(deviceId,
+ operation.groupId());
if (existing == null) {
log.warn("No group entry with ID {} found ", operation.groupId());
@@ -518,27 +757,37 @@
log.warn("Unknown group operation type {}", operation.opType());
}
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(existing.deviceId());
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(existing.deviceId());
- idTable.remove(existing.id());
- keyTable.remove(existing.appCookie());
+ getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()));
+ getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
+ existing.id()));
}
@Override
public void addOrUpdateExtraneousGroupEntry(Group group) {
+ log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
+ + "group entry {} in device {}",
+ group.id(),
+ group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.put(group.id(), group);
// Check the reference counter
if (group.referenceCount() == 0) {
+ log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
+ + "counter is zero and triggering remove",
+ group.id(),
+ group.deviceId());
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
}
}
@Override
public void removeExtraneousGroupEntry(Group group) {
+ log.trace("removeExtraneousGroupEntry: remove extraneous "
+ + "group entry {} of device {} from store",
+ group.id(),
+ group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.remove(group.id());
@@ -551,5 +800,192 @@
getExtraneousGroupIdTable(deviceId).values());
}
+ /**
+ * ClockService that generates wallclock based timestamps.
+ */
+ private class GroupStoreLogicalClockManager<T, U>
+ implements ClockService<T, U> {
+ private final AtomicLong sequenceNumber = new AtomicLong(0);
+
+ @Override
+ public Timestamp getTimestamp(T t1, U u1) {
+ return new MultiValuedTimestamp<>(System.currentTimeMillis(),
+ sequenceNumber.getAndIncrement());
+ }
+ }
+
+ /**
+ * Map handler to receive any events when the group map is updated.
+ */
+ private class GroupStoreIdMapListener implements
+ EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
+
+ @Override
+ public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
+ StoredGroupEntry> mapEvent) {
+ GroupEvent groupEvent = null;
+ log.trace("GroupStoreIdMapListener: received groupid map event {}",
+ mapEvent.type());
+ if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
+ log.trace("GroupIdMapListener: Received PUT event");
+ if (mapEvent.value().state() == Group.GroupState.ADDED) {
+ if (mapEvent.value().isGroupStateAddedFirstTime()) {
+ groupEvent = new GroupEvent(Type.GROUP_ADDED,
+ mapEvent.value());
+ log.trace("GroupIdMapListener: Received first time "
+ + "GROUP_ADDED state update");
+ } else {
+ groupEvent = new GroupEvent(Type.GROUP_UPDATED,
+ mapEvent.value());
+ log.trace("GroupIdMapListener: Received following "
+ + "GROUP_ADDED state update");
+ }
+ }
+ } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
+ log.trace("GroupIdMapListener: Received REMOVE event");
+ groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
+ }
+
+ if (groupEvent != null) {
+ notifyDelegate(groupEvent);
+ }
+ }
+ }
+ /**
+ * Message handler to receive messages from group subsystems of
+ * other cluster members.
+ */
+ private final class ClusterGroupMsgHandler
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ log.trace("ClusterGroupMsgHandler: received remote group message");
+ if (message.subject() ==
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
+ GroupStoreMessage groupOp = kryoBuilder.
+ build().deserialize(message.payload());
+ log.trace("received remote group operation request");
+ if (!(mastershipService.
+ getLocalRole(groupOp.deviceId()) !=
+ MastershipRole.MASTER)) {
+ log.warn("ClusterGroupMsgHandler: This node is not "
+ + "MASTER for device {}", groupOp.deviceId());
+ return;
+ }
+ if (groupOp.type() == GroupStoreMessage.Type.ADD) {
+ log.trace("processing remote group "
+ + "add operation request");
+ storeGroupDescriptionInternal(groupOp.groupDesc());
+ } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
+ log.trace("processing remote group "
+ + "update operation request");
+ updateGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie(),
+ groupOp.updateType(),
+ groupOp.updateBuckets(),
+ groupOp.newAppCookie());
+ } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
+ log.trace("processing remote group "
+ + "delete operation request");
+ deleteGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie());
+ }
+ }
+ }
+ }
+
+ /**
+ * Flattened map key to be used to store group entries.
+ */
+ private class GroupStoreMapKey {
+ private final DeviceId deviceId;
+
+ public GroupStoreMapKey(DeviceId deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupStoreMapKey)) {
+ return false;
+ }
+ GroupStoreMapKey that = (GroupStoreMapKey) o;
+ return this.deviceId.equals(that.deviceId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+
+ result = 31 * result + Objects.hash(this.deviceId);
+
+ return result;
+ }
+ }
+
+ private class GroupStoreKeyMapKey extends GroupStoreMapKey {
+ private final GroupKey appCookie;
+ public GroupStoreKeyMapKey(DeviceId deviceId,
+ GroupKey appCookie) {
+ super(deviceId);
+ this.appCookie = appCookie;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupStoreKeyMapKey)) {
+ return false;
+ }
+ GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
+ return (super.equals(that) &&
+ this.appCookie.equals(that.appCookie));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+
+ result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
+
+ return result;
+ }
+ }
+
+ private class GroupStoreIdMapKey extends GroupStoreMapKey {
+ private final GroupId groupId;
+ public GroupStoreIdMapKey(DeviceId deviceId,
+ GroupId groupId) {
+ super(deviceId);
+ this.groupId = groupId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupStoreIdMapKey)) {
+ return false;
+ }
+ GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
+ return (super.equals(that) &&
+ this.groupId.equals(that.groupId));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+
+ result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
+
+ return result;
+ }
+ }
}