Implemented a Builder pattern for EventuallyConsistentMaps.
EventuallyConsistentMap has been moved to the API package so is now available outside the stores.
ONOS-1357
Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index a571b4e..1d30607 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -15,23 +15,8 @@
*/
package org.onosproject.store.group.impl;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -75,17 +60,32 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
-import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of group entries using trivial in-memory implementation.
@@ -108,6 +108,9 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
// Per device group table with (device id + app cookie) as key
@@ -192,31 +195,38 @@
messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
- groupStoreEntriesByKey =
- new EventuallyConsistentMapImpl<>("groupstorekeymap",
- clusterService,
- clusterCommunicator,
- kryoBuilder,
- new GroupStoreLogicalClockManager<>());
+ EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
+ keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+ groupStoreEntriesByKey = keyMapBuilder
+ .withName("groupstorekeymap")
+ .withSerializer(kryoBuilder)
+ .withClockService(new GroupStoreLogicalClockManager<>())
+ .build();
log.trace("Current size {}", groupStoreEntriesByKey.size());
log.debug("Creating EC map groupstoreidmap");
- groupStoreEntriesById =
- new EventuallyConsistentMapImpl<>("groupstoreidmap",
- clusterService,
- clusterCommunicator,
- kryoBuilder,
- new GroupStoreLogicalClockManager<>());
+ EventuallyConsistentMapBuilder<GroupStoreIdMapKey, StoredGroupEntry>
+ idMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+ groupStoreEntriesById = idMapBuilder
+ .withName("groupstoreidmap")
+ .withSerializer(kryoBuilder)
+ .withClockService(new GroupStoreLogicalClockManager<>())
+ .build();
+
groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
log.trace("Current size {}", groupStoreEntriesById.size());
log.debug("Creating EC map pendinggroupkeymap");
- auditPendingReqQueue =
- new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
- clusterService,
- clusterCommunicator,
- kryoBuilder,
- new GroupStoreLogicalClockManager<>());
+ EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
+ auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+ auditPendingReqQueue = auditMapBuilder
+ .withName("pendinggroupkeymap")
+ .withSerializer(kryoBuilder)
+ .withClockService(new GroupStoreLogicalClockManager<>())
+ .build();
log.trace("Current size {}", auditPendingReqQueue.size());
log.info("Started");
@@ -819,11 +829,11 @@
* Map handler to receive any events when the group map is updated.
*/
private class GroupStoreIdMapListener implements
- EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
+ EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
@Override
public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
- StoredGroupEntry> mapEvent) {
+ StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
log.trace("GroupStoreIdMapListener: received groupid map event {}",
mapEvent.type());