DatabaseService subsystem: add admin commands, etc.

Change-Id: I24124579f5e0b03ccbf35a03230ae5a7aff95f22
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 6a84e04..f328842 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -2,21 +2,30 @@
 
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import net.kuujo.copycat.Copycat;
 import net.kuujo.copycat.StateMachine;
 import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.Member;
 import net.kuujo.copycat.cluster.TcpCluster;
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
 import net.kuujo.copycat.log.InMemoryLog;
 import net.kuujo.copycat.log.Log;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -27,6 +36,8 @@
 import org.onlab.onos.cluster.ClusterEventListener;
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.service.DatabaseAdminService;
 import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.DatabaseService;
@@ -38,8 +49,12 @@
 import org.onlab.onos.store.service.WriteAborted;
 import org.onlab.onos.store.service.WriteRequest;
 import org.onlab.onos.store.service.WriteResult;
+import org.onlab.packet.IpAddress;
 import org.slf4j.Logger;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
 /**
  * Strongly consistent and durable state management service based on
  * Copycat implementation of Raft consensus protocol.
@@ -56,7 +71,19 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DatabaseProtocolService copycatMessagingProtocol;
 
-    public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
+    public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
+
+    // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
+    // TODO: Get the path to /opt/onos/config
+    private static final String CONFIG_DIR = "../config";
+
+    private static final String DEFAULT_MEMBER_FILE = "tablets.json";
+
+    private static final String DEFAULT_TABLET = "default";
+
+    // TODO: make this configurable
+    // initial member configuration file path
+    private String initialMemberConfig = DEFAULT_MEMBER_FILE;
 
     private Copycat copycat;
     private DatabaseClient client;
@@ -65,49 +92,75 @@
     private ClusterConfig<TcpMember> clusterConfig;
 
     private CountDownLatch clusterEventLatch;
-
     private ClusterEventListener clusterEventListener;
 
+    private Map<String, Set<DefaultControllerNode>> tabletMembers;
+
+    private boolean autoAddMember = false;
+
     @Activate
     public void activate() {
 
         // TODO: Not every node should be part of the consensus ring.
 
-        final ControllerNode localNode = clusterService.getLocalNode();
-        TcpMember localMember =
-                new TcpMember(
-                        localNode.ip().toString(),
-                        localNode.tcpPort());
+        // load tablet configuration
+        File file = new File(CONFIG_DIR, initialMemberConfig);
+        log.info("Loading config: {}", file.getAbsolutePath());
+        TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
+        try {
+            tabletMembers = tabletDef.read();
+        } catch (IOException e) {
+            log.error("Failed to load tablet config {}", file);
+            throw new IllegalStateException("Failed to load tablet config", e);
+        }
 
+        // load default tablet configuration and start copycat
         clusterConfig = new TcpClusterConfig();
-        clusterConfig.setLocalMember(localMember);
+        Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
+        if (defaultMember == null || defaultMember.isEmpty()) {
+            log.error("No member found in [{}] tablet configuration.",
+                      DEFAULT_TABLET);
+            throw new IllegalStateException("No member found in tablet configuration");
 
-        List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
+        }
 
+        final ControllerNode localNode = clusterService.getLocalNode();
+        TcpMember clientHandler = null;
+        for (ControllerNode member : defaultMember) {
+            final TcpMember tcpMember = new TcpMember(member.ip().toString(),
+                                                      member.tcpPort());
+            if (localNode.equals(member)) {
+                clientHandler = tcpMember;
+                clusterConfig.setLocalMember(tcpMember);
+            } else {
+                clusterConfig.addRemoteMember(tcpMember);
+            }
+        }
+
+        // TODO should be removed after DatabaseClient refactoring
+        if (clientHandler == null) {
+            Set<TcpMember> members = clusterConfig.getMembers();
+            if (members.isEmpty()) {
+                log.error("No member found in [{}] tablet configuration.",
+                          DEFAULT_TABLET);
+                throw new IllegalStateException("No member found in tablet configuration");
+            }
+            int position = RandomUtils.nextInt(0, members.size());
+            clientHandler = Iterables.get(members, position);
+        }
+
+        // note: from this point beyond, clusterConfig requires synchronization
         clusterEventLatch = new CountDownLatch(1);
         clusterEventListener = new InternalClusterEventListener();
         clusterService.addListener(clusterEventListener);
 
-        // note: from this point beyond, clusterConfig requires synchronization
-
-        for (ControllerNode node : clusterService.getNodes()) {
-            TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
-            if (!member.equals(localMember)) {
-                remoteMembers.add(member);
-            }
-        }
-
-        if (remoteMembers.isEmpty()) {
-            log.info("This node is the only node in the cluster.  "
-                    + "Waiting for others to show up.");
-            // FIXME: hack trying to relax cases forming multiple consensus rings.
-            // add seed node configuration to avoid this
-
-            // If the node is alone on it's own, wait some time
-            // hoping other will come up soon
+        if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
+            // current cluster size smaller then expected
             try {
                 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
-                    log.info("Starting as single node cluster");
+                    log.info("Starting with {}/{} nodes cluster",
+                             clusterService.getNodes().size(),
+                             clusterConfig.getMembers().size());
                 }
             } catch (InterruptedException e) {
                 log.info("Interrupted waiting for others", e);
@@ -116,8 +169,6 @@
 
         final TcpCluster cluster;
         synchronized (clusterConfig) {
-            clusterConfig.addRemoteMembers(remoteMembers);
-
             // Create the cluster.
             cluster = new TcpCluster(clusterConfig);
         }
@@ -131,7 +182,8 @@
         copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
         copycat.start();
 
-        client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
+        // FIXME Redo DatabaseClient. Needs fall back mechanism etc.
+        client = new DatabaseClient(copycatMessagingProtocol.createClient(clientHandler));
 
         log.info("Started.");
     }
@@ -233,22 +285,34 @@
             final TcpMember tcpMember = new TcpMember(node.ip().toString(),
                                                       node.tcpPort());
 
-            log.trace("{}", event);
             switch (event.type()) {
             case INSTANCE_ACTIVATED:
             case INSTANCE_ADDED:
-                log.info("{} was added to the cluster", tcpMember);
-                synchronized (clusterConfig) {
-                    clusterConfig.addRemoteMember(tcpMember);
+                if (autoAddMember) {
+                    synchronized (clusterConfig) {
+                        if (!clusterConfig.getMembers().contains(tcpMember)) {
+                            log.info("{} was automatically added to the cluster", tcpMember);
+                            clusterConfig.addRemoteMember(tcpMember);
+                        }
+                    }
                 }
                 break;
             case INSTANCE_DEACTIVATED:
             case INSTANCE_REMOVED:
-                // FIXME to be replaced with admin interface
-//                log.info("{} was removed from the cluster", tcpMember);
-//                synchronized (clusterConfig) {
-//                    clusterConfig.removeRemoteMember(tcpMember);
-//                }
+                if (autoAddMember) {
+                    Set<DefaultControllerNode> members
+                        = tabletMembers.getOrDefault(DEFAULT_TABLET,
+                                                     Collections.emptySet());
+                    // remove only if not the initial members
+                    if (!members.contains(node)) {
+                        synchronized (clusterConfig) {
+                            if (clusterConfig.getMembers().contains(tcpMember)) {
+                                log.info("{} was automatically removed from the cluster", tcpMember);
+                                clusterConfig.removeRemoteMember(tcpMember);
+                            }
+                        }
+                    }
+                }
                 break;
             default:
                 break;
@@ -307,4 +371,58 @@
             }
         }
     }
+
+    @Override
+    public void addMember(final ControllerNode node) {
+        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+                                                  node.tcpPort());
+        log.info("{} was added to the cluster", tcpMember);
+        synchronized (clusterConfig) {
+            clusterConfig.addRemoteMember(tcpMember);
+        }
+    }
+
+    @Override
+    public void removeMember(final ControllerNode node) {
+        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+                                                  node.tcpPort());
+      log.info("{} was removed from the cluster", tcpMember);
+      synchronized (clusterConfig) {
+          clusterConfig.removeRemoteMember(tcpMember);
+      }
+    }
+
+    @Override
+    public Collection<ControllerNode> listMembers() {
+        if (copycat == null) {
+            return ImmutableList.of();
+        }
+        Set<ControllerNode> members = new HashSet<>();
+        for (Member member : copycat.cluster().members()) {
+            if (member instanceof TcpMember) {
+                final TcpMember tcpMember = (TcpMember) member;
+                // TODO assuming tcpMember#host to be IP address,
+                // but if not lookup DNS, etc. first
+                IpAddress ip = IpAddress.valueOf(tcpMember.host());
+                int tcpPort = tcpMember.port();
+                NodeId id = getNodeIdFromIp(ip, tcpPort);
+                if (id == null) {
+                    log.info("No NodeId found for {}:{}", ip, tcpPort);
+                    continue;
+                }
+                members.add(new DefaultControllerNode(id, ip, tcpPort));
+            }
+        }
+        return members;
+    }
+
+    private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
+        for (ControllerNode node : clusterService.getNodes()) {
+            if (node.ip().equals(ip) &&
+                node.tcpPort() == tcpPort) {
+                return node.id();
+            }
+        }
+        return null;
+    }
 }