blob: 42a79b3ccdab6bea17ff8d37ac638e8bda5ee0b7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.util.concurrent.ListenableFuture;
Madan Jampani890bc352014-10-01 22:35:29 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070022import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070024import org.apache.felix.scr.annotations.Service;
Madan Jampani4e729af2014-11-19 11:11:37 -080025import org.onlab.netty.Endpoint;
26import org.onlab.netty.Message;
27import org.onlab.netty.MessageHandler;
28import org.onlab.netty.MessagingService;
29import org.onlab.netty.NettyMessagingService;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.ControllerNode;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
34import org.onosproject.store.cluster.messaging.ClusterMessage;
35import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
36import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani890bc352014-10-01 22:35:29 -070037import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
39
Jonathan Hart7d656f42015-01-27 14:07:23 -080040import java.io.IOException;
Madan Jampani2af244a2015-02-22 13:12:01 -080041import java.util.concurrent.ExecutorService;
Jonathan Hart7d656f42015-01-27 14:07:23 -080042
43import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070044
Madan Jampani890bc352014-10-01 22:35:29 -070045@Component(immediate = true)
46@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070047public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070048 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070049
50 private final Logger log = LoggerFactory.getLogger(getClass());
51
Madan Jampania5d0d782014-10-07 14:36:00 -070052 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ClusterService clusterService;
54
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070055 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070056 private MessagingService messagingService;
57
58 @Activate
59 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070060 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080061 NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070062 // FIXME: workaround until it becomes a service.
63 try {
64 netty.activate();
65 } catch (Exception e) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070066 log.error("NettyMessagingService#activate", e);
67 }
68 messagingService = netty;
Yuta HIGUCHI1f8cd5f2014-11-04 23:48:55 -080069 log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070070 }
71
72 @Deactivate
73 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070074 // TODO: cleanup messageingService if needed.
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -070075 // FIXME: workaround until it becomes a service.
76 try {
77 ((NettyMessagingService) messagingService).deactivate();
78 } catch (Exception e) {
79 log.error("NettyMessagingService#deactivate", e);
80 }
Madan Jampani890bc352014-10-01 22:35:29 -070081 log.info("Stopped");
82 }
83
84 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080085 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -070086 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070087 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -080088 byte[] payload = message.getBytes();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070089 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070090 if (!node.equals(localNode)) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -080091 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -070092 }
93 }
94 return ok;
95 }
96
97 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080098 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -080099 boolean ok = true;
Madan Jampaniba472232015-03-04 13:00:50 -0800100 byte[] payload = message.getBytes();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800101 for (ControllerNode node : clusterService.getNodes()) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800102 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -0800103 }
104 return ok;
105 }
106
107 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800108 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -0700109 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700110 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800111 byte[] payload = message.getBytes();
Madan Jampani890bc352014-10-01 22:35:29 -0700112 for (NodeId nodeId : nodes) {
113 if (!nodeId.equals(localNode.id())) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800114 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700115 }
116 }
117 return ok;
118 }
119
120 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800121 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
122 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800123 }
124
125 private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700126 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700127 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800128 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -0700129 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800130 messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700131 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700132 } catch (IOException e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800133 log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700134 throw e;
Madan Jampani890bc352014-10-01 22:35:29 -0700135 }
Madan Jampani890bc352014-10-01 22:35:29 -0700136 }
137
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800138 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani98c17602014-10-23 15:33:23 -0700139 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800140 return unicast(subject, payload, toNodeId);
Madan Jampani98c17602014-10-23 15:33:23 -0700141 } catch (IOException e) {
142 return false;
143 }
144 }
145
Madan Jampani890bc352014-10-01 22:35:29 -0700146 @Override
Madan Jampani24f9efb2014-10-24 18:56:23 -0700147 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700148 ControllerNode node = clusterService.getNode(toNodeId);
149 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800150 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700151 try {
Madan Jampaniba472232015-03-04 13:00:50 -0800152 return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700153
154 } catch (IOException e) {
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -0800155 log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700156 throw e;
157 }
158 }
159
160 @Override
Madan Jampania14047d2015-02-25 12:23:02 -0800161 @Deprecated
Madan Jampani890bc352014-10-01 22:35:29 -0700162 public void addSubscriber(MessageSubject subject,
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800163 ClusterMessageHandler subscriber) {
Madan Jampani890bc352014-10-01 22:35:29 -0700164 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
165 }
166
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800167 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800168 public void addSubscriber(MessageSubject subject,
169 ClusterMessageHandler subscriber,
170 ExecutorService executor) {
171 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
172 }
173
174 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800175 public void removeSubscriber(MessageSubject subject) {
176 messagingService.unregisterHandler(subject.value());
177 }
178
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700179 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700180
181 private final ClusterMessageHandler handler;
182
183 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
184 this.handler = handler;
185 }
186
187 @Override
188 public void handle(Message message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800189 final ClusterMessage clusterMessage;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700190 try {
Madan Jampaniba472232015-03-04 13:00:50 -0800191 clusterMessage = ClusterMessage.fromBytes(message.payload());
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800192 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800193 log.error("Failed decoding {}", message, e);
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800194 throw e;
195 }
196 try {
Madan Jampani8a895092014-10-17 16:55:50 -0700197 handler.handle(new InternalClusterMessage(clusterMessage, message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700198 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800199 log.trace("Failed handling {}", clusterMessage, e);
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700200 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700201 }
Madan Jampani890bc352014-10-01 22:35:29 -0700202 }
203 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700204
Madan Jampani8a895092014-10-17 16:55:50 -0700205 public static final class InternalClusterMessage extends ClusterMessage {
206
207 private final Message rawMessage;
208
209 public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
210 super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
211 this.rawMessage = rawMessage;
212 }
213
214 @Override
215 public void respond(byte[] response) throws IOException {
216 rawMessage.respond(response);
217 }
218 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800219}