blob: 2e8937c49085c2b2781df34a748b1a3a0f5a8897 [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
15import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
17import org.apache.felix.scr.annotations.Service;
18import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.NodeId;
20import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070026import org.onlab.netty.Endpoint;
27import org.onlab.netty.Message;
28import org.onlab.netty.MessageHandler;
29import org.onlab.netty.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33@Component(immediate = true)
34@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070035public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070036 implements ClusterCommunicationService, ClusterCommunicationAdminService {
37
38 private final Logger log = LoggerFactory.getLogger(getClass());
39
40 private ControllerNode localNode;
41 private ClusterNodesDelegate nodesDelegate;
Yuta HIGUCHI3f4c2b42014-10-06 16:53:56 -070042 // FIXME: `members` should go away and should be using ClusterService
Madan Jampani890bc352014-10-01 22:35:29 -070043 private Map<NodeId, ControllerNode> members = new HashMap<>();
44 private final Timer timer = new Timer("onos-controller-heatbeats");
45 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
46
47 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
48 private MessagingService messagingService;
49
50 @Activate
51 public void activate() {
52 log.info("Started");
53 }
54
55 @Deactivate
56 public void deactivate() {
57 log.info("Stopped");
58 }
59
60 @Override
61 public boolean broadcast(ClusterMessage message) {
62 boolean ok = true;
63 for (ControllerNode node : members.values()) {
64 if (!node.equals(localNode)) {
65 ok = unicast(message, node.id()) && ok;
66 }
67 }
68 return ok;
69 }
70
71 @Override
72 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
73 boolean ok = true;
74 for (NodeId nodeId : nodes) {
75 if (!nodeId.equals(localNode.id())) {
76 ok = unicast(message, nodeId) && ok;
77 }
78 }
79 return ok;
80 }
81
82 @Override
83 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
84 ControllerNode node = members.get(toNodeId);
85 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
86 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
87 try {
88 messagingService.sendAsync(nodeEp, message.subject().value(), message);
89 return true;
90 } catch (IOException e) {
91 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
92 }
93
94 return false;
95 }
96
97 @Override
98 public void addSubscriber(MessageSubject subject,
99 ClusterMessageHandler subscriber) {
100 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
101 }
102
103 @Override
104 public void initialize(ControllerNode localNode,
105 ClusterNodesDelegate delegate) {
106 this.localNode = localNode;
107 this.nodesDelegate = delegate;
108 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
109 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
110 }
111
112 @Override
113 public void addNode(ControllerNode node) {
114 members.put(node.id(), node);
115 }
116
117 @Override
118 public void removeNode(ControllerNode node) {
119 broadcast(new ClusterMessage(
120 localNode.id(),
121 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
122 new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
123 members.remove(node.id());
124 }
125
126 // Sends a heart beat to all peers.
127 private class KeepAlive extends TimerTask {
128
129 @Override
130 public void run() {
131 broadcast(new ClusterMessage(
132 localNode.id(),
133 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
134 new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
135 }
136 }
137
138 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
139
140 @Override
141 public void handle(ClusterMessage message) {
142
143 ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
144 ControllerNode node = event.node();
145 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
146 log.info("Node {} sent a hearbeat", node.id());
147 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
148 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
149 log.info("Node {} is leaving", node.id());
150 nodesDelegate.nodeRemoved(node.id());
151 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
152 log.info("Node {} is unreachable", node.id());
153 nodesDelegate.nodeVanished(node.id());
154 }
155 }
156 }
157
158 private static class InternalClusterMessageHandler implements MessageHandler {
159
160 private final ClusterMessageHandler handler;
161
162 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
163 this.handler = handler;
164 }
165
166 @Override
167 public void handle(Message message) {
168 handler.handle((ClusterMessage) message.payload());
169 }
170 }
171}