Updates to ECM interface

Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209

Adding origin to IntentData and use it to pick GossipIntentStore peer

Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8a178cb..8f99d0e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -35,7 +35,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -51,6 +50,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -87,6 +87,7 @@
     private final ExecutorService executor;
 
     private final ScheduledExecutorService backgroundExecutor;
+    private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
 
     private ExecutorService broadcastMessageExecutor;
 
@@ -140,14 +141,18 @@
      *                            both K and V
      * @param clockService        a clock service able to generate timestamps
      *                            for K
+     * @param peerUpdateFunction  function that provides a set of nodes to immediately
+     *                            update to when there writes to the map
      */
     public EventuallyConsistentMapImpl(String mapName,
                                        ClusterService clusterService,
                                        ClusterCommunicationService clusterCommunicator,
                                        KryoNamespace.Builder serializerBuilder,
-                                       ClockService<K, V> clockService) {
+                                       ClockService<K, V> clockService,
+                                       BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
+        this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
 
         serializer = createSerializer(checkNotNull(serializerBuilder));
         destroyedMessage = mapName + ERROR_DESTROYED;
@@ -189,6 +194,34 @@
                                           new InternalAntiEntropyListener(), backgroundExecutor);
     }
 
+    /**
+     * Creates a new eventually consistent map shared amongst multiple instances.
+     * <p>
+     * Take a look at the other constructor for usage information. The only difference
+     * is that a BiFunction is provided that returns all nodes in the cluster, so
+     * all nodes will be sent write updates immediately.
+     * </p>
+     *
+     * @param mapName             a String identifier for the map.
+     * @param clusterService      the cluster service
+     * @param clusterCommunicator the cluster communications service
+     * @param serializerBuilder   a Kryo namespace builder that can serialize
+     *                            both K and V
+     * @param clockService        a clock service able to generate timestamps
+     *                            for K
+     */
+    public EventuallyConsistentMapImpl(String mapName,
+                                       ClusterService clusterService,
+                                       ClusterCommunicationService clusterCommunicator,
+                                       KryoNamespace.Builder serializerBuilder,
+                                       ClockService<K, V> clockService) {
+        this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
+             (key, value) -> clusterService.getNodes().stream()
+                     .map(ControllerNode::id)
+                     .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+                     .collect(Collectors.toList()));
+    }
+
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
         return new KryoSerializer() {
             @Override
@@ -270,11 +303,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (putInternal(key, value, timestamp)) {
-            notifyPeers(new InternalPutEvent<>(key, value, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.PUT, key, value);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalPutEvent<>(key, value, timestamp),
+                        peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.PUT, key, value));
         }
     }
 
@@ -318,11 +350,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, null);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, null);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+                        peerUpdateFunction.apply(key, null));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.REMOVE, key, null));
         }
     }
 
@@ -364,11 +395,10 @@
         Timestamp timestamp = clockService.getTimestamp(key, value);
 
         if (removeInternal(key, timestamp)) {
-            notifyPeers(new InternalRemoveEvent<>(key, timestamp));
-            EventuallyConsistentMapEvent<K, V> externalEvent
-                    = new EventuallyConsistentMapEvent<>(
-                    EventuallyConsistentMapEvent.Type.REMOVE, key, value);
-            notifyListeners(externalEvent);
+            notifyPeers(new InternalRemoveEvent<>(key, timestamp),
+                        peerUpdateFunction.apply(key, value));
+            notifyListeners(new EventuallyConsistentMapEvent<>(
+                    EventuallyConsistentMapEvent.Type.REMOVE, key, value));
         }
     }
 
@@ -393,7 +423,7 @@
         }
 
         if (!updates.isEmpty()) {
-            notifyPeers(new InternalPutEvent<>(updates));
+            broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
 
             for (PutEntry<K, V> entry : updates) {
                 EventuallyConsistentMapEvent<K, V> externalEvent =
@@ -421,7 +451,7 @@
         }
 
         if (!removed.isEmpty()) {
-            notifyPeers(new InternalRemoveEvent<>(removed));
+            broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
 
             for (RemoveEntry<K> entry : removed) {
                 EventuallyConsistentMapEvent<K, V> externalEvent
@@ -493,16 +523,26 @@
         }
     }
 
-    private void notifyPeers(InternalPutEvent event) {
+    private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
         // FIXME extremely memory expensive when we are overrun
 //        broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
-        broadcastMessage(updateMessageSubject, event);
+        multicastMessage(updateMessageSubject, event, peers);
     }
 
-    private void notifyPeers(InternalRemoveEvent event) {
+    private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
         // FIXME extremely memory expensive when we are overrun
 //        broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
-        broadcastMessage(removeMessageSubject, event);
+        multicastMessage(removeMessageSubject, event, peers);
+    }
+
+    private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
+        // FIXME can we parallelize the serialization... use the caller???
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                serializer.encode(event));
+        broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
+//        clusterCommunicator.broadcast(message);
     }
 
     private void broadcastMessage(MessageSubject subject, Object event) {
@@ -515,14 +555,13 @@
 //        clusterCommunicator.broadcast(message);
     }
 
-    private void unicastMessage(NodeId peer,
-                                MessageSubject subject,
-                                Object event) throws IOException {
+    private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 subject,
                 serializer.encode(event));
-        clusterCommunicator.unicast(message, peer);
+//        clusterCommunicator.unicast(message, peer);
+        broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
     }
 
     private boolean underHighLoad() {
@@ -567,11 +606,9 @@
 
                 AntiEntropyAdvertisement<K> ad = createAdvertisement();
 
-                try {
-                    unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
-                } catch (IOException e) {
-                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
-                }
+                // TODO check the return value?
+                unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
+                // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
             } catch (Exception e) {
                 // Catch all exceptions to avoid scheduled task being suppressed.
                 log.error("Exception thrown while sending advertisement", e);
@@ -607,14 +644,9 @@
                     // Send the advertisement back if this peer is out-of-sync
                     final NodeId sender = ad.sender();
                     AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-                    try {
-                        unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
-                    } catch (IOException e) {
-                        log.debug(
-                                "Failed to send reactive anti-entropy advertisement to {}",
-                                sender);
-                    }
-
+                    // TODO check the return value?
+                    unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
+                    // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
                     break;
                 }
             }
@@ -669,12 +701,10 @@
 
         // Send all updates to the peer at once
         if (!updatesToSend.isEmpty()) {
-            try {
-                unicastMessage(sender, updateMessageSubject,
-                               new InternalPutEvent<>(updatesToSend));
-            } catch (IOException e) {
-                log.warn("Failed to send advertisement response", e);
-            }
+            // TODO check the return value?
+            unicastMessage(sender, updateMessageSubject,
+                           new InternalPutEvent<>(updatesToSend));
+            //error log: log.warn("Failed to send advertisement response", e);
         }
 
         return externalEvents;
@@ -707,12 +737,10 @@
 
         // Send all removes to the peer at once
         if (!removesToSend.isEmpty()) {
-            try {
-                unicastMessage(sender, removeMessageSubject,
-                               new InternalRemoveEvent<>(removesToSend));
-            } catch (IOException e) {
-                log.warn("Failed to send advertisement response", e);
-            }
+            // TODO check the return value
+            unicastMessage(sender, removeMessageSubject,
+                           new InternalRemoveEvent<>(removesToSend));
+            // error log: log.warn("Failed to send advertisement response", e);
         }
     }