CopyCat: Dynamic cluster support

Change-Id: I887c52b35811abf37a2b59db034b07ccf01eed2c
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 183a6db..19ee882 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -8,17 +8,21 @@
 
 import net.kuujo.copycat.Copycat;
 import net.kuujo.copycat.StateMachine;
+import net.kuujo.copycat.cluster.ClusterConfig;
 import net.kuujo.copycat.cluster.TcpCluster;
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
 import net.kuujo.copycat.log.InMemoryLog;
 import net.kuujo.copycat.log.Log;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterEventListener;
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.service.DatabaseAdminService;
@@ -35,8 +39,6 @@
 import org.onlab.onos.store.service.WriteResult;
 import org.slf4j.Logger;
 
-import com.google.common.collect.Lists;
-
 /**
  * Strongly consistent and durable state management service based on
  * Copycat implementation of Raft consensus protocol.
@@ -58,17 +60,29 @@
     private Copycat copycat;
     private DatabaseClient client;
 
+    // TODO: check if synchronization is required to read/modify this
+    private ClusterConfig<TcpMember> clusterConfig;
+
+    private ClusterEventListener clusterEventListener;
+
     @Activate
     public void activate() {
-        log.info("Starting.");
 
-        // TODO: Not every node can be part of the consensus ring.
+        // TODO: Not every node should be part of the consensus ring.
 
+        final ControllerNode localNode = clusterService.getLocalNode();
         TcpMember localMember =
                 new TcpMember(
-                        clusterService.getLocalNode().ip().toString(),
-                        clusterService.getLocalNode().tcpPort());
-        List<TcpMember> remoteMembers = Lists.newArrayList();
+                        localNode.ip().toString(),
+                        localNode.tcpPort());
+
+        clusterConfig = new TcpClusterConfig();
+        clusterConfig.setLocalMember(localMember);
+
+        List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
+
+        clusterEventListener = new InternalClusterEventListener();
+        clusterService.addListener(clusterEventListener);
 
         for (ControllerNode node : clusterService.getNodes()) {
             TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
@@ -76,21 +90,18 @@
                 remoteMembers.add(member);
             }
         }
+        clusterConfig.addRemoteMembers(remoteMembers);
 
-        // Configure the cluster.
-        TcpClusterConfig config = new TcpClusterConfig();
+        log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers);
 
-        config.setLocalMember(localMember);
-        config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
 
         // Create the cluster.
-        TcpCluster cluster = new TcpCluster(config);
+        TcpCluster cluster = new TcpCluster(clusterConfig);
 
         StateMachine stateMachine = new DatabaseStateMachine();
-        ControllerNode thisNode = clusterService.getLocalNode();
         // FIXME resolve Chronicle + OSGi issue
         //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
-        Log consensusLog = new InMemoryLog();
+        Log consensusLog = new KryoRegisteredInMemoryLog();
 
         copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
         copycat.start();
@@ -102,6 +113,7 @@
 
     @Deactivate
     public void deactivate() {
+        clusterService.removeListener(clusterEventListener);
         copycat.stop();
         log.info("Stopped.");
     }
@@ -179,6 +191,46 @@
 
     }
 
+    private final class InternalClusterEventListener
+            implements ClusterEventListener {
+
+        @Override
+        public void event(ClusterEvent event) {
+            // TODO: Not every node should be part of the consensus ring.
+
+            final ControllerNode node = event.subject();
+            final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+                                                      node.tcpPort());
+
+            log.trace("{}", event);
+            switch (event.type()) {
+            case INSTANCE_ACTIVATED:
+            case INSTANCE_ADDED:
+                log.info("{} was added to the cluster", tcpMember);
+                clusterConfig.addRemoteMember(tcpMember);
+                break;
+            case INSTANCE_DEACTIVATED:
+            case INSTANCE_REMOVED:
+                log.info("{} was removed from the cluster", tcpMember);
+                clusterConfig.removeRemoteMember(tcpMember);
+                break;
+            default:
+                break;
+            }
+            log.info("Current cluster: {}", clusterConfig.getMembers());
+        }
+
+    }
+
+    public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
+        public KryoRegisteredInMemoryLog() {
+            super();
+            // required to deserialize object across bundles
+            super.kryo.register(TcpMember.class, new TcpMemberSerializer());
+            super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
+        }
+    }
+
     private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
 
         private final R result;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java
new file mode 100644
index 0000000..48887b9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpClusterConfigSerializer.java
@@ -0,0 +1,30 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.Collection;
+
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class TcpClusterConfigSerializer extends Serializer<TcpClusterConfig> {
+
+    @Override
+    public void write(Kryo kryo, Output output, TcpClusterConfig object) {
+        kryo.writeClassAndObject(output, object.getLocalMember());
+        kryo.writeClassAndObject(output, object.getRemoteMembers());
+    }
+
+    @Override
+    public TcpClusterConfig read(Kryo kryo, Input input,
+                                 Class<TcpClusterConfig> type) {
+        TcpMember localMember = (TcpMember) kryo.readClassAndObject(input);
+        @SuppressWarnings("unchecked")
+        Collection<TcpMember> remoteMembers = (Collection<TcpMember>) kryo.readClassAndObject(input);
+        return new TcpClusterConfig(localMember, remoteMembers);
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java
new file mode 100644
index 0000000..e729f9b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TcpMemberSerializer.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.service.impl;
+
+import net.kuujo.copycat.cluster.TcpMember;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class TcpMemberSerializer extends Serializer<TcpMember> {
+
+    @Override
+    public void write(Kryo kryo, Output output, TcpMember object) {
+        output.writeString(object.host());
+        output.writeInt(object.port());
+    }
+
+    @Override
+    public TcpMember read(Kryo kryo, Input input, Class<TcpMember> type) {
+        String host = input.readString();
+        int port = input.readInt();
+        return new TcpMember(host, port);
+    }
+}