Initial implementation of EventuallyConsistentMap.
The map uses the gossip schemes to replicate data between instances. It seems
to work for basic add and remove use cases right now, no anti-entropy yet.
ONOS-844.
Change-Id: I7d05a7b532e40c95ab14e2c8911f18514bd0a8ca
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java b/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java
new file mode 100644
index 0000000..4fbfc22
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/ClockService.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+import org.onosproject.store.Timestamp;
+
+/**
+ * Clock service that can generate timestamps per object.
+ */
+public interface ClockService<T> {
+
+ /**
+ * Gets a new timestamp for the given object.
+ *
+ * @param object Object to get a timestamp for
+ * @return the new timestamp
+ */
+ public Timestamp getTimestamp(T object);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
new file mode 100644
index 0000000..bbddd1d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A distributed, eventually consistent map.
+ *
+ * This map does not offer read after writes consistency. Operations are
+ * serialized via the timestamps issued by the clock service. If two updates
+ * are in conflict, the update with the more recent timestamp will endure.
+ *
+ * The interface is mostly similar to {@link java.util.Map} with some minor
+ * semantic changes and the addition of a listener framework (because the map
+ * can be mutated by clients on other instances, not only through the local Java
+ * API).
+ *
+ * Clients are expected to register an
+ * {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if they
+ * are interested in receiving notifications of update to the map.
+ */
+public interface EventuallyConsistentMap<K, V> {
+
+ /**
+ * Returns the number of key-value mappings in this map.
+ *
+ * @return number of key-value mappings
+ */
+ public int size();
+
+ /**
+ * Returns true if this map is empty.
+ *
+ * @return true if this map is empty, otherwise false
+ */
+ public boolean isEmpty();
+
+ /**
+ * Returns true if the map contains a mapping for the specified key.
+ *
+ * @param key the key to check if this map contains
+ * @return true if this map has a mapping for the key, otherwise false
+ */
+ public boolean containsKey(K key);
+
+ /**
+ * Returns true if the map contains a mapping from any key to the specified
+ * value.
+ *
+ * @param value the value to check if this map has a mapping for
+ * @return true if this map has a mapping to this value, otherwise false
+ */
+ public boolean containsValue(V value);
+
+ /**
+ * Returns the value mapped to the specified key.
+ *
+ * @param key the key to look up in this map
+ * @return the value mapped to the key, or null if no mapping is found
+ */
+ public V get(K key);
+
+ /**
+ * Associates the specified value to the specified key in this map.
+ * <p>
+ * Note: this differs from the specification of {@link java.util.Map}
+ * because it does not return the previous value associated with the key.
+ * Clients are expected to register an
+ * {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if
+ * they are interested in receiving notification of updates to the map.
+ * </p>
+ *
+ * @param key the key to add a mapping for in this map
+ * @param value the value to associate with the key in this map
+ */
+ public void put(K key, V value);
+
+ /**
+ * Removes the mapping associated with the specified key from the map.
+ * <p>
+ * Note: this differs from the specification of {@link java.util.Map}
+ * because it does not return the previous value associated with the key.
+ * Clients are expected to register an
+ * {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if
+ * they are interested in receiving notification of updates to the map.
+ * </p>
+ *
+ * @param key the key to remove the mapping for
+ */
+ public void remove(K key);
+
+ /**
+ * Adds mappings for all key-value pairs in the specified map to this map.
+ * <p>
+ * This will be more efficient in communication than calling individual put
+ * operations.
+ * </p>
+ *
+ * @param m a map of values to add to this map
+ */
+ public void putAll(Map<? extends K, ? extends V> m);
+
+ /**
+ * Removes all mappings from this map.
+ */
+ public void clear();
+
+ /**
+ * Returns a set of the keys in this map. Changes to the set are not
+ * reflected back to the map.
+ *
+ * @return set of keys in the map
+ */
+ public Set<K> keySet();
+
+ /**
+ * Returns a collections of values in this map. Changes to the collection
+ * are not reflected back to the map.
+ *
+ * @return collection of values in the map
+ */
+ public Collection<V> values();
+
+ /**
+ * Returns a set of mappings contained in this map. Changes to the set are
+ * not reflected back to the map.
+ *
+ * @return set of key-value mappings in this map
+ */
+ public Set<Map.Entry<K, V>> entrySet();
+
+ /**
+ * Adds the specified listener to the map which will be notified whenever
+ * the mappings in the map are changed.
+ *
+ * @param listener listener to register for events
+ */
+ public void addListener(EventuallyConsistentMapListener listener);
+
+ /**
+ * Removes the specified listener from the map such that it will no longer
+ * receive change notifications.
+ *
+ * @param listener listener to deregister for events
+ */
+ public void removeListener(EventuallyConsistentMapListener listener);
+
+ /**
+ * Shuts down the map and breaks communication between different instances.
+ * This allows the map objects to be cleaned up and garbage collected.
+ * Calls to any methods on the map subsequent to calling destroy() will
+ * throw a {@link java.lang.RuntimeException}.
+ */
+ public void destroy();
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java
new file mode 100644
index 0000000..fd38e64
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapEvent.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+/**
+ * Event object signalling that the map was modified.
+ */
+public class EventuallyConsistentMapEvent<K, V> {
+
+ public enum Type {
+ PUT,
+ REMOVE
+ }
+
+ private final Type type;
+ private final K key;
+ private final V value;
+
+ /**
+ * Creates a new event object.
+ *
+ * @param type the type of the event
+ * @param key the key the event concerns
+ * @param value the value related to the key, or null for remove events
+ */
+ public EventuallyConsistentMapEvent(Type type, K key, V value) {
+ this.type = type;
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Returns the type of the event.
+ *
+ * @return the type of the event
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the key this event concerns.
+ *
+ * @return the key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the value associated with this event.
+ *
+ * @return the value, or null if the event was REMOVE
+ */
+ public V value() {
+ return value;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
new file mode 100644
index 0000000..b011517
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
@@ -0,0 +1,598 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.minPriority;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Distributed Map implementation which uses optimistic replication and gossip
+ * based techniques to provide an eventually consistent data store.
+ */
+public class EventuallyConsistentMapImpl<K, V>
+ implements EventuallyConsistentMap<K, V> {
+
+ private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
+
+ private final Map<K, Timestamped<V>> items;
+ private final Map<K, Timestamp> removedItems;
+
+ private final String mapName;
+ private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
+ private final KryoSerializer serializer;
+
+ private final ClockService<K> clockService;
+
+ private final MessageSubject updateMessageSubject;
+ private final MessageSubject removeMessageSubject;
+
+ private final Set<EventuallyConsistentMapListener> listeners
+ = new CopyOnWriteArraySet<>();
+
+ private final ExecutorService executor;
+
+ private final ScheduledExecutorService backgroundExecutor;
+
+ private volatile boolean destroyed = false;
+ private static final String ERROR_DESTROYED = " is already destroyed";
+
+ // TODO: Make these anti-entropy params configurable
+ private long initialDelaySec = 5;
+ private long periodSec = 5;
+
+ /**
+ * Creates a new eventually consistent map shared amongst multiple instances.
+ *
+ * Each map is identified by a string map name. EventuallyConsistentMapImpl
+ * objects in different JVMs that use the same map name will form a
+ * distributed map across JVMs (provided the cluster service is aware of
+ * both nodes).
+ *
+ * The client is expected to provide an
+ * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
+ * will be stored in this map have been registered (including referenced
+ * classes). This serializer will be used to serialize both K and V for
+ * inter-node notifications.
+ *
+ * The client must provide an {@link org.onosproject.store.impl.ClockService}
+ * which can generate timestamps for a given key. The clock service is free
+ * to generate timestamps however it wishes, however these timestamps will
+ * be used to serialize updates to the map so they must be strict enough
+ * to ensure updates are properly ordered for the use case (i.e. in some
+ * cases wallclock time will suffice, whereas in other cases logical time
+ * will be necessary).
+ *
+ * @param mapName a String identifier for the map.
+ * @param clusterService the cluster service
+ * @param clusterCommunicator the cluster communications service
+ * @param serializerBuilder a Kryo namespace builder that can serialize
+ * both K and V
+ * @param clockService a clock service able to generate timestamps
+ * for K
+ */
+ public EventuallyConsistentMapImpl(String mapName,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ KryoNamespace.Builder serializerBuilder,
+ ClockService<K> clockService) {
+
+ this.mapName = checkNotNull(mapName);
+ this.clusterService = checkNotNull(clusterService);
+ this.clusterCommunicator = checkNotNull(clusterCommunicator);
+
+ serializer = createSerializer(checkNotNull(serializerBuilder));
+
+ this.clockService = checkNotNull(clockService);
+
+ items = new ConcurrentHashMap<>();
+ removedItems = new ConcurrentHashMap<>();
+
+ executor = Executors
+ .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
+
+ backgroundExecutor =
+ newSingleThreadScheduledExecutor(minPriority(
+ namedThreads("onos-ecm-" + mapName + "-bg-%d")));
+
+ updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
+ clusterCommunicator.addSubscriber(updateMessageSubject,
+ new InternalPutEventListener());
+ removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
+ clusterCommunicator.addSubscriber(removeMessageSubject,
+ new InternalRemoveEventListener());
+ }
+
+ private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
+ return new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ // Add the map's internal helper classes to the user-supplied serializer
+ serializerPool = builder
+ .register(WallClockTimestamp.class)
+ .register(PutEntry.class)
+ .register(ArrayList.class)
+ .register(InternalPutEvent.class)
+ .register(InternalRemoveEvent.class)
+ .build();
+
+ // TODO anti-entropy classes
+ }
+ };
+ }
+
+ @Override
+ public int size() {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+ return items.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+ return items.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+ return items.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(V value) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ return items.values().stream()
+ .anyMatch(timestamped -> timestamped.value().equals(value));
+ }
+
+ @Override
+ public V get(K key) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ Timestamped<V> value = items.get(key);
+ if (value != null) {
+ return value.value();
+ }
+ return null;
+ }
+
+ @Override
+ public void put(K key, V value) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ Timestamp timestamp = clockService.getTimestamp(key);
+ if (putInternal(key, value, timestamp)) {
+ notifyPeers(new InternalPutEvent<>(key, value, timestamp));
+ EventuallyConsistentMapEvent<K, V> externalEvent
+ = new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, key, value);
+ notifyListeners(externalEvent);
+ }
+ }
+
+ private boolean putInternal(K key, V value, Timestamp timestamp) {
+ synchronized (this) {
+ Timestamp removed = removedItems.get(key);
+ if (removed != null && removed.compareTo(timestamp) > 0) {
+ return false;
+ }
+
+ Timestamped<V> existing = items.get(key);
+ if (existing != null && existing.isNewer(timestamp)) {
+ return false;
+ } else {
+ items.put(key, new Timestamped<>(value, timestamp));
+ removedItems.remove(key);
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public void remove(K key) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ Timestamp timestamp = clockService.getTimestamp(key);
+ if (removeInternal(key, timestamp)) {
+ notifyPeers(new InternalRemoveEvent<>(key, timestamp));
+ EventuallyConsistentMapEvent<K, V> externalEvent
+ = new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, key, null);
+ notifyListeners(externalEvent);
+ }
+ }
+
+ private boolean removeInternal(K key, Timestamp timestamp) {
+ synchronized (this) {
+ if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
+ return false;
+ }
+
+ items.remove(key);
+ removedItems.put(key, timestamp);
+ return true;
+ }
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
+
+ for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
+ K key = entry.getKey();
+ V value = entry.getValue();
+ Timestamp timestamp = clockService.getTimestamp(entry.getKey());
+
+ if (putInternal(key, value, timestamp)) {
+ updates.add(new PutEntry<>(key, value, timestamp));
+ }
+ }
+
+ notifyPeers(new InternalPutEvent<>(updates));
+
+ for (PutEntry<K, V> entry : updates) {
+ EventuallyConsistentMapEvent<K, V> externalEvent =
+ new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
+ notifyListeners(externalEvent);
+ }
+ }
+
+ @Override
+ public void clear() {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
+
+ for (K key : items.keySet()) {
+ Timestamp timestamp = clockService.getTimestamp(key);
+
+ if (removeInternal(key, timestamp)) {
+ removed.add(new RemoveEntry<>(key, timestamp));
+ }
+ }
+
+ notifyPeers(new InternalRemoveEvent<>(removed));
+
+ for (RemoveEntry<K> entry : removed) {
+ EventuallyConsistentMapEvent<K, V> externalEvent =
+ new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
+ notifyListeners(externalEvent);
+ }
+ }
+
+ @Override
+ public Set<K> keySet() {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ return items.keySet();
+ }
+
+ @Override
+ public Collection<V> values() {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ return items.values().stream()
+ .map(Timestamped::value)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Set<Map.Entry<K, V>> entrySet() {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ return items.entrySet().stream()
+ .map(e -> new Entry(e.getKey(), e.getValue().value()))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public void addListener(EventuallyConsistentMapListener listener) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ listeners.add(checkNotNull(listener));
+ }
+
+ @Override
+ public void removeListener(EventuallyConsistentMapListener listener) {
+ checkState(destroyed, mapName + ERROR_DESTROYED);
+
+ listeners.remove(checkNotNull(listener));
+ }
+
+ @Override
+ public void destroy() {
+ destroyed = true;
+
+ executor.shutdown();
+ backgroundExecutor.shutdown();
+
+ clusterCommunicator.removeSubscriber(updateMessageSubject);
+ clusterCommunicator.removeSubscriber(removeMessageSubject);
+ }
+
+ private void notifyListeners(EventuallyConsistentMapEvent event) {
+ for (EventuallyConsistentMapListener listener : listeners) {
+ listener.event(event);
+ }
+ }
+
+ private void notifyPeers(InternalPutEvent event) {
+ try {
+ log.debug("sending put {}", event);
+ broadcastMessage(updateMessageSubject, event);
+ } catch (IOException e) {
+ // TODO this won't happen; remove from API
+ log.debug("IOException broadcasting update", e);
+ }
+ }
+
+ private void notifyPeers(InternalRemoveEvent event) {
+ try {
+ broadcastMessage(removeMessageSubject, event);
+ } catch (IOException e) {
+ // TODO this won't happen; remove from API
+ log.debug("IOException broadcasting update", e);
+ }
+ }
+
+ private void broadcastMessage(MessageSubject subject, Object event) throws
+ IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ serializer.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void unicastMessage(NodeId peer,
+ MessageSubject subject,
+ Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ serializer.encode(event));
+ clusterCommunicator.unicast(message, peer);
+ }
+
+ private final class Entry implements Map.Entry<K, V> {
+
+ private final K key;
+ private final V value;
+
+ public Entry(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(V value) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private final class InternalPutEventListener implements
+ ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ log.debug("Received put event from peer: {}", message.sender());
+ InternalPutEvent<K, V> event = serializer.decode(message.payload());
+
+ executor.submit(() -> {
+ try {
+ for (PutEntry<K, V> entry : event.entries()) {
+ K key = entry.key();
+ V value = entry.value();
+ Timestamp timestamp = entry.timestamp();
+
+ if (putInternal(key, value, timestamp)) {
+ EventuallyConsistentMapEvent externalEvent =
+ new EventuallyConsistentMapEvent<>(
+ EventuallyConsistentMapEvent.Type.PUT, key,
+ value);
+ notifyListeners(externalEvent);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Exception thrown handling put", e);
+ }
+ });
+ }
+ }
+
+ private final class InternalRemoveEventListener implements
+ ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ log.debug("Received remove event from peer: {}", message.sender());
+ InternalRemoveEvent<K> event = serializer.decode(message.payload());
+
+ executor.submit(() -> {
+ try {
+ for (RemoveEntry<K> entry : event.entries()) {
+ K key = entry.key();
+ Timestamp timestamp = entry.timestamp();
+
+ if (removeInternal(key, timestamp)) {
+ EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
+ EventuallyConsistentMapEvent.Type.REMOVE,
+ key, null);
+ notifyListeners(externalEvent);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Exception thrown handling remove", e);
+ }
+ });
+ }
+ }
+
+ private static final class InternalPutEvent<K, V> {
+ private final List<PutEntry<K, V>> entries;
+
+ public InternalPutEvent(K key, V value, Timestamp timestamp) {
+ entries = Collections
+ .singletonList(new PutEntry<>(key, value, timestamp));
+ }
+
+ public InternalPutEvent(List<PutEntry<K, V>> entries) {
+ this.entries = checkNotNull(entries);
+ }
+
+ // Needed for serialization.
+ @SuppressWarnings("unused")
+ private InternalPutEvent() {
+ entries = null;
+ }
+
+ public List<PutEntry<K, V>> entries() {
+ return entries;
+ }
+ }
+
+ private static final class PutEntry<K, V> {
+ private final K key;
+ private final V value;
+ private final Timestamp timestamp;
+
+ public PutEntry(K key, V value, Timestamp timestamp) {
+ this.key = checkNotNull(key);
+ this.value = checkNotNull(value);
+ this.timestamp = checkNotNull(timestamp);
+ }
+
+ // Needed for serialization.
+ @SuppressWarnings("unused")
+ private PutEntry() {
+ this.key = null;
+ this.value = null;
+ this.timestamp = null;
+ }
+
+ public K key() {
+ return key;
+ }
+
+ public V value() {
+ return value;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("key", key)
+ .add("value", value)
+ .add("timestamp", timestamp)
+ .toString();
+ }
+ }
+
+ private static final class InternalRemoveEvent<K> {
+ private final List<RemoveEntry<K>> entries;
+
+ public InternalRemoveEvent(K key, Timestamp timestamp) {
+ entries = Collections.singletonList(
+ new RemoveEntry<>(key, timestamp));
+ }
+
+ public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
+ this.entries = checkNotNull(entries);
+ }
+
+ // Needed for serialization.
+ @SuppressWarnings("unused")
+ private InternalRemoveEvent() {
+ entries = null;
+ }
+
+ public List<RemoveEntry<K>> entries() {
+ return entries;
+ }
+ }
+
+ private static final class RemoveEntry<K> {
+ private final K key;
+ private final Timestamp timestamp;
+
+ public RemoveEntry(K key, Timestamp timestamp) {
+ this.key = checkNotNull(key);
+ this.timestamp = checkNotNull(timestamp);
+ }
+
+ // Needed for serialization.
+ @SuppressWarnings("unused")
+ private RemoveEntry() {
+ this.key = null;
+ this.timestamp = null;
+ }
+
+ public K key() {
+ return key;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
new file mode 100644
index 0000000..289f46c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+/**
+ * Listener interested in receiving modification events for an
+ * EventuallyConsistentMap.
+ */
+public interface EventuallyConsistentMapListener {
+
+ /**
+ * Reacts to the specified event.
+ *
+ * @param event the event
+ */
+ public void event(EventuallyConsistentMapEvent event);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
new file mode 100644
index 0000000..0103ee4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/WallclockClockManager.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+import org.onosproject.store.Timestamp;
+
+/**
+ * A clock service which hands out wallclock-based timestamps.
+ */
+public class WallclockClockManager<T> implements ClockService<T> {
+ @Override
+ public Timestamp getTimestamp(T object) {
+ return new WallClockTimestamp();
+ }
+}