ClusterService implementation that relies on accrual failure detector for determining node up/down status.
Initially off by default, until futher testing is done.

Change-Id: I0ac8850d76af717e7804d4503bedb227d5894a0a
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 2f0f977..ead1ab0 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -32,6 +32,13 @@
     <description>ONOS Gossip based distributed store subsystems</description>
 
     <dependencies>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>3.2</version>
+        </dependency>
+
         <dependency>
             <groupId>org.onosproject</groupId>
             <artifactId>onos-core-serializers</artifactId>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java
new file mode 100644
index 0000000..4abcbc5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java
@@ -0,0 +1,320 @@
+package org.onosproject.store.cluster.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.NettyMessagingService;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterAdminService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractListenerRegistry;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * ClusterService implementation that employs an accrual failure
+ * detector to identify cluster member up/down status.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class ClusterManager implements ClusterService, ClusterAdminService {
+
+    private final Logger log = getLogger(getClass());
+
+    protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
+    listenerRegistry = new AbstractListenerRegistry<>();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+    // TODO: make these configurable.
+    private static final int HEARTBEAT_FD_PORT = 2419;
+    private static final int HEARTBEAT_INTERVAL_MS = 100;
+    private static final int PHI_FAILURE_THRESHOLD = 10;
+
+    private static final String CONFIG_DIR = "../config";
+    private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
+
+    private ClusterDefinitionStore clusterDefinition;
+
+    private Set<ControllerNode> seedNodes;
+    private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
+    private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
+    private NettyMessagingService messagingService = new NettyMessagingService();
+    private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+            groupedThreads("onos/cluster/membership", "heartbeat-sender"));
+    private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
+            groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
+
+    private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
+
+
+    private PhiAccrualFailureDetector failureDetector;
+
+    private ControllerNode localNode;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(HeartbeatMessage.class)
+                .build()
+                .populate(1);
+        }
+    };
+
+    private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+
+    @Activate
+    public void activate() {
+
+        File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
+        clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+        try {
+            seedNodes = ImmutableSet.copyOf(clusterDefinition.read());
+        } catch (IOException e) {
+            log.warn("Failed to read cluster definition.", e);
+        }
+
+        seedNodes.forEach(node -> {
+            allNodes.put(node.id(), node);
+            nodeStates.put(node.id(), State.INACTIVE);
+        });
+
+        establishSelfIdentity();
+
+        messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
+
+        try {
+            messagingService.activate();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("Failed to cleanly initialize membership and"
+                    + " failure detector communication channel.", e);
+        }
+        messagingService.registerHandler(
+                HEARTBEAT_MESSAGE,
+                new HeartbeatMessageHandler(),
+                heartBeatMessageHandler);
+
+        eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
+        failureDetector = new PhiAccrualFailureDetector();
+
+        heartBeatSender.scheduleWithFixedDelay(
+                this::heartbeat,
+                0,
+                HEARTBEAT_INTERVAL_MS,
+                TimeUnit.MILLISECONDS);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        try {
+            messagingService.deactivate();
+        } catch (Exception e) {
+            log.trace("Failed to cleanly shutdown cluster membership messaging", e);
+        }
+
+        heartBeatSender.shutdown();
+        heartBeatMessageHandler.shutdown();
+        eventDispatcher.removeSink(ClusterEvent.class);
+
+        log.info("Stopped");
+    }
+
+    @Override
+    public ControllerNode getLocalNode() {
+        return localNode;
+    }
+
+    @Override
+    public Set<ControllerNode> getNodes() {
+        return ImmutableSet.copyOf(allNodes.values());
+    }
+
+    @Override
+    public ControllerNode getNode(NodeId nodeId) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return allNodes.get(nodeId);
+    }
+
+    @Override
+    public State getState(NodeId nodeId) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return nodeStates.get(nodeId);
+    }
+
+    @Override
+    public void addListener(ClusterEventListener listener) {
+        checkNotNull(listener, "Listener must not be null");
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(ClusterEventListener listener) {
+        checkNotNull(listener, "Listener must not be null");
+        listenerRegistry.removeListener(listener);
+    }
+
+    @Override
+    public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        checkNotNull(ip, "IP address must not be null");
+        checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
+        ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
+        allNodes.put(node.id(), node);
+        nodeStates.put(nodeId, State.INACTIVE);
+        eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+        return node;
+    }
+
+    @Override
+    public void removeNode(NodeId nodeId) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        ControllerNode node = allNodes.remove(nodeId);
+        if (node != null) {
+            nodeStates.remove(nodeId);
+            eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
+        }
+    }
+
+    private void establishSelfIdentity() {
+        try {
+            IpAddress ip = findLocalIp();
+            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+            allNodes.put(localNode.id(), localNode);
+            nodeStates.put(localNode.id(), State.ACTIVE);
+            log.info("Local Node: {}", localNode);
+        } catch (SocketException e) {
+            throw new IllegalStateException("Cannot determine local IP", e);
+        }
+    }
+
+    private void heartbeat() {
+        try {
+            Set<ControllerNode> peers = allNodes.values()
+                    .stream()
+                    .filter(node -> !(node.id().equals(localNode.id())))
+                    .collect(Collectors.toSet());
+            byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
+            peers.forEach((node) -> {
+                heartbeatToPeer(hbMessagePayload, node);
+                State currentState = nodeStates.get(node.id());
+                double phi = failureDetector.phi(node.id());
+                if (phi >= PHI_FAILURE_THRESHOLD) {
+                    if (currentState == State.ACTIVE) {
+                        nodeStates.put(node.id(), State.INACTIVE);
+                        notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
+                    }
+                } else {
+                    if (currentState == State.INACTIVE) {
+                        nodeStates.put(node.id(), State.ACTIVE);
+                        notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.trace("Failed to send heartbeat", e);
+        }
+    }
+
+    private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
+        if (newState == State.ACTIVE) {
+            eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, allNodes.get(nodeId)));
+        } else {
+            eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, allNodes.get(nodeId)));
+        }
+    }
+
+    private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
+        Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+        try {
+            messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
+        } catch (IOException e) {
+            log.trace("Sending heartbeat to {} failed", remoteEp, e);
+        }
+    }
+
+    private class HeartbeatMessageHandler implements MessageHandler {
+        @Override
+        public void handle(Message message) throws IOException {
+            HeartbeatMessage hb = SERIALIZER.decode(message.payload());
+            failureDetector.report(hb.source().id());
+            hb.knownPeers().forEach(node -> {
+                allNodes.put(node.id(), node);
+            });
+        }
+    }
+
+    private class HeartbeatMessage {
+        private ControllerNode source;
+        private Set<ControllerNode> knownPeers;
+
+        public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
+            this.source = source;
+            this.knownPeers = ImmutableSet.copyOf(members);
+        }
+
+        public ControllerNode source() {
+            return source;
+        }
+
+        public Set<ControllerNode> knownPeers() {
+            return knownPeers;
+        }
+    }
+
+    private IpAddress findLocalIp() throws SocketException {
+        NetworkInterface ni = NetworkInterface.getByName("eth0");
+        Enumeration<InetAddress> inetAddresses =  ni.getInetAddresses();
+
+        while (inetAddresses.hasMoreElements()) {
+            InetAddress ia = inetAddresses.nextElement();
+            if (!ia.isLinkLocalAddress()) {
+                return IpAddress.valueOf(ia);
+            }
+        }
+        throw new IllegalStateException("Unable to determine local ip");
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
new file mode 100644
index 0000000..2ba1066
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
@@ -0,0 +1,103 @@
+package org.onosproject.store.cluster.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.onosproject.cluster.NodeId;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Phi Accrual failure detector.
+ * <p>
+ * Based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
+ */
+public class PhiAccrualFailureDetector {
+    private final Map<NodeId, History> states = Maps.newConcurrentMap();
+
+    // TODO: make these configurable.
+    private static final int WINDOW_SIZE = 250;
+    private static final int MIN_SAMPLES = 25;
+
+    // If a node does not have any heartbeats, this is the phi
+    // value to report. Indicates the node is inactive (from the
+    // detectors perspective.
+    private static final double BOOTSTRAP_PHI_VALUE = 100.0;
+
+    /**
+     * Report a new heart beat for the specified node id.
+     * @param nodeId node id
+     */
+    public void report(NodeId nodeId) {
+        report(nodeId, System.currentTimeMillis());
+    }
+
+    /**
+     * Report a new heart beat for the specified node id.
+     * @param nodeId node id
+     * @param arrivalTime arrival time
+     */
+    public void report(NodeId nodeId, long arrivalTime) {
+        checkNotNull(nodeId, "NodeId must not be null");
+        checkArgument(arrivalTime >= 0, "arrivalTime must not be negative");
+        History nodeState =
+                states.computeIfAbsent(nodeId, key -> new History());
+        synchronized (nodeState) {
+            long latestHeartbeat = nodeState.latestHeartbeatTime();
+            if (latestHeartbeat != -1) {
+                nodeState.samples().addValue(arrivalTime - latestHeartbeat);
+            }
+            nodeState.setLatestHeartbeatTime(arrivalTime);
+        }
+    }
+
+    /**
+     * Compute phi for the specified node id.
+     * @param nodeId node id
+     * @return phi value
+     */
+    public Double phi(NodeId nodeId) {
+        if (!states.containsKey(nodeId)) {
+            return BOOTSTRAP_PHI_VALUE;
+        }
+        checkNotNull(nodeId, "NodeId must not be null");
+        History nodeState = states.get(nodeId);
+        synchronized (nodeState) {
+            long latestHeartbeat = nodeState.latestHeartbeatTime();
+            DescriptiveStatistics samples = nodeState.samples();
+            if (latestHeartbeat == -1 || samples.getN() < MIN_SAMPLES) {
+                return 0.0;
+            }
+            return computePhi(samples, latestHeartbeat, System.currentTimeMillis());
+        }
+    }
+
+    private double computePhi(DescriptiveStatistics samples, long tLast, long tNow) {
+        long size = samples.getN();
+        long t = tNow - tLast;
+        return (size > 0)
+               ? (1.0 / Math.log(10.0)) * t / samples.getMean()
+               : BOOTSTRAP_PHI_VALUE;
+    }
+
+    private static class History {
+        DescriptiveStatistics samples =
+                new DescriptiveStatistics(WINDOW_SIZE);
+        long lastHeartbeatTime = -1;
+
+        public DescriptiveStatistics samples() {
+            return samples;
+        }
+
+        public long latestHeartbeatTime() {
+            return lastHeartbeatTime;
+        }
+
+        public void setLatestHeartbeatTime(long value) {
+            lastHeartbeatTime = value;
+        }
+    }
+}
\ No newline at end of file
diff --git a/features/features.xml b/features/features.xml
index 3a5d5e5..0aabf66 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -33,6 +33,7 @@
         <bundle>mvn:io.netty/netty-codec/4.0.23.Final</bundle>
         <bundle>mvn:io.netty/netty-transport-native-epoll/4.0.23.Final</bundle>
         <bundle>mvn:commons-pool/commons-pool/1.6</bundle>
+        <bundle>mvn:org.apache.commons/commons-math3/3.2</bundle>
 
         <bundle>mvn:joda-time/joda-time/2.5</bundle>