Javadoc improvements
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
new file mode 100644
index 0000000..d4fd9c0
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -0,0 +1,170 @@
+package org.onlab.onos.store.cluster.messaging.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(immediate = true)
+@Service
+public class ClusterCommunicationManager
+ implements ClusterCommunicationService, ClusterCommunicationAdminService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private ControllerNode localNode;
+ private ClusterNodesDelegate nodesDelegate;
+ private Map<NodeId, ControllerNode> members = new HashMap<>();
+ private final Timer timer = new Timer("onos-controller-heatbeats");
+ public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private MessagingService messagingService;
+
+ @Activate
+ public void activate() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public boolean broadcast(ClusterMessage message) {
+ boolean ok = true;
+ for (ControllerNode node : members.values()) {
+ if (!node.equals(localNode)) {
+ ok = unicast(message, node.id()) && ok;
+ }
+ }
+ return ok;
+ }
+
+ @Override
+ public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
+ boolean ok = true;
+ for (NodeId nodeId : nodes) {
+ if (!nodeId.equals(localNode.id())) {
+ ok = unicast(message, nodeId) && ok;
+ }
+ }
+ return ok;
+ }
+
+ @Override
+ public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+ ControllerNode node = members.get(toNodeId);
+ checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+ Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+ try {
+ messagingService.sendAsync(nodeEp, message.subject().value(), message);
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
+ }
+
+ return false;
+ }
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber) {
+ messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
+ }
+
+ @Override
+ public void initialize(ControllerNode localNode,
+ ClusterNodesDelegate delegate) {
+ this.localNode = localNode;
+ this.nodesDelegate = delegate;
+ this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
+ timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
+ }
+
+ @Override
+ public void addNode(ControllerNode node) {
+ members.put(node.id(), node);
+ }
+
+ @Override
+ public void removeNode(ControllerNode node) {
+ broadcast(new ClusterMessage(
+ localNode.id(),
+ new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
+ new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
+ members.remove(node.id());
+ }
+
+ // Sends a heart beat to all peers.
+ private class KeepAlive extends TimerTask {
+
+ @Override
+ public void run() {
+ broadcast(new ClusterMessage(
+ localNode.id(),
+ new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
+ new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+ }
+ }
+
+ private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+
+ ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+ ControllerNode node = event.node();
+ if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
+ log.info("Node {} sent a hearbeat", node.id());
+ nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
+ } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
+ log.info("Node {} is leaving", node.id());
+ nodesDelegate.nodeRemoved(node.id());
+ } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
+ log.info("Node {} is unreachable", node.id());
+ nodesDelegate.nodeVanished(node.id());
+ }
+ }
+ }
+
+ private static class InternalClusterMessageHandler implements MessageHandler {
+
+ private final ClusterMessageHandler handler;
+
+ public InternalClusterMessageHandler(ClusterMessageHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void handle(Message message) {
+ handler.handle((ClusterMessage) message.payload());
+ }
+ }
+}