ClusterService implementation that relies on accrual failure detector for determining node up/down status.
Initially off by default, until futher testing is done.
Change-Id: I0ac8850d76af717e7804d4503bedb227d5894a0a
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 2f0f977..ead1ab0 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -32,6 +32,13 @@
<description>ONOS Gossip based distributed store subsystems</description>
<dependencies>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>3.2</version>
+ </dependency>
+
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-serializers</artifactId>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java
new file mode 100644
index 0000000..4abcbc5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterManager.java
@@ -0,0 +1,320 @@
+package org.onosproject.store.cluster.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.NettyMessagingService;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterAdminService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractListenerRegistry;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * ClusterService implementation that employs an accrual failure
+ * detector to identify cluster member up/down status.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class ClusterManager implements ClusterService, ClusterAdminService {
+
+ private final Logger log = getLogger(getClass());
+
+ protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
+ listenerRegistry = new AbstractListenerRegistry<>();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ // TODO: make these configurable.
+ private static final int HEARTBEAT_FD_PORT = 2419;
+ private static final int HEARTBEAT_INTERVAL_MS = 100;
+ private static final int PHI_FAILURE_THRESHOLD = 10;
+
+ private static final String CONFIG_DIR = "../config";
+ private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
+
+ private ClusterDefinitionStore clusterDefinition;
+
+ private Set<ControllerNode> seedNodes;
+ private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
+ private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
+ private NettyMessagingService messagingService = new NettyMessagingService();
+ private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-sender"));
+ private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
+
+ private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
+
+
+ private PhiAccrualFailureDetector failureDetector;
+
+ private ControllerNode localNode;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(HeartbeatMessage.class)
+ .build()
+ .populate(1);
+ }
+ };
+
+ private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+
+ @Activate
+ public void activate() {
+
+ File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
+ clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+ try {
+ seedNodes = ImmutableSet.copyOf(clusterDefinition.read());
+ } catch (IOException e) {
+ log.warn("Failed to read cluster definition.", e);
+ }
+
+ seedNodes.forEach(node -> {
+ allNodes.put(node.id(), node);
+ nodeStates.put(node.id(), State.INACTIVE);
+ });
+
+ establishSelfIdentity();
+
+ messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
+
+ try {
+ messagingService.activate();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Failed to cleanly initialize membership and"
+ + " failure detector communication channel.", e);
+ }
+ messagingService.registerHandler(
+ HEARTBEAT_MESSAGE,
+ new HeartbeatMessageHandler(),
+ heartBeatMessageHandler);
+
+ eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
+ failureDetector = new PhiAccrualFailureDetector();
+
+ heartBeatSender.scheduleWithFixedDelay(
+ this::heartbeat,
+ 0,
+ HEARTBEAT_INTERVAL_MS,
+ TimeUnit.MILLISECONDS);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ try {
+ messagingService.deactivate();
+ } catch (Exception e) {
+ log.trace("Failed to cleanly shutdown cluster membership messaging", e);
+ }
+
+ heartBeatSender.shutdown();
+ heartBeatMessageHandler.shutdown();
+ eventDispatcher.removeSink(ClusterEvent.class);
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return localNode;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return ImmutableSet.copyOf(allNodes.values());
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return allNodes.get(nodeId);
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return nodeStates.get(nodeId);
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ checkNotNull(listener, "Listener must not be null");
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ checkNotNull(listener, "Listener must not be null");
+ listenerRegistry.removeListener(listener);
+ }
+
+ @Override
+ public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ checkNotNull(ip, "IP address must not be null");
+ checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
+ ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
+ allNodes.put(node.id(), node);
+ nodeStates.put(nodeId, State.INACTIVE);
+ eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+ return node;
+ }
+
+ @Override
+ public void removeNode(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ ControllerNode node = allNodes.remove(nodeId);
+ if (node != null) {
+ nodeStates.remove(nodeId);
+ eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
+ }
+ }
+
+ private void establishSelfIdentity() {
+ try {
+ IpAddress ip = findLocalIp();
+ localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ allNodes.put(localNode.id(), localNode);
+ nodeStates.put(localNode.id(), State.ACTIVE);
+ log.info("Local Node: {}", localNode);
+ } catch (SocketException e) {
+ throw new IllegalStateException("Cannot determine local IP", e);
+ }
+ }
+
+ private void heartbeat() {
+ try {
+ Set<ControllerNode> peers = allNodes.values()
+ .stream()
+ .filter(node -> !(node.id().equals(localNode.id())))
+ .collect(Collectors.toSet());
+ byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
+ peers.forEach((node) -> {
+ heartbeatToPeer(hbMessagePayload, node);
+ State currentState = nodeStates.get(node.id());
+ double phi = failureDetector.phi(node.id());
+ if (phi >= PHI_FAILURE_THRESHOLD) {
+ if (currentState == State.ACTIVE) {
+ nodeStates.put(node.id(), State.INACTIVE);
+ notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
+ }
+ } else {
+ if (currentState == State.INACTIVE) {
+ nodeStates.put(node.id(), State.ACTIVE);
+ notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
+ }
+ }
+ });
+ } catch (Exception e) {
+ log.trace("Failed to send heartbeat", e);
+ }
+ }
+
+ private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
+ if (newState == State.ACTIVE) {
+ eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, allNodes.get(nodeId)));
+ } else {
+ eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, allNodes.get(nodeId)));
+ }
+ }
+
+ private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
+ Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+ try {
+ messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
+ } catch (IOException e) {
+ log.trace("Sending heartbeat to {} failed", remoteEp, e);
+ }
+ }
+
+ private class HeartbeatMessageHandler implements MessageHandler {
+ @Override
+ public void handle(Message message) throws IOException {
+ HeartbeatMessage hb = SERIALIZER.decode(message.payload());
+ failureDetector.report(hb.source().id());
+ hb.knownPeers().forEach(node -> {
+ allNodes.put(node.id(), node);
+ });
+ }
+ }
+
+ private class HeartbeatMessage {
+ private ControllerNode source;
+ private Set<ControllerNode> knownPeers;
+
+ public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
+ this.source = source;
+ this.knownPeers = ImmutableSet.copyOf(members);
+ }
+
+ public ControllerNode source() {
+ return source;
+ }
+
+ public Set<ControllerNode> knownPeers() {
+ return knownPeers;
+ }
+ }
+
+ private IpAddress findLocalIp() throws SocketException {
+ NetworkInterface ni = NetworkInterface.getByName("eth0");
+ Enumeration<InetAddress> inetAddresses = ni.getInetAddresses();
+
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress ia = inetAddresses.nextElement();
+ if (!ia.isLinkLocalAddress()) {
+ return IpAddress.valueOf(ia);
+ }
+ }
+ throw new IllegalStateException("Unable to determine local ip");
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
new file mode 100644
index 0000000..2ba1066
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/PhiAccrualFailureDetector.java
@@ -0,0 +1,103 @@
+package org.onosproject.store.cluster.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+import org.onosproject.cluster.NodeId;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Phi Accrual failure detector.
+ * <p>
+ * Based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
+ */
+public class PhiAccrualFailureDetector {
+ private final Map<NodeId, History> states = Maps.newConcurrentMap();
+
+ // TODO: make these configurable.
+ private static final int WINDOW_SIZE = 250;
+ private static final int MIN_SAMPLES = 25;
+
+ // If a node does not have any heartbeats, this is the phi
+ // value to report. Indicates the node is inactive (from the
+ // detectors perspective.
+ private static final double BOOTSTRAP_PHI_VALUE = 100.0;
+
+ /**
+ * Report a new heart beat for the specified node id.
+ * @param nodeId node id
+ */
+ public void report(NodeId nodeId) {
+ report(nodeId, System.currentTimeMillis());
+ }
+
+ /**
+ * Report a new heart beat for the specified node id.
+ * @param nodeId node id
+ * @param arrivalTime arrival time
+ */
+ public void report(NodeId nodeId, long arrivalTime) {
+ checkNotNull(nodeId, "NodeId must not be null");
+ checkArgument(arrivalTime >= 0, "arrivalTime must not be negative");
+ History nodeState =
+ states.computeIfAbsent(nodeId, key -> new History());
+ synchronized (nodeState) {
+ long latestHeartbeat = nodeState.latestHeartbeatTime();
+ if (latestHeartbeat != -1) {
+ nodeState.samples().addValue(arrivalTime - latestHeartbeat);
+ }
+ nodeState.setLatestHeartbeatTime(arrivalTime);
+ }
+ }
+
+ /**
+ * Compute phi for the specified node id.
+ * @param nodeId node id
+ * @return phi value
+ */
+ public Double phi(NodeId nodeId) {
+ if (!states.containsKey(nodeId)) {
+ return BOOTSTRAP_PHI_VALUE;
+ }
+ checkNotNull(nodeId, "NodeId must not be null");
+ History nodeState = states.get(nodeId);
+ synchronized (nodeState) {
+ long latestHeartbeat = nodeState.latestHeartbeatTime();
+ DescriptiveStatistics samples = nodeState.samples();
+ if (latestHeartbeat == -1 || samples.getN() < MIN_SAMPLES) {
+ return 0.0;
+ }
+ return computePhi(samples, latestHeartbeat, System.currentTimeMillis());
+ }
+ }
+
+ private double computePhi(DescriptiveStatistics samples, long tLast, long tNow) {
+ long size = samples.getN();
+ long t = tNow - tLast;
+ return (size > 0)
+ ? (1.0 / Math.log(10.0)) * t / samples.getMean()
+ : BOOTSTRAP_PHI_VALUE;
+ }
+
+ private static class History {
+ DescriptiveStatistics samples =
+ new DescriptiveStatistics(WINDOW_SIZE);
+ long lastHeartbeatTime = -1;
+
+ public DescriptiveStatistics samples() {
+ return samples;
+ }
+
+ public long latestHeartbeatTime() {
+ return lastHeartbeatTime;
+ }
+
+ public void setLatestHeartbeatTime(long value) {
+ lastHeartbeatTime = value;
+ }
+ }
+}
\ No newline at end of file
diff --git a/features/features.xml b/features/features.xml
index 3a5d5e5..0aabf66 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -33,6 +33,7 @@
<bundle>mvn:io.netty/netty-codec/4.0.23.Final</bundle>
<bundle>mvn:io.netty/netty-transport-native-epoll/4.0.23.Final</bundle>
<bundle>mvn:commons-pool/commons-pool/1.6</bundle>
+ <bundle>mvn:org.apache.commons/commons-math3/3.2</bundle>
<bundle>mvn:joda-time/joda-time/2.5</bundle>