blob: babe4d31f63201487e94a9e9a491d9eeed525697 [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani890bc352014-10-01 22:35:29 -070015import org.apache.felix.scr.annotations.Service;
16import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070018import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
19import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani53e44e62014-10-07 12:39:51 -070026import org.onlab.onos.store.serializers.KryoPoolUtil;
27import org.onlab.onos.store.serializers.KryoSerializer;
28import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070029import org.onlab.netty.Endpoint;
30import org.onlab.netty.Message;
31import org.onlab.netty.MessageHandler;
32import org.onlab.netty.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070033import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
36@Component(immediate = true)
37@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070038public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070039 implements ClusterCommunicationService, ClusterCommunicationAdminService {
40
41 private final Logger log = LoggerFactory.getLogger(getClass());
42
43 private ControllerNode localNode;
44 private ClusterNodesDelegate nodesDelegate;
Yuta HIGUCHI3f4c2b42014-10-06 16:53:56 -070045 // FIXME: `members` should go away and should be using ClusterService
Madan Jampani890bc352014-10-01 22:35:29 -070046 private Map<NodeId, ControllerNode> members = new HashMap<>();
47 private final Timer timer = new Timer("onos-controller-heatbeats");
48 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
49
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070050 // TODO: This probably should not be a OSGi service.
51 //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani890bc352014-10-01 22:35:29 -070052 private MessagingService messagingService;
53
Madan Jampani53e44e62014-10-07 12:39:51 -070054 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
55 protected void setupKryoPool() {
56 serializerPool = KryoPool.newBuilder()
57 .register(KryoPoolUtil.API)
58 .register(ClusterMessage.class)
59 .register(ClusterMembershipEvent.class)
60 .build()
61 .populate(1);
62 }
63
64 };
65
Madan Jampani890bc352014-10-01 22:35:29 -070066 @Activate
67 public void activate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070068 // TODO: initialize messagingService
69 // TODO: setPayloadSerializer, which is capable of
70 // (1) serialize ClusterMessage - ClusterMessage.payload
71 // (2) serialize ClusterMessage.payload using user specified serializer
72// messagingService.setPayloadSerializer(...);
Madan Jampani890bc352014-10-01 22:35:29 -070073 log.info("Started");
74 }
75
76 @Deactivate
77 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070078 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070079 log.info("Stopped");
80 }
81
82 @Override
83 public boolean broadcast(ClusterMessage message) {
84 boolean ok = true;
85 for (ControllerNode node : members.values()) {
86 if (!node.equals(localNode)) {
87 ok = unicast(message, node.id()) && ok;
88 }
89 }
90 return ok;
91 }
92
93 @Override
94 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
95 boolean ok = true;
96 for (NodeId nodeId : nodes) {
97 if (!nodeId.equals(localNode.id())) {
98 ok = unicast(message, nodeId) && ok;
99 }
100 }
101 return ok;
102 }
103
104 @Override
105 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
106 ControllerNode node = members.get(toNodeId);
107 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
108 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
109 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700110 messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700111 return true;
112 } catch (IOException e) {
113 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
114 }
115
116 return false;
117 }
118
119 @Override
120 public void addSubscriber(MessageSubject subject,
121 ClusterMessageHandler subscriber) {
122 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
123 }
124
125 @Override
126 public void initialize(ControllerNode localNode,
127 ClusterNodesDelegate delegate) {
128 this.localNode = localNode;
129 this.nodesDelegate = delegate;
130 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
131 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
132 }
133
134 @Override
135 public void addNode(ControllerNode node) {
136 members.put(node.id(), node);
137 }
138
139 @Override
140 public void removeNode(ControllerNode node) {
141 broadcast(new ClusterMessage(
142 localNode.id(),
143 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700144 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
Madan Jampani890bc352014-10-01 22:35:29 -0700145 members.remove(node.id());
146 }
147
148 // Sends a heart beat to all peers.
149 private class KeepAlive extends TimerTask {
150
151 @Override
152 public void run() {
153 broadcast(new ClusterMessage(
154 localNode.id(),
155 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700156 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
Madan Jampani890bc352014-10-01 22:35:29 -0700157 }
158 }
159
160 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
161
162 @Override
163 public void handle(ClusterMessage message) {
164
Madan Jampani53e44e62014-10-07 12:39:51 -0700165 ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
Madan Jampani890bc352014-10-01 22:35:29 -0700166 ControllerNode node = event.node();
167 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
168 log.info("Node {} sent a hearbeat", node.id());
169 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
170 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
171 log.info("Node {} is leaving", node.id());
172 nodesDelegate.nodeRemoved(node.id());
173 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
174 log.info("Node {} is unreachable", node.id());
175 nodesDelegate.nodeVanished(node.id());
176 }
177 }
178 }
179
180 private static class InternalClusterMessageHandler implements MessageHandler {
181
182 private final ClusterMessageHandler handler;
183
184 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
185 this.handler = handler;
186 }
187
188 @Override
189 public void handle(Message message) {
Madan Jampani53e44e62014-10-07 12:39:51 -0700190 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
191 handler.handle(clusterMessage);
Madan Jampani890bc352014-10-01 22:35:29 -0700192 }
193 }
194}