Support for running copycat on a subset of ONOS cluster nodes.
This change ensures DatabaseService on each node does the right thing, irrespective of whether the node is part of Raft cluster or not.

Change-Id: I1e8976d56b3a2892d5c7ecbb46c247770a633860
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 27e04c4..71608ef 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -13,13 +13,15 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import net.kuujo.copycat.cluster.Member;
 import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
 import net.kuujo.copycat.protocol.SubmitRequest;
 import net.kuujo.copycat.protocol.SubmitResponse;
 import net.kuujo.copycat.spi.protocol.ProtocolClient;
 
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchWriteRequest;
 import org.onlab.onos.store.service.DatabaseException;
@@ -31,7 +33,7 @@
 /**
  * Client for interacting with the Copycat Raft cluster.
  */
-public class DatabaseClient implements EventHandler<LeaderElectEvent> {
+public class DatabaseClient implements ClusterMessageHandler {
 
     private static final int RETRIES = 5;
 
@@ -41,24 +43,28 @@
 
     private final DatabaseProtocolService protocol;
     private volatile ProtocolClient client = null;
-    private volatile TcpMember currentLeader = null;
-
+    private volatile Member currentLeader = null;
+    private volatile long currentLeaderTerm = 0;
 
     public DatabaseClient(DatabaseProtocolService protocol) {
         this.protocol = checkNotNull(protocol);
     }
 
-    // FIXME This handler relies on a fact that local node is part of Raft cluster
     @Override
-    public void handle(LeaderElectEvent event) {
-        final TcpMember newLeader = event.leader();
-        if (newLeader != null && !newLeader.equals(currentLeader)) {
-            log.info("{} became the new leader", newLeader);
+    public void handle(ClusterMessage message) {
+        LeaderElectEvent event =
+                ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+        TcpMember newLeader = event.leader();
+        long newLeaderTerm = event.term();
+        if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
+            log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
             ProtocolClient prevClient = client;
-            ProtocolClient newclient = protocol.createClient(newLeader);
-            newclient.connect();
-            client = newclient;
+            ProtocolClient newClient = protocol.createClient(newLeader);
+            newClient.connect();
+            client = newClient;
             currentLeader = newLeader;
+            currentLeaderTerm = newLeaderTerm;
+
             if (prevClient != null) {
                 prevClient.close();
             }
@@ -80,7 +86,6 @@
             while (currentLeader == null) {
                 Thread.sleep(200);
             }
-            log.info("Leader appeared: {}", currentLeader);
             return;
         } catch (InterruptedException e) {
             log.error("Interrupted while waiting for Leader", e);