Initial implementation of EventuallyConsistentMap.

The map uses the gossip schemes to replicate data between instances. It seems
to work for basic add and remove use cases right now, no anti-entropy yet.

ONOS-844.

Change-Id: I7d05a7b532e40c95ab14e2c8911f18514bd0a8ca
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
new file mode 100644
index 0000000..b011517
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
@@ -0,0 +1,598 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.minPriority;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Distributed Map implementation which uses optimistic replication and gossip
+ * based techniques to provide an eventually consistent data store.
+ */
+public class EventuallyConsistentMapImpl<K, V>
+        implements EventuallyConsistentMap<K, V> {
+
+    private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
+
+    private final Map<K, Timestamped<V>> items;
+    private final Map<K, Timestamp> removedItems;
+
+    private final String mapName;
+    private final ClusterService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
+    private final KryoSerializer serializer;
+
+    private final ClockService<K> clockService;
+
+    private final MessageSubject updateMessageSubject;
+    private final MessageSubject removeMessageSubject;
+
+    private final Set<EventuallyConsistentMapListener> listeners
+            = new CopyOnWriteArraySet<>();
+
+    private final ExecutorService executor;
+
+    private final ScheduledExecutorService backgroundExecutor;
+
+    private volatile boolean destroyed = false;
+    private static final String ERROR_DESTROYED = " is already destroyed";
+
+    // TODO: Make these anti-entropy params configurable
+    private long initialDelaySec = 5;
+    private long periodSec = 5;
+
+    /**
+     * Creates a new eventually consistent map shared amongst multiple instances.
+     *
+     * Each map is identified by a string map name. EventuallyConsistentMapImpl
+     * objects in different JVMs that use the same map name will form a
+     * distributed map across JVMs (provided the cluster service is aware of
+     * both nodes).
+     *
+     * The client is expected to provide an
+     * {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
+     * will be stored in this map have been registered (including referenced
+     * classes). This serializer will be used to serialize both K and V for
+     * inter-node notifications.
+     *
+     * The client must provide an {@link org.onosproject.store.impl.ClockService}
+     * which can generate timestamps for a given key. The clock service is free
+     * to generate timestamps however it wishes, however these timestamps will
+     * be used to serialize updates to the map so they must be strict enough
+     * to ensure updates are properly ordered for the use case (i.e. in some
+     * cases wallclock time will suffice, whereas in other cases logical time
+     * will be necessary).
+     *
+     * @param mapName             a String identifier for the map.
+     * @param clusterService      the cluster service
+     * @param clusterCommunicator the cluster communications service
+     * @param serializerBuilder   a Kryo namespace builder that can serialize
+     *                            both K and V
+     * @param clockService        a clock service able to generate timestamps
+     *                            for K
+     */
+    public EventuallyConsistentMapImpl(String mapName,
+                                       ClusterService clusterService,
+                                       ClusterCommunicationService clusterCommunicator,
+                                       KryoNamespace.Builder serializerBuilder,
+                                       ClockService<K> clockService) {
+
+        this.mapName = checkNotNull(mapName);
+        this.clusterService = checkNotNull(clusterService);
+        this.clusterCommunicator = checkNotNull(clusterCommunicator);
+
+        serializer = createSerializer(checkNotNull(serializerBuilder));
+
+        this.clockService = checkNotNull(clockService);
+
+        items = new ConcurrentHashMap<>();
+        removedItems = new ConcurrentHashMap<>();
+
+        executor = Executors
+                .newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
+
+        backgroundExecutor =
+                newSingleThreadScheduledExecutor(minPriority(
+                        namedThreads("onos-ecm-" + mapName + "-bg-%d")));
+
+        updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
+        clusterCommunicator.addSubscriber(updateMessageSubject,
+                                          new InternalPutEventListener());
+        removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
+        clusterCommunicator.addSubscriber(removeMessageSubject,
+                                          new InternalRemoveEventListener());
+    }
+
+    private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
+        return new KryoSerializer() {
+            @Override
+            protected void setupKryoPool() {
+                // Add the map's internal helper classes to the user-supplied serializer
+                serializerPool = builder
+                        .register(WallClockTimestamp.class)
+                        .register(PutEntry.class)
+                        .register(ArrayList.class)
+                        .register(InternalPutEvent.class)
+                        .register(InternalRemoveEvent.class)
+                        .build();
+
+                // TODO anti-entropy classes
+            }
+        };
+    }
+
+    @Override
+    public int size() {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+        return items.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+        return items.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+        return items.containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(V value) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        return items.values().stream()
+                .anyMatch(timestamped -> timestamped.value().equals(value));
+    }
+
+    @Override
+    public V get(K key) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        Timestamped<V> value = items.get(key);
+        if (value != null) {
+            return value.value();
+        }
+        return null;
+    }
+
+    @Override
+    public void put(K key, V value) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        Timestamp timestamp = clockService.getTimestamp(key);
+        if (putInternal(key, value, timestamp)) {
+            notifyPeers(new InternalPutEvent<>(key, value, timestamp));
+            EventuallyConsistentMapEvent<K, V> externalEvent
+                    = new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.PUT, key, value);
+            notifyListeners(externalEvent);
+        }
+    }
+
+    private boolean putInternal(K key, V value, Timestamp timestamp) {
+        synchronized (this) {
+            Timestamp removed = removedItems.get(key);
+            if (removed != null && removed.compareTo(timestamp) > 0) {
+                return false;
+            }
+
+            Timestamped<V> existing = items.get(key);
+            if (existing != null && existing.isNewer(timestamp)) {
+                return false;
+            } else {
+                items.put(key, new Timestamped<>(value, timestamp));
+                removedItems.remove(key);
+                return true;
+            }
+        }
+    }
+
+    @Override
+    public void remove(K key) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        Timestamp timestamp = clockService.getTimestamp(key);
+        if (removeInternal(key, timestamp)) {
+            notifyPeers(new InternalRemoveEvent<>(key, timestamp));
+            EventuallyConsistentMapEvent<K, V> externalEvent
+                    = new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.REMOVE, key, null);
+            notifyListeners(externalEvent);
+        }
+    }
+
+    private boolean removeInternal(K key, Timestamp timestamp) {
+        synchronized (this) {
+            if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
+                return false;
+            }
+
+            items.remove(key);
+            removedItems.put(key, timestamp);
+            return true;
+        }
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
+
+        for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
+            K key = entry.getKey();
+            V value = entry.getValue();
+            Timestamp timestamp = clockService.getTimestamp(entry.getKey());
+
+            if (putInternal(key, value, timestamp)) {
+                updates.add(new PutEntry<>(key, value, timestamp));
+            }
+        }
+
+        notifyPeers(new InternalPutEvent<>(updates));
+
+        for (PutEntry<K, V> entry : updates) {
+            EventuallyConsistentMapEvent<K, V> externalEvent =
+                    new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
+            notifyListeners(externalEvent);
+        }
+    }
+
+    @Override
+    public void clear() {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
+
+        for (K key : items.keySet()) {
+            Timestamp timestamp = clockService.getTimestamp(key);
+
+            if (removeInternal(key, timestamp)) {
+                removed.add(new RemoveEntry<>(key, timestamp));
+            }
+        }
+
+        notifyPeers(new InternalRemoveEvent<>(removed));
+
+        for (RemoveEntry<K> entry : removed) {
+            EventuallyConsistentMapEvent<K, V> externalEvent =
+                    new EventuallyConsistentMapEvent<>(
+                            EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
+            notifyListeners(externalEvent);
+        }
+    }
+
+    @Override
+    public Set<K> keySet() {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        return items.keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        return items.values().stream()
+                .map(Timestamped::value)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Set<Map.Entry<K, V>> entrySet() {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        return items.entrySet().stream()
+                .map(e -> new Entry(e.getKey(), e.getValue().value()))
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public void addListener(EventuallyConsistentMapListener listener) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        listeners.add(checkNotNull(listener));
+    }
+
+    @Override
+    public void removeListener(EventuallyConsistentMapListener listener) {
+        checkState(destroyed, mapName + ERROR_DESTROYED);
+
+        listeners.remove(checkNotNull(listener));
+    }
+
+    @Override
+    public void destroy() {
+        destroyed = true;
+
+        executor.shutdown();
+        backgroundExecutor.shutdown();
+
+        clusterCommunicator.removeSubscriber(updateMessageSubject);
+        clusterCommunicator.removeSubscriber(removeMessageSubject);
+    }
+
+    private void notifyListeners(EventuallyConsistentMapEvent event) {
+        for (EventuallyConsistentMapListener listener : listeners) {
+            listener.event(event);
+        }
+    }
+
+    private void notifyPeers(InternalPutEvent event) {
+        try {
+            log.debug("sending put {}", event);
+            broadcastMessage(updateMessageSubject, event);
+        } catch (IOException e) {
+            // TODO this won't happen; remove from API
+            log.debug("IOException broadcasting update", e);
+        }
+    }
+
+    private void notifyPeers(InternalRemoveEvent event) {
+        try {
+            broadcastMessage(removeMessageSubject, event);
+        } catch (IOException e) {
+            // TODO this won't happen; remove from API
+            log.debug("IOException broadcasting update", e);
+        }
+    }
+
+    private void broadcastMessage(MessageSubject subject, Object event) throws
+            IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                serializer.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
+
+    private void unicastMessage(NodeId peer,
+                                MessageSubject subject,
+                                Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                serializer.encode(event));
+        clusterCommunicator.unicast(message, peer);
+    }
+
+    private final class Entry implements Map.Entry<K, V> {
+
+        private final K key;
+        private final V value;
+
+        public Entry(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public K getKey() {
+            return key;
+        }
+
+        @Override
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public V setValue(V value) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private final class InternalPutEventListener implements
+            ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            log.debug("Received put event from peer: {}", message.sender());
+            InternalPutEvent<K, V> event = serializer.decode(message.payload());
+
+            executor.submit(() -> {
+                try {
+                    for (PutEntry<K, V> entry : event.entries()) {
+                        K key = entry.key();
+                        V value = entry.value();
+                        Timestamp timestamp = entry.timestamp();
+
+                        if (putInternal(key, value, timestamp)) {
+                            EventuallyConsistentMapEvent externalEvent =
+                                    new EventuallyConsistentMapEvent<>(
+                                    EventuallyConsistentMapEvent.Type.PUT, key,
+                                    value);
+                            notifyListeners(externalEvent);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.warn("Exception thrown handling put", e);
+                }
+            });
+        }
+    }
+
+    private final class InternalRemoveEventListener implements
+            ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            log.debug("Received remove event from peer: {}", message.sender());
+            InternalRemoveEvent<K> event = serializer.decode(message.payload());
+
+            executor.submit(() -> {
+                try {
+                    for (RemoveEntry<K> entry : event.entries()) {
+                        K key = entry.key();
+                        Timestamp timestamp = entry.timestamp();
+
+                        if (removeInternal(key, timestamp)) {
+                            EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
+                                    EventuallyConsistentMapEvent.Type.REMOVE,
+                                    key, null);
+                            notifyListeners(externalEvent);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.warn("Exception thrown handling remove", e);
+                }
+            });
+        }
+    }
+
+    private static final class InternalPutEvent<K, V> {
+        private final List<PutEntry<K, V>> entries;
+
+        public InternalPutEvent(K key, V value, Timestamp timestamp) {
+            entries = Collections
+                    .singletonList(new PutEntry<>(key, value, timestamp));
+        }
+
+        public InternalPutEvent(List<PutEntry<K, V>> entries) {
+            this.entries = checkNotNull(entries);
+        }
+
+        // Needed for serialization.
+        @SuppressWarnings("unused")
+        private InternalPutEvent() {
+            entries = null;
+        }
+
+        public List<PutEntry<K, V>> entries() {
+            return entries;
+        }
+    }
+
+    private static final class PutEntry<K, V> {
+        private final K key;
+        private final V value;
+        private final Timestamp timestamp;
+
+        public PutEntry(K key, V value, Timestamp timestamp) {
+            this.key = checkNotNull(key);
+            this.value = checkNotNull(value);
+            this.timestamp = checkNotNull(timestamp);
+        }
+
+        // Needed for serialization.
+        @SuppressWarnings("unused")
+        private PutEntry() {
+            this.key = null;
+            this.value = null;
+            this.timestamp = null;
+        }
+
+        public K key() {
+            return key;
+        }
+
+        public V value() {
+            return value;
+        }
+
+        public Timestamp timestamp() {
+            return timestamp;
+        }
+
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("key", key)
+                    .add("value", value)
+                    .add("timestamp", timestamp)
+                    .toString();
+        }
+    }
+
+    private static final class InternalRemoveEvent<K> {
+        private final List<RemoveEntry<K>> entries;
+
+        public InternalRemoveEvent(K key, Timestamp timestamp) {
+            entries = Collections.singletonList(
+                    new RemoveEntry<>(key, timestamp));
+        }
+
+        public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
+            this.entries = checkNotNull(entries);
+        }
+
+        // Needed for serialization.
+        @SuppressWarnings("unused")
+        private InternalRemoveEvent() {
+            entries = null;
+        }
+
+        public List<RemoveEntry<K>> entries() {
+            return entries;
+        }
+    }
+
+    private static final class RemoveEntry<K> {
+        private final K key;
+        private final Timestamp timestamp;
+
+        public RemoveEntry(K key, Timestamp timestamp) {
+            this.key = checkNotNull(key);
+            this.timestamp = checkNotNull(timestamp);
+        }
+
+        // Needed for serialization.
+        @SuppressWarnings("unused")
+        private RemoveEntry() {
+            this.key = null;
+            this.timestamp = null;
+        }
+
+        public K key() {
+            return key;
+        }
+
+        public Timestamp timestamp() {
+            return timestamp;
+        }
+    }
+}