Moving group store to consistent map
Change-Id: Id3c23c0cd9d7c713bceffc7bd9125aa3de99c45e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 1d7ded1..64b7c87 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -56,11 +56,12 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.ArrayList;
@@ -70,6 +71,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
@@ -79,7 +81,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
@@ -113,12 +114,12 @@
protected MastershipService mastershipService;
// Per device group table with (device id + app cookie) as key
- private EventuallyConsistentMap<GroupStoreKeyMapKey,
+ private ConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> groupStoreEntriesByKey = null;
// Per device group table with (device id + group id) as key
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
groupEntriesById = new ConcurrentHashMap<>();
- private EventuallyConsistentMap<GroupStoreKeyMapKey,
+ private ConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> auditPendingReqQueue = null;
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
extraneousGroupEntriesById = new ConcurrentHashMap<>();
@@ -131,8 +132,6 @@
private KryoNamespace.Builder kryoBuilder = null;
- private final AtomicLong sequenceNumber = new AtomicLong(0);
-
private KryoNamespace clusterMsgSerializer;
@Activate
@@ -169,29 +168,21 @@
this::process,
messageHandlingExecutor);
- log.debug("Creating EC map groupstorekeymap");
- EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
- keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
+ log.debug("Creating Consistent map onos-group-store-keymap");
- groupStoreEntriesByKey = keyMapBuilder
- .withName("groupstorekeymap")
- .withSerializer(kryoBuilder)
- .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
- sequenceNumber.getAndIncrement()))
+ groupStoreEntriesByKey = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
+ .withName("onos-group-store-keymap")
+ .withSerializer(Serializer.using(kryoBuilder.build()))
.build();
groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
log.debug("Current size of groupstorekeymap:{}",
groupStoreEntriesByKey.size());
- log.debug("Creating EC map pendinggroupkeymap");
- EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
- auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
+ log.debug("Creating Consistent map pendinggroupkeymap");
- auditPendingReqQueue = auditMapBuilder
- .withName("pendinggroupkeymap")
- .withSerializer(kryoBuilder)
- .withTimestampProvider((k, v) -> new MultiValuedTimestamp<>(System.currentTimeMillis(),
- sequenceNumber.getAndIncrement()))
+ auditPendingReqQueue = storageService.<GroupStoreKeyMapKey, StoredGroupEntry>consistentMapBuilder()
+ .withName("onos-pending-group-keymap")
+ .withSerializer(Serializer.using(kryoBuilder.build()))
.build();
log.debug("Current size of pendinggroupkeymap:{}",
auditPendingReqQueue.size());
@@ -222,9 +213,9 @@
*
* @return Map representing group key table.
*/
- private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+ private Map<GroupStoreKeyMapKey, StoredGroupEntry>
getGroupStoreKeyMap() {
- return groupStoreEntriesByKey;
+ return groupStoreEntriesByKey.asJavaMap();
}
/**
@@ -243,9 +234,9 @@
*
* @return Map representing group key table.
*/
- private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+ private Map<GroupStoreKeyMapKey, StoredGroupEntry>
getPendingGroupKeyTable() {
- return auditPendingReqQueue;
+ return auditPendingReqQueue.asJavaMap();
}
/**
@@ -445,7 +436,7 @@
groupDesc.deviceId());
StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
group.setState(GroupState.WAITING_AUDIT_COMPLETE);
- EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
+ Map<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
getPendingGroupKeyTable();
pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
groupDesc.appCookie()),
@@ -851,7 +842,7 @@
Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
new HashSet<>();
- groupStoreEntriesByKey.entrySet().stream()
+ getGroupStoreKeyMap().entrySet().stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
.forEach(entryPendingRemove::add);
@@ -987,14 +978,13 @@
* Map handler to receive any events when the group key map is updated.
*/
private class GroupStoreKeyMapListener implements
- EventuallyConsistentMapListener<GroupStoreKeyMapKey, StoredGroupEntry> {
+ MapEventListener<GroupStoreKeyMapKey, StoredGroupEntry> {
@Override
- public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
- StoredGroupEntry> mapEvent) {
+ public void event(MapEvent<GroupStoreKeyMapKey, StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
GroupStoreKeyMapKey key = mapEvent.key();
- StoredGroupEntry group = mapEvent.value();
+ StoredGroupEntry group = Versioned.valueOrNull(mapEvent.newValue());
if ((key == null) && (group == null)) {
log.error("GroupStoreKeyMapListener: Received "
+ "event {} with null entry", mapEvent.type());
@@ -1014,25 +1004,24 @@
mapEvent.type(),
group.id(),
key.deviceId());
- if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
+ if (mapEvent.type() == MapEvent.Type.INSERT || mapEvent.type() == MapEvent.Type.UPDATE) {
// Update the group ID table
getGroupIdTable(group.deviceId()).put(group.id(), group);
- if (mapEvent.value().state() == Group.GroupState.ADDED) {
- if (mapEvent.value().isGroupStateAddedFirstTime()) {
- groupEvent = new GroupEvent(Type.GROUP_ADDED,
- mapEvent.value());
+ StoredGroupEntry value = Versioned.valueOrNull(mapEvent.newValue());
+ if (value.state() == Group.GroupState.ADDED) {
+ if (value.isGroupStateAddedFirstTime()) {
+ groupEvent = new GroupEvent(Type.GROUP_ADDED, value);
log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
group.id(),
group.deviceId());
} else {
- groupEvent = new GroupEvent(Type.GROUP_UPDATED,
- mapEvent.value());
+ groupEvent = new GroupEvent(Type.GROUP_UPDATED, value);
log.trace("Received following GROUP_ADDED state update for id {} in device {}",
group.id(),
group.deviceId());
}
}
- } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
+ } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
// Remove the entry from the group ID table
getGroupIdTable(group.deviceId()).remove(group.id(), group);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 066fced..4f37504 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -46,7 +46,7 @@
import org.onosproject.net.group.GroupStore;
import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
-import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.TestStorageService;
import com.google.common.collect.ImmutableList;
@@ -109,7 +109,7 @@
DistributedGroupStore groupStoreImpl;
GroupStore groupStore;
- EventuallyConsistentMap auditPendingReqQueue;
+ ConsistentMap auditPendingReqQueue;
static class MasterOfAll extends MastershipServiceAdapter {
@Override