Modified GossipIntentStore to use EventuallyConsistentMaps.

All IntentStore operations are now implemented.

ONOS-858

Change-Id: I5081805b61c7e25e28707b90093cae12b5a4374b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
index bbddd1d..5417505 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMap.java
@@ -151,7 +151,7 @@
      *
      * @param listener listener to register for events
      */
-    public void addListener(EventuallyConsistentMapListener listener);
+    public void addListener(EventuallyConsistentMapListener<K, V> listener);
 
     /**
      * Removes the specified listener from the map such that it will no longer
@@ -159,7 +159,7 @@
      *
      * @param listener listener to deregister for events
      */
-    public void removeListener(EventuallyConsistentMapListener listener);
+    public void removeListener(EventuallyConsistentMapListener<K, V> listener);
 
     /**
      * Shuts down the map and breaks communication between different instances.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
index 19786f8..0acef1a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapImpl.java
@@ -78,7 +78,7 @@
     private final ScheduledExecutorService backgroundExecutor;
 
     private volatile boolean destroyed = false;
-    private static final String ERROR_DESTROYED = " is already destroyed";
+    private static final String ERROR_DESTROYED = " map is already destroyed";
 
     // TODO: Make these anti-entropy params configurable
     private long initialDelaySec = 5;
@@ -154,6 +154,7 @@
                 serializerPool = builder
                         .register(WallClockTimestamp.class)
                         .register(PutEntry.class)
+                        .register(RemoveEntry.class)
                         .register(ArrayList.class)
                         .register(InternalPutEvent.class)
                         .register(InternalRemoveEvent.class)
@@ -166,25 +167,25 @@
 
     @Override
     public int size() {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
         return items.size();
     }
 
     @Override
     public boolean isEmpty() {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
         return items.isEmpty();
     }
 
     @Override
     public boolean containsKey(K key) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
         return items.containsKey(key);
     }
 
     @Override
     public boolean containsValue(V value) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         return items.values().stream()
                 .anyMatch(timestamped -> timestamped.value().equals(value));
@@ -192,7 +193,7 @@
 
     @Override
     public V get(K key) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         Timestamped<V> value = items.get(key);
         if (value != null) {
@@ -203,7 +204,7 @@
 
     @Override
     public void put(K key, V value) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         Timestamp timestamp = clockService.getTimestamp(key);
         if (putInternal(key, value, timestamp)) {
@@ -235,7 +236,7 @@
 
     @Override
     public void remove(K key) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         Timestamp timestamp = clockService.getTimestamp(key);
         if (removeInternal(key, timestamp)) {
@@ -261,7 +262,7 @@
 
     @Override
     public void putAll(Map<? extends K, ? extends V> m) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
 
@@ -287,7 +288,7 @@
 
     @Override
     public void clear() {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
 
@@ -311,14 +312,14 @@
 
     @Override
     public Set<K> keySet() {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         return items.keySet();
     }
 
     @Override
     public Collection<V> values() {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         return items.values().stream()
                 .map(Timestamped::value)
@@ -327,7 +328,7 @@
 
     @Override
     public Set<Map.Entry<K, V>> entrySet() {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         return items.entrySet().stream()
                 .map(e -> new Entry(e.getKey(), e.getValue().value()))
@@ -335,15 +336,15 @@
     }
 
     @Override
-    public void addListener(EventuallyConsistentMapListener listener) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+    public void addListener(EventuallyConsistentMapListener<K, V> listener) {
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         listeners.add(checkNotNull(listener));
     }
 
     @Override
-    public void removeListener(EventuallyConsistentMapListener listener) {
-        checkState(destroyed, mapName + ERROR_DESTROYED);
+    public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
+        checkState(!destroyed, mapName + ERROR_DESTROYED);
 
         listeners.remove(checkNotNull(listener));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
index 289f46c..d77445a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/EventuallyConsistentMapListener.java
@@ -19,12 +19,12 @@
  * Listener interested in receiving modification events for an
  * EventuallyConsistentMap.
  */
-public interface EventuallyConsistentMapListener {
+public interface EventuallyConsistentMapListener<K, V> {
 
     /**
      * Reacts to the specified event.
      *
      * @param event the event
      */
-    public void event(EventuallyConsistentMapEvent event);
+    public void event(EventuallyConsistentMapEvent<K, V> event);
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 4ffd4bf..157d66c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -16,8 +16,6 @@
 package org.onosproject.store.intent.impl;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -26,46 +24,29 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.BatchWrite;
 import org.onosproject.net.intent.Intent;
-import org.onosproject.net.intent.IntentClockService;
 import org.onosproject.net.intent.IntentEvent;
 import org.onosproject.net.intent.IntentId;
 import org.onosproject.net.intent.IntentState;
 import org.onosproject.net.intent.IntentStore;
 import org.onosproject.net.intent.IntentStoreDelegate;
 import org.onosproject.store.AbstractStore;
-import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.Timestamped;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.onosproject.store.impl.EventuallyConsistentMap;
+import org.onosproject.store.impl.EventuallyConsistentMapEvent;
+import org.onosproject.store.impl.EventuallyConsistentMapImpl;
+import org.onosproject.store.impl.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.WallclockClockManager;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.minPriority;
-import static org.onlab.util.Tools.namedThreads;
 import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_ANTI_ENTROPY_ADVERTISEMENT;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_SET_INSTALLABLES_MSG;
-import static org.onosproject.store.intent.impl.GossipIntentStoreMessageSubjects.INTENT_UPDATED_MSG;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -80,20 +61,11 @@
 
     private final Logger log = getLogger(getClass());
 
-    private final ConcurrentMap<IntentId, Intent> intents =
-            new ConcurrentHashMap<>();
+    private EventuallyConsistentMap<IntentId, Intent> intents;
 
-    private final ConcurrentMap<IntentId, Timestamped<IntentState>> intentStates
-            = new ConcurrentHashMap<>();
+    private EventuallyConsistentMap<IntentId, IntentState> intentStates;
 
-    private final Set<IntentId> withdrawRequestedIntents
-            = Sets.newConcurrentHashSet();
-
-    private ConcurrentMap<IntentId, Timestamped<List<Intent>>> installables
-            = new ConcurrentHashMap<>();
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected IntentClockService intentClockService;
+    private EventuallyConsistentMap<IntentId, List<Intent>> installables;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
@@ -101,64 +73,39 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoNamespace.newBuilder()
-                    .register(DistributedStoreSerializers.STORE_COMMON)
-                    .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
-                    .register(InternalIntentEvent.class)
-                    .register(InternalSetInstallablesEvent.class)
-                    .register(Collections.emptyList().getClass())
-                    //.register(InternalIntentAntiEntropyEvent.class)
-                    //.register(IntentAntiEntropyAdvertisement.class)
-                    .build();
-        }
-    };
-
-    private ExecutorService executor;
-
-    private ScheduledExecutorService backgroundExecutor;
-
-    // TODO: Make these anti-entropy params configurable
-    private long initialDelaySec = 5;
-    private long periodSec = 5;
-
     @Activate
     public void activate() {
-        clusterCommunicator.addSubscriber(INTENT_UPDATED_MSG,
-                new InternalIntentCreateOrUpdateEventListener());
-        clusterCommunicator.addSubscriber(INTENT_SET_INSTALLABLES_MSG,
-                                          new InternalIntentSetInstallablesListener());
-        clusterCommunicator.addSubscriber(
-                INTENT_ANTI_ENTROPY_ADVERTISEMENT,
-                new InternalIntentAntiEntropyAdvertisementListener());
+        KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                // TODO this should be in BASIC namespace
+                .register(Collections.emptyList().getClass());
+        intents = new EventuallyConsistentMapImpl<>("intents", clusterService,
+                                                clusterCommunicator,
+                                                intentSerializer,
+                                                new WallclockClockManager<>());
 
-        executor = Executors.newCachedThreadPool(namedThreads("onos-intent-fg-%d"));
+        intentStates = new EventuallyConsistentMapImpl<>("intent-states",
+                                                       clusterService,
+                                                       clusterCommunicator,
+                                                       intentSerializer,
+                                                       new WallclockClockManager<>());
 
-        backgroundExecutor =
-                newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-intent-bg-%d")));
+        installables = new EventuallyConsistentMapImpl<>("intent-installables",
+                                                         clusterService,
+                                                         clusterCommunicator,
+                                                         intentSerializer,
+                                                         new WallclockClockManager<>());
 
-        // start anti-entropy thread
-        //backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
-                    //initialDelaySec, periodSec, TimeUnit.SECONDS);
+        intentStates.addListener(new InternalIntentStatesListener());
 
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        executor.shutdownNow();
-        backgroundExecutor.shutdownNow();
-        try {
-            if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-                log.error("Timeout during executor shutdown");
-            }
-        } catch (InterruptedException e) {
-            log.error("Error during executor shutdown", e);
-        }
-
-        intents.clear();
+        intents.destroy();
+        intentStates.destroy();
+        installables.destroy();
 
         log.info("Stopped");
     }
@@ -181,76 +128,19 @@
 
     @Override
     public IntentState getIntentState(IntentId intentId) {
-        Timestamped<IntentState> state = intentStates.get(intentId);
-        if (state != null) {
-            return state.value();
-        }
-        return null;
-    }
-
-    private IntentEvent setStateInternal(IntentId intentId, IntentState newState, Timestamp timestamp) {
-        switch (newState) {
-        case WITHDRAW_REQ:
-            withdrawRequestedIntents.add(intentId);
-            break;
-        case INSTALL_REQ:
-        case COMPILING:
-        case INSTALLING:
-        case INSTALLED:
-        case RECOMPILING:
-        case WITHDRAWING:
-        case WITHDRAWN:
-        case FAILED:
-            synchronized (intentStates) {
-                Timestamped<IntentState> existing = intentStates.get(intentId);
-                if (existing == null || !existing.isNewer(timestamp)) {
-                    intentStates.put(intentId, new Timestamped<>(newState, timestamp));
-                }
-            }
-            break;
-        default:
-            log.warn("Unknown intent state {}", newState);
-            break;
-        }
-
-        try {
-            // TODO make sure it's OK if the intent is null
-            return IntentEvent.getEvent(newState, intents.get(intentId));
-        } catch (IllegalArgumentException e) {
-            // Transient states can't be used for events, so don't send one
-            return null;
-        }
-    }
-
-    private void setInstallableIntentsInternal(IntentId intentId,
-                                               List<Intent> installableIntents,
-                                               Timestamp timestamp) {
-        synchronized (installables) {
-            Timestamped<List<Intent>> existing = installables.get(intentId);
-            if (existing == null || !existing.isNewer(timestamp)) {
-                installables.put(intentId,
-                                 new Timestamped<>(installableIntents, timestamp));
-            }
-        }
+        return intentStates.get(intentId);
     }
 
     @Override
     public List<Intent> getInstallableIntents(IntentId intentId) {
-        Timestamped<List<Intent>> tInstallables = installables.get(intentId);
-        if (tInstallables != null) {
-            return tInstallables.value();
-        }
-        return null;
+        return installables.get(intentId);
     }
 
     @Override
     public List<BatchWrite.Operation> batchWrite(BatchWrite batch) {
 
-        List<IntentEvent> events = Lists.newArrayList();
         List<BatchWrite.Operation> failed = new ArrayList<>();
 
-        Timestamp timestamp = null;
-
         for (BatchWrite.Operation op : batch.operations()) {
             switch (op.type()) {
             case CREATE_INTENT:
@@ -258,19 +148,18 @@
                               "CREATE_INTENT takes 1 argument. %s", op);
                 Intent intent = op.arg(0);
 
-                timestamp = intentClockService.getTimestamp(intent.id());
-                if (createIntentInternal(intent)) {
-                    events.add(setStateInternal(intent.id(), INSTALL_REQ, timestamp));
-                    notifyPeers(new InternalIntentEvent(intent.id(), intent,
-                                                        INSTALL_REQ, timestamp));
-                }
+                intents.put(intent.id(), intent);
+                intentStates.put(intent.id(), INSTALL_REQ);
 
                 break;
             case REMOVE_INTENT:
                 checkArgument(op.args().size() == 1,
                               "REMOVE_INTENT takes 1 argument. %s", op);
-                IntentId intentId = (IntentId) op.arg(0);
-                // TODO implement
+                IntentId intentId = op.arg(0);
+
+                intents.remove(intentId);
+                intentStates.remove(intentId);
+                installables.remove(intentId);
 
                 break;
             case SET_STATE:
@@ -279,10 +168,7 @@
                 intent = op.arg(0);
                 IntentState newState = op.arg(1);
 
-                timestamp = intentClockService.getTimestamp(intent.id());
-                IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp);
-                events.add(externalEvent);
-                notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp));
+                intentStates.put(intent.id(), newState);
 
                 break;
             case SET_INSTALLABLE:
@@ -291,18 +177,14 @@
                 intentId = op.arg(0);
                 List<Intent> installableIntents = op.arg(1);
 
-                timestamp = intentClockService.getTimestamp(intentId);
-                setInstallableIntentsInternal(
-                        intentId, installableIntents, timestamp);
-
-                notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp));
+                installables.put(intentId, installableIntents);
 
                 break;
             case REMOVE_INSTALLED:
                 checkArgument(op.args().size() == 1,
                               "REMOVE_INSTALLED takes 1 argument. %s", op);
                 intentId = op.arg(0);
-                // TODO implement
+                installables.remove(intentId);
                 break;
             default:
                 log.warn("Unknown Operation encountered: {}", op);
@@ -311,121 +193,34 @@
             }
         }
 
-        notifyDelegate(events);
         return failed;
     }
 
-    private boolean createIntentInternal(Intent intent) {
-        Intent oldValue = intents.putIfAbsent(intent.id(), intent);
-        if (oldValue == null) {
-            return true;
-        }
-
-        log.warn("Intent ID {} already in store, throwing new update away",
-                 intent.id());
-        return false;
-    }
-
-    private void notifyPeers(InternalIntentEvent event) {
-        broadcastMessage(INTENT_UPDATED_MSG, event);
-    }
-
-    private void notifyPeers(InternalSetInstallablesEvent event) {
-        broadcastMessage(INTENT_SET_INSTALLABLES_MSG, event);
-    }
-
-    private void broadcastMessage(MessageSubject subject, Object event) {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.broadcast(message);
-    }
-
-    private void unicastMessage(NodeId peer,
-                                MessageSubject subject,
-                                Object event) throws IOException {
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                SERIALIZER.encode(event));
-        clusterCommunicator.unicast(message, peer);
-    }
-
     private void notifyDelegateIfNotNull(IntentEvent event) {
         if (event != null) {
             notifyDelegate(event);
         }
     }
 
-    private final class InternalIntentCreateOrUpdateEventListener
-            implements ClusterMessageHandler {
+    private final class InternalIntentStatesListener implements
+            EventuallyConsistentMapListener<IntentId, IntentState> {
         @Override
-        public void handle(ClusterMessage message) {
+        public void event(
+                EventuallyConsistentMapEvent<IntentId, IntentState> event) {
+            if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
+                IntentEvent externalEvent;
+                Intent intent = intents.get(event.key()); // TODO OK if this is null?
 
-            log.debug("Received intent update event from peer: {}", message.sender());
-            InternalIntentEvent event = SERIALIZER.decode(message.payload());
-
-            IntentId intentId = event.intentId();
-            Intent intent = event.intent();
-            IntentState state = event.state();
-            Timestamp timestamp = event.timestamp();
-
-            executor.submit(() -> {
                 try {
-                    switch (state) {
-                    case INSTALL_REQ:
-                        createIntentInternal(intent);
-                        // Fallthrough to setStateInternal for INSTALL_REQ
-                    default:
-                        notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp));
-                        break;
-                    }
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling intent create or update", e);
+                    externalEvent = IntentEvent.getEvent(event.value(), intent);
+                } catch (IllegalArgumentException e) {
+                    externalEvent = null;
                 }
-            });
+
+                notifyDelegateIfNotNull(externalEvent);
+            }
         }
     }
 
-    private final class InternalIntentSetInstallablesListener
-            implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            log.debug("Received intent set installables event from peer: {}", message.sender());
-            InternalSetInstallablesEvent event = SERIALIZER.decode(message.payload());
-
-            IntentId intentId = event.intentId();
-            List<Intent> installables = event.installables();
-            Timestamp timestamp = event.timestamp();
-
-            executor.submit(() -> {
-                try {
-                    setInstallableIntentsInternal(intentId, installables, timestamp);
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling intent set installables", e);
-                }
-            });
-        }
-    }
-
-    private final class InternalIntentAntiEntropyAdvertisementListener
-            implements ClusterMessageHandler {
-
-        @Override
-        public void handle(ClusterMessage message) {
-            log.trace("Received intent Anti-Entropy advertisement from peer: {}", message.sender());
-            // TODO implement
-            //IntentAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
-            backgroundExecutor.submit(() -> {
-                try {
-                    log.debug("something");
-                    //handleAntiEntropyAdvertisement(advertisement);
-                } catch (Exception e) {
-                    log.warn("Exception thrown handling intent advertisements", e);
-                }
-            });
-        }
-    }
 }