blob: 7b054015240675cebc3eb28ebec3989f692ca4d1 [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070015import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070017import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070018import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070019import org.onlab.onos.cluster.ControllerNode;
20import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070021import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
22import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
24import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
25import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
28import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani53e44e62014-10-07 12:39:51 -070029import org.onlab.onos.store.serializers.KryoPoolUtil;
30import org.onlab.onos.store.serializers.KryoSerializer;
31import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070032import org.onlab.netty.Endpoint;
33import org.onlab.netty.Message;
34import org.onlab.netty.MessageHandler;
35import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070036import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070037import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
39
40@Component(immediate = true)
41@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070042public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070043 implements ClusterCommunicationService, ClusterCommunicationAdminService {
44
45 private final Logger log = LoggerFactory.getLogger(getClass());
46
47 private ControllerNode localNode;
Madan Jampania5d0d782014-10-07 14:36:00 -070048
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 private ClusterService clusterService;
51
Madan Jampani890bc352014-10-01 22:35:29 -070052 private ClusterNodesDelegate nodesDelegate;
Yuta HIGUCHI3f4c2b42014-10-06 16:53:56 -070053 // FIXME: `members` should go away and should be using ClusterService
Madan Jampani890bc352014-10-01 22:35:29 -070054 private Map<NodeId, ControllerNode> members = new HashMap<>();
55 private final Timer timer = new Timer("onos-controller-heatbeats");
56 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
57
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070058 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070059 private MessagingService messagingService;
60
Madan Jampani53e44e62014-10-07 12:39:51 -070061 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
62 protected void setupKryoPool() {
63 serializerPool = KryoPool.newBuilder()
64 .register(KryoPoolUtil.API)
65 .register(ClusterMessage.class)
66 .register(ClusterMembershipEvent.class)
67 .build()
68 .populate(1);
69 }
70
71 };
72
Madan Jampani890bc352014-10-01 22:35:29 -070073 @Activate
74 public void activate() {
Madan Jampania5d0d782014-10-07 14:36:00 -070075 localNode = clusterService.getLocalNode();
Madan Jampanida1a6b02014-10-07 14:16:15 -070076 messagingService = new NettyMessagingService(localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070077 log.info("Started");
78 }
79
80 @Deactivate
81 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070082 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070083 log.info("Stopped");
84 }
85
86 @Override
87 public boolean broadcast(ClusterMessage message) {
88 boolean ok = true;
89 for (ControllerNode node : members.values()) {
90 if (!node.equals(localNode)) {
91 ok = unicast(message, node.id()) && ok;
92 }
93 }
94 return ok;
95 }
96
97 @Override
98 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
99 boolean ok = true;
100 for (NodeId nodeId : nodes) {
101 if (!nodeId.equals(localNode.id())) {
102 ok = unicast(message, nodeId) && ok;
103 }
104 }
105 return ok;
106 }
107
108 @Override
109 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
110 ControllerNode node = members.get(toNodeId);
111 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
112 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
113 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700114 messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700115 return true;
116 } catch (IOException e) {
117 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
118 }
119
120 return false;
121 }
122
123 @Override
124 public void addSubscriber(MessageSubject subject,
125 ClusterMessageHandler subscriber) {
126 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
127 }
128
129 @Override
130 public void initialize(ControllerNode localNode,
131 ClusterNodesDelegate delegate) {
132 this.localNode = localNode;
133 this.nodesDelegate = delegate;
134 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
135 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
136 }
137
138 @Override
139 public void addNode(ControllerNode node) {
140 members.put(node.id(), node);
141 }
142
143 @Override
144 public void removeNode(ControllerNode node) {
145 broadcast(new ClusterMessage(
146 localNode.id(),
147 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700148 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
Madan Jampani890bc352014-10-01 22:35:29 -0700149 members.remove(node.id());
150 }
151
152 // Sends a heart beat to all peers.
153 private class KeepAlive extends TimerTask {
154
155 @Override
156 public void run() {
157 broadcast(new ClusterMessage(
158 localNode.id(),
159 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700160 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
Madan Jampani890bc352014-10-01 22:35:29 -0700161 }
162 }
163
164 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
165
166 @Override
167 public void handle(ClusterMessage message) {
168
Madan Jampani53e44e62014-10-07 12:39:51 -0700169 ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
Madan Jampani890bc352014-10-01 22:35:29 -0700170 ControllerNode node = event.node();
171 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
172 log.info("Node {} sent a hearbeat", node.id());
173 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
174 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
175 log.info("Node {} is leaving", node.id());
176 nodesDelegate.nodeRemoved(node.id());
177 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
178 log.info("Node {} is unreachable", node.id());
179 nodesDelegate.nodeVanished(node.id());
180 }
181 }
182 }
183
184 private static class InternalClusterMessageHandler implements MessageHandler {
185
186 private final ClusterMessageHandler handler;
187
188 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
189 this.handler = handler;
190 }
191
192 @Override
193 public void handle(Message message) {
Madan Jampani53e44e62014-10-07 12:39:51 -0700194 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
195 handler.handle(clusterMessage);
Madan Jampani890bc352014-10-01 22:35:29 -0700196 }
197 }
198}