Support for running copycat on a subset of ONOS cluster nodes.
This change ensures DatabaseService on each node does the right thing, irrespective of whether the node is part of Raft cluster or not.

Change-Id: I1e8976d56b3a2892d5c7ecbb46c247770a633860
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 6fcd6b9..37e75cf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -19,6 +19,7 @@
 import net.kuujo.copycat.cluster.TcpCluster;
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
 import net.kuujo.copycat.log.Log;
 
@@ -35,6 +36,8 @@
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchReadResult;
 import org.onlab.onos.store.service.BatchWriteRequest;
@@ -86,6 +89,9 @@
     // initial member configuration file path
     private String initialMemberConfig = DEFAULT_MEMBER_FILE;
 
+    public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
+            new MessageSubject("raft-leader-election-event");
+
     private Copycat copycat;
     private DatabaseClient client;
 
@@ -102,8 +108,6 @@
     @Activate
     public void activate() throws InterruptedException, ExecutionException {
 
-        // TODO: Not every node should be part of the consensus ring.
-
         // load tablet configuration
         File file = new File(CONFIG_DIR, initialMemberConfig);
         log.info("Loading config: {}", file.getAbsolutePath());
@@ -117,16 +121,16 @@
 
         // load default tablet configuration and start copycat
         clusterConfig = new TcpClusterConfig();
-        Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
-        if (defaultMember == null || defaultMember.isEmpty()) {
-            log.error("No member found in [{}] tablet configuration.",
+        Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
+        if (defaultMembers == null || defaultMembers.isEmpty()) {
+            log.error("No members found in [{}] tablet configuration.",
                       DEFAULT_TABLET);
             throw new IllegalStateException("No member found in tablet configuration");
 
         }
 
         final ControllerNode localNode = clusterService.getLocalNode();
-        for (ControllerNode member : defaultMember) {
+        for (ControllerNode member : defaultMembers) {
             final TcpMember tcpMember = new TcpMember(member.ip().toString(),
                                                       member.tcpPort());
             if (localNode.equals(member)) {
@@ -136,6 +140,61 @@
             }
         }
 
+        if (clusterConfig.getLocalMember() != null) {
+
+            // Wait for a minimum viable Raft cluster to boot up.
+            waitForClusterQuorum();
+
+            final TcpCluster cluster;
+            synchronized (clusterConfig) {
+                // Create the cluster.
+                cluster = new TcpCluster(clusterConfig);
+            }
+            log.info("Starting cluster: {}", cluster);
+
+            DatabaseEntryExpirationTracker expirationTracker =
+                    new DatabaseEntryExpirationTracker(
+                            clusterConfig.getLocalMember(),
+                            clusterService.getLocalNode(),
+                            clusterCommunicator,
+                            this);
+
+            DatabaseStateMachine stateMachine = new DatabaseStateMachine();
+            stateMachine.addEventListener(expirationTracker);
+            Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
+                    ClusterMessagingProtocol.SERIALIZER);
+
+            copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
+            copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
+            copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
+        }
+
+        client = new DatabaseClient(copycatMessagingProtocol);
+        clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
+
+        // Starts copycat if this node is a participant
+        // of the Raft cluster.
+        if (copycat != null) {
+            copycat.start().get();
+        }
+
+        client.waitForLeader();
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterService.removeListener(clusterEventListener);
+        // TODO: ClusterCommunicationService must support more than one
+        // handler per message subject.
+        clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
+        if (copycat != null) {
+            copycat.stop();
+        }
+        log.info("Stopped.");
+    }
+
+    private void waitForClusterQuorum() {
         // note: from this point beyond, clusterConfig requires synchronization
         clusterEventLatch = new CountDownLatch(1);
         clusterEventListener = new InternalClusterEventListener();
@@ -153,46 +212,6 @@
                 log.info("Interrupted waiting for others", e);
             }
         }
-
-        final TcpCluster cluster;
-        synchronized (clusterConfig) {
-            // Create the cluster.
-            cluster = new TcpCluster(clusterConfig);
-        }
-        log.info("Starting cluster: {}", cluster);
-
-        DatabaseEntryExpirationTracker expirationTracker =
-                new DatabaseEntryExpirationTracker(
-                        clusterConfig.getLocalMember(),
-                        clusterService.getLocalNode(),
-                        clusterCommunicator,
-                        this);
-
-        DatabaseStateMachine stateMachine = new DatabaseStateMachine();
-        stateMachine.addEventListener(expirationTracker);
-        Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
-                                        ClusterMessagingProtocol.SERIALIZER);
-
-        copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
-
-        client = new DatabaseClient(copycatMessagingProtocol);
-
-
-        copycat.event(LeaderElectEvent.class).registerHandler(client);
-        copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
-
-        copycat.start().get();
-
-        client.waitForLeader();
-
-        log.info("Started.");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        clusterService.removeListener(clusterEventListener);
-        copycat.stop();
-        log.info("Stopped.");
     }
 
     @Override
@@ -353,6 +372,24 @@
         }
     }
 
+    private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
+        @Override
+        public void handle(LeaderElectEvent event) {
+            try {
+                if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
+                    // This node just became the leader.
+                    clusterCommunicator.broadcastIncludeSelf(
+                            new ClusterMessage(
+                                    clusterService.getLocalNode().id(),
+                                    RAFT_LEADER_ELECTION_EVENT,
+                                    ClusterMessagingProtocol.SERIALIZER.encode(event)));
+                }
+            } catch (IOException e) {
+                log.error("Failed to broadcast raft leadership change event", e);
+            }
+        }
+    }
+
     private final class InternalClusterEventListener
     implements ClusterEventListener {