Support for running copycat on a subset of ONOS cluster nodes.
This change ensures DatabaseService on each node does the right thing, irrespective of whether the node is part of Raft cluster or not.
Change-Id: I1e8976d56b3a2892d5c7ecbb46c247770a633860
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index 97cf50e..b2f5c2b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -13,6 +13,7 @@
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.internal.log.ConfigurationEntry;
import net.kuujo.copycat.internal.log.CopycatEntry;
import net.kuujo.copycat.internal.log.OperationEntry;
@@ -103,6 +104,7 @@
.register(OperationEntry.class)
.register(TcpClusterConfig.class)
.register(TcpMember.class)
+ .register(LeaderElectEvent.class)
.build();
private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 27e04c4..71608ef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -13,13 +13,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.DatabaseException;
@@ -31,7 +33,7 @@
/**
* Client for interacting with the Copycat Raft cluster.
*/
-public class DatabaseClient implements EventHandler<LeaderElectEvent> {
+public class DatabaseClient implements ClusterMessageHandler {
private static final int RETRIES = 5;
@@ -41,24 +43,28 @@
private final DatabaseProtocolService protocol;
private volatile ProtocolClient client = null;
- private volatile TcpMember currentLeader = null;
-
+ private volatile Member currentLeader = null;
+ private volatile long currentLeaderTerm = 0;
public DatabaseClient(DatabaseProtocolService protocol) {
this.protocol = checkNotNull(protocol);
}
- // FIXME This handler relies on a fact that local node is part of Raft cluster
@Override
- public void handle(LeaderElectEvent event) {
- final TcpMember newLeader = event.leader();
- if (newLeader != null && !newLeader.equals(currentLeader)) {
- log.info("{} became the new leader", newLeader);
+ public void handle(ClusterMessage message) {
+ LeaderElectEvent event =
+ ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ TcpMember newLeader = event.leader();
+ long newLeaderTerm = event.term();
+ if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
+ log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
ProtocolClient prevClient = client;
- ProtocolClient newclient = protocol.createClient(newLeader);
- newclient.connect();
- client = newclient;
+ ProtocolClient newClient = protocol.createClient(newLeader);
+ newClient.connect();
+ client = newClient;
currentLeader = newLeader;
+ currentLeaderTerm = newLeaderTerm;
+
if (prevClient != null) {
prevClient.close();
}
@@ -80,7 +86,6 @@
while (currentLeader == null) {
Thread.sleep(200);
}
- log.info("Leader appeared: {}", currentLeader);
return;
} catch (InterruptedException e) {
log.error("Interrupted while waiting for Leader", e);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 6fcd6b9..37e75cf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -19,6 +19,7 @@
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.log.Log;
@@ -35,6 +36,8 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchReadResult;
import org.onlab.onos.store.service.BatchWriteRequest;
@@ -86,6 +89,9 @@
// initial member configuration file path
private String initialMemberConfig = DEFAULT_MEMBER_FILE;
+ public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
+ new MessageSubject("raft-leader-election-event");
+
private Copycat copycat;
private DatabaseClient client;
@@ -102,8 +108,6 @@
@Activate
public void activate() throws InterruptedException, ExecutionException {
- // TODO: Not every node should be part of the consensus ring.
-
// load tablet configuration
File file = new File(CONFIG_DIR, initialMemberConfig);
log.info("Loading config: {}", file.getAbsolutePath());
@@ -117,16 +121,16 @@
// load default tablet configuration and start copycat
clusterConfig = new TcpClusterConfig();
- Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
- if (defaultMember == null || defaultMember.isEmpty()) {
- log.error("No member found in [{}] tablet configuration.",
+ Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
+ if (defaultMembers == null || defaultMembers.isEmpty()) {
+ log.error("No members found in [{}] tablet configuration.",
DEFAULT_TABLET);
throw new IllegalStateException("No member found in tablet configuration");
}
final ControllerNode localNode = clusterService.getLocalNode();
- for (ControllerNode member : defaultMember) {
+ for (ControllerNode member : defaultMembers) {
final TcpMember tcpMember = new TcpMember(member.ip().toString(),
member.tcpPort());
if (localNode.equals(member)) {
@@ -136,6 +140,61 @@
}
}
+ if (clusterConfig.getLocalMember() != null) {
+
+ // Wait for a minimum viable Raft cluster to boot up.
+ waitForClusterQuorum();
+
+ final TcpCluster cluster;
+ synchronized (clusterConfig) {
+ // Create the cluster.
+ cluster = new TcpCluster(clusterConfig);
+ }
+ log.info("Starting cluster: {}", cluster);
+
+ DatabaseEntryExpirationTracker expirationTracker =
+ new DatabaseEntryExpirationTracker(
+ clusterConfig.getLocalMember(),
+ clusterService.getLocalNode(),
+ clusterCommunicator,
+ this);
+
+ DatabaseStateMachine stateMachine = new DatabaseStateMachine();
+ stateMachine.addEventListener(expirationTracker);
+ Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
+ ClusterMessagingProtocol.SERIALIZER);
+
+ copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
+ copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
+ copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
+ }
+
+ client = new DatabaseClient(copycatMessagingProtocol);
+ clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
+
+ // Starts copycat if this node is a participant
+ // of the Raft cluster.
+ if (copycat != null) {
+ copycat.start().get();
+ }
+
+ client.waitForLeader();
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterService.removeListener(clusterEventListener);
+ // TODO: ClusterCommunicationService must support more than one
+ // handler per message subject.
+ clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
+ if (copycat != null) {
+ copycat.stop();
+ }
+ log.info("Stopped.");
+ }
+
+ private void waitForClusterQuorum() {
// note: from this point beyond, clusterConfig requires synchronization
clusterEventLatch = new CountDownLatch(1);
clusterEventListener = new InternalClusterEventListener();
@@ -153,46 +212,6 @@
log.info("Interrupted waiting for others", e);
}
}
-
- final TcpCluster cluster;
- synchronized (clusterConfig) {
- // Create the cluster.
- cluster = new TcpCluster(clusterConfig);
- }
- log.info("Starting cluster: {}", cluster);
-
- DatabaseEntryExpirationTracker expirationTracker =
- new DatabaseEntryExpirationTracker(
- clusterConfig.getLocalMember(),
- clusterService.getLocalNode(),
- clusterCommunicator,
- this);
-
- DatabaseStateMachine stateMachine = new DatabaseStateMachine();
- stateMachine.addEventListener(expirationTracker);
- Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
- ClusterMessagingProtocol.SERIALIZER);
-
- copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
-
- client = new DatabaseClient(copycatMessagingProtocol);
-
-
- copycat.event(LeaderElectEvent.class).registerHandler(client);
- copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
-
- copycat.start().get();
-
- client.waitForLeader();
-
- log.info("Started.");
- }
-
- @Deactivate
- public void deactivate() {
- clusterService.removeListener(clusterEventListener);
- copycat.stop();
- log.info("Stopped.");
}
@Override
@@ -353,6 +372,24 @@
}
}
+ private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+ @Override
+ public void handle(LeaderElectEvent event) {
+ try {
+ if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+ // This node just became the leader.
+ clusterCommunicator.broadcastIncludeSelf(
+ new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ RAFT_LEADER_ELECTION_EVENT,
+ ClusterMessagingProtocol.SERIALIZER.encode(event)));
+ }
+ } catch (IOException e) {
+ log.error("Failed to broadcast raft leadership change event", e);
+ }
+ }
+ }
+
private final class InternalClusterEventListener
implements ClusterEventListener {