blob: e4f7e11f5b9be49c95fd1120317b50cc3be0a3bc [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani890bc352014-10-01 22:35:29 -070015import org.apache.felix.scr.annotations.Service;
16import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070018import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
19import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani53e44e62014-10-07 12:39:51 -070026import org.onlab.onos.store.serializers.KryoPoolUtil;
27import org.onlab.onos.store.serializers.KryoSerializer;
28import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070029import org.onlab.netty.Endpoint;
30import org.onlab.netty.Message;
31import org.onlab.netty.MessageHandler;
32import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070033import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070034import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
37@Component(immediate = true)
38@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070039public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070040 implements ClusterCommunicationService, ClusterCommunicationAdminService {
41
42 private final Logger log = LoggerFactory.getLogger(getClass());
43
44 private ControllerNode localNode;
45 private ClusterNodesDelegate nodesDelegate;
Yuta HIGUCHI3f4c2b42014-10-06 16:53:56 -070046 // FIXME: `members` should go away and should be using ClusterService
Madan Jampani890bc352014-10-01 22:35:29 -070047 private Map<NodeId, ControllerNode> members = new HashMap<>();
48 private final Timer timer = new Timer("onos-controller-heatbeats");
49 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
50
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070051 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070052 private MessagingService messagingService;
53
Madan Jampani53e44e62014-10-07 12:39:51 -070054 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
55 protected void setupKryoPool() {
56 serializerPool = KryoPool.newBuilder()
57 .register(KryoPoolUtil.API)
58 .register(ClusterMessage.class)
59 .register(ClusterMembershipEvent.class)
60 .build()
61 .populate(1);
62 }
63
64 };
65
Madan Jampani890bc352014-10-01 22:35:29 -070066 @Activate
67 public void activate() {
Madan Jampanida1a6b02014-10-07 14:16:15 -070068 messagingService = new NettyMessagingService(localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070069 log.info("Started");
70 }
71
72 @Deactivate
73 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070074 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070075 log.info("Stopped");
76 }
77
78 @Override
79 public boolean broadcast(ClusterMessage message) {
80 boolean ok = true;
81 for (ControllerNode node : members.values()) {
82 if (!node.equals(localNode)) {
83 ok = unicast(message, node.id()) && ok;
84 }
85 }
86 return ok;
87 }
88
89 @Override
90 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
91 boolean ok = true;
92 for (NodeId nodeId : nodes) {
93 if (!nodeId.equals(localNode.id())) {
94 ok = unicast(message, nodeId) && ok;
95 }
96 }
97 return ok;
98 }
99
100 @Override
101 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
102 ControllerNode node = members.get(toNodeId);
103 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
104 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
105 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700106 messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700107 return true;
108 } catch (IOException e) {
109 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
110 }
111
112 return false;
113 }
114
115 @Override
116 public void addSubscriber(MessageSubject subject,
117 ClusterMessageHandler subscriber) {
118 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
119 }
120
121 @Override
122 public void initialize(ControllerNode localNode,
123 ClusterNodesDelegate delegate) {
124 this.localNode = localNode;
125 this.nodesDelegate = delegate;
126 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
127 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
128 }
129
130 @Override
131 public void addNode(ControllerNode node) {
132 members.put(node.id(), node);
133 }
134
135 @Override
136 public void removeNode(ControllerNode node) {
137 broadcast(new ClusterMessage(
138 localNode.id(),
139 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700140 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
Madan Jampani890bc352014-10-01 22:35:29 -0700141 members.remove(node.id());
142 }
143
144 // Sends a heart beat to all peers.
145 private class KeepAlive extends TimerTask {
146
147 @Override
148 public void run() {
149 broadcast(new ClusterMessage(
150 localNode.id(),
151 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700152 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
Madan Jampani890bc352014-10-01 22:35:29 -0700153 }
154 }
155
156 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
157
158 @Override
159 public void handle(ClusterMessage message) {
160
Madan Jampani53e44e62014-10-07 12:39:51 -0700161 ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
Madan Jampani890bc352014-10-01 22:35:29 -0700162 ControllerNode node = event.node();
163 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
164 log.info("Node {} sent a hearbeat", node.id());
165 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
166 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
167 log.info("Node {} is leaving", node.id());
168 nodesDelegate.nodeRemoved(node.id());
169 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
170 log.info("Node {} is unreachable", node.id());
171 nodesDelegate.nodeVanished(node.id());
172 }
173 }
174 }
175
176 private static class InternalClusterMessageHandler implements MessageHandler {
177
178 private final ClusterMessageHandler handler;
179
180 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
181 this.handler = handler;
182 }
183
184 @Override
185 public void handle(Message message) {
Madan Jampani53e44e62014-10-07 12:39:51 -0700186 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
187 handler.handle(clusterMessage);
Madan Jampani890bc352014-10-01 22:35:29 -0700188 }
189 }
190}