Merge "Compare ConnectPoints rather than Interfaces."
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/DefaultFlowEntryTest.java b/core/api/src/test/java/org/onlab/onos/net/flow/DefaultFlowEntryTest.java
similarity index 100%
rename from core/net/src/test/java/org/onlab/onos/net/flow/DefaultFlowEntryTest.java
rename to core/api/src/test/java/org/onlab/onos/net/flow/DefaultFlowEntryTest.java
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/DefaultFlowRuleTest.java b/core/api/src/test/java/org/onlab/onos/net/flow/DefaultFlowRuleTest.java
similarity index 100%
rename from core/net/src/test/java/org/onlab/onos/net/flow/DefaultFlowRuleTest.java
rename to core/api/src/test/java/org/onlab/onos/net/flow/DefaultFlowRuleTest.java
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java b/core/api/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java
similarity index 100%
rename from core/net/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java
rename to core/api/src/test/java/org/onlab/onos/net/intent/IntentTestsMocks.java
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index dc913bf..97cf50e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -1,6 +1,5 @@
package org.onlab.onos.store.service.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
@@ -38,7 +37,6 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
-import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ImmutableListSerializer;
@@ -172,20 +170,9 @@
@Override
public ProtocolClient createClient(TcpMember member) {
- ControllerNode remoteNode = getControllerNode(member.host(), member.port());
- checkNotNull(remoteNode,
- "No matching ONOS Node for %s:%s",
- member.host(), member.port());
- return new ClusterMessagingProtocolClient(
- clusterCommunicator, clusterService.getLocalNode(), remoteNode);
- }
-
- private ControllerNode getControllerNode(String host, int port) {
- for (ControllerNode node : clusterService.getNodes()) {
- if (node.ip().toString().equals(host) && node.tcpPort() == port) {
- return node;
- }
- }
- return null;
+ return new ClusterMessagingProtocolClient(clusterService,
+ clusterCommunicator,
+ clusterService.getLocalNode(),
+ member);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 23c34b2..3dd93b9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -13,6 +13,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
@@ -23,6 +24,9 @@
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
@@ -43,21 +47,30 @@
public static final long RETRY_INTERVAL_MILLIS = 2000;
+ private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final ControllerNode localNode;
- private final ControllerNode remoteNode;
+ private final TcpMember remoteMember;
+ private ControllerNode remoteNode;
// FIXME: Thread pool sizing.
private static final ScheduledExecutorService THREAD_POOL =
new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+ private volatile CompletableFuture<Void> appeared;
+
+ private volatile InternalClusterEventListener listener;
+
public ClusterMessagingProtocolClient(
+ ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
ControllerNode localNode,
- ControllerNode remoteNode) {
+ TcpMember remoteMember) {
+
+ this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
this.localNode = localNode;
- this.remoteNode = remoteNode;
+ this.remoteMember = remoteMember;
}
@Override
@@ -81,15 +94,64 @@
}
@Override
- public CompletableFuture<Void> connect() {
- return CompletableFuture.completedFuture(null);
+ public synchronized CompletableFuture<Void> connect() {
+ if (remoteNode != null) {
+ // done
+ return CompletableFuture.completedFuture(null);
+ }
+
+ remoteNode = getControllerNode(remoteMember);
+
+ if (remoteNode != null) {
+ // done
+ return CompletableFuture.completedFuture(null);
+ }
+
+ if (appeared != null) {
+ // already waiting for member to appear
+ return appeared;
+ }
+
+ appeared = new CompletableFuture<>();
+ listener = new InternalClusterEventListener();
+ clusterService.addListener(listener);
+
+ // wait for specified controller node to come up
+ return null;
}
@Override
- public CompletableFuture<Void> close() {
+ public synchronized CompletableFuture<Void> close() {
+ if (listener != null) {
+ clusterService.removeListener(listener);
+ listener = null;
+ }
+ if (appeared != null) {
+ appeared.cancel(true);
+ appeared = null;
+ }
return CompletableFuture.completedFuture(null);
}
+ private synchronized void checkIfMemberAppeared() {
+ final ControllerNode controllerNode = getControllerNode(remoteMember);
+ if (controllerNode == null) {
+ // still not there: no-op
+ return;
+ }
+
+ // found
+ remoteNode = controllerNode;
+ if (appeared != null) {
+ appeared.complete(null);
+ }
+
+ if (listener != null) {
+ clusterService.removeListener(listener);
+ listener = null;
+ }
+ }
+
private <I> MessageSubject messageType(I input) {
Class<?> clazz = input.getClass();
if (clazz.equals(PollRequest.class)) {
@@ -112,6 +174,30 @@
return future;
}
+ private ControllerNode getControllerNode(TcpMember remoteMember) {
+ final String host = remoteMember.host();
+ final int port = remoteMember.port();
+ for (ControllerNode node : clusterService.getNodes()) {
+ if (node.ip().toString().equals(host) && node.tcpPort() == port) {
+ return node;
+ }
+ }
+ return null;
+ }
+
+ private final class InternalClusterEventListener
+ implements ClusterEventListener {
+
+ public InternalClusterEventListener() {
+ }
+
+ @Override
+ public void event(ClusterEvent event) {
+ checkIfMemberAppeared();
+ }
+
+ }
+
private class RPCTask<I, O> implements Runnable {
private final I request;