Updates to ECM interface

Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209

Adding origin to IntentData and use it to pick GossipIntentStore peer

Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index bbdbdb7..2bc992b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.intent.impl;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentEvent;
@@ -41,7 +44,10 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.onosproject.net.intent.IntentState.*;
@@ -51,6 +57,8 @@
  * Manages inventory of Intents in a distributed data store that uses optimistic
  * replication and gossip based techniques.
  */
+//FIXME we should listen for leadership changes. if the local instance has just
+// ...  become a leader, scan the pending map and process those
 @Component(immediate = false, enabled = true)
 @Service
 public class GossipIntentStore
@@ -86,15 +94,17 @@
                                                        clusterService,
                                                        clusterCommunicator,
                                                        intentSerializer,
-                                                       new IntentDataLogicalClockManager<>());
+                                                       new IntentDataLogicalClockManager<>(),
+                                                       (key, intentData) -> getPeerNodes(key, intentData));
 
         pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
                                                        clusterService,
                                                        clusterCommunicator,
                                                        intentSerializer, // TODO
-                                                       new IntentDataClockManager<>());
+                                                       new IntentDataClockManager<>(),
+                                                       (key, intentData) -> getPeerNodes(key, intentData));
 
-        currentMap.addListener(new InternalIntentStatesListener());
+        currentMap.addListener(new InternalCurrentListener());
         pendingMap.addListener(new InternalPendingListener());
 
         log.info("Started");
@@ -226,7 +236,6 @@
     @Override
     public void write(IntentData newData) {
         IntentData currentData = currentMap.get(newData.key());
-
         if (isUpdateAcceptable(currentData, newData)) {
             // Only the master is modifying the current state. Therefore assume
             // this always succeeds
@@ -239,6 +248,34 @@
         }
     }
 
+    private Iterable<NodeId> getPeerNodes(Key key, IntentData data) {
+        NodeId master = partitionService.getLeader(key);
+        NodeId origin = (data != null) ? data.origin() : null;
+        NodeId me = clusterService.getLocalNode().id();
+        boolean isMaster = Objects.equals(master, me);
+        boolean isOrigin = Objects.equals(origin, me);
+        if (isMaster && isOrigin) {
+            return ImmutableList.of(getRandomNode());
+        } else if (isMaster) {
+            return ImmutableList.of(origin);
+        } else if (isOrigin) {
+            return ImmutableList.of(master);
+        } else {
+            // FIXME: why are we here? log error?
+            return ImmutableList.of(master);
+        }
+    }
+
+    private NodeId getRandomNode() {
+        List<NodeId> nodes = clusterService.getNodes().stream()
+                                .map(ControllerNode::id)
+                .collect(Collectors.toCollection(ArrayList::new));
+        Collections.shuffle(nodes);
+        // FIXME check if self
+        // FIXME verify nodes.size() > 0
+        return nodes.get(0);
+    }
+
     @Override
     public void batchWrite(Iterable<IntentData> updates) {
         updates.forEach(this::write);
@@ -263,6 +300,7 @@
         if (data.version() == null) {
             data.setVersion(new WallClockTimestamp());
         }
+        data.setOrigin(clusterService.getLocalNode().id());
         pendingMap.put(data.key(), copyData(data));
     }
 
@@ -292,7 +330,7 @@
         }
     }
 
-    private final class InternalIntentStatesListener implements
+    private final class InternalCurrentListener implements
             EventuallyConsistentMapListener<Key, IntentData> {
         @Override
         public void event(