Support for running copycat on a subset of ONOS cluster nodes.
This change ensures DatabaseService on each node does the right thing, irrespective of whether the node is part of Raft cluster or not.

Change-Id: I1e8976d56b3a2892d5c7ecbb46c247770a633860
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index 97cf50e..b2f5c2b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -13,6 +13,7 @@
 
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.LeaderElectEvent;
 import net.kuujo.copycat.internal.log.ConfigurationEntry;
 import net.kuujo.copycat.internal.log.CopycatEntry;
 import net.kuujo.copycat.internal.log.OperationEntry;
@@ -103,6 +104,7 @@
             .register(OperationEntry.class)
             .register(TcpClusterConfig.class)
             .register(TcpMember.class)
+            .register(LeaderElectEvent.class)
             .build();
 
     private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 27e04c4..71608ef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -13,13 +13,15 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import net.kuujo.copycat.cluster.Member;
 import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
 import net.kuujo.copycat.protocol.SubmitRequest;
 import net.kuujo.copycat.protocol.SubmitResponse;
 import net.kuujo.copycat.spi.protocol.ProtocolClient;
 
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchWriteRequest;
 import org.onlab.onos.store.service.DatabaseException;
@@ -31,7 +33,7 @@
 /**
  * Client for interacting with the Copycat Raft cluster.
  */
-public class DatabaseClient implements EventHandler<LeaderElectEvent> {
+public class DatabaseClient implements ClusterMessageHandler {
 
     private static final int RETRIES = 5;
 
@@ -41,24 +43,28 @@
 
     private final DatabaseProtocolService protocol;
     private volatile ProtocolClient client = null;
-    private volatile TcpMember currentLeader = null;
-
+    private volatile Member currentLeader = null;
+    private volatile long currentLeaderTerm = 0;
 
     public DatabaseClient(DatabaseProtocolService protocol) {
         this.protocol = checkNotNull(protocol);
     }
 
-    // FIXME This handler relies on a fact that local node is part of Raft cluster
     @Override
-    public void handle(LeaderElectEvent event) {
-        final TcpMember newLeader = event.leader();
-        if (newLeader != null && !newLeader.equals(currentLeader)) {
-            log.info("{} became the new leader", newLeader);
+    public void handle(ClusterMessage message) {
+        LeaderElectEvent event =
+                ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+        TcpMember newLeader = event.leader();
+        long newLeaderTerm = event.term();
+        if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
+            log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
             ProtocolClient prevClient = client;
-            ProtocolClient newclient = protocol.createClient(newLeader);
-            newclient.connect();
-            client = newclient;
+            ProtocolClient newClient = protocol.createClient(newLeader);
+            newClient.connect();
+            client = newClient;
             currentLeader = newLeader;
+            currentLeaderTerm = newLeaderTerm;
+
             if (prevClient != null) {
                 prevClient.close();
             }
@@ -80,7 +86,6 @@
             while (currentLeader == null) {
                 Thread.sleep(200);
             }
-            log.info("Leader appeared: {}", currentLeader);
             return;
         } catch (InterruptedException e) {
             log.error("Interrupted while waiting for Leader", e);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 6fcd6b9..37e75cf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -19,6 +19,7 @@
 import net.kuujo.copycat.cluster.TcpCluster;
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
 import net.kuujo.copycat.log.Log;
 
@@ -35,6 +36,8 @@
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchReadResult;
 import org.onlab.onos.store.service.BatchWriteRequest;
@@ -86,6 +89,9 @@
     // initial member configuration file path
     private String initialMemberConfig = DEFAULT_MEMBER_FILE;
 
+    public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
+            new MessageSubject("raft-leader-election-event");
+
     private Copycat copycat;
     private DatabaseClient client;
 
@@ -102,8 +108,6 @@
     @Activate
     public void activate() throws InterruptedException, ExecutionException {
 
-        // TODO: Not every node should be part of the consensus ring.
-
         // load tablet configuration
         File file = new File(CONFIG_DIR, initialMemberConfig);
         log.info("Loading config: {}", file.getAbsolutePath());
@@ -117,16 +121,16 @@
 
         // load default tablet configuration and start copycat
         clusterConfig = new TcpClusterConfig();
-        Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
-        if (defaultMember == null || defaultMember.isEmpty()) {
-            log.error("No member found in [{}] tablet configuration.",
+        Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
+        if (defaultMembers == null || defaultMembers.isEmpty()) {
+            log.error("No members found in [{}] tablet configuration.",
                       DEFAULT_TABLET);
             throw new IllegalStateException("No member found in tablet configuration");
 
         }
 
         final ControllerNode localNode = clusterService.getLocalNode();
-        for (ControllerNode member : defaultMember) {
+        for (ControllerNode member : defaultMembers) {
             final TcpMember tcpMember = new TcpMember(member.ip().toString(),
                                                       member.tcpPort());
             if (localNode.equals(member)) {
@@ -136,6 +140,61 @@
             }
         }
 
+        if (clusterConfig.getLocalMember() != null) {
+
+            // Wait for a minimum viable Raft cluster to boot up.
+            waitForClusterQuorum();
+
+            final TcpCluster cluster;
+            synchronized (clusterConfig) {
+                // Create the cluster.
+                cluster = new TcpCluster(clusterConfig);
+            }
+            log.info("Starting cluster: {}", cluster);
+
+            DatabaseEntryExpirationTracker expirationTracker =
+                    new DatabaseEntryExpirationTracker(
+                            clusterConfig.getLocalMember(),
+                            clusterService.getLocalNode(),
+                            clusterCommunicator,
+                            this);
+
+            DatabaseStateMachine stateMachine = new DatabaseStateMachine();
+            stateMachine.addEventListener(expirationTracker);
+            Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
+                    ClusterMessagingProtocol.SERIALIZER);
+
+            copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
+            copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
+            copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
+        }
+
+        client = new DatabaseClient(copycatMessagingProtocol);
+        clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
+
+        // Starts copycat if this node is a participant
+        // of the Raft cluster.
+        if (copycat != null) {
+            copycat.start().get();
+        }
+
+        client.waitForLeader();
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterService.removeListener(clusterEventListener);
+        // TODO: ClusterCommunicationService must support more than one
+        // handler per message subject.
+        clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
+        if (copycat != null) {
+            copycat.stop();
+        }
+        log.info("Stopped.");
+    }
+
+    private void waitForClusterQuorum() {
         // note: from this point beyond, clusterConfig requires synchronization
         clusterEventLatch = new CountDownLatch(1);
         clusterEventListener = new InternalClusterEventListener();
@@ -153,46 +212,6 @@
                 log.info("Interrupted waiting for others", e);
             }
         }
-
-        final TcpCluster cluster;
-        synchronized (clusterConfig) {
-            // Create the cluster.
-            cluster = new TcpCluster(clusterConfig);
-        }
-        log.info("Starting cluster: {}", cluster);
-
-        DatabaseEntryExpirationTracker expirationTracker =
-                new DatabaseEntryExpirationTracker(
-                        clusterConfig.getLocalMember(),
-                        clusterService.getLocalNode(),
-                        clusterCommunicator,
-                        this);
-
-        DatabaseStateMachine stateMachine = new DatabaseStateMachine();
-        stateMachine.addEventListener(expirationTracker);
-        Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
-                                        ClusterMessagingProtocol.SERIALIZER);
-
-        copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
-
-        client = new DatabaseClient(copycatMessagingProtocol);
-
-
-        copycat.event(LeaderElectEvent.class).registerHandler(client);
-        copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
-
-        copycat.start().get();
-
-        client.waitForLeader();
-
-        log.info("Started.");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        clusterService.removeListener(clusterEventListener);
-        copycat.stop();
-        log.info("Stopped.");
     }
 
     @Override
@@ -353,6 +372,24 @@
         }
     }
 
+    private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+        @Override
+        public void handle(LeaderElectEvent event) {
+            try {
+                if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+                    // This node just became the leader.
+                    clusterCommunicator.broadcastIncludeSelf(
+                            new ClusterMessage(
+                                    clusterService.getLocalNode().id(),
+                                    RAFT_LEADER_ELECTION_EVENT,
+                                    ClusterMessagingProtocol.SERIALIZER.encode(event)));
+                }
+            } catch (IOException e) {
+                log.error("Failed to broadcast raft leadership change event", e);
+            }
+        }
+    }
+
     private final class InternalClusterEventListener
     implements ClusterEventListener {