Refactored ClusterManager as proper fix for Karaf clean issue (Topic phi-fd-on)

Change-Id: Ibb328d73412855dd2d44ca6b734f738ae2996873
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 4da1b46..213d0f4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -15,167 +15,323 @@
  */
 package org.onosproject.store.cluster.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.Member;
-import com.hazelcast.core.MemberAttributeEvent;
-import com.hazelcast.core.MembershipEvent;
-import com.hazelcast.core.MembershipListener;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.NettyMessagingService;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterStore;
 import org.onosproject.cluster.ClusterStoreDelegate;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.OptionalCacheLoader;
-import org.onlab.packet.IpAddress;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.hazelcast.util.AddressUtil;
 
-import static com.google.common.cache.CacheBuilder.newBuilder;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
-import static org.onosproject.cluster.ControllerNode.State;
-
-/**
- * Distributed implementation of the cluster nodes store.
- */
 @Component(immediate = true)
 @Service
+/**
+ * Distributed cluster nodes store that employs an accrual failure
+ * detector to identify cluster member up/down status.
+ */
 public class DistributedClusterStore
-        extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
+        extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
         implements ClusterStore {
 
-    private IMap<byte[], byte[]> rawNodes;
-    private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
+    private final Logger log = getLogger(DistributedClusterStore.class);
 
-    private String listenerId;
-    private final MembershipListener listener = new InternalMembershipListener();
-    private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+    // TODO: make these configurable.
+    private static final int HEARTBEAT_FD_PORT = 2419;
+    private static final int HEARTBEAT_INTERVAL_MS = 100;
+    private static final int PHI_FAILURE_THRESHOLD = 10;
 
-    private String nodesListenerId;
+    private static final String CONFIG_DIR = "../config";
+    private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
+    private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
 
-    @Override
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(HeartbeatMessage.class)
+                .build()
+                .populate(1);
+        }
+    };
+
+    private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+
+    private ClusterDefinition clusterDefinition;
+
+    private Set<ControllerNode> seedNodes;
+    private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
+    private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
+    private NettyMessagingService messagingService = new NettyMessagingService();
+    private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
+            groupedThreads("onos/cluster/membership", "heartbeat-sender"));
+    private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
+            groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
+
+    private PhiAccrualFailureDetector failureDetector;
+
+    private ControllerNode localNode;
+
     @Activate
     public void activate() {
-        super.activate();
-        listenerId = theInstance.getCluster().addMembershipListener(listener);
+        File clusterDefinitionFile = new File(CONFIG_DIR,
+                CLUSTER_DEFINITION_FILE);
 
-        rawNodes = theInstance.getMap("nodes");
-        OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
-                = new OptionalCacheLoader<>(serializer, rawNodes);
-        nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
-        nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
+        try {
+            clusterDefinition = new ClusterDefinitionStore(
+                    clusterDefinitionFile.getPath()).read();
+            seedNodes = ImmutableSet
+                    .copyOf(clusterDefinition.getNodes())
+                    .stream()
+                    .map(nodeInfo -> new DefaultControllerNode(new NodeId(
+                            nodeInfo.getId()), IpAddress.valueOf(nodeInfo
+                            .getIp()), nodeInfo.getTcpPort()))
+                    .collect(Collectors.toSet());
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "Failed to read cluster definition.", e);
+        }
 
-        loadClusterNodes();
+        seedNodes.forEach(node -> {
+            allNodes.put(node.id(), node);
+            nodeStates.put(node.id(), State.INACTIVE);
+        });
+
+        establishSelfIdentity();
+
+        messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
+
+        try {
+            messagingService.activate();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException(
+                    "Failed to cleanly initialize membership and"
+                            + " failure detector communication channel.", e);
+        }
+        messagingService.registerHandler(HEARTBEAT_MESSAGE,
+                new HeartbeatMessageHandler(), heartBeatMessageHandler);
+
+        failureDetector = new PhiAccrualFailureDetector();
+
+        heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
+                HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
 
         log.info("Started");
     }
 
-    // Loads the initial set of cluster nodes
-    private void loadClusterNodes() {
-        for (Member member : theInstance.getCluster().getMembers()) {
-            addNode(node(member));
-        }
-    }
-
     @Deactivate
     public void deactivate() {
-        rawNodes.removeEntryListener(nodesListenerId);
-        theInstance.getCluster().removeMembershipListener(listenerId);
+        try {
+            messagingService.deactivate();
+        } catch (Exception e) {
+            log.trace("Failed to cleanly shutdown cluster membership messaging", e);
+        }
+
+        heartBeatSender.shutdownNow();
+        heartBeatMessageHandler.shutdownNow();
+
         log.info("Stopped");
     }
 
     @Override
+    public void setDelegate(ClusterStoreDelegate delegate) {
+        checkNotNull(delegate, "Delegate cannot be null");
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void unsetDelegate(ClusterStoreDelegate delegate) {
+        this.delegate = null;
+    }
+
+    @Override
+    public boolean hasDelegate() {
+        return this.delegate != null;
+    }
+
+    @Override
     public ControllerNode getLocalNode() {
-        return node(theInstance.getCluster().getLocalMember());
+        return localNode;
     }
 
     @Override
     public Set<ControllerNode> getNodes() {
-        ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
-        for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
-            builder.add(optional.get());
-        }
-        return builder.build();
+        return ImmutableSet.copyOf(allNodes.values());
     }
 
     @Override
     public ControllerNode getNode(NodeId nodeId) {
-        return nodes.getUnchecked(nodeId).orNull();
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return allNodes.get(nodeId);
     }
 
     @Override
     public State getState(NodeId nodeId) {
-        State state = states.get(nodeId);
-        return state == null ? State.INACTIVE : state;
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return nodeStates.get(nodeId);
     }
 
     @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
-        return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        checkNotNull(ip, "IP address must not be null");
+        checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
+        ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
+        allNodes.put(node.id(), node);
+        nodeStates.put(nodeId, State.INACTIVE);
+        delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+        return node;
     }
 
     @Override
     public void removeNode(NodeId nodeId) {
-        synchronized (this) {
-            rawNodes.remove(serialize(nodeId));
-            nodes.invalidate(nodeId);
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        ControllerNode node = allNodes.remove(nodeId);
+        if (node != null) {
+            nodeStates.remove(nodeId);
+            delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
         }
     }
 
-    // Adds a new node based on the specified member
-    private synchronized ControllerNode addNode(DefaultControllerNode node) {
-        rawNodes.put(serialize(node.id()), serialize(node));
-        nodes.put(node.id(), Optional.of(node));
-        states.put(node.id(), State.ACTIVE);
-        return node;
+    private void establishSelfIdentity() {
+        try {
+            IpAddress ip = findLocalIp();
+            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+            allNodes.put(localNode.id(), localNode);
+            nodeStates.put(localNode.id(), State.ACTIVE);
+            log.info("Local Node: {}", localNode);
+        } catch (SocketException e) {
+            throw new IllegalStateException("Cannot determine local IP", e);
+        }
     }
 
-    // Creates a controller node descriptor from the Hazelcast member.
-    private DefaultControllerNode node(Member member) {
-        IpAddress ip = memberAddress(member);
-        return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+    private void heartbeat() {
+        try {
+            Set<ControllerNode> peers = allNodes.values()
+                    .stream()
+                    .filter(node -> !(node.id().equals(localNode.id())))
+                    .collect(Collectors.toSet());
+            byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
+            peers.forEach((node) -> {
+                heartbeatToPeer(hbMessagePayload, node);
+                State currentState = nodeStates.get(node.id());
+                double phi = failureDetector.phi(node.id());
+                if (phi >= PHI_FAILURE_THRESHOLD) {
+                    if (currentState == State.ACTIVE) {
+                        nodeStates.put(node.id(), State.INACTIVE);
+                        notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
+                    }
+                } else {
+                    if (currentState == State.INACTIVE) {
+                        nodeStates.put(node.id(), State.ACTIVE);
+                        notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.debug("Failed to send heartbeat", e);
+        }
     }
 
-    private IpAddress memberAddress(Member member) {
-        return IpAddress.valueOf(member.getSocketAddress().getAddress());
+    private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
+        ControllerNode node = allNodes.get(nodeId);
+        if (newState == State.ACTIVE) {
+            delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
+        } else {
+            delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
+        }
     }
 
-    // Interceptor for membership events.
-    private class InternalMembershipListener implements MembershipListener {
+    private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
+        Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+        try {
+            messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
+        } catch (IOException e) {
+            log.debug("Sending heartbeat to {} failed", remoteEp, e);
+        }
+    }
+
+    private IpAddress findLocalIp() throws SocketException {
+        Enumeration<NetworkInterface> interfaces =
+                NetworkInterface.getNetworkInterfaces();
+        while (interfaces.hasMoreElements()) {
+            NetworkInterface iface = interfaces.nextElement();
+            Enumeration<InetAddress> inetAddresses =  iface.getInetAddresses();
+            while (inetAddresses.hasMoreElements()) {
+                IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
+                if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
+                    return ip;
+                }
+            }
+        }
+        throw new IllegalStateException("Unable to determine local ip");
+    }
+
+    private class HeartbeatMessageHandler implements MessageHandler {
         @Override
-        public void memberAdded(MembershipEvent membershipEvent) {
-            log.info("Member {} added", membershipEvent.getMember());
-            ControllerNode node = addNode(node(membershipEvent.getMember()));
-            notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
-        }
-
-        @Override
-        public void memberRemoved(MembershipEvent membershipEvent) {
-            log.info("Member {} removed", membershipEvent.getMember());
-            NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
-            states.put(nodeId, State.INACTIVE);
-            notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
-        }
-
-        @Override
-        public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
-            log.info("Member {} attribute {} changed to {}",
-                     memberAttributeEvent.getMember(),
-                     memberAttributeEvent.getKey(),
-                     memberAttributeEvent.getValue());
+        public void handle(Message message) throws IOException {
+            HeartbeatMessage hb = SERIALIZER.decode(message.payload());
+            failureDetector.report(hb.source().id());
+            hb.knownPeers().forEach(node -> {
+                allNodes.put(node.id(), node);
+            });
         }
     }
+
+    private static class HeartbeatMessage {
+        private ControllerNode source;
+        private Set<ControllerNode> knownPeers;
+
+        public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
+            this.source = source;
+            this.knownPeers = ImmutableSet.copyOf(members);
+        }
+
+        public ControllerNode source() {
+            return source;
+        }
+
+        public Set<ControllerNode> knownPeers() {
+            return knownPeers;
+        }
+    }
+
 }