CopyCat: Dynamic cluster support
Change-Id: I887c52b35811abf37a2b59db034b07ccf01eed2c
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 183a6db..19ee882 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -8,17 +8,21 @@
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
+import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.service.DatabaseAdminService;
@@ -35,8 +39,6 @@
import org.onlab.onos.store.service.WriteResult;
import org.slf4j.Logger;
-import com.google.common.collect.Lists;
-
/**
* Strongly consistent and durable state management service based on
* Copycat implementation of Raft consensus protocol.
@@ -58,17 +60,29 @@
private Copycat copycat;
private DatabaseClient client;
+ // TODO: check if synchronization is required to read/modify this
+ private ClusterConfig<TcpMember> clusterConfig;
+
+ private ClusterEventListener clusterEventListener;
+
@Activate
public void activate() {
- log.info("Starting.");
- // TODO: Not every node can be part of the consensus ring.
+ // TODO: Not every node should be part of the consensus ring.
+ final ControllerNode localNode = clusterService.getLocalNode();
TcpMember localMember =
new TcpMember(
- clusterService.getLocalNode().ip().toString(),
- clusterService.getLocalNode().tcpPort());
- List<TcpMember> remoteMembers = Lists.newArrayList();
+ localNode.ip().toString(),
+ localNode.tcpPort());
+
+ clusterConfig = new TcpClusterConfig();
+ clusterConfig.setLocalMember(localMember);
+
+ List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
+
+ clusterEventListener = new InternalClusterEventListener();
+ clusterService.addListener(clusterEventListener);
for (ControllerNode node : clusterService.getNodes()) {
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
@@ -76,21 +90,18 @@
remoteMembers.add(member);
}
}
+ clusterConfig.addRemoteMembers(remoteMembers);
- // Configure the cluster.
- TcpClusterConfig config = new TcpClusterConfig();
+ log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers);
- config.setLocalMember(localMember);
- config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
// Create the cluster.
- TcpCluster cluster = new TcpCluster(config);
+ TcpCluster cluster = new TcpCluster(clusterConfig);
StateMachine stateMachine = new DatabaseStateMachine();
- ControllerNode thisNode = clusterService.getLocalNode();
// FIXME resolve Chronicle + OSGi issue
//Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
- Log consensusLog = new InMemoryLog();
+ Log consensusLog = new KryoRegisteredInMemoryLog();
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
@@ -102,6 +113,7 @@
@Deactivate
public void deactivate() {
+ clusterService.removeListener(clusterEventListener);
copycat.stop();
log.info("Stopped.");
}
@@ -179,6 +191,46 @@
}
+ private final class InternalClusterEventListener
+ implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ // TODO: Not every node should be part of the consensus ring.
+
+ final ControllerNode node = event.subject();
+ final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+ node.tcpPort());
+
+ log.trace("{}", event);
+ switch (event.type()) {
+ case INSTANCE_ACTIVATED:
+ case INSTANCE_ADDED:
+ log.info("{} was added to the cluster", tcpMember);
+ clusterConfig.addRemoteMember(tcpMember);
+ break;
+ case INSTANCE_DEACTIVATED:
+ case INSTANCE_REMOVED:
+ log.info("{} was removed from the cluster", tcpMember);
+ clusterConfig.removeRemoteMember(tcpMember);
+ break;
+ default:
+ break;
+ }
+ log.info("Current cluster: {}", clusterConfig.getMembers());
+ }
+
+ }
+
+ public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
+ public KryoRegisteredInMemoryLog() {
+ super();
+ // required to deserialize object across bundles
+ super.kryo.register(TcpMember.class, new TcpMemberSerializer());
+ super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
+ }
+ }
+
private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
private final R result;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java
new file mode 100644
index 0000000..48887b9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java
@@ -0,0 +1,30 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.Collection;
+
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class TcpClusterConfigSerializer extends Serializer<TcpClusterConfig> {
+
+ @Override
+ public void write(Kryo kryo, Output output, TcpClusterConfig object) {
+ kryo.writeClassAndObject(output, object.getLocalMember());
+ kryo.writeClassAndObject(output, object.getRemoteMembers());
+ }
+
+ @Override
+ public TcpClusterConfig read(Kryo kryo, Input input,
+ Class<TcpClusterConfig> type) {
+ TcpMember localMember = (TcpMember) kryo.readClassAndObject(input);
+ @SuppressWarnings("unchecked")
+ Collection<TcpMember> remoteMembers = (Collection<TcpMember>) kryo.readClassAndObject(input);
+ return new TcpClusterConfig(localMember, remoteMembers);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java
new file mode 100644
index 0000000..e729f9b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.service.impl;
+
+import net.kuujo.copycat.cluster.TcpMember;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class TcpMemberSerializer extends Serializer<TcpMember> {
+
+ @Override
+ public void write(Kryo kryo, Output output, TcpMember object) {
+ output.writeString(object.host());
+ output.writeInt(object.port());
+ }
+
+ @Override
+ public TcpMember read(Kryo kryo, Input input, Class<TcpMember> type) {
+ String host = input.readString();
+ int port = input.readInt();
+ return new TcpMember(host, port);
+ }
+}