Updates to ECM interface

Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209

Adding origin to IntentData and use it to pick GossipIntentStore peer

Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index e6670de..5ed6384 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -16,9 +16,9 @@
 package org.onosproject.store.ecmap;
 
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,10 +53,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static junit.framework.TestCase.assertFalse;
 import static org.easymock.EasyMock.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 /**
  * Unit tests for EventuallyConsistentMapImpl.
@@ -119,8 +116,8 @@
     @Before
     public void setUp() throws Exception {
         clusterService = createMock(ClusterService.class);
-        expect(clusterService.getLocalNode()).andReturn(self)
-                .anyTimes();
+        expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
+        expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
         replay(clusterService);
 
         clusterCommunicator = createMock(ClusterCommunicationService.class);
@@ -163,7 +160,7 @@
 
     @Test
     public void testSize() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertEquals(0, ecMap.size());
         ecMap.put(KEY1, VALUE1);
@@ -184,7 +181,7 @@
 
     @Test
     public void testIsEmpty() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.isEmpty());
         ecMap.put(KEY1, VALUE1);
@@ -195,7 +192,7 @@
 
     @Test
     public void testContainsKey() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertFalse(ecMap.containsKey(KEY1));
         ecMap.put(KEY1, VALUE1);
@@ -207,7 +204,7 @@
 
     @Test
     public void testContainsValue() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertFalse(ecMap.containsValue(VALUE1));
         ecMap.put(KEY1, VALUE1);
@@ -222,7 +219,7 @@
 
     @Test
     public void testGet() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         CountDownLatch latch;
 
@@ -278,7 +275,7 @@
         ecMap.addListener(listener);
 
         // Set up expected internal message to be broadcast to peers on first put
-        expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService
+        expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
                 .peekAtNextTimestamp()), clusterCommunicator);
 
         // Put first value
@@ -289,7 +286,7 @@
         verify(clusterCommunicator);
 
         // Set up expected internal message to be broadcast to peers on second put
-        expectSpecificMessage(generatePutMessage(
+        expectSpecificMulticastMessage(generatePutMessage(
                 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
 
         // Update same key to a new value
@@ -332,14 +329,14 @@
         ecMap.addListener(listener);
 
         // Put in an initial value
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
         ecMap.put(KEY1, VALUE1);
         assertEquals(VALUE1, ecMap.get(KEY1));
 
         // Remove the value and check the correct internal cluster messages
         // are sent
-        expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                              clusterCommunicator);
+        expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                                       clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -349,8 +346,8 @@
         // Remove the same value again. Even though the value is no longer in
         // the map, we expect that the tombstone is updated and another remove
         // event is sent to the cluster and external listeners.
-        expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
-                              clusterCommunicator);
+        expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
+                                       clusterCommunicator);
 
         ecMap.remove(KEY1);
         assertNull(ecMap.get(KEY1));
@@ -359,7 +356,7 @@
 
 
         // Put in a new value for us to try and remove
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         ecMap.put(KEY2, VALUE2);
 
@@ -400,8 +397,8 @@
         ecMap.addListener(listener);
 
         // Expect a multi-update inter-instance message
-        expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
-                              clusterCommunicator);
+        expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+                                       clusterCommunicator);
 
         Map<String, String> putAllValues = new HashMap<>();
         putAllValues.put(KEY1, VALUE1);
@@ -434,12 +431,12 @@
         verify(clusterCommunicator);
 
         // Put some items in the map
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
         ecMap.put(KEY1, VALUE1);
         ecMap.put(KEY2, VALUE2);
 
         ecMap.addListener(listener);
-        expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+        expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
 
         ecMap.clear();
 
@@ -449,7 +446,7 @@
 
     @Test
     public void testKeySet() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.keySet().isEmpty());
 
@@ -482,7 +479,7 @@
 
     @Test
     public void testValues() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.values().isEmpty());
 
@@ -520,7 +517,7 @@
 
     @Test
     public void testEntrySet() throws Exception {
-        expectAnyMessage(clusterCommunicator);
+        expectPeerMessage(clusterCommunicator);
 
         assertTrue(ecMap.entrySet().isEmpty());
 
@@ -658,21 +655,52 @@
      * @param m message we expect to be sent
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
-    private static void expectSpecificMessage(ClusterMessage m,
-            ClusterCommunicationService clusterCommunicator) {
+    private static void expectSpecificBroadcastMessage(ClusterMessage m,
+                           ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
         expect(clusterCommunicator.broadcast(m)).andReturn(true);
         replay(clusterCommunicator);
     }
 
     /**
-     * Sets up a mock ClusterCommunicationService to expect any cluster message
+     * Sets up a mock ClusterCommunicationService to expect a specific cluster
+     * message to be multicast to the cluster.
+     *
+     * @param m message we expect to be sent
+     * @param clusterCommunicator a mock ClusterCommunicationService to set up
+     */
+    private static void expectSpecificMulticastMessage(ClusterMessage m,
+                           ClusterCommunicationService clusterCommunicator) {
+        reset(clusterCommunicator);
+        expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
+        replay(clusterCommunicator);
+    }
+
+
+    /**
+     * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
      * that is sent to it. This is useful for unit tests where we aren't
      * interested in testing the messaging component.
      *
      * @param clusterCommunicator a mock ClusterCommunicationService to set up
      */
-    private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) {
+    private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+        reset(clusterCommunicator);
+        expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
+                                             anyObject(Iterable.class)))
+                .andReturn(true)
+                .anyTimes();
+        replay(clusterCommunicator);
+    }
+
+    /**
+     * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
+     * that is sent to it. This is useful for unit tests where we aren't
+     * interested in testing the messaging component.
+     *
+     * @param clusterCommunicator a mock ClusterCommunicationService to set up
+     */
+    private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
         reset(clusterCommunicator);
         expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
                 .andReturn(true)
@@ -700,13 +728,12 @@
         }
 
         @Override
-        public boolean unicast(ClusterMessage message, NodeId toNodeId)
-                throws IOException {
+        public boolean unicast(ClusterMessage message, NodeId toNodeId)  {
             return false;
         }
 
         @Override
-        public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) {
+        public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
             return false;
         }