blob: d6d92a9fdbb3ff01bb019c759b9812c1a943fb2f [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.group.impl;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L0ModificationInstruction;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flow.instructions.L3ModificationInstruction;
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.Group.GroupState;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupEvent.Type;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupOperation;
import org.onosproject.net.group.GroupStore;
import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of group entries using trivial in-memory implementation.
*/
@Component(immediate = true)
@Service
public class DistributedGroupStore
extends AbstractStore<GroupEvent, GroupStoreDelegate>
implements GroupStore {
private final Logger log = getLogger(getClass());
private final int dummyId = 0xffffffff;
private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
// Per device group table with (device id + app cookie) as key
private EventuallyConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> groupStoreEntriesByKey = null;
// Per device group table with (device id + group id) as key
private EventuallyConsistentMap<GroupStoreIdMapKey,
StoredGroupEntry> groupStoreEntriesById = null;
private EventuallyConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> auditPendingReqQueue = null;
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
extraneousGroupEntriesById = new ConcurrentHashMap<>();
private ExecutorService messageHandlingExecutor;
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
private final HashMap<DeviceId, Boolean> deviceAuditStatus =
new HashMap<DeviceId, Boolean>();
private final AtomicInteger groupIdGen = new AtomicInteger();
private KryoNamespace.Builder kryoBuilder = null;
@Activate
public void activate() {
kryoBuilder = new KryoNamespace.Builder()
.register(DefaultGroup.class,
DefaultGroupBucket.class,
DefaultGroupDescription.class,
DefaultGroupKey.class,
GroupDescription.Type.class,
Group.GroupState.class,
GroupBuckets.class,
DefaultGroupId.class,
GroupStoreMessage.class,
GroupStoreMessage.Type.class,
UpdateType.class,
GroupStoreMessageSubjects.class,
MultiValuedTimestamp.class,
GroupStoreKeyMapKey.class,
GroupStoreIdMapKey.class,
GroupStoreMapKey.class
)
.register(URI.class)
.register(DeviceId.class)
.register(PortNumber.class)
.register(DefaultApplicationId.class)
.register(DefaultTrafficTreatment.class,
Instructions.DropInstruction.class,
Instructions.OutputInstruction.class,
Instructions.GroupInstruction.class,
Instructions.TableTypeTransition.class,
FlowRule.Type.class,
L0ModificationInstruction.class,
L0ModificationInstruction.L0SubType.class,
L0ModificationInstruction.ModLambdaInstruction.class,
L2ModificationInstruction.class,
L2ModificationInstruction.L2SubType.class,
L2ModificationInstruction.ModEtherInstruction.class,
L2ModificationInstruction.PushHeaderInstructions.class,
L2ModificationInstruction.ModVlanIdInstruction.class,
L2ModificationInstruction.ModVlanPcpInstruction.class,
L2ModificationInstruction.ModMplsLabelInstruction.class,
L2ModificationInstruction.ModMplsTtlInstruction.class,
L3ModificationInstruction.class,
L3ModificationInstruction.L3SubType.class,
L3ModificationInstruction.ModIPInstruction.class,
L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
L3ModificationInstruction.ModTtlInstruction.class,
org.onlab.packet.MplsLabel.class
)
.register(org.onosproject.cluster.NodeId.class)
.register(KryoNamespaces.BASIC)
.register(KryoNamespaces.MISC);
messageHandlingExecutor = Executors.
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/group",
"message-handlers"));
clusterCommunicator.
addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
new ClusterGroupMsgHandler(),
messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
groupStoreEntriesByKey = keyMapBuilder
.withName("groupstorekeymap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
log.trace("Current size {}", groupStoreEntriesByKey.size());
log.debug("Creating EC map groupstoreidmap");
EventuallyConsistentMapBuilder<GroupStoreIdMapKey, StoredGroupEntry>
idMapBuilder = storageService.eventuallyConsistentMapBuilder();
groupStoreEntriesById = idMapBuilder
.withName("groupstoreidmap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
log.trace("Current size {}", groupStoreEntriesById.size());
log.debug("Creating EC map pendinggroupkeymap");
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
auditPendingReqQueue = auditMapBuilder
.withName("pendinggroupkeymap")
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
log.trace("Current size {}", auditPendingReqQueue.size());
log.info("Started");
}
@Deactivate
public void deactivate() {
groupStoreEntriesByKey.destroy();
groupStoreEntriesById.destroy();
auditPendingReqQueue.destroy();
log.info("Stopped");
}
private static NewConcurrentHashMap<GroupId, Group>
lazyEmptyExtraneousGroupIdTable() {
return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
}
/**
* Returns the group store eventual consistent key map.
*
* @return Map representing group key table.
*/
private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
getGroupStoreKeyMap() {
return groupStoreEntriesByKey;
}
/**
* Returns the group store eventual consistent id map.
*
* @return Map representing group id table.
*/
private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
getGroupStoreIdMap() {
return groupStoreEntriesById;
}
/**
* Returns the pending group request table.
*
* @return Map representing group key table.
*/
private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
getPendingGroupKeyTable() {
return auditPendingReqQueue;
}
/**
* Returns the extraneous group id table for specified device.
*
* @param deviceId identifier of the device
* @return Map representing group key table of given device.
*/
private ConcurrentMap<GroupId, Group>
getExtraneousGroupIdTable(DeviceId deviceId) {
return createIfAbsentUnchecked(extraneousGroupEntriesById,
deviceId,
lazyEmptyExtraneousGroupIdTable());
}
/**
* Returns the number of groups for the specified device in the store.
*
* @return number of groups for the specified device
*/
@Override
public int getGroupCount(DeviceId deviceId) {
return (getGroups(deviceId) != null) ?
Iterables.size(getGroups(deviceId)) : 0;
}
/**
* Returns the groups associated with a device.
*
* @param deviceId the device ID
*
* @return the group entries
*/
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
log.trace("getGroups: for device {} total number of groups {}",
deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId))
.transform(input -> input);
}
/**
* Returns the stored group entry.
*
* @param deviceId the device ID
* @param appCookie the group key
*
* @return a group associated with the key
*/
@Override
public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
return getStoredGroupEntry(deviceId, appCookie);
}
private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
GroupKey appCookie) {
return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
appCookie));
}
@Override
public Group getGroup(DeviceId deviceId, GroupId groupId) {
return getStoredGroupEntry(deviceId, groupId);
}
private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
GroupId groupId) {
return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
groupId));
}
private int getFreeGroupIdValue(DeviceId deviceId) {
int freeId = groupIdGen.incrementAndGet();
while (true) {
Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
if (existing == null) {
existing = (
extraneousGroupEntriesById.get(deviceId) != null) ?
extraneousGroupEntriesById.get(deviceId).
get(new DefaultGroupId(freeId)) :
null;
}
if (existing != null) {
freeId = groupIdGen.incrementAndGet();
} else {
break;
}
}
return freeId;
}
/**
* Stores a new group entry using the information from group description.
*
* @param groupDesc group description to be used to create group entry
*/
@Override
public void storeGroupDescription(GroupDescription groupDesc) {
log.trace("In storeGroupDescription");
// Check if a group is existing with the same key
if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
log.warn("Group already exists with the same key {}",
groupDesc.appCookie());
return;
}
// Check if group to be created by a remote instance
if (mastershipService.getLocalRole(
groupDesc.deviceId()) != MastershipRole.MASTER) {
log.debug("Device {} local role is not MASTER",
groupDesc.deviceId());
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupAddRequestMsg(groupDesc.deviceId(),
groupDesc);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GroupStoreMessageSubjects.
REMOTE_GROUP_OP_REQUEST,
kryoBuilder.build().serialize(groupOp));
if (!clusterCommunicator.unicast(message,
mastershipService.
getMasterFor(
groupDesc.deviceId()))) {
log.warn("Failed to send request to master: {} to {}",
message,
mastershipService.getMasterFor(groupDesc.deviceId()));
//TODO: Send Group operation failure event
}
log.debug("Sent Group operation request for device {} "
+ "to remote MASTER {}",
groupDesc.deviceId(),
mastershipService.getMasterFor(groupDesc.deviceId()));
return;
}
log.debug("Store group for device {} is getting handled locally",
groupDesc.deviceId());
storeGroupDescriptionInternal(groupDesc);
}
private void storeGroupDescriptionInternal(GroupDescription groupDesc) {
// Check if a group is existing with the same key
if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
return;
}
if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
// Device group audit has not completed yet
// Add this group description to pending group key table
// Create a group entry object with Dummy Group ID
log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
+ "pending...Queuing Group ADD request",
groupDesc.deviceId());
StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
group.setState(GroupState.WAITING_AUDIT_COMPLETE);
EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
getPendingGroupKeyTable();
pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
groupDesc.appCookie()),
group);
return;
}
// Get a new group identifier
GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
// Create a group entry object
StoredGroupEntry group = new DefaultGroup(id, groupDesc);
// Insert the newly created group entry into key and id maps
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
groupDesc.appCookie()), group);
getGroupStoreIdMap().
put(new GroupStoreIdMapKey(groupDesc.deviceId(),
id), group);
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
}
/**
* Updates the existing group entry with the information
* from group description.
*
* @param deviceId the device ID
* @param oldAppCookie the current group key
* @param type update type
* @param newBuckets group buckets for updates
* @param newAppCookie optional new group key
*/
@Override
public void updateGroupDescription(DeviceId deviceId,
GroupKey oldAppCookie,
UpdateType type,
GroupBuckets newBuckets,
GroupKey newAppCookie) {
// Check if group update to be done by a remote instance
if (mastershipService.
getLocalRole(deviceId) != MastershipRole.MASTER) {
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupUpdateRequestMsg(deviceId,
oldAppCookie,
type,
newBuckets,
newAppCookie);
ClusterMessage message =
new ClusterMessage(clusterService.getLocalNode().id(),
GroupStoreMessageSubjects.
REMOTE_GROUP_OP_REQUEST,
kryoBuilder.build().serialize(groupOp));
if (!clusterCommunicator.unicast(message,
mastershipService.
getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
message,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
return;
}
updateGroupDescriptionInternal(deviceId,
oldAppCookie,
type,
newBuckets,
newAppCookie);
}
private void updateGroupDescriptionInternal(DeviceId deviceId,
GroupKey oldAppCookie,
UpdateType type,
GroupBuckets newBuckets,
GroupKey newAppCookie) {
// Check if a group is existing with the provided key
Group oldGroup = getGroup(deviceId, oldAppCookie);
if (oldGroup == null) {
return;
}
List<GroupBucket> newBucketList = getUpdatedBucketList(oldGroup,
type,
newBuckets);
if (newBucketList != null) {
// Create a new group object from the old group
GroupBuckets updatedBuckets = new GroupBuckets(newBucketList);
GroupKey newCookie = (newAppCookie != null) ? newAppCookie : oldAppCookie;
GroupDescription updatedGroupDesc = new DefaultGroupDescription(
oldGroup.deviceId(),
oldGroup.type(),
updatedBuckets,
newCookie,
oldGroup.appId());
StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
updatedGroupDesc);
newGroup.setState(GroupState.PENDING_UPDATE);
newGroup.setLife(oldGroup.life());
newGroup.setPackets(oldGroup.packets());
newGroup.setBytes(oldGroup.bytes());
// Remove the old entry from maps and add new entry using new key
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
oldGroup.appCookie()));
getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
oldGroup.id()));
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(newGroup.deviceId(),
newGroup.appCookie()), newGroup);
getGroupStoreIdMap().
put(new GroupStoreIdMapKey(newGroup.deviceId(),
newGroup.id()), newGroup);
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
}
}
private List<GroupBucket> getUpdatedBucketList(Group oldGroup,
UpdateType type,
GroupBuckets buckets) {
GroupBuckets oldBuckets = oldGroup.buckets();
List<GroupBucket> newBucketList = new ArrayList<GroupBucket>(
oldBuckets.buckets());
boolean groupDescUpdated = false;
if (type == UpdateType.ADD) {
// Check if the any of the new buckets are part of
// the old bucket list
for (GroupBucket addBucket:buckets.buckets()) {
if (!newBucketList.contains(addBucket)) {
newBucketList.add(addBucket);
groupDescUpdated = true;
}
}
} else if (type == UpdateType.REMOVE) {
// Check if the to be removed buckets are part of the
// old bucket list
for (GroupBucket removeBucket:buckets.buckets()) {
if (newBucketList.contains(removeBucket)) {
newBucketList.remove(removeBucket);
groupDescUpdated = true;
}
}
}
if (groupDescUpdated) {
return newBucketList;
} else {
return null;
}
}
/**
* Triggers deleting the existing group entry.
*
* @param deviceId the device ID
* @param appCookie the group key
*/
@Override
public void deleteGroupDescription(DeviceId deviceId,
GroupKey appCookie) {
// Check if group to be deleted by a remote instance
if (mastershipService.
getLocalRole(deviceId) != MastershipRole.MASTER) {
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupDeleteRequestMsg(deviceId,
appCookie);
ClusterMessage message =
new ClusterMessage(clusterService.getLocalNode().id(),
GroupStoreMessageSubjects.
REMOTE_GROUP_OP_REQUEST,
kryoBuilder.build().serialize(groupOp));
if (!clusterCommunicator.unicast(message,
mastershipService.
getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
message,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
return;
}
deleteGroupDescriptionInternal(deviceId, appCookie);
}
private void deleteGroupDescriptionInternal(DeviceId deviceId,
GroupKey appCookie) {
// Check if a group is existing with the provided key
StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
if (existing == null) {
return;
}
synchronized (existing) {
existing.setState(GroupState.PENDING_DELETE);
}
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
}
/**
* Stores a new group entry, or updates an existing entry.
*
* @param group group entry
*/
@Override
public void addOrUpdateGroupEntry(Group group) {
// check if this new entry is an update to an existing entry
StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
group.id());
GroupEvent event = null;
if (existing != null) {
log.trace("addOrUpdateGroupEntry: updating group "
+ "entry {} in device {}",
group.id(),
group.deviceId());
synchronized (existing) {
existing.setLife(group.life());
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
if (existing.state() == GroupState.PENDING_ADD) {
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(true);
event = new GroupEvent(Type.GROUP_ADDED, existing);
} else {
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(false);
event = new GroupEvent(Type.GROUP_UPDATED, existing);
}
//Re-PUT map entries to trigger map update events
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()), existing);
getGroupStoreIdMap().
put(new GroupStoreIdMapKey(existing.deviceId(),
existing.id()), existing);
}
}
if (event != null) {
notifyDelegate(event);
}
}
/**
* Removes the group entry from store.
*
* @param group group entry
*/
@Override
public void removeGroupEntry(Group group) {
StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
group.id());
if (existing != null) {
log.trace("removeGroupEntry: removing group "
+ "entry {} in device {}",
group.id(),
group.deviceId());
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
existing.id()));
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
}
}
@Override
public void deviceInitialAuditCompleted(DeviceId deviceId,
boolean completed) {
synchronized (deviceAuditStatus) {
if (completed) {
log.debug("deviceInitialAuditCompleted: AUDIT "
+ "completed for device {}", deviceId);
deviceAuditStatus.put(deviceId, true);
// Execute all pending group requests
List<StoredGroupEntry> pendingGroupRequests =
getPendingGroupKeyTable().values()
.stream()
.filter(g-> g.deviceId().equals(deviceId))
.collect(Collectors.toList());
log.trace("deviceInitialAuditCompleted: processing "
+ "pending group add requests for device {} and "
+ "number of pending requests {}",
deviceId,
pendingGroupRequests.size());
for (Group group:pendingGroupRequests) {
GroupDescription tmp = new DefaultGroupDescription(
group.deviceId(),
group.type(),
group.buckets(),
group.appCookie(),
group.appId());
storeGroupDescriptionInternal(tmp);
getPendingGroupKeyTable().
remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
}
} else {
if (deviceAuditStatus.get(deviceId)) {
log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
+ "status for device {}", deviceId);
deviceAuditStatus.put(deviceId, false);
}
}
}
}
@Override
public boolean deviceInitialAuditStatus(DeviceId deviceId) {
synchronized (deviceAuditStatus) {
return (deviceAuditStatus.get(deviceId) != null)
? deviceAuditStatus.get(deviceId) : false;
}
}
@Override
public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
StoredGroupEntry existing = getStoredGroupEntry(deviceId,
operation.groupId());
if (existing == null) {
log.warn("No group entry with ID {} found ", operation.groupId());
return;
}
switch (operation.opType()) {
case ADD:
notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
break;
case MODIFY:
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
break;
case DELETE:
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_FAILED, existing));
break;
default:
log.warn("Unknown group operation type {}", operation.opType());
}
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
existing.id()));
}
@Override
public void addOrUpdateExtraneousGroupEntry(Group group) {
log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
+ "group entry {} in device {}",
group.id(),
group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.put(group.id(), group);
// Check the reference counter
if (group.referenceCount() == 0) {
log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
+ "counter is zero and triggering remove",
group.id(),
group.deviceId());
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
}
}
@Override
public void removeExtraneousGroupEntry(Group group) {
log.trace("removeExtraneousGroupEntry: remove extraneous "
+ "group entry {} of device {} from store",
group.id(),
group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.remove(group.id());
}
@Override
public Iterable<Group> getExtraneousGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
return FluentIterable.from(
getExtraneousGroupIdTable(deviceId).values());
}
/**
* ClockService that generates wallclock based timestamps.
*/
private class GroupStoreLogicalClockManager<T, U>
implements ClockService<T, U> {
private final AtomicLong sequenceNumber = new AtomicLong(0);
@Override
public Timestamp getTimestamp(T t1, U u1) {
return new MultiValuedTimestamp<>(System.currentTimeMillis(),
sequenceNumber.getAndIncrement());
}
}
/**
* Map handler to receive any events when the group map is updated.
*/
private class GroupStoreIdMapListener implements
EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
@Override
public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
log.trace("GroupStoreIdMapListener: received groupid map event {}",
mapEvent.type());
if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
log.trace("GroupIdMapListener: Received PUT event");
if (mapEvent.value().state() == Group.GroupState.ADDED) {
if (mapEvent.value().isGroupStateAddedFirstTime()) {
groupEvent = new GroupEvent(Type.GROUP_ADDED,
mapEvent.value());
log.trace("GroupIdMapListener: Received first time "
+ "GROUP_ADDED state update");
} else {
groupEvent = new GroupEvent(Type.GROUP_UPDATED,
mapEvent.value());
log.trace("GroupIdMapListener: Received following "
+ "GROUP_ADDED state update");
}
}
} else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
log.trace("GroupIdMapListener: Received REMOVE event");
groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
}
if (groupEvent != null) {
notifyDelegate(groupEvent);
}
}
}
/**
* Message handler to receive messages from group subsystems of
* other cluster members.
*/
private final class ClusterGroupMsgHandler
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("ClusterGroupMsgHandler: received remote group message");
if (message.subject() ==
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
GroupStoreMessage groupOp = kryoBuilder.
build().deserialize(message.payload());
log.trace("received remote group operation request");
if (!(mastershipService.
getLocalRole(groupOp.deviceId()) !=
MastershipRole.MASTER)) {
log.warn("ClusterGroupMsgHandler: This node is not "
+ "MASTER for device {}", groupOp.deviceId());
return;
}
if (groupOp.type() == GroupStoreMessage.Type.ADD) {
log.trace("processing remote group "
+ "add operation request");
storeGroupDescriptionInternal(groupOp.groupDesc());
} else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
log.trace("processing remote group "
+ "update operation request");
updateGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie(),
groupOp.updateType(),
groupOp.updateBuckets(),
groupOp.newAppCookie());
} else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
log.trace("processing remote group "
+ "delete operation request");
deleteGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie());
}
}
}
}
/**
* Flattened map key to be used to store group entries.
*/
private class GroupStoreMapKey {
private final DeviceId deviceId;
public GroupStoreMapKey(DeviceId deviceId) {
this.deviceId = deviceId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GroupStoreMapKey)) {
return false;
}
GroupStoreMapKey that = (GroupStoreMapKey) o;
return this.deviceId.equals(that.deviceId);
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + Objects.hash(this.deviceId);
return result;
}
}
private class GroupStoreKeyMapKey extends GroupStoreMapKey {
private final GroupKey appCookie;
public GroupStoreKeyMapKey(DeviceId deviceId,
GroupKey appCookie) {
super(deviceId);
this.appCookie = appCookie;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GroupStoreKeyMapKey)) {
return false;
}
GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
return (super.equals(that) &&
this.appCookie.equals(that.appCookie));
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
return result;
}
}
private class GroupStoreIdMapKey extends GroupStoreMapKey {
private final GroupId groupId;
public GroupStoreIdMapKey(DeviceId deviceId,
GroupId groupId) {
super(deviceId);
this.groupId = groupId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GroupStoreIdMapKey)) {
return false;
}
GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
return (super.equals(that) &&
this.groupId.equals(that.groupId));
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
return result;
}
}
}