Support for running copycat on a subset of ONOS cluster nodes.
This change ensures DatabaseService on each node does the right thing, irrespective of whether the node is part of Raft cluster or not.
Change-Id: I1e8976d56b3a2892d5c7ecbb46c247770a633860
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 6fcd6b9..37e75cf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -19,6 +19,7 @@
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.log.Log;
@@ -35,6 +36,8 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchReadResult;
import org.onlab.onos.store.service.BatchWriteRequest;
@@ -86,6 +89,9 @@
// initial member configuration file path
private String initialMemberConfig = DEFAULT_MEMBER_FILE;
+ public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
+ new MessageSubject("raft-leader-election-event");
+
private Copycat copycat;
private DatabaseClient client;
@@ -102,8 +108,6 @@
@Activate
public void activate() throws InterruptedException, ExecutionException {
- // TODO: Not every node should be part of the consensus ring.
-
// load tablet configuration
File file = new File(CONFIG_DIR, initialMemberConfig);
log.info("Loading config: {}", file.getAbsolutePath());
@@ -117,16 +121,16 @@
// load default tablet configuration and start copycat
clusterConfig = new TcpClusterConfig();
- Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
- if (defaultMember == null || defaultMember.isEmpty()) {
- log.error("No member found in [{}] tablet configuration.",
+ Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
+ if (defaultMembers == null || defaultMembers.isEmpty()) {
+ log.error("No members found in [{}] tablet configuration.",
DEFAULT_TABLET);
throw new IllegalStateException("No member found in tablet configuration");
}
final ControllerNode localNode = clusterService.getLocalNode();
- for (ControllerNode member : defaultMember) {
+ for (ControllerNode member : defaultMembers) {
final TcpMember tcpMember = new TcpMember(member.ip().toString(),
member.tcpPort());
if (localNode.equals(member)) {
@@ -136,6 +140,61 @@
}
}
+ if (clusterConfig.getLocalMember() != null) {
+
+ // Wait for a minimum viable Raft cluster to boot up.
+ waitForClusterQuorum();
+
+ final TcpCluster cluster;
+ synchronized (clusterConfig) {
+ // Create the cluster.
+ cluster = new TcpCluster(clusterConfig);
+ }
+ log.info("Starting cluster: {}", cluster);
+
+ DatabaseEntryExpirationTracker expirationTracker =
+ new DatabaseEntryExpirationTracker(
+ clusterConfig.getLocalMember(),
+ clusterService.getLocalNode(),
+ clusterCommunicator,
+ this);
+
+ DatabaseStateMachine stateMachine = new DatabaseStateMachine();
+ stateMachine.addEventListener(expirationTracker);
+ Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
+ ClusterMessagingProtocol.SERIALIZER);
+
+ copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
+ copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
+ copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
+ }
+
+ client = new DatabaseClient(copycatMessagingProtocol);
+ clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
+
+ // Starts copycat if this node is a participant
+ // of the Raft cluster.
+ if (copycat != null) {
+ copycat.start().get();
+ }
+
+ client.waitForLeader();
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterService.removeListener(clusterEventListener);
+ // TODO: ClusterCommunicationService must support more than one
+ // handler per message subject.
+ clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
+ if (copycat != null) {
+ copycat.stop();
+ }
+ log.info("Stopped.");
+ }
+
+ private void waitForClusterQuorum() {
// note: from this point beyond, clusterConfig requires synchronization
clusterEventLatch = new CountDownLatch(1);
clusterEventListener = new InternalClusterEventListener();
@@ -153,46 +212,6 @@
log.info("Interrupted waiting for others", e);
}
}
-
- final TcpCluster cluster;
- synchronized (clusterConfig) {
- // Create the cluster.
- cluster = new TcpCluster(clusterConfig);
- }
- log.info("Starting cluster: {}", cluster);
-
- DatabaseEntryExpirationTracker expirationTracker =
- new DatabaseEntryExpirationTracker(
- clusterConfig.getLocalMember(),
- clusterService.getLocalNode(),
- clusterCommunicator,
- this);
-
- DatabaseStateMachine stateMachine = new DatabaseStateMachine();
- stateMachine.addEventListener(expirationTracker);
- Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
- ClusterMessagingProtocol.SERIALIZER);
-
- copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
-
- client = new DatabaseClient(copycatMessagingProtocol);
-
-
- copycat.event(LeaderElectEvent.class).registerHandler(client);
- copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
-
- copycat.start().get();
-
- client.waitForLeader();
-
- log.info("Started.");
- }
-
- @Deactivate
- public void deactivate() {
- clusterService.removeListener(clusterEventListener);
- copycat.stop();
- log.info("Stopped.");
}
@Override
@@ -353,6 +372,24 @@
}
}
+ private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+ @Override
+ public void handle(LeaderElectEvent event) {
+ try {
+ if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+ // This node just became the leader.
+ clusterCommunicator.broadcastIncludeSelf(
+ new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ RAFT_LEADER_ELECTION_EVENT,
+ ClusterMessagingProtocol.SERIALIZER.encode(event)));
+ }
+ } catch (IOException e) {
+ log.error("Failed to broadcast raft leadership change event", e);
+ }
+ }
+ }
+
private final class InternalClusterEventListener
implements ClusterEventListener {