blob: 2868270f1d55464179d4a0c3c00a04009481286e [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.util.concurrent.ListenableFuture;
Madan Jampani890bc352014-10-01 22:35:29 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070022import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070024import org.apache.felix.scr.annotations.Service;
Madan Jampani4e729af2014-11-19 11:11:37 -080025import org.onlab.netty.Endpoint;
26import org.onlab.netty.Message;
27import org.onlab.netty.MessageHandler;
28import org.onlab.netty.MessagingService;
29import org.onlab.netty.NettyMessagingService;
Jonathan Hart7d656f42015-01-27 14:07:23 -080030import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.ControllerNode;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.serializers.KryoSerializer;
40import org.onosproject.store.serializers.impl.ClusterMessageSerializer;
41import org.onosproject.store.serializers.impl.MessageSubjectSerializer;
Madan Jampani890bc352014-10-01 22:35:29 -070042import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Jonathan Hart7d656f42015-01-27 14:07:23 -080045import java.io.IOException;
46import java.util.Set;
47
48import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070049
Madan Jampani890bc352014-10-01 22:35:29 -070050@Component(immediate = true)
51@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070052public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070053 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070054
55 private final Logger log = LoggerFactory.getLogger(getClass());
56
Madan Jampania5d0d782014-10-07 14:36:00 -070057 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
58 private ClusterService clusterService;
59
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070060 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070061 private MessagingService messagingService;
62
Madan Jampani53e44e62014-10-07 12:39:51 -070063 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070064 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070065 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070066 serializerPool = KryoNamespace.newBuilder()
67 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080068 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
69 .register(new ClusterMessageSerializer(), ClusterMessage.class)
70 .register(new MessageSubjectSerializer(), MessageSubject.class)
71 .build();
Madan Jampani53e44e62014-10-07 12:39:51 -070072 }
73
74 };
75
Madan Jampani890bc352014-10-01 22:35:29 -070076 @Activate
77 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070078 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani87100932014-10-21 16:46:12 -070079 NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070080 // FIXME: workaround until it becomes a service.
81 try {
82 netty.activate();
83 } catch (Exception e) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070084 log.error("NettyMessagingService#activate", e);
85 }
86 messagingService = netty;
Yuta HIGUCHI1f8cd5f2014-11-04 23:48:55 -080087 log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070088 }
89
90 @Deactivate
91 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070092 // TODO: cleanup messageingService if needed.
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -070093 // FIXME: workaround until it becomes a service.
94 try {
95 ((NettyMessagingService) messagingService).deactivate();
96 } catch (Exception e) {
97 log.error("NettyMessagingService#deactivate", e);
98 }
Madan Jampani890bc352014-10-01 22:35:29 -070099 log.info("Stopped");
100 }
101
102 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800103 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -0700104 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700105 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700106 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -0700107 if (!node.equals(localNode)) {
Madan Jampani4e729af2014-11-19 11:11:37 -0800108 ok = unicastUnchecked(message, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700109 }
110 }
111 return ok;
112 }
113
114 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800115 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800116 boolean ok = true;
117 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani4e729af2014-11-19 11:11:37 -0800118 ok = unicastUnchecked(message, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -0800119 }
120 return ok;
121 }
122
123 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800124 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -0700125 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700126 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani890bc352014-10-01 22:35:29 -0700127 for (NodeId nodeId : nodes) {
128 if (!nodeId.equals(localNode.id())) {
Madan Jampani98c17602014-10-23 15:33:23 -0700129 ok = unicastUnchecked(message, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700130 }
131 }
132 return ok;
133 }
134
135 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700136 public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700137 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700138 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
139 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
140 try {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700141 messagingService.sendAsync(nodeEp,
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700142 message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700143 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700144 } catch (IOException e) {
Yuta HIGUCHIa58451f2014-10-16 15:44:59 -0700145 log.trace("Failed to send cluster message to nodeId: " + toNodeId, e);
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700146 throw e;
Madan Jampani890bc352014-10-01 22:35:29 -0700147 }
Madan Jampani890bc352014-10-01 22:35:29 -0700148 }
149
Jonathan Hart7d656f42015-01-27 14:07:23 -0800150 private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) {
Madan Jampani98c17602014-10-23 15:33:23 -0700151 try {
152 return unicast(message, toNodeId);
153 } catch (IOException e) {
154 return false;
155 }
156 }
157
Madan Jampani890bc352014-10-01 22:35:29 -0700158 @Override
Madan Jampani24f9efb2014-10-24 18:56:23 -0700159 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700160 ControllerNode node = clusterService.getNode(toNodeId);
161 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
162 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
163 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700164 return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700165
166 } catch (IOException e) {
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -0800167 log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700168 throw e;
169 }
170 }
171
172 @Override
Madan Jampani890bc352014-10-01 22:35:29 -0700173 public void addSubscriber(MessageSubject subject,
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800174 ClusterMessageHandler subscriber) {
Madan Jampani890bc352014-10-01 22:35:29 -0700175 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
176 }
177
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800178 @Override
179 public void removeSubscriber(MessageSubject subject) {
180 messagingService.unregisterHandler(subject.value());
181 }
182
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700183 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700184
185 private final ClusterMessageHandler handler;
186
187 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
188 this.handler = handler;
189 }
190
191 @Override
192 public void handle(Message message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800193 final ClusterMessage clusterMessage;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700194 try {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800195 clusterMessage = SERIALIZER.decode(message.payload());
196 } catch (Exception e) {
197 log.error("Failed decoding ClusterMessage", e);
198 throw e;
199 }
200 try {
Madan Jampani8a895092014-10-17 16:55:50 -0700201 handler.handle(new InternalClusterMessage(clusterMessage, message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700202 } catch (Exception e) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800203 log.error("Exception caught handling {}", clusterMessage, e);
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700204 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700205 }
Madan Jampani890bc352014-10-01 22:35:29 -0700206 }
207 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700208
Madan Jampani8a895092014-10-17 16:55:50 -0700209 public static final class InternalClusterMessage extends ClusterMessage {
210
211 private final Message rawMessage;
212
213 public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
214 super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
215 this.rawMessage = rawMessage;
216 }
217
218 @Override
219 public void respond(byte[] response) throws IOException {
220 rawMessage.respond(response);
221 }
222 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800223}