Support for running copycat on a subset of ONOS cluster nodes.
This change ensures DatabaseService on each node does the right thing, irrespective of whether the node is part of Raft cluster or not.
Change-Id: I1e8976d56b3a2892d5c7ecbb46c247770a633860
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 27e04c4..71608ef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -13,13 +13,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.DatabaseException;
@@ -31,7 +33,7 @@
/**
* Client for interacting with the Copycat Raft cluster.
*/
-public class DatabaseClient implements EventHandler<LeaderElectEvent> {
+public class DatabaseClient implements ClusterMessageHandler {
private static final int RETRIES = 5;
@@ -41,24 +43,28 @@
private final DatabaseProtocolService protocol;
private volatile ProtocolClient client = null;
- private volatile TcpMember currentLeader = null;
-
+ private volatile Member currentLeader = null;
+ private volatile long currentLeaderTerm = 0;
public DatabaseClient(DatabaseProtocolService protocol) {
this.protocol = checkNotNull(protocol);
}
- // FIXME This handler relies on a fact that local node is part of Raft cluster
@Override
- public void handle(LeaderElectEvent event) {
- final TcpMember newLeader = event.leader();
- if (newLeader != null && !newLeader.equals(currentLeader)) {
- log.info("{} became the new leader", newLeader);
+ public void handle(ClusterMessage message) {
+ LeaderElectEvent event =
+ ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ TcpMember newLeader = event.leader();
+ long newLeaderTerm = event.term();
+ if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
+ log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
ProtocolClient prevClient = client;
- ProtocolClient newclient = protocol.createClient(newLeader);
- newclient.connect();
- client = newclient;
+ ProtocolClient newClient = protocol.createClient(newLeader);
+ newClient.connect();
+ client = newClient;
currentLeader = newLeader;
+ currentLeaderTerm = newLeaderTerm;
+
if (prevClient != null) {
prevClient.close();
}
@@ -80,7 +86,6 @@
while (currentLeader == null) {
Thread.sleep(200);
}
- log.info("Leader appeared: {}", currentLeader);
return;
} catch (InterruptedException e) {
log.error("Interrupted while waiting for Leader", e);