adding group garbage collection functionality
If a group has a reference count of zero for more than
a configurable timeout, it is garbage collected.
This feature can be deactivated by component config.
Change-Id: I254d62a90ef7ac8d2ce2f406b67957455a5bf4d0
diff --git a/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java b/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java
index 97f8aed..edd64e3 100644
--- a/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java
+++ b/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java
@@ -35,6 +35,7 @@
private long bytes;
private long referenceCount;
private GroupId id;
+ private int age;
/**
* Initializes default values.
@@ -48,6 +49,7 @@
packets = 0;
bytes = 0;
referenceCount = 0;
+ age = 0;
}
/**
@@ -128,6 +130,11 @@
return this.bytes;
}
+ @Override
+ public int age() {
+ return age;
+ }
+
/**
* Sets the new state for this entry.
*
@@ -171,6 +178,11 @@
@Override
public void setReferenceCount(long referenceCount) {
this.referenceCount = referenceCount;
+ if (referenceCount == 0) {
+ age++;
+ } else {
+ age = 0;
+ }
}
@Override
@@ -214,6 +226,7 @@
.add("description", super.toString())
.add("groupid", id)
.add("state", state)
+ .add("age", age)
.toString();
}
diff --git a/core/api/src/main/java/org/onosproject/net/group/Group.java b/core/api/src/main/java/org/onosproject/net/group/Group.java
index 5440775..d053029 100644
--- a/core/api/src/main/java/org/onosproject/net/group/Group.java
+++ b/core/api/src/main/java/org/onosproject/net/group/Group.java
@@ -96,4 +96,12 @@
* @return number of flow rules or other groups pointing to this group
*/
long referenceCount();
+
+ /**
+ * Obtains the age of a group. The age reflects the number of polling rounds
+ * the group has had a reference count of zero.
+ *
+ * @return the age of the group as an integer
+ */
+ int age();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 0cf77b3..996356a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -19,15 +19,17 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.DefaultGroupId;
@@ -54,19 +56,21 @@
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -75,6 +79,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -83,7 +88,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -99,6 +106,9 @@
private final Logger log = getLogger(getClass());
+ private static final boolean GARBAGE_COLLECT = false;
+ private static final int GC_THRESH = 6;
+
private final int dummyId = 0xffffffff;
private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
@@ -114,14 +124,17 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+
// Per device group table with (device id + app cookie) as key
private ConsistentMap<GroupStoreKeyMapKey,
- StoredGroupEntry> groupStoreEntriesByKey = null;
+ StoredGroupEntry> groupStoreEntriesByKey = null;
// Per device group table with (device id + group id) as key
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
- groupEntriesById = new ConcurrentHashMap<>();
+ groupEntriesById = new ConcurrentHashMap<>();
private ConsistentMap<GroupStoreKeyMapKey,
- StoredGroupEntry> auditPendingReqQueue = null;
+ StoredGroupEntry> auditPendingReqQueue = null;
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
extraneousGroupEntriesById = new ConcurrentHashMap<>();
private ExecutorService messageHandlingExecutor;
@@ -135,27 +148,37 @@
private KryoNamespace clusterMsgSerializer;
+ @Property(name = "garbageCollect", boolValue = GARBAGE_COLLECT,
+ label = "Enable group garbage collection")
+ private boolean garbageCollect = GARBAGE_COLLECT;
+
+ @Property(name = "gcThresh", intValue = GC_THRESH,
+ label = "Number of rounds for group garbage collection")
+ private int gcThresh = GC_THRESH;
+
+
@Activate
public void activate() {
+ cfgService.registerProperties(getClass());
kryoBuilder = new KryoNamespace.Builder()
- .register(KryoNamespaces.API)
- .register(DefaultGroup.class,
- DefaultGroupBucket.class,
- DefaultGroupDescription.class,
- DefaultGroupKey.class,
- GroupDescription.Type.class,
- Group.GroupState.class,
- GroupBuckets.class,
- DefaultGroupId.class,
- GroupStoreMessage.class,
- GroupStoreMessage.Type.class,
- UpdateType.class,
- GroupStoreMessageSubjects.class,
- MultiValuedTimestamp.class,
- GroupStoreKeyMapKey.class,
- GroupStoreIdMapKey.class,
- GroupStoreMapKey.class
- );
+ .register(KryoNamespaces.API)
+ .register(DefaultGroup.class,
+ DefaultGroupBucket.class,
+ DefaultGroupDescription.class,
+ DefaultGroupKey.class,
+ GroupDescription.Type.class,
+ Group.GroupState.class,
+ GroupBuckets.class,
+ DefaultGroupId.class,
+ GroupStoreMessage.class,
+ GroupStoreMessage.Type.class,
+ UpdateType.class,
+ GroupStoreMessageSubjects.class,
+ MultiValuedTimestamp.class,
+ GroupStoreKeyMapKey.class,
+ GroupStoreIdMapKey.class,
+ GroupStoreMapKey.class
+ );
clusterMsgSerializer = kryoBuilder.build();
@@ -165,9 +188,9 @@
"message-handlers"));
clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- clusterMsgSerializer::deserialize,
- this::process,
- messageHandlingExecutor);
+ clusterMsgSerializer::deserialize,
+ this::process,
+ messageHandlingExecutor);
log.debug("Creating Consistent map onos-group-store-keymap");
@@ -193,19 +216,36 @@
@Deactivate
public void deactivate() {
+ cfgService.unregisterProperties(getClass(), false);
clusterCommunicator.removeSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST);
groupStoreEntriesByKey.destroy();
auditPendingReqQueue.destroy();
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ try {
+ String s = get(properties, "garbageCollect");
+ garbageCollect = isNullOrEmpty(s) ? GARBAGE_COLLECT : Boolean.parseBoolean(s.trim());
+
+ s = get(properties, "gcThresh");
+ gcThresh = isNullOrEmpty(s) ? GC_THRESH : Integer.parseInt(s.trim());
+ } catch (Exception e) {
+ gcThresh = GC_THRESH;
+ garbageCollect = GARBAGE_COLLECT;
+ }
+ }
+
private static NewConcurrentHashMap<GroupId, Group>
- lazyEmptyExtraneousGroupIdTable() {
+ lazyEmptyExtraneousGroupIdTable() {
return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
}
private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
- lazyEmptyGroupIdTable() {
+ lazyEmptyGroupIdTable() {
return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
}
@@ -215,7 +255,7 @@
* @return Map representing group key table.
*/
private Map<GroupStoreKeyMapKey, StoredGroupEntry>
- getGroupStoreKeyMap() {
+ getGroupStoreKeyMap() {
return groupStoreEntriesByKey.asJavaMap();
}
@@ -236,7 +276,7 @@
* @return Map representing group key table.
*/
private Map<GroupStoreKeyMapKey, StoredGroupEntry>
- getPendingGroupKeyTable() {
+ getPendingGroupKeyTable() {
return auditPendingReqQueue.asJavaMap();
}
@@ -261,14 +301,13 @@
@Override
public int getGroupCount(DeviceId deviceId) {
return (getGroups(deviceId) != null) ?
- Iterables.size(getGroups(deviceId)) : 0;
+ Iterables.size(getGroups(deviceId)) : 0;
}
/**
* Returns the groups associated with a device.
*
* @param deviceId the device ID
- *
* @return the group entries
*/
@Override
@@ -294,9 +333,8 @@
/**
* Returns the stored group entry.
*
- * @param deviceId the device ID
+ * @param deviceId the device ID
* @param appCookie the group key
- *
* @return a group associated with the key
*/
@Override
@@ -365,8 +403,8 @@
groupDesc.deviceId());
if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
log.error("No Master for device {}..."
- + "Can not perform add group operation",
- groupDesc.deviceId());
+ + "Can not perform add group operation",
+ groupDesc.deviceId());
//TODO: Send Group operation failure event
return;
}
@@ -375,19 +413,20 @@
groupDesc);
clusterCommunicator.unicast(groupOp,
- GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- clusterMsgSerializer::serialize,
- mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ clusterMsgSerializer::serialize,
+ mastershipService.getMasterFor(groupDesc.deviceId()))
+ .whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
- groupOp,
- mastershipService.getMasterFor(groupDesc.deviceId()));
+ groupOp,
+ mastershipService.getMasterFor(groupDesc.deviceId()));
//TODO: Send Group operation failure event
} else {
log.debug("Sent Group operation request for device {} "
- + "to remote MASTER {}",
- groupDesc.deviceId(),
- mastershipService.getMasterFor(groupDesc.deviceId()));
+ + "to remote MASTER {}",
+ groupDesc.deviceId(),
+ mastershipService.getMasterFor(groupDesc.deviceId()));
}
});
return;
@@ -415,7 +454,7 @@
return null;
}
- for (Group extraneousGroup:extraneousMap.values()) {
+ for (Group extraneousGroup : extraneousMap.values()) {
if (extraneousGroup.buckets().equals(buckets)) {
return extraneousGroup;
}
@@ -434,7 +473,7 @@
// Add this group description to pending group key table
// Create a group entry object with Dummy Group ID
log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
- groupDesc.deviceId());
+ groupDesc.deviceId());
StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
group.setState(GroupState.WAITING_AUDIT_COMPLETE);
Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
@@ -449,10 +488,10 @@
if (groupDesc.givenGroupId() != null) {
//Check if there is a extraneous group existing with the same Id
matchingExtraneousGroup = getMatchingExtraneousGroupbyId(
- groupDesc.deviceId(), groupDesc.givenGroupId());
+ groupDesc.deviceId(), groupDesc.givenGroupId());
if (matchingExtraneousGroup != null) {
log.debug("storeGroupDescriptionInternal: Matching extraneous group "
- + "found in Device {} for group id 0x{}",
+ + "found in Device {} for group id 0x{}",
groupDesc.deviceId(),
Integer.toHexString(groupDesc.givenGroupId()));
//Check if the group buckets matches with user provided buckets
@@ -460,19 +499,19 @@
//Group is already existing with the same buckets and Id
// Create a group entry object
log.debug("storeGroupDescriptionInternal: Buckets also matching "
- + "in Device {} for group id 0x{}",
+ + "in Device {} for group id 0x{}",
groupDesc.deviceId(),
Integer.toHexString(groupDesc.givenGroupId()));
StoredGroupEntry group = new DefaultGroup(
- matchingExtraneousGroup.id(), groupDesc);
+ matchingExtraneousGroup.id(), groupDesc);
// Insert the newly created group entry into key and id maps
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
- groupDesc.appCookie()), group);
+ put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()), group);
// Ensure it also inserted into group id based table to
// avoid any chances of duplication in group id generation
getGroupIdTable(groupDesc.deviceId()).
- put(matchingExtraneousGroup.id(), group);
+ put(matchingExtraneousGroup.id(), group);
addOrUpdateGroupEntry(matchingExtraneousGroup);
removeExtraneousGroupEntry(matchingExtraneousGroup);
return;
@@ -480,22 +519,22 @@
//Group buckets are not matching. Update group
//with user provided buckets.
log.debug("storeGroupDescriptionInternal: Buckets are not "
- + "matching in Device {} for group id 0x{}",
+ + "matching in Device {} for group id 0x{}",
groupDesc.deviceId(),
Integer.toHexString(groupDesc.givenGroupId()));
StoredGroupEntry modifiedGroup = new DefaultGroup(
- matchingExtraneousGroup.id(), groupDesc);
+ matchingExtraneousGroup.id(), groupDesc);
modifiedGroup.setState(GroupState.PENDING_UPDATE);
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
- groupDesc.appCookie()), modifiedGroup);
+ put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()), modifiedGroup);
// Ensure it also inserted into group id based table to
// avoid any chances of duplication in group id generation
getGroupIdTable(groupDesc.deviceId()).
- put(matchingExtraneousGroup.id(), modifiedGroup);
+ put(matchingExtraneousGroup.id(), modifiedGroup);
removeExtraneousGroupEntry(matchingExtraneousGroup);
log.debug("storeGroupDescriptionInternal: Triggering Group "
- + "UPDATE request for {} in device {}",
+ + "UPDATE request for {} in device {}",
matchingExtraneousGroup.id(),
groupDesc.deviceId());
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, modifiedGroup));
@@ -505,7 +544,7 @@
} else {
//Check if there is an extraneous group with user provided buckets
matchingExtraneousGroup = getMatchingExtraneousGroupbyBuckets(
- groupDesc.deviceId(), groupDesc.buckets());
+ groupDesc.deviceId(), groupDesc.buckets());
if (matchingExtraneousGroup != null) {
//Group is already existing with the same buckets.
//So reuse this group.
@@ -513,15 +552,15 @@
groupDesc.deviceId());
//Create a group entry object
StoredGroupEntry group = new DefaultGroup(
- matchingExtraneousGroup.id(), groupDesc);
+ matchingExtraneousGroup.id(), groupDesc);
// Insert the newly created group entry into key and id maps
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
- groupDesc.appCookie()), group);
+ put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()), group);
// Ensure it also inserted into group id based table to
// avoid any chances of duplication in group id generation
getGroupIdTable(groupDesc.deviceId()).
- put(matchingExtraneousGroup.id(), group);
+ put(matchingExtraneousGroup.id(), group);
addOrUpdateGroupEntry(matchingExtraneousGroup);
removeExtraneousGroupEntry(matchingExtraneousGroup);
return;
@@ -543,11 +582,11 @@
new DefaultGroupId(groupDesc.givenGroupId()));
if (existing != null) {
log.warn("Group already exists with the same id: 0x{} in dev:{} "
- + "but with different key: {} (request gkey: {})",
- Integer.toHexString(groupDesc.givenGroupId()),
- groupDesc.deviceId(),
- existing.appCookie(),
- groupDesc.appCookie());
+ + "but with different key: {} (request gkey: {})",
+ Integer.toHexString(groupDesc.givenGroupId()),
+ groupDesc.deviceId(),
+ existing.appCookie(),
+ groupDesc.appCookie());
return;
}
id = new DefaultGroupId(groupDesc.givenGroupId());
@@ -556,15 +595,15 @@
StoredGroupEntry group = new DefaultGroup(id, groupDesc);
// Insert the newly created group entry into key and id maps
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
- groupDesc.appCookie()), group);
+ put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()), group);
// Ensure it also inserted into group id based table to
// avoid any chances of duplication in group id generation
getGroupIdTable(groupDesc.deviceId()).
- put(id, group);
+ put(id, group);
log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
- id,
- groupDesc.deviceId());
+ id,
+ groupDesc.deviceId());
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
}
@@ -573,10 +612,10 @@
* Updates the existing group entry with the information
* from group description.
*
- * @param deviceId the device ID
+ * @param deviceId the device ID
* @param oldAppCookie the current group key
- * @param type update type
- * @param newBuckets group buckets for updates
+ * @param type update type
+ * @param newBuckets group buckets for updates
* @param newAppCookie optional new group key
*/
@Override
@@ -592,8 +631,8 @@
deviceId);
if (mastershipService.getMasterFor(deviceId) == null) {
log.error("No Master for device {}..."
- + "Can not perform update group operation",
- deviceId);
+ + "Can not perform update group operation",
+ deviceId);
//TODO: Send Group operation failure event
return;
}
@@ -605,16 +644,16 @@
newAppCookie);
clusterCommunicator.unicast(groupOp,
- GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- clusterMsgSerializer::serialize,
- mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to send request to master: {} to {}",
- groupOp,
- mastershipService.getMasterFor(deviceId), error);
- }
- //TODO: Send Group operation failure event
- });
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ clusterMsgSerializer::serialize,
+ mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to send request to master: {} to {}",
+ groupOp,
+ mastershipService.getMasterFor(deviceId), error);
+ }
+ //TODO: Send Group operation failure event
+ });
return;
}
log.debug("updateGroupDescription for device {} is getting handled locally",
@@ -627,15 +666,15 @@
}
private void updateGroupDescriptionInternal(DeviceId deviceId,
- GroupKey oldAppCookie,
- UpdateType type,
- GroupBuckets newBuckets,
- GroupKey newAppCookie) {
+ GroupKey oldAppCookie,
+ UpdateType type,
+ GroupBuckets newBuckets,
+ GroupKey newAppCookie) {
// Check if a group is existing with the provided key
Group oldGroup = getGroup(deviceId, oldAppCookie);
if (oldGroup == null) {
log.warn("updateGroupDescriptionInternal: Group not found...strange. "
- + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
+ + "GroupKey:{} DeviceId:{}", oldAppCookie, deviceId);
return;
}
@@ -656,9 +695,9 @@
StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
updatedGroupDesc);
log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
- oldGroup.id(),
- oldGroup.deviceId(),
- oldGroup.state());
+ oldGroup.id(),
+ oldGroup.deviceId(),
+ oldGroup.state());
newGroup.setState(GroupState.PENDING_UPDATE);
newGroup.setLife(oldGroup.life());
newGroup.setPackets(oldGroup.packets());
@@ -669,12 +708,12 @@
log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
type);
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(newGroup.deviceId(),
- newGroup.appCookie()), newGroup);
+ put(new GroupStoreKeyMapKey(newGroup.deviceId(),
+ newGroup.appCookie()), newGroup);
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
} else {
log.warn("updateGroupDescriptionInternal with type {}: No "
- + "change in the buckets in update", type);
+ + "change in the buckets in update", type);
}
}
@@ -688,7 +727,7 @@
if (type == UpdateType.ADD) {
// Check if the any of the new buckets are part of
// the old bucket list
- for (GroupBucket addBucket:buckets.buckets()) {
+ for (GroupBucket addBucket : buckets.buckets()) {
if (!newBucketList.contains(addBucket)) {
newBucketList.add(addBucket);
groupDescUpdated = true;
@@ -697,7 +736,7 @@
} else if (type == UpdateType.REMOVE) {
// Check if the to be removed buckets are part of the
// old bucket list
- for (GroupBucket removeBucket:buckets.buckets()) {
+ for (GroupBucket removeBucket : buckets.buckets()) {
if (newBucketList.contains(removeBucket)) {
newBucketList.remove(removeBucket);
groupDescUpdated = true;
@@ -715,7 +754,7 @@
/**
* Triggers deleting the existing group entry.
*
- * @param deviceId the device ID
+ * @param deviceId the device ID
* @param appCookie the group key
*/
@Override
@@ -728,8 +767,8 @@
deviceId);
if (mastershipService.getMasterFor(deviceId) == null) {
log.error("No Master for device {}..."
- + "Can not perform delete group operation",
- deviceId);
+ + "Can not perform delete group operation",
+ deviceId);
//TODO: Send Group operation failure event
return;
}
@@ -738,16 +777,16 @@
appCookie);
clusterCommunicator.unicast(groupOp,
- GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- clusterMsgSerializer::serialize,
- mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to send request to master: {} to {}",
- groupOp,
- mastershipService.getMasterFor(deviceId), error);
- }
- //TODO: Send Group operation failure event
- });
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ clusterMsgSerializer::serialize,
+ mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to send request to master: {} to {}",
+ groupOp,
+ mastershipService.getMasterFor(deviceId), error);
+ }
+ //TODO: Send Group operation failure event
+ });
return;
}
log.debug("deleteGroupDescription in device {} is getting handled locally",
@@ -764,14 +803,14 @@
}
log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
- existing.id(),
- existing.deviceId(),
- existing.state());
+ existing.id(),
+ existing.deviceId(),
+ existing.state());
synchronized (existing) {
existing.setState(GroupState.PENDING_DELETE);
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
- existing);
+ put(new GroupStoreKeyMapKey(existing.deviceId(), existing.appCookie()),
+ existing);
}
log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
deviceId);
@@ -792,15 +831,15 @@
if (existing != null) {
log.trace("addOrUpdateGroupEntry: updating group entry {} in device {}",
- group.id(),
- group.deviceId());
+ group.id(),
+ group.deviceId());
synchronized (existing) {
- for (GroupBucket bucket:group.buckets().buckets()) {
+ for (GroupBucket bucket : group.buckets().buckets()) {
Optional<GroupBucket> matchingBucket =
existing.buckets().buckets()
- .stream()
- .filter((existingBucket)->(existingBucket.equals(bucket)))
- .findFirst();
+ .stream()
+ .filter((existingBucket) -> (existingBucket.equals(bucket)))
+ .findFirst();
if (matchingBucket.isPresent()) {
((StoredGroupBucketEntry) matchingBucket.
get()).setPackets(bucket.packets());
@@ -808,38 +847,39 @@
get()).setBytes(bucket.bytes());
} else {
log.warn("addOrUpdateGroupEntry: No matching "
- + "buckets to update stats");
+ + "buckets to update stats");
}
}
existing.setLife(group.life());
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
+ existing.setReferenceCount(group.referenceCount());
if ((existing.state() == GroupState.PENDING_ADD) ||
- (existing.state() == GroupState.PENDING_ADD_RETRY)) {
+ (existing.state() == GroupState.PENDING_ADD_RETRY)) {
log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
- existing.id(),
- existing.deviceId(),
- existing.state());
+ existing.id(),
+ existing.deviceId(),
+ existing.state());
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(true);
event = new GroupEvent(Type.GROUP_ADDED, existing);
} else {
log.trace("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
- existing.id(),
- existing.deviceId(),
- GroupState.PENDING_UPDATE);
+ existing.id(),
+ existing.deviceId(),
+ GroupState.PENDING_UPDATE);
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(false);
event = new GroupEvent(Type.GROUP_UPDATED, existing);
}
//Re-PUT map entries to trigger map update events
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(existing.deviceId(),
- existing.appCookie()), existing);
+ put(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()), existing);
}
} else {
log.warn("addOrUpdateGroupEntry: Group update "
- + "happening for a non-existing entry in the map");
+ + "happening for a non-existing entry in the map");
}
if (event != null) {
@@ -859,8 +899,8 @@
if (existing != null) {
log.debug("removeGroupEntry: removing group entry {} in device {}",
- group.id(),
- group.deviceId());
+ group.id(),
+ group.deviceId());
//Removal from groupid based map will happen in the
//map update listener
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
@@ -868,9 +908,9 @@
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
} else {
log.warn("removeGroupEntry for {} in device{} is "
- + "not existing in our maps",
- group.id(),
- group.deviceId());
+ + "not existing in our maps",
+ group.id(),
+ group.deviceId());
}
}
@@ -900,13 +940,13 @@
// Execute all pending group requests
List<StoredGroupEntry> pendingGroupRequests =
getPendingGroupKeyTable().values()
- .stream()
- .filter(g-> g.deviceId().equals(deviceId))
- .collect(Collectors.toList());
+ .stream()
+ .filter(g -> g.deviceId().equals(deviceId))
+ .collect(Collectors.toList());
log.debug("processing pending group add requests for device {} and number of pending requests {}",
- deviceId,
- pendingGroupRequests.size());
- for (Group group:pendingGroupRequests) {
+ deviceId,
+ pendingGroupRequests.size());
+ for (Group group : pendingGroupRequests) {
GroupDescription tmp = new DefaultGroupDescription(
group.deviceId(),
group.type(),
@@ -916,7 +956,7 @@
group.appId());
storeGroupDescriptionInternal(tmp);
getPendingGroupKeyTable().
- remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
+ remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
}
} else {
Boolean audited = deviceAuditStatus.get(deviceId);
@@ -948,11 +988,11 @@
}
log.warn("groupOperationFailed: group operation {} failed"
- + "for group {} in device {} with code {}",
- operation.opType(),
- existing.id(),
- existing.deviceId(),
- operation.failureCode());
+ + "for group {} in device {} with code {}",
+ operation.opType(),
+ existing.id(),
+ existing.deviceId(),
+ operation.failureCode());
if (operation.failureCode() == GroupOperation.GroupMsgErrorCode.GROUP_EXISTS) {
log.warn("Current extraneous groups in device:{} are: {}",
deviceId,
@@ -960,14 +1000,14 @@
if (operation.buckets().equals(existing.buckets())) {
if (existing.state() == GroupState.PENDING_ADD) {
log.info("GROUP_EXISTS: GroupID and Buckets match for group in pending "
- + "add state - moving to ADDED for group {} in device {}",
- existing.id(), deviceId);
+ + "add state - moving to ADDED for group {} in device {}",
+ existing.id(), deviceId);
addOrUpdateGroupEntry(existing);
return;
} else {
log.warn("GROUP EXISTS: Group ID matched but buckets did not. "
- + "Operation: {} Existing: {}", operation.buckets(),
- existing.buckets());
+ + "Operation: {} Existing: {}", operation.buckets(),
+ existing.buckets());
}
}
}
@@ -976,9 +1016,9 @@
if (existing.state() == GroupState.PENDING_ADD) {
notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
log.warn("groupOperationFailed: cleaningup "
- + "group {} from store in device {}....",
- existing.id(),
- existing.deviceId());
+ + "group {} from store in device {}....",
+ existing.id(),
+ existing.deviceId());
//Removal from groupid based map will happen in the
//map update listener
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
@@ -999,8 +1039,8 @@
@Override
public void addOrUpdateExtraneousGroupEntry(Group group) {
log.debug("add/update extraneous group entry {} in device {}",
- group.id(),
- group.deviceId());
+ group.id(),
+ group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.put(group.id(), group);
@@ -1011,8 +1051,8 @@
@Override
public void removeExtraneousGroupEntry(Group group) {
log.debug("remove extraneous group entry {} of device {} from store",
- group.id(),
- group.deviceId());
+ group.id(),
+ group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.remove(group.id());
@@ -1038,7 +1078,7 @@
StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
if ((key == null) && (group == null)) {
log.error("GroupStoreKeyMapListener: Received "
- + "event {} with null entry", mapEvent.type());
+ + "event {} with null entry", mapEvent.type());
return;
} else if (group == null) {
group = getGroupIdTable(key.deviceId()).values()
@@ -1047,7 +1087,7 @@
.findFirst().get();
if (group == null) {
log.error("GroupStoreKeyMapListener: Received "
- + "event {} with null entry... can not process", mapEvent.type());
+ + "event {} with null entry... can not process", mapEvent.type());
return;
}
}
@@ -1063,13 +1103,13 @@
if (value.isGroupStateAddedFirstTime()) {
groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
- group.id(),
- group.deviceId());
+ group.id(),
+ group.deviceId());
} else {
groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
log.trace("Received following GROUP_ADDED state update for id {} in device {}",
- group.id(),
- group.deviceId());
+ group.id(),
+ group.deviceId());
}
}
} else if (mapEvent.type() == MapEvent.Type.REMOVE) {
@@ -1086,24 +1126,24 @@
private void process(GroupStoreMessage groupOp) {
log.debug("Received remote group operation {} request for device {}",
- groupOp.type(),
- groupOp.deviceId());
- if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
- log.warn("This node is not MASTER for device {}", groupOp.deviceId());
- return;
- }
- if (groupOp.type() == GroupStoreMessage.Type.ADD) {
- storeGroupDescriptionInternal(groupOp.groupDesc());
- } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
- updateGroupDescriptionInternal(groupOp.deviceId(),
- groupOp.appCookie(),
- groupOp.updateType(),
- groupOp.updateBuckets(),
- groupOp.newAppCookie());
- } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
- deleteGroupDescriptionInternal(groupOp.deviceId(),
- groupOp.appCookie());
- }
+ groupOp.type(),
+ groupOp.deviceId());
+ if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
+ log.warn("This node is not MASTER for device {}", groupOp.deviceId());
+ return;
+ }
+ if (groupOp.type() == GroupStoreMessage.Type.ADD) {
+ storeGroupDescriptionInternal(groupOp.groupDesc());
+ } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
+ updateGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie(),
+ groupOp.updateType(),
+ groupOp.updateBuckets(),
+ groupOp.newAppCookie());
+ } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
+ deleteGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie());
+ }
}
/**
@@ -1144,6 +1184,7 @@
protected static class GroupStoreKeyMapKey extends GroupStoreMapKey {
private final GroupKey appCookie;
+
public GroupStoreKeyMapKey(DeviceId deviceId,
GroupKey appCookie) {
super(deviceId);
@@ -1175,6 +1216,7 @@
protected static class GroupStoreIdMapKey extends GroupStoreMapKey {
private final GroupId groupId;
+
public GroupStoreIdMapKey(DeviceId deviceId,
GroupId groupId) {
super(deviceId);
@@ -1228,17 +1270,20 @@
storedGroupEntries.size(),
deviceId);
for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
- it1.hasNext();) {
+ it1.hasNext();) {
Group group = it1.next();
log.trace("Stored Group {} for device {}", group, deviceId);
}
+ garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
+
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
log.trace("Group AUDIT: group {} exists in both planes for device {}",
- group.id(), deviceId);
+ group.id(), deviceId);
+
groupAdded(group);
it2.remove();
}
@@ -1249,15 +1294,15 @@
// It is possible that group update is
// in progress while we got a stale info from switch
if (!storedGroupEntries.remove(getGroup(
- group.deviceId(), group.id()))) {
+ group.deviceId(), group.id()))) {
log.warn("Group AUDIT: Inconsistent state:"
- + "Group exists in ID based table while "
- + "not present in key based table");
+ + "Group exists in ID based table while "
+ + "not present in key based table");
}
} else {
// there are groups in the switch that aren't in the store
log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
- group.id(), deviceId);
+ group.id(), deviceId);
extraneousStoredEntries.remove(group);
extraneousGroup(group);
}
@@ -1265,24 +1310,47 @@
for (Group group : storedGroupEntries) {
// there are groups in the store that aren't in the switch
log.debug("Group AUDIT: group {} missing in data plane for device {}",
- group.id(), deviceId);
+ group.id(), deviceId);
groupMissing(group);
}
for (Group group : extraneousStoredEntries) {
// there are groups in the extraneous store that
// aren't in the switch
log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
- group.id(), deviceId);
+ group.id(), deviceId);
removeExtraneousGroupEntry(group);
}
if (!deviceInitialAuditStatus) {
log.info("Group AUDIT: Setting device {} initial AUDIT completed",
- deviceId);
+ deviceId);
deviceInitialAuditCompleted(deviceId, true);
}
}
+ private void garbageCollect(DeviceId deviceId,
+ Set<Group> southboundGroupEntries,
+ Set<StoredGroupEntry> storedGroupEntries) {
+ if (!garbageCollect) {
+ return;
+ }
+
+ Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
+ while (it.hasNext()) {
+ StoredGroupEntry group = it.next();
+ if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
+ log.debug("Garbage collecting group {} on {}", group, deviceId);
+ deleteGroupDescription(deviceId, group.appCookie());
+ southboundGroupEntries.remove(group);
+ it.remove();
+ }
+ }
+ }
+
+ private boolean checkGroupRefCount(Group group) {
+ return (group.referenceCount() == 0 && group.age() >= gcThresh);
+ }
+
private void groupMissing(Group group) {
switch (group.state()) {
case PENDING_DELETE:
@@ -1299,14 +1367,14 @@
StoredGroupEntry existing =
getStoredGroupEntry(group.deviceId(), group.id());
log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
- existing.id(),
- existing.deviceId(),
- existing.state());
+ existing.id(),
+ existing.deviceId(),
+ existing.state());
existing.setState(Group.GroupState.PENDING_ADD_RETRY);
//Re-PUT map entries to trigger map update events
getGroupStoreKeyMap().
- put(new GroupStoreKeyMapKey(existing.deviceId(),
- existing.appCookie()), existing);
+ put(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()), existing);
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
break;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 4f37504..791b769 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -23,6 +23,7 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
+import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
@@ -129,6 +130,7 @@
groupStoreImpl.storageService = new TestStorageService();
groupStoreImpl.clusterCommunicator = new ClusterCommunicationServiceAdapter();
groupStoreImpl.mastershipService = new MasterOfAll();
+ groupStoreImpl.cfgService = new ComponentConfigAdapter();
groupStoreImpl.activate();
groupStore = groupStoreImpl;
auditPendingReqQueue =
diff --git a/web/api/src/test/java/org/onosproject/rest/resources/GroupsResourceTest.java b/web/api/src/test/java/org/onosproject/rest/resources/GroupsResourceTest.java
index 1321893..f8048d4 100644
--- a/web/api/src/test/java/org/onosproject/rest/resources/GroupsResourceTest.java
+++ b/web/api/src/test/java/org/onosproject/rest/resources/GroupsResourceTest.java
@@ -156,6 +156,11 @@
}
@Override
+ public int age() {
+ return 0;
+ }
+
+ @Override
public Type type() {
return GroupDescription.Type.ALL;
}