ClusterMessagingProtocolClient: lazily bind to NodeId

fixes ONOS-185

Change-Id: Ibbe9624509964d7c3e7ac2c95c171e5cb20b0634
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index dc913bf..97cf50e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -1,6 +1,5 @@
 package org.onlab.onos.store.service.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayList;
@@ -38,7 +37,6 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ClusterService;
-import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.serializers.ImmutableListSerializer;
@@ -172,20 +170,9 @@
 
     @Override
     public ProtocolClient createClient(TcpMember member) {
-        ControllerNode remoteNode = getControllerNode(member.host(), member.port());
-        checkNotNull(remoteNode,
-                     "No matching ONOS Node for %s:%s",
-                     member.host(), member.port());
-        return new ClusterMessagingProtocolClient(
-                clusterCommunicator, clusterService.getLocalNode(), remoteNode);
-    }
-
-    private ControllerNode getControllerNode(String host, int port) {
-        for (ControllerNode node : clusterService.getNodes()) {
-            if (node.ip().toString().equals(host) && node.tcpPort() == port) {
-                return node;
-            }
-        }
-        return null;
+        return new ClusterMessagingProtocolClient(clusterService,
+                                                  clusterCommunicator,
+                                                  clusterService.getLocalNode(),
+                                                  member);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 23c34b2..3dd93b9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -13,6 +13,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import net.kuujo.copycat.cluster.TcpMember;
 import net.kuujo.copycat.protocol.PingRequest;
 import net.kuujo.copycat.protocol.PingResponse;
 import net.kuujo.copycat.protocol.PollRequest;
@@ -23,6 +24,9 @@
 import net.kuujo.copycat.protocol.SyncResponse;
 import net.kuujo.copycat.spi.protocol.ProtocolClient;
 
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
@@ -43,21 +47,30 @@
 
     public static final long RETRY_INTERVAL_MILLIS = 2000;
 
+    private final ClusterService clusterService;
     private final ClusterCommunicationService clusterCommunicator;
     private final ControllerNode localNode;
-    private final ControllerNode remoteNode;
+    private final TcpMember remoteMember;
+    private ControllerNode remoteNode;
 
     // FIXME: Thread pool sizing.
     private static final ScheduledExecutorService THREAD_POOL =
             new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
 
+    private volatile CompletableFuture<Void> appeared;
+
+    private volatile InternalClusterEventListener listener;
+
     public ClusterMessagingProtocolClient(
+            ClusterService clusterService,
             ClusterCommunicationService clusterCommunicator,
             ControllerNode localNode,
-            ControllerNode remoteNode) {
+            TcpMember remoteMember) {
+
+        this.clusterService = clusterService;
         this.clusterCommunicator = clusterCommunicator;
         this.localNode = localNode;
-        this.remoteNode = remoteNode;
+        this.remoteMember = remoteMember;
     }
 
     @Override
@@ -81,15 +94,64 @@
     }
 
     @Override
-    public CompletableFuture<Void> connect() {
-        return CompletableFuture.completedFuture(null);
+    public synchronized CompletableFuture<Void> connect() {
+        if (remoteNode != null) {
+            // done
+            return CompletableFuture.completedFuture(null);
+        }
+
+        remoteNode = getControllerNode(remoteMember);
+
+        if (remoteNode != null) {
+            // done
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (appeared != null) {
+            // already waiting for member to appear
+            return appeared;
+        }
+
+        appeared = new CompletableFuture<>();
+        listener = new InternalClusterEventListener();
+        clusterService.addListener(listener);
+
+        // wait for specified controller node to come up
+        return null;
     }
 
     @Override
-    public CompletableFuture<Void> close() {
+    public synchronized CompletableFuture<Void> close() {
+        if (listener != null) {
+            clusterService.removeListener(listener);
+            listener = null;
+        }
+        if (appeared != null) {
+            appeared.cancel(true);
+            appeared = null;
+        }
         return CompletableFuture.completedFuture(null);
     }
 
+    private synchronized void checkIfMemberAppeared() {
+        final ControllerNode controllerNode = getControllerNode(remoteMember);
+        if (controllerNode == null) {
+            // still not there: no-op
+            return;
+        }
+
+        // found
+        remoteNode = controllerNode;
+        if (appeared != null) {
+            appeared.complete(null);
+        }
+
+        if (listener != null) {
+            clusterService.removeListener(listener);
+            listener = null;
+        }
+    }
+
     private <I> MessageSubject messageType(I input) {
         Class<?> clazz = input.getClass();
         if (clazz.equals(PollRequest.class)) {
@@ -112,6 +174,30 @@
         return future;
     }
 
+    private ControllerNode getControllerNode(TcpMember remoteMember) {
+        final String host = remoteMember.host();
+        final int port = remoteMember.port();
+        for (ControllerNode node : clusterService.getNodes()) {
+            if (node.ip().toString().equals(host) && node.tcpPort() == port) {
+                return node;
+            }
+        }
+        return null;
+    }
+
+    private final class InternalClusterEventListener
+            implements ClusterEventListener {
+
+        public InternalClusterEventListener() {
+        }
+
+        @Override
+        public void event(ClusterEvent event) {
+            checkIfMemberAppeared();
+        }
+
+    }
+
     private class RPCTask<I, O> implements Runnable {
 
         private final I request;