blob: 59671c0ddd3e3b437bf93d1a71b8dab80ea79b26 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.util.concurrent.ListenableFuture;
Madan Jampani2af244a2015-02-22 13:12:01 -080019
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070025import org.apache.felix.scr.annotations.Service;
Madan Jampani4e729af2014-11-19 11:11:37 -080026import org.onlab.netty.Endpoint;
27import org.onlab.netty.Message;
28import org.onlab.netty.MessageHandler;
29import org.onlab.netty.MessagingService;
30import org.onlab.netty.NettyMessagingService;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.ControllerNode;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35import org.onosproject.store.cluster.messaging.ClusterMessage;
36import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani890bc352014-10-01 22:35:29 -070038import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
Jonathan Hart7d656f42015-01-27 14:07:23 -080041import java.io.IOException;
42import java.util.Set;
Madan Jampani2af244a2015-02-22 13:12:01 -080043import java.util.concurrent.ExecutorService;
Jonathan Hart7d656f42015-01-27 14:07:23 -080044
45import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070046
Madan Jampani890bc352014-10-01 22:35:29 -070047@Component(immediate = true)
48@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070049public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070050 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070051
52 private final Logger log = LoggerFactory.getLogger(getClass());
53
Madan Jampania5d0d782014-10-07 14:36:00 -070054 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
55 private ClusterService clusterService;
56
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070057 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070058 private MessagingService messagingService;
59
60 @Activate
61 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070062 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080063 NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070064 // FIXME: workaround until it becomes a service.
65 try {
66 netty.activate();
67 } catch (Exception e) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070068 log.error("NettyMessagingService#activate", e);
69 }
70 messagingService = netty;
Yuta HIGUCHI1f8cd5f2014-11-04 23:48:55 -080071 log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070072 }
73
74 @Deactivate
75 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070076 // TODO: cleanup messageingService if needed.
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -070077 // FIXME: workaround until it becomes a service.
78 try {
79 ((NettyMessagingService) messagingService).deactivate();
80 } catch (Exception e) {
81 log.error("NettyMessagingService#deactivate", e);
82 }
Madan Jampani890bc352014-10-01 22:35:29 -070083 log.info("Stopped");
84 }
85
86 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080087 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -070088 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070089 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -080090 byte[] payload = message.getBytes();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070091 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070092 if (!node.equals(localNode)) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -080093 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -070094 }
95 }
96 return ok;
97 }
98
99 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800100 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800101 boolean ok = true;
Madan Jampaniba472232015-03-04 13:00:50 -0800102 byte[] payload = message.getBytes();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800103 for (ControllerNode node : clusterService.getNodes()) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800104 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -0800105 }
106 return ok;
107 }
108
109 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800110 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -0700111 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700112 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800113 byte[] payload = message.getBytes();
Madan Jampani890bc352014-10-01 22:35:29 -0700114 for (NodeId nodeId : nodes) {
115 if (!nodeId.equals(localNode.id())) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800116 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700117 }
118 }
119 return ok;
120 }
121
122 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700123 public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
Madan Jampaniba472232015-03-04 13:00:50 -0800124 return unicast(message.subject(), message.getBytes(), toNodeId);
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800125 }
126
127 private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700128 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700129 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800130 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -0700131 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800132 messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700133 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700134 } catch (IOException e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800135 log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700136 throw e;
Madan Jampani890bc352014-10-01 22:35:29 -0700137 }
Madan Jampani890bc352014-10-01 22:35:29 -0700138 }
139
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800140
141 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani98c17602014-10-23 15:33:23 -0700142 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800143 return unicast(subject, payload, toNodeId);
Madan Jampani98c17602014-10-23 15:33:23 -0700144 } catch (IOException e) {
145 return false;
146 }
147 }
148
Madan Jampani890bc352014-10-01 22:35:29 -0700149 @Override
Madan Jampani24f9efb2014-10-24 18:56:23 -0700150 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700151 ControllerNode node = clusterService.getNode(toNodeId);
152 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800153 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700154 try {
Madan Jampaniba472232015-03-04 13:00:50 -0800155 return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700156
157 } catch (IOException e) {
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -0800158 log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700159 throw e;
160 }
161 }
162
163 @Override
Madan Jampania14047d2015-02-25 12:23:02 -0800164 @Deprecated
Madan Jampani890bc352014-10-01 22:35:29 -0700165 public void addSubscriber(MessageSubject subject,
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800166 ClusterMessageHandler subscriber) {
Madan Jampani890bc352014-10-01 22:35:29 -0700167 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
168 }
169
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800170 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800171 public void addSubscriber(MessageSubject subject,
172 ClusterMessageHandler subscriber,
173 ExecutorService executor) {
174 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
175 }
176
177 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800178 public void removeSubscriber(MessageSubject subject) {
179 messagingService.unregisterHandler(subject.value());
180 }
181
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700182 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700183
184 private final ClusterMessageHandler handler;
185
186 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
187 this.handler = handler;
188 }
189
190 @Override
191 public void handle(Message message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800192 final ClusterMessage clusterMessage;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700193 try {
Madan Jampaniba472232015-03-04 13:00:50 -0800194 clusterMessage = ClusterMessage.fromBytes(message.payload());
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800195 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800196 log.error("Failed decoding {}", message, e);
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800197 throw e;
198 }
199 try {
Madan Jampani8a895092014-10-17 16:55:50 -0700200 handler.handle(new InternalClusterMessage(clusterMessage, message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700201 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800202 log.trace("Failed handling {}", clusterMessage, e);
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700203 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700204 }
Madan Jampani890bc352014-10-01 22:35:29 -0700205 }
206 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700207
Madan Jampani8a895092014-10-17 16:55:50 -0700208 public static final class InternalClusterMessage extends ClusterMessage {
209
210 private final Message rawMessage;
211
212 public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
213 super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
214 this.rawMessage = rawMessage;
215 }
216
217 @Override
218 public void respond(byte[] response) throws IOException {
219 rawMessage.respond(response);
220 }
221 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800222}