Refactored ClusterManager as proper fix for Karaf clean issue (Topic phi-fd-on)
Change-Id: Ibb328d73412855dd2d44ca6b734f738ae2996873
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 4da1b46..213d0f4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -15,167 +15,323 @@
*/
package org.onosproject.store.cluster.impl;
-import com.google.common.base.Optional;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.Member;
-import com.hazelcast.core.MemberAttributeEvent;
-import com.hazelcast.core.MembershipEvent;
-import com.hazelcast.core.MembershipListener;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.NettyMessagingService;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
import org.onosproject.cluster.NodeId;
-import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.OptionalCacheLoader;
-import org.onlab.packet.IpAddress;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.hazelcast.util.AddressUtil;
-import static com.google.common.cache.CacheBuilder.newBuilder;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
-import static org.onosproject.cluster.ControllerNode.State;
-
-/**
- * Distributed implementation of the cluster nodes store.
- */
@Component(immediate = true)
@Service
+/**
+ * Distributed cluster nodes store that employs an accrual failure
+ * detector to identify cluster member up/down status.
+ */
public class DistributedClusterStore
- extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
+ extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
- private IMap<byte[], byte[]> rawNodes;
- private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
+ private final Logger log = getLogger(DistributedClusterStore.class);
- private String listenerId;
- private final MembershipListener listener = new InternalMembershipListener();
- private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+ // TODO: make these configurable.
+ private static final int HEARTBEAT_FD_PORT = 2419;
+ private static final int HEARTBEAT_INTERVAL_MS = 100;
+ private static final int PHI_FAILURE_THRESHOLD = 10;
- private String nodesListenerId;
+ private static final String CONFIG_DIR = "../config";
+ private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
+ private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
- @Override
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(HeartbeatMessage.class)
+ .build()
+ .populate(1);
+ }
+ };
+
+ private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+
+ private ClusterDefinition clusterDefinition;
+
+ private Set<ControllerNode> seedNodes;
+ private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
+ private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
+ private NettyMessagingService messagingService = new NettyMessagingService();
+ private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-sender"));
+ private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
+
+ private PhiAccrualFailureDetector failureDetector;
+
+ private ControllerNode localNode;
+
@Activate
public void activate() {
- super.activate();
- listenerId = theInstance.getCluster().addMembershipListener(listener);
+ File clusterDefinitionFile = new File(CONFIG_DIR,
+ CLUSTER_DEFINITION_FILE);
- rawNodes = theInstance.getMap("nodes");
- OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
- = new OptionalCacheLoader<>(serializer, rawNodes);
- nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
- nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
+ try {
+ clusterDefinition = new ClusterDefinitionStore(
+ clusterDefinitionFile.getPath()).read();
+ seedNodes = ImmutableSet
+ .copyOf(clusterDefinition.getNodes())
+ .stream()
+ .map(nodeInfo -> new DefaultControllerNode(new NodeId(
+ nodeInfo.getId()), IpAddress.valueOf(nodeInfo
+ .getIp()), nodeInfo.getTcpPort()))
+ .collect(Collectors.toSet());
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Failed to read cluster definition.", e);
+ }
- loadClusterNodes();
+ seedNodes.forEach(node -> {
+ allNodes.put(node.id(), node);
+ nodeStates.put(node.id(), State.INACTIVE);
+ });
+
+ establishSelfIdentity();
+
+ messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
+
+ try {
+ messagingService.activate();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(
+ "Failed to cleanly initialize membership and"
+ + " failure detector communication channel.", e);
+ }
+ messagingService.registerHandler(HEARTBEAT_MESSAGE,
+ new HeartbeatMessageHandler(), heartBeatMessageHandler);
+
+ failureDetector = new PhiAccrualFailureDetector();
+
+ heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+ HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
log.info("Started");
}
- // Loads the initial set of cluster nodes
- private void loadClusterNodes() {
- for (Member member : theInstance.getCluster().getMembers()) {
- addNode(node(member));
- }
- }
-
@Deactivate
public void deactivate() {
- rawNodes.removeEntryListener(nodesListenerId);
- theInstance.getCluster().removeMembershipListener(listenerId);
+ try {
+ messagingService.deactivate();
+ } catch (Exception e) {
+ log.trace("Failed to cleanly shutdown cluster membership messaging", e);
+ }
+
+ heartBeatSender.shutdownNow();
+ heartBeatMessageHandler.shutdownNow();
+
log.info("Stopped");
}
@Override
+ public void setDelegate(ClusterStoreDelegate delegate) {
+ checkNotNull(delegate, "Delegate cannot be null");
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void unsetDelegate(ClusterStoreDelegate delegate) {
+ this.delegate = null;
+ }
+
+ @Override
+ public boolean hasDelegate() {
+ return this.delegate != null;
+ }
+
+ @Override
public ControllerNode getLocalNode() {
- return node(theInstance.getCluster().getLocalMember());
+ return localNode;
}
@Override
public Set<ControllerNode> getNodes() {
- ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
- for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
- builder.add(optional.get());
- }
- return builder.build();
+ return ImmutableSet.copyOf(allNodes.values());
}
@Override
public ControllerNode getNode(NodeId nodeId) {
- return nodes.getUnchecked(nodeId).orNull();
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return allNodes.get(nodeId);
}
@Override
public State getState(NodeId nodeId) {
- State state = states.get(nodeId);
- return state == null ? State.INACTIVE : state;
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ return nodeStates.get(nodeId);
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ checkNotNull(ip, "IP address must not be null");
+ checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
+ ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
+ allNodes.put(node.id(), node);
+ nodeStates.put(nodeId, State.INACTIVE);
+ delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+ return node;
}
@Override
public void removeNode(NodeId nodeId) {
- synchronized (this) {
- rawNodes.remove(serialize(nodeId));
- nodes.invalidate(nodeId);
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ ControllerNode node = allNodes.remove(nodeId);
+ if (node != null) {
+ nodeStates.remove(nodeId);
+ delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
}
}
- // Adds a new node based on the specified member
- private synchronized ControllerNode addNode(DefaultControllerNode node) {
- rawNodes.put(serialize(node.id()), serialize(node));
- nodes.put(node.id(), Optional.of(node));
- states.put(node.id(), State.ACTIVE);
- return node;
+ private void establishSelfIdentity() {
+ try {
+ IpAddress ip = findLocalIp();
+ localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ allNodes.put(localNode.id(), localNode);
+ nodeStates.put(localNode.id(), State.ACTIVE);
+ log.info("Local Node: {}", localNode);
+ } catch (SocketException e) {
+ throw new IllegalStateException("Cannot determine local IP", e);
+ }
}
- // Creates a controller node descriptor from the Hazelcast member.
- private DefaultControllerNode node(Member member) {
- IpAddress ip = memberAddress(member);
- return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ private void heartbeat() {
+ try {
+ Set<ControllerNode> peers = allNodes.values()
+ .stream()
+ .filter(node -> !(node.id().equals(localNode.id())))
+ .collect(Collectors.toSet());
+ byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
+ peers.forEach((node) -> {
+ heartbeatToPeer(hbMessagePayload, node);
+ State currentState = nodeStates.get(node.id());
+ double phi = failureDetector.phi(node.id());
+ if (phi >= PHI_FAILURE_THRESHOLD) {
+ if (currentState == State.ACTIVE) {
+ nodeStates.put(node.id(), State.INACTIVE);
+ notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
+ }
+ } else {
+ if (currentState == State.INACTIVE) {
+ nodeStates.put(node.id(), State.ACTIVE);
+ notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
+ }
+ }
+ });
+ } catch (Exception e) {
+ log.debug("Failed to send heartbeat", e);
+ }
}
- private IpAddress memberAddress(Member member) {
- return IpAddress.valueOf(member.getSocketAddress().getAddress());
+ private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
+ ControllerNode node = allNodes.get(nodeId);
+ if (newState == State.ACTIVE) {
+ delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
+ } else {
+ delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
+ }
}
- // Interceptor for membership events.
- private class InternalMembershipListener implements MembershipListener {
+ private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
+ Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+ try {
+ messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
+ } catch (IOException e) {
+ log.debug("Sending heartbeat to {} failed", remoteEp, e);
+ }
+ }
+
+ private IpAddress findLocalIp() throws SocketException {
+ Enumeration<NetworkInterface> interfaces =
+ NetworkInterface.getNetworkInterfaces();
+ while (interfaces.hasMoreElements()) {
+ NetworkInterface iface = interfaces.nextElement();
+ Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
+ if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
+ return ip;
+ }
+ }
+ }
+ throw new IllegalStateException("Unable to determine local ip");
+ }
+
+ private class HeartbeatMessageHandler implements MessageHandler {
@Override
- public void memberAdded(MembershipEvent membershipEvent) {
- log.info("Member {} added", membershipEvent.getMember());
- ControllerNode node = addNode(node(membershipEvent.getMember()));
- notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
- }
-
- @Override
- public void memberRemoved(MembershipEvent membershipEvent) {
- log.info("Member {} removed", membershipEvent.getMember());
- NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
- states.put(nodeId, State.INACTIVE);
- notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
- }
-
- @Override
- public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
- log.info("Member {} attribute {} changed to {}",
- memberAttributeEvent.getMember(),
- memberAttributeEvent.getKey(),
- memberAttributeEvent.getValue());
+ public void handle(Message message) throws IOException {
+ HeartbeatMessage hb = SERIALIZER.decode(message.payload());
+ failureDetector.report(hb.source().id());
+ hb.knownPeers().forEach(node -> {
+ allNodes.put(node.id(), node);
+ });
}
}
+
+ private static class HeartbeatMessage {
+ private ControllerNode source;
+ private Set<ControllerNode> knownPeers;
+
+ public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
+ this.source = source;
+ this.knownPeers = ImmutableSet.copyOf(members);
+ }
+
+ public ControllerNode source() {
+ return source;
+ }
+
+ public Set<ControllerNode> knownPeers() {
+ return knownPeers;
+ }
+ }
+
}