blob: 98be0b10205ae3e43fbc08de9d13e07452143044 [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani890bc352014-10-01 22:35:29 -070015import org.apache.felix.scr.annotations.Service;
16import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070018import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
19import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070026import org.onlab.netty.Endpoint;
27import org.onlab.netty.Message;
28import org.onlab.netty.MessageHandler;
29import org.onlab.netty.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33@Component(immediate = true)
34@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070035public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070036 implements ClusterCommunicationService, ClusterCommunicationAdminService {
37
38 private final Logger log = LoggerFactory.getLogger(getClass());
39
40 private ControllerNode localNode;
41 private ClusterNodesDelegate nodesDelegate;
Yuta HIGUCHI3f4c2b42014-10-06 16:53:56 -070042 // FIXME: `members` should go away and should be using ClusterService
Madan Jampani890bc352014-10-01 22:35:29 -070043 private Map<NodeId, ControllerNode> members = new HashMap<>();
44 private final Timer timer = new Timer("onos-controller-heatbeats");
45 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
46
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070047 // TODO: This probably should not be a OSGi service.
48 //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani890bc352014-10-01 22:35:29 -070049 private MessagingService messagingService;
50
51 @Activate
52 public void activate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070053 // TODO: initialize messagingService
54 // TODO: setPayloadSerializer, which is capable of
55 // (1) serialize ClusterMessage - ClusterMessage.payload
56 // (2) serialize ClusterMessage.payload using user specified serializer
57// messagingService.setPayloadSerializer(...);
Madan Jampani890bc352014-10-01 22:35:29 -070058 log.info("Started");
59 }
60
61 @Deactivate
62 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070063 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070064 log.info("Stopped");
65 }
66
67 @Override
68 public boolean broadcast(ClusterMessage message) {
69 boolean ok = true;
70 for (ControllerNode node : members.values()) {
71 if (!node.equals(localNode)) {
72 ok = unicast(message, node.id()) && ok;
73 }
74 }
75 return ok;
76 }
77
78 @Override
79 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
80 boolean ok = true;
81 for (NodeId nodeId : nodes) {
82 if (!nodeId.equals(localNode.id())) {
83 ok = unicast(message, nodeId) && ok;
84 }
85 }
86 return ok;
87 }
88
89 @Override
90 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
91 ControllerNode node = members.get(toNodeId);
92 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
93 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
94 try {
95 messagingService.sendAsync(nodeEp, message.subject().value(), message);
96 return true;
97 } catch (IOException e) {
98 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
99 }
100
101 return false;
102 }
103
104 @Override
105 public void addSubscriber(MessageSubject subject,
106 ClusterMessageHandler subscriber) {
107 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
108 }
109
110 @Override
111 public void initialize(ControllerNode localNode,
112 ClusterNodesDelegate delegate) {
113 this.localNode = localNode;
114 this.nodesDelegate = delegate;
115 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
116 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
117 }
118
119 @Override
120 public void addNode(ControllerNode node) {
121 members.put(node.id(), node);
122 }
123
124 @Override
125 public void removeNode(ControllerNode node) {
126 broadcast(new ClusterMessage(
127 localNode.id(),
128 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
129 new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
130 members.remove(node.id());
131 }
132
133 // Sends a heart beat to all peers.
134 private class KeepAlive extends TimerTask {
135
136 @Override
137 public void run() {
138 broadcast(new ClusterMessage(
139 localNode.id(),
140 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
141 new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
142 }
143 }
144
145 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
146
147 @Override
148 public void handle(ClusterMessage message) {
149
150 ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
151 ControllerNode node = event.node();
152 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
153 log.info("Node {} sent a hearbeat", node.id());
154 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
155 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
156 log.info("Node {} is leaving", node.id());
157 nodesDelegate.nodeRemoved(node.id());
158 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
159 log.info("Node {} is unreachable", node.id());
160 nodesDelegate.nodeVanished(node.id());
161 }
162 }
163 }
164
165 private static class InternalClusterMessageHandler implements MessageHandler {
166
167 private final ClusterMessageHandler handler;
168
169 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
170 this.handler = handler;
171 }
172
173 @Override
174 public void handle(Message message) {
175 handler.handle((ClusterMessage) message.payload());
176 }
177 }
178}