DatabaseManager: try to wait for others on start up
Change-Id: I90acfa10be7430509a459b456658dc8838d4e44b
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index c561221..56dba79 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -149,12 +149,12 @@
@Activate
public void activate() {
- log.info("Started.");
+ log.info("Started");
}
@Deactivate
public void deactivate() {
- log.info("Stopped.");
+ log.info("Stopped");
}
@Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index b1e1949..2779b35 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -5,6 +5,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
@@ -60,9 +62,11 @@
private Copycat copycat;
private DatabaseClient client;
- // TODO: check if synchronization is required to read/modify this
+ // guarded by synchronized block
private ClusterConfig<TcpMember> clusterConfig;
+ private CountDownLatch clusterEventLatch;
+
private ClusterEventListener clusterEventListener;
@Activate
@@ -81,22 +85,45 @@
List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
+ clusterEventLatch = new CountDownLatch(1);
clusterEventListener = new InternalClusterEventListener();
clusterService.addListener(clusterEventListener);
+ // note: from this point beyond, clusterConfig requires synchronization
+
for (ControllerNode node : clusterService.getNodes()) {
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
if (!member.equals(localMember)) {
remoteMembers.add(member);
}
}
- clusterConfig.addRemoteMembers(remoteMembers);
- log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers);
+ if (remoteMembers.isEmpty()) {
+ log.info("This node is the only node in the cluster. "
+ + "Waiting for others to show up.");
+ // FIXME: hack trying to relax cases forming multiple consensus rings.
+ // add seed node configuration to avoid this
+ // If the node is alone on it's own, wait some time
+ // hoping other will come up soon
+ try {
+ if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
+ log.info("Starting as single node cluster");
+ }
+ } catch (InterruptedException e) {
+ log.info("Interrupted waiting for others", e);
+ }
+ }
- // Create the cluster.
- TcpCluster cluster = new TcpCluster(clusterConfig);
+ final TcpCluster cluster;
+ synchronized (clusterConfig) {
+ clusterConfig.addRemoteMembers(remoteMembers);
+
+ // Create the cluster.
+ cluster = new TcpCluster(clusterConfig);
+ }
+ log.info("Starting cluster: {}", cluster);
+
StateMachine stateMachine = new DatabaseStateMachine();
// FIXME resolve Chronicle + OSGi issue
@@ -207,17 +234,24 @@
case INSTANCE_ACTIVATED:
case INSTANCE_ADDED:
log.info("{} was added to the cluster", tcpMember);
- clusterConfig.addRemoteMember(tcpMember);
+ synchronized (clusterConfig) {
+ clusterConfig.addRemoteMember(tcpMember);
+ }
break;
case INSTANCE_DEACTIVATED:
case INSTANCE_REMOVED:
log.info("{} was removed from the cluster", tcpMember);
- clusterConfig.removeRemoteMember(tcpMember);
+ synchronized (clusterConfig) {
+ clusterConfig.removeRemoteMember(tcpMember);
+ }
break;
default:
break;
}
- log.debug("Current cluster: {}", copycat.cluster());
+ if (copycat != null) {
+ log.debug("Current cluster: {}", copycat.cluster());
+ }
+ clusterEventLatch.countDown();
}
}