Implemented a Builder pattern for EventuallyConsistentMaps.
EventuallyConsistentMap has been moved to the API package so is now available outside the stores.
ONOS-1357
Change-Id: I1c892eb3dbefa72cb3f3eb3ccc74e9a02c7e2ac9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java b/core/api/src/main/java/org/onosproject/store/service/ClockService.java
similarity index 96%
rename from core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java
rename to core/api/src/main/java/org/onosproject/store/service/ClockService.java
index 3ed89f8..a0de0a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ClockService.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.store.impl;
+package org.onosproject.store.service;
import org.onosproject.store.Timestamp;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
rename to core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
index 8cda45d..58a81cc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.store.ecmap;
+package org.onosproject.store.service;
import java.util.Collection;
import java.util.Map;
@@ -114,7 +114,7 @@
* Removes the given key-value mapping from the map, if it exists.
* <p>
* This actually means remove any values up to and including the timestamp
- * given by {@link org.onosproject.store.impl.ClockService#getTimestamp(Object, Object)}.
+ * given by {@link org.onosproject.store.service.ClockService#getTimestamp(Object, Object)}.
* Any mappings that produce an earlier timestamp than this given key-value
* pair will be removed, and any mappings that produce a later timestamp
* will supersede this remove.
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
new file mode 100644
index 0000000..779c329
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+/**
+ * Builder for eventually consistent maps.
+ *
+ * @param <K> type for map keys
+ * @param <V> type for map values
+ */
+public interface EventuallyConsistentMapBuilder<K, V> {
+
+ /**
+ * Sets the name of the map.
+ * <p>
+ * Each map is identified by a string map name. EventuallyConsistentMapImpl
+ * objects in different JVMs that use the same map name will form a
+ * distributed map across JVMs (provided the cluster service is aware of
+ * both nodes).
+ * </p>
+ * <p>
+ * Note: This is a mandatory parameter.
+ * </p>
+ *
+ * @param name name of the map
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withName(String name);
+
+ /**
+ * Sets a serializer builder that can be used to create a serializer that
+ * can serialize both the keys and values put into the map. The serializer
+ * builder should be pre-populated with any classes that will be put into
+ * the map.
+ * <p>
+ * Note: This is a mandatory parameter.
+ * </p>
+ *
+ * @param serializerBuilder serializer builder
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withSerializer(
+ KryoNamespace.Builder serializerBuilder);
+
+ /**
+ * Sets the clock service to use for generating timestamps for map updates.
+ * <p>
+ * The client must provide an {@link org.onosproject.store.service.ClockService}
+ * which can generate timestamps for a given key. The clock service is free
+ * to generate timestamps however it wishes, however these timestamps will
+ * be used to serialize updates to the map so they must be strict enough
+ * to ensure updates are properly ordered for the use case (i.e. in some
+ * cases wallclock time will suffice, whereas in other cases logical time
+ * will be necessary).
+ * </p>
+ * <p>
+ * Note: This is a mandatory parameter.
+ * </p>
+ *
+ * @param clockService clock service
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withClockService(
+ ClockService<K, V> clockService);
+
+ /**
+ * Sets the executor to use for processing events coming in from peers.
+ *
+ * @param executor event executor
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withEventExecutor(
+ ExecutorService executor);
+
+ /**
+ * Sets the executor to use for sending events to peers.
+ *
+ * @param executor event executor
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
+ ExecutorService executor);
+
+ /**
+ * Sets the executor to use for background anti-entropy tasks.
+ *
+ * @param executor event executor
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(
+ ScheduledExecutorService executor);
+
+ /**
+ * Sets a function that can determine which peers to replicate updates to.
+ * <p>
+ * The default function replicates to all nodes.
+ * </p>
+ *
+ * @param peerUpdateFunction function that takes a K, V input and returns
+ * a collection of NodeIds to replicate the event
+ * to
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction(
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction);
+
+ /**
+ * Prevents this map from writing tombstones of items that have been
+ * removed. This may result in zombie items reappearing after they have
+ * been removed.
+ * <p>
+ * The default behavior is tombstones are enabled.
+ * </p>
+ *
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled();
+
+ /**
+ * Configures how often to run the anti-entropy background task.
+ * <p>
+ * The default anti-entropy period is 5 seconds.
+ * </p>
+ *
+ * @param period anti-entropy period
+ * @param unit time unit for the period
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(
+ long period, TimeUnit unit);
+
+ /**
+ * Configure anti-entropy to converge faster at the cost of doing more work
+ * for each anti-entropy cycle. Suited to maps with low update rate where
+ * convergence time is more important than throughput.
+ * <p>
+ * The default behavior is to do less anti-entropy work at the cost of
+ * slower convergence.
+ * </p>
+ *
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withFasterConvergence();
+
+ /**
+ * Builds an eventually consistent map based on the configuration options
+ * supplied to this builder.
+ *
+ * @return new eventually consistent map
+ * @throws java.lang.RuntimeException if a mandatory parameter is missing
+ */
+ public EventuallyConsistentMap<K, V> build();
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapEvent.java
similarity index 98%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
rename to core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapEvent.java
index ee7d42e..8f63325 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapEvent.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.store.ecmap;
+package org.onosproject.store.service;
import com.google.common.base.MoreObjects;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapListener.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
rename to core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapListener.java
index 52642a1..686255e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapListener.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapListener.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.store.ecmap;
+package org.onosproject.store.service;
/**
* Listener interested in receiving modification events for an
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index 7e447cc..a59e376 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -30,6 +30,7 @@
/**
* Creates a ConsistentMap.
+ *
* @param name map name
* @param serializer serializer to use for serializing keys and values
* @return consistent map.
@@ -40,6 +41,7 @@
/**
* Creates a AsyncConsistentMap.
+ *
* @param name map name
* @param serializer serializer to use for serializing keys and values
* @return async consistent map
@@ -50,7 +52,18 @@
/**
* Creates a new transaction context.
+ *
* @return transaction context
*/
TransactionContext createTransactionContext();
-}
\ No newline at end of file
+
+ /**
+ * Creates a new EventuallyConsistentMapBuilder.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ * @return builder for an eventually consistent map
+ */
+ <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 935561d..1279c0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -43,14 +43,14 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
-import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
@@ -66,10 +66,16 @@
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.app.ApplicationEvent.Type.*;
-import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.onosproject.app.ApplicationEvent.Type.APP_ACTIVATED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_INSTALLED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_PERMISSIONS_CHANGED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
+import static org.onosproject.store.app.GossipApplicationStore.InternalState.ACTIVATED;
+import static org.onosproject.store.app.GossipApplicationStore.InternalState.DEACTIVATED;
+import static org.onosproject.store.app.GossipApplicationStore.InternalState.INSTALLED;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -106,6 +112,9 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
private final AtomicLong sequence = new AtomicLong();
@@ -130,27 +139,29 @@
new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
sequence.incrementAndGet());
- apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
- clusterCommunicator,
- serializer,
- appsClockService);
+ apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
+ .withName("apps")
+ .withSerializer(serializer)
+ .withClockService(appsClockService)
+ .build();
ClockService<Application, InternalState> statesClockService = (app, state) ->
new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
sequence.incrementAndGet());
- states = new EventuallyConsistentMapImpl<>("app-states",
- clusterService,
- clusterCommunicator,
- serializer,
- statesClockService);
+ states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
+ .withName("app-states")
+ .withSerializer(serializer)
+ .withClockService(statesClockService)
+ .build();
+
states.addListener(new InternalAppStatesListener());
- permissions = new EventuallyConsistentMapImpl<>("app-permissions",
- clusterService,
- clusterCommunicator,
- serializer,
- new WallclockClockManager<>());
+ permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
+ .withName("app-permissions")
+ .withSerializer(serializer)
+ .withClockService(new WallclockClockManager<>())
+ .build();
log.info("Started");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java b/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
index b70a291..94e17b1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cfg/GossipComponentConfigStore.java
@@ -25,21 +25,19 @@
import org.onosproject.cfg.ComponentConfigEvent;
import org.onosproject.cfg.ComponentConfigStore;
import org.onosproject.cfg.ComponentConfigStoreDelegate;
-import org.onosproject.cluster.ClusterService;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_SET;
import static org.onosproject.cfg.ComponentConfigEvent.Type.PROPERTY_UNSET;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.ecmap.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -59,20 +57,19 @@
private EventuallyConsistentMap<String, String> properties;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
+ protected StorageService storageService;
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
- properties = new EventuallyConsistentMapImpl<>("cfg", clusterService,
- clusterCommunicator,
- serializer,
- new WallclockClockManager<>());
+ properties = storageService.<String, String>eventuallyConsistentMapBuilder()
+ .withName("cfg")
+ .withSerializer(serializer)
+ .withClockService(new WallclockClockManager<>())
+ .build();
+
properties.addListener(new InternalPropertiesListener());
log.info("Started");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 6ddeea9..4ef317f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -17,13 +17,11 @@
package org.onosproject.store.consistent.impl;
import com.google.common.collect.Sets;
-
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.log.FileLog;
import net.kuujo.copycat.netty.NettyTcpProtocol;
import net.kuujo.copycat.protocol.Consistency;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -32,8 +30,11 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.store.cluster.impl.NodeInfo;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
@@ -71,6 +72,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
protected String nodeToUri(NodeInfo node) {
return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
}
@@ -169,12 +173,12 @@
@Override
public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
- return new DefaultConsistentMap<K, V>(name, partitionedDatabase, serializer);
+ return new DefaultConsistentMap<>(name, partitionedDatabase, serializer);
}
@Override
public <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer) {
- return new DefaultAsyncConsistentMap<K, V>(name, partitionedDatabase, serializer);
+ return new DefaultAsyncConsistentMap<>(name, partitionedDatabase, serializer);
}
@Override
@@ -207,4 +211,12 @@
database.cluster().leader() != null ?
database.cluster().leader().uri() : null);
}
+
+
+ @Override
+ public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+ return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+ clusterCommunicator);
+ }
+
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
new file mode 100644
index 0000000..9d60143
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.ecmap;
+
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Eventually consistent map builder.
+ */
+public class EventuallyConsistentMapBuilderImpl<K, V>
+ implements EventuallyConsistentMapBuilder<K, V> {
+ private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
+
+ private String name;
+ private KryoNamespace.Builder serializerBuilder;
+ private ExecutorService eventExecutor;
+ private ExecutorService communicationExecutor;
+ private ScheduledExecutorService backgroundExecutor;
+ private ClockService<K, V> clockService;
+ private BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
+ private boolean tombstonesDisabled = false;
+ private long antiEntropyPeriod = 5;
+ private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
+ private boolean convergeFaster = false;
+
+ /**
+ * Creates a new eventually consistent map builder.
+ *
+ * @param clusterService cluster service
+ * @param clusterCommunicator cluster communication service
+ */
+ public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator) {
+ this.clusterService = checkNotNull(clusterService);
+ this.clusterCommunicator = checkNotNull(clusterCommunicator);
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withName(String name) {
+ this.name = checkNotNull(name);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withSerializer(
+ KryoNamespace.Builder serializerBuilder) {
+ this.serializerBuilder = checkNotNull(serializerBuilder);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withClockService(
+ ClockService<K, V> clockService) {
+ this.clockService = checkNotNull(clockService);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
+ this.eventExecutor = checkNotNull(executor);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withCommunicationExecutor(
+ ExecutorService executor) {
+ communicationExecutor = checkNotNull(executor);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
+ this.backgroundExecutor = checkNotNull(executor);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder withPeerUpdateFunction(
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
+ this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withTombstonesDisabled() {
+ tombstonesDisabled = true;
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withAntiEntropyPeriod(long period, TimeUnit unit) {
+ checkArgument(period > 0, "anti-entropy period must be greater than 0");
+ antiEntropyPeriod = period;
+ antiEntropyTimeUnit = checkNotNull(unit);
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMapBuilder<K, V> withFasterConvergence() {
+ convergeFaster = true;
+ return this;
+ }
+
+ @Override
+ public EventuallyConsistentMap<K, V> build() {
+ checkNotNull(name, "name is a mandatory parameter");
+ checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
+ checkNotNull(clockService, "clockService is a mandatory parameter");
+
+ return new EventuallyConsistentMapImpl<>(name,
+ clusterService,
+ clusterCommunicator,
+ serializerBuilder,
+ clockService,
+ peerUpdateFunction,
+ eventExecutor,
+ communicationExecutor,
+ backgroundExecutor,
+ tombstonesDisabled,
+ antiEntropyPeriod,
+ antiEntropyTimeUnit,
+ convergeFaster);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 2987529..62d145d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -32,10 +32,13 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +57,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -93,8 +95,8 @@
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
- private ExecutorService communicationExecutor;
- private Map<NodeId, EventAccumulator> senderPending;
+ private final ExecutorService communicationExecutor;
+ private final Map<NodeId, EventAccumulator> senderPending;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
@@ -103,130 +105,115 @@
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
- // TODO: Make these anti-entropy params configurable
- private long initialDelaySec = 5;
- private long periodSec = 5;
- private boolean lightweightAntiEntropy = true;
- private boolean tombstonesDisabled = false;
+ private final long initialDelaySec = 5;
+ private final boolean lightweightAntiEntropy;
+ private final boolean tombstonesDisabled;
private static final int WINDOW_SIZE = 5;
private static final int HIGH_LOAD_THRESHOLD = 0;
private static final int LOAD_WINDOW = 2;
- SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
- AtomicLong operations = new AtomicLong();
+ private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
- * Each map is identified by a string map name. EventuallyConsistentMapImpl
- * objects in different JVMs that use the same map name will form a
- * distributed map across JVMs (provided the cluster service is aware of
- * both nodes).
- * </p>
- * <p>
- * The client is expected to provide an
- * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
- * will be stored in this map have been registered (including referenced
- * classes). This serializer will be used to serialize both K and V for
- * inter-node notifications.
- * </p>
- * <p>
- * The client must provide an {@link org.onosproject.store.impl.ClockService}
- * which can generate timestamps for a given key. The clock service is free
- * to generate timestamps however it wishes, however these timestamps will
- * be used to serialize updates to the map so they must be strict enough
- * to ensure updates are properly ordered for the use case (i.e. in some
- * cases wallclock time will suffice, whereas in other cases logical time
- * will be necessary).
+ * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
+ * for more description of the parameters expected by the map.
* </p>
*
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
- * both K and V
- * @param clockService a clock service able to generate timestamps
- * for K
- * @param peerUpdateFunction function that provides a set of nodes to immediately
- * update to when there writes to the map
+ * @param mapName a String identifier for the map.
+ * @param clusterService the cluster service
+ * @param clusterCommunicator the cluster communications service
+ * @param serializerBuilder a Kryo namespace builder that can serialize
+ * both K and V
+ * @param clockService a clock service able to generate timestamps
+ * for K and V
+ * @param peerUpdateFunction function that provides a set of nodes to immediately
+ * update to when there writes to the map
+ * @param eventExecutor executor to use for processing incoming
+ * events from peers
+ * @param communicationExecutor executor to use for sending events to peers
+ * @param backgroundExecutor executor to use for background anti-entropy
+ * tasks
+ * @param tombstonesDisabled true if this map should not maintain
+ * tombstones
+ * @param antiEntropyPeriod period that the anti-entropy task should run
+ * in seconds
+ * @param convergeFaster make anti-entropy try to converge faster
*/
- public EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService,
- BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
- this.clusterService = checkNotNull(clusterService);
- this.clusterCommunicator = checkNotNull(clusterCommunicator);
- this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
-
- serializer = createSerializer(checkNotNull(serializerBuilder));
- destroyedMessage = mapName + ERROR_DESTROYED;
-
- this.clockService = checkNotNull(clockService);
-
+ EventuallyConsistentMapImpl(String mapName,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace.Builder serializerBuilder,
+ ClockService<K, V> clockService,
+ BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
+ ExecutorService eventExecutor,
+ ExecutorService communicationExecutor,
+ ScheduledExecutorService backgroundExecutor,
+ boolean tombstonesDisabled,
+ long antiEntropyPeriod,
+ TimeUnit antiEntropyTimeUnit,
+ boolean convergeFaster) {
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
-
- // should be a normal executor; it's used for receiving messages
- //TODO make # of threads configurable
- executor = Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
-
- // sending executor; should be capped
- //TODO make # of threads configurable
- //TODO this probably doesn't need to be bounded anymore
- communicationExecutor =
- newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
senderPending = Maps.newConcurrentMap();
+ destroyedMessage = mapName + ERROR_DESTROYED;
- backgroundExecutor =
- newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+
+ this.serializer = createSerializer(serializerBuilder);
+
+ this.clockService = clockService;
+
+ if (peerUpdateFunction != null) {
+ this.peerUpdateFunction = peerUpdateFunction;
+ } else {
+ this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
+ .map(ControllerNode::id)
+ .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .collect(Collectors.toList());
+ }
+
+ if (eventExecutor != null) {
+ this.executor = eventExecutor;
+ } else {
+ // should be a normal executor; it's used for receiving messages
+ this.executor =
+ Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
+ }
+
+ if (communicationExecutor != null) {
+ this.communicationExecutor = communicationExecutor;
+ } else {
+ // sending executor; should be capped
+ //TODO this probably doesn't need to be bounded anymore
+ this.communicationExecutor =
+ newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
+ }
+
+ if (backgroundExecutor != null) {
+ this.backgroundExecutor = backgroundExecutor;
+ } else {
+ this.backgroundExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
+ }
// start anti-entropy thread
- backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
- initialDelaySec, periodSec,
- TimeUnit.SECONDS);
+ this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, antiEntropyPeriod,
+ antiEntropyTimeUnit);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalEventListener(), executor);
+ new InternalEventListener(), this.executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- new InternalAntiEntropyListener(), backgroundExecutor);
- }
+ new InternalAntiEntropyListener(), this.backgroundExecutor);
- /**
- * Creates a new eventually consistent map shared amongst multiple instances.
- * <p>
- * Take a look at the other constructor for usage information. The only difference
- * is that a BiFunction is provided that returns all nodes in the cluster, so
- * all nodes will be sent write updates immediately.
- * </p>
- *
- * @param mapName a String identifier for the map.
- * @param clusterService the cluster service
- * @param clusterCommunicator the cluster communications service
- * @param serializerBuilder a Kryo namespace builder that can serialize
- * both K and V
- * @param clockService a clock service able to generate timestamps
- * for K
- */
- public EventuallyConsistentMapImpl(String mapName,
- ClusterService clusterService,
- ClusterCommunicationService clusterCommunicator,
- KryoNamespace.Builder serializerBuilder,
- ClockService<K, V> clockService) {
- this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
- (key, value) -> clusterService.getNodes().stream()
- .map(ControllerNode::id)
- .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
- .collect(Collectors.toList()));
- }
-
- public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
- tombstonesDisabled = status;
- return this;
+ this.tombstonesDisabled = tombstonesDisabled;
+ this.lightweightAntiEntropy = !convergeFaster;
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
@@ -246,19 +233,6 @@
};
}
- /**
- * Sets the executor to use for broadcasting messages and returns this
- * instance for method chaining.
- *
- * @param executor executor service
- * @return this instance
- */
- public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
- checkNotNull(executor, "Null executor");
- communicationExecutor = executor;
- return this;
- }
-
@Override
public int size() {
checkState(!destroyed, destroyedMessage);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index a571b4e..1d30607 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -15,23 +15,8 @@
*/
package org.onosproject.store.group.impl;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -75,17 +60,32 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
-import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ClockService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of group entries using trivial in-memory implementation.
@@ -108,6 +108,9 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
// Per device group table with (device id + app cookie) as key
@@ -192,31 +195,38 @@
messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
- groupStoreEntriesByKey =
- new EventuallyConsistentMapImpl<>("groupstorekeymap",
- clusterService,
- clusterCommunicator,
- kryoBuilder,
- new GroupStoreLogicalClockManager<>());
+ EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
+ keyMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+ groupStoreEntriesByKey = keyMapBuilder
+ .withName("groupstorekeymap")
+ .withSerializer(kryoBuilder)
+ .withClockService(new GroupStoreLogicalClockManager<>())
+ .build();
log.trace("Current size {}", groupStoreEntriesByKey.size());
log.debug("Creating EC map groupstoreidmap");
- groupStoreEntriesById =
- new EventuallyConsistentMapImpl<>("groupstoreidmap",
- clusterService,
- clusterCommunicator,
- kryoBuilder,
- new GroupStoreLogicalClockManager<>());
+ EventuallyConsistentMapBuilder<GroupStoreIdMapKey, StoredGroupEntry>
+ idMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+ groupStoreEntriesById = idMapBuilder
+ .withName("groupstoreidmap")
+ .withSerializer(kryoBuilder)
+ .withClockService(new GroupStoreLogicalClockManager<>())
+ .build();
+
groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
log.trace("Current size {}", groupStoreEntriesById.size());
log.debug("Creating EC map pendinggroupkeymap");
- auditPendingReqQueue =
- new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
- clusterService,
- clusterCommunicator,
- kryoBuilder,
- new GroupStoreLogicalClockManager<>());
+ EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
+ auditMapBuilder = storageService.eventuallyConsistentMapBuilder();
+
+ auditPendingReqQueue = auditMapBuilder
+ .withName("pendinggroupkeymap")
+ .withSerializer(kryoBuilder)
+ .withClockService(new GroupStoreLogicalClockManager<>())
+ .build();
log.trace("Current size {}", auditPendingReqQueue.size());
log.info("Started");
@@ -819,11 +829,11 @@
* Map handler to receive any events when the group map is updated.
*/
private class GroupStoreIdMapListener implements
- EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
+ EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
@Override
public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
- StoredGroupEntry> mapEvent) {
+ StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
log.trace("GroupStoreIdMapListener: received groupid map event {}",
mapEvent.type());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
index ec90a5f..fd25ebf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
@@ -16,6 +16,7 @@
package org.onosproject.store.impl;
import org.onosproject.store.Timestamp;
+import org.onosproject.store.service.ClockService;
/**
* A clock service which hands out wallclock-based timestamps.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 370b0b5..fc2e8e3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -36,14 +36,13 @@
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
-import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.Collection;
@@ -52,7 +51,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.intent.IntentState.*;
+import static org.onosproject.net.intent.IntentState.PURGE_REQ;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -61,7 +60,7 @@
*/
//FIXME we should listen for leadership changes. if the local instance has just
// ... become a leader, scan the pending map and process those
-@Component(immediate = false, enabled = true)
+@Component(immediate = true, enabled = true)
@Service
public class GossipIntentStore
extends AbstractStore<IntentEvent, IntentStoreDelegate>
@@ -76,10 +75,10 @@
private EventuallyConsistentMap<Key, IntentData> pendingMap;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
+ protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
+ protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionService partitionService;
@@ -92,19 +91,19 @@
.register(MultiValuedTimestamp.class)
.register(WallClockTimestamp.class);
- currentMap = new EventuallyConsistentMapImpl<>("intent-current",
- clusterService,
- clusterCommunicator,
- intentSerializer,
- new IntentDataLogicalClockManager<>(),
- (key, intentData) -> getPeerNodes(key, intentData));
+ currentMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
+ .withName("intent-current")
+ .withSerializer(intentSerializer)
+ .withClockService(new IntentDataLogicalClockManager<>())
+ .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
+ .build();
- pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
- clusterService,
- clusterCommunicator,
- intentSerializer,
- new IntentDataClockManager<>(),
- (key, intentData) -> getPeerNodes(key, intentData));
+ pendingMap = storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
+ .withName("intent-pending")
+ .withSerializer(intentSerializer)
+ .withClockService(new IntentDataClockManager<>())
+ .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData))
+ .build();
currentMap.addListener(new InternalCurrentListener());
pendingMap.addListener(new InternalPendingListener());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
index f64f6b7..1c4ffbf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataClockManager.java
@@ -17,7 +17,7 @@
import org.onosproject.net.intent.IntentData;
import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
index 4636cd7..9d21223 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentDataLogicalClockManager.java
@@ -17,7 +17,7 @@
import org.onosproject.net.intent.IntentData;
import org.onosproject.store.Timestamp;
-import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import java.util.concurrent.atomic.AtomicLong;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index f7c9323..8629fcd 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -34,10 +34,13 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.service.ClockService;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
import java.io.IOException;
import java.util.ArrayList;
@@ -134,10 +137,13 @@
.register(KryoNamespaces.API)
.register(TestTimestamp.class);
- ecMap = new EventuallyConsistentMapImpl<>(MAP_NAME, clusterService,
- clusterCommunicator,
- serializer, clockService)
- .withBroadcastMessageExecutor(MoreExecutors.newDirectExecutorService());
+ ecMap = new EventuallyConsistentMapBuilderImpl<>(
+ clusterService, clusterCommunicator)
+ .withName(MAP_NAME)
+ .withSerializer(serializer)
+ .withClockService(clockService)
+ .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
+ .build();
// Reset ready for tests to add their own expectations
reset(clusterCommunicator);