blob: b260e1c1fafcc5c6c8b0b218ca2577ae60e90cec [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
15import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
17import org.apache.felix.scr.annotations.Service;
18import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070020import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
21import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070022import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
23import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
24import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
25import org.onlab.onos.store.cluster.messaging.ClusterMessage;
26import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
27import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070028import org.onlab.netty.Endpoint;
29import org.onlab.netty.Message;
30import org.onlab.netty.MessageHandler;
31import org.onlab.netty.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
35@Component(immediate = true)
36@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070037public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070038 implements ClusterCommunicationService, ClusterCommunicationAdminService {
39
40 private final Logger log = LoggerFactory.getLogger(getClass());
41
42 private ControllerNode localNode;
43 private ClusterNodesDelegate nodesDelegate;
Yuta HIGUCHI3f4c2b42014-10-06 16:53:56 -070044 // FIXME: `members` should go away and should be using ClusterService
Madan Jampani890bc352014-10-01 22:35:29 -070045 private Map<NodeId, ControllerNode> members = new HashMap<>();
46 private final Timer timer = new Timer("onos-controller-heatbeats");
47 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
48
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 private MessagingService messagingService;
51
52 @Activate
53 public void activate() {
54 log.info("Started");
55 }
56
57 @Deactivate
58 public void deactivate() {
59 log.info("Stopped");
60 }
61
62 @Override
63 public boolean broadcast(ClusterMessage message) {
64 boolean ok = true;
65 for (ControllerNode node : members.values()) {
66 if (!node.equals(localNode)) {
67 ok = unicast(message, node.id()) && ok;
68 }
69 }
70 return ok;
71 }
72
73 @Override
74 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
75 boolean ok = true;
76 for (NodeId nodeId : nodes) {
77 if (!nodeId.equals(localNode.id())) {
78 ok = unicast(message, nodeId) && ok;
79 }
80 }
81 return ok;
82 }
83
84 @Override
85 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
86 ControllerNode node = members.get(toNodeId);
87 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
88 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
89 try {
90 messagingService.sendAsync(nodeEp, message.subject().value(), message);
91 return true;
92 } catch (IOException e) {
93 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
94 }
95
96 return false;
97 }
98
99 @Override
100 public void addSubscriber(MessageSubject subject,
101 ClusterMessageHandler subscriber) {
102 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
103 }
104
105 @Override
106 public void initialize(ControllerNode localNode,
107 ClusterNodesDelegate delegate) {
108 this.localNode = localNode;
109 this.nodesDelegate = delegate;
110 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
111 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
112 }
113
114 @Override
115 public void addNode(ControllerNode node) {
116 members.put(node.id(), node);
117 }
118
119 @Override
120 public void removeNode(ControllerNode node) {
121 broadcast(new ClusterMessage(
122 localNode.id(),
123 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
124 new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
125 members.remove(node.id());
126 }
127
128 // Sends a heart beat to all peers.
129 private class KeepAlive extends TimerTask {
130
131 @Override
132 public void run() {
133 broadcast(new ClusterMessage(
134 localNode.id(),
135 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
136 new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
137 }
138 }
139
140 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
141
142 @Override
143 public void handle(ClusterMessage message) {
144
145 ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
146 ControllerNode node = event.node();
147 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
148 log.info("Node {} sent a hearbeat", node.id());
149 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
150 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
151 log.info("Node {} is leaving", node.id());
152 nodesDelegate.nodeRemoved(node.id());
153 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
154 log.info("Node {} is unreachable", node.id());
155 nodesDelegate.nodeVanished(node.id());
156 }
157 }
158 }
159
160 private static class InternalClusterMessageHandler implements MessageHandler {
161
162 private final ClusterMessageHandler handler;
163
164 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
165 this.handler = handler;
166 }
167
168 @Override
169 public void handle(Message message) {
170 handler.handle((ClusterMessage) message.payload());
171 }
172 }
173}