blob: 540de77f69c1680f754133931777b6a73a9b0827 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampani890bc352014-10-01 22:35:29 -070016package org.onlab.onos.store.cluster.messaging.impl;
17
18import static com.google.common.base.Preconditions.checkArgument;
19
20import java.io.IOException;
Madan Jampani890bc352014-10-01 22:35:29 -070021import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070022
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070028import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070029import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070030import org.onlab.onos.cluster.ControllerNode;
31import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070032import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
Madan Jampani890bc352014-10-01 22:35:29 -070033import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
34import org.onlab.onos.store.cluster.messaging.ClusterMessage;
35import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
36import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampanie0ec3292014-10-20 16:10:51 -070037import org.onlab.onos.store.serializers.ClusterMessageSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070038import org.onlab.onos.store.serializers.KryoNamespaces;
Madan Jampani53e44e62014-10-07 12:39:51 -070039import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampanie0ec3292014-10-20 16:10:51 -070040import org.onlab.onos.store.serializers.MessageSubjectSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070041import org.onlab.util.KryoNamespace;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070042import org.onlab.netty.Endpoint;
43import org.onlab.netty.Message;
44import org.onlab.netty.MessageHandler;
45import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070046import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070047import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
49
Madan Jampani24f9efb2014-10-24 18:56:23 -070050import com.google.common.util.concurrent.ListenableFuture;
51
Madan Jampani890bc352014-10-01 22:35:29 -070052@Component(immediate = true)
53@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070054public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070055 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070056
57 private final Logger log = LoggerFactory.getLogger(getClass());
58
Madan Jampania5d0d782014-10-07 14:36:00 -070059 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 private ClusterService clusterService;
61
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070062 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070063 private MessagingService messagingService;
64
Madan Jampani53e44e62014-10-07 12:39:51 -070065 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070066 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070067 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070068 serializerPool = KryoNamespace.newBuilder()
69 .register(KryoNamespaces.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070070 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070071 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070072 .register(byte[].class)
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070073 .register(MessageSubject.class, new MessageSubjectSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070074 .build()
75 .populate(1);
76 }
77
78 };
79
Madan Jampani890bc352014-10-01 22:35:29 -070080 @Activate
81 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070082 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani87100932014-10-21 16:46:12 -070083 NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070084 // FIXME: workaround until it becomes a service.
85 try {
86 netty.activate();
87 } catch (Exception e) {
88 // TODO Auto-generated catch block
89 log.error("NettyMessagingService#activate", e);
90 }
91 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070092 log.info("Started");
93 }
94
95 @Deactivate
96 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070097 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070098 log.info("Stopped");
99 }
100
101 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700102 public boolean broadcast(ClusterMessage message) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -0700103 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700104 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700105 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -0700106 if (!node.equals(localNode)) {
107 ok = unicast(message, node.id()) && ok;
108 }
109 }
110 return ok;
111 }
112
113 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700114 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -0700115 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700116 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani890bc352014-10-01 22:35:29 -0700117 for (NodeId nodeId : nodes) {
118 if (!nodeId.equals(localNode.id())) {
Madan Jampani98c17602014-10-23 15:33:23 -0700119 ok = unicastUnchecked(message, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700120 }
121 }
122 return ok;
123 }
124
125 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700126 public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700127 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700128 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
129 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
130 try {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700131 messagingService.sendAsync(nodeEp,
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700132 message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700133 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700134 } catch (IOException e) {
Yuta HIGUCHIa58451f2014-10-16 15:44:59 -0700135 log.trace("Failed to send cluster message to nodeId: " + toNodeId, e);
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700136 throw e;
Madan Jampani890bc352014-10-01 22:35:29 -0700137 }
Madan Jampani890bc352014-10-01 22:35:29 -0700138 }
139
Madan Jampani98c17602014-10-23 15:33:23 -0700140 private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) throws IOException {
141 try {
142 return unicast(message, toNodeId);
143 } catch (IOException e) {
144 return false;
145 }
146 }
147
Madan Jampani890bc352014-10-01 22:35:29 -0700148 @Override
Madan Jampani24f9efb2014-10-24 18:56:23 -0700149 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700150 ControllerNode node = clusterService.getNode(toNodeId);
151 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
152 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
153 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700154 return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700155
156 } catch (IOException e) {
157 log.error("Failed interaction with remote nodeId: " + toNodeId, e);
158 throw e;
159 }
160 }
161
162 @Override
Madan Jampani890bc352014-10-01 22:35:29 -0700163 public void addSubscriber(MessageSubject subject,
164 ClusterMessageHandler subscriber) {
165 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
166 }
167
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700168 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700169
170 private final ClusterMessageHandler handler;
171
172 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
173 this.handler = handler;
174 }
175
176 @Override
177 public void handle(Message message) {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700178 try {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700179 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
Madan Jampani8a895092014-10-17 16:55:50 -0700180 handler.handle(new InternalClusterMessage(clusterMessage, message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700181 } catch (Exception e) {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700182 log.error("Exception caught during ClusterMessageHandler", e);
183 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700184 }
Madan Jampani890bc352014-10-01 22:35:29 -0700185 }
186 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700187
Madan Jampani8a895092014-10-17 16:55:50 -0700188 public static final class InternalClusterMessage extends ClusterMessage {
189
190 private final Message rawMessage;
191
192 public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
193 super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
194 this.rawMessage = rawMessage;
195 }
196
197 @Override
198 public void respond(byte[] response) throws IOException {
199 rawMessage.respond(response);
200 }
201 }
Madan Jampani24f9efb2014-10-24 18:56:23 -0700202}