blob: 9d7edd063ed5c20dca4e1ab4009f61bf832e685c [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jonathan Hartad0c3022017-02-22 14:06:01 -080018import com.google.common.base.Throwables;
Madan Jampani630e7ac2016-05-31 11:34:05 -070019import io.atomix.catalyst.concurrent.Listener;
20import io.atomix.catalyst.concurrent.Listeners;
21import io.atomix.catalyst.concurrent.ThreadContext;
22import io.atomix.catalyst.serializer.SerializationException;
Madan Jampani630e7ac2016-05-31 11:34:05 -070023import io.atomix.catalyst.transport.Connection;
24import io.atomix.catalyst.transport.MessageHandler;
25import io.atomix.catalyst.transport.TransportException;
Madan Jampani630e7ac2016-05-31 11:34:05 -070026import io.atomix.catalyst.util.reference.ReferenceCounted;
Jonathan Hartad0c3022017-02-22 14:06:01 -080027import org.apache.commons.io.IOUtils;
28import org.onlab.util.Tools;
29import org.onosproject.cluster.PartitionId;
Jordan Haltermane9c37092017-03-21 11:16:14 -070030import org.onosproject.store.cluster.messaging.Endpoint;
Jonathan Hartad0c3022017-02-22 14:06:01 -080031import org.onosproject.store.cluster.messaging.MessagingException;
32import org.onosproject.store.cluster.messaging.MessagingService;
Jordan Haltermane9c37092017-03-21 11:16:14 -070033import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
Madan Jampani630e7ac2016-05-31 11:34:05 -070035
Madan Jampani3289fbf2016-01-13 14:14:27 -080036import java.io.ByteArrayInputStream;
37import java.io.ByteArrayOutputStream;
38import java.io.DataInputStream;
39import java.io.DataOutputStream;
40import java.io.IOException;
41import java.io.InputStream;
Jordan Haltermane9c37092017-03-21 11:16:14 -070042import java.nio.ByteBuffer;
Madan Jampani3289fbf2016-01-13 14:14:27 -080043import java.util.Map;
Madan Jampani3289fbf2016-01-13 14:14:27 -080044import java.util.concurrent.CompletableFuture;
Jordan Haltermane9c37092017-03-21 11:16:14 -070045import java.util.concurrent.ConcurrentHashMap;
Madan Jampani3289fbf2016-01-13 14:14:27 -080046import java.util.function.Consumer;
47
Jonathan Hartad0c3022017-02-22 14:06:01 -080048import static com.google.common.base.Preconditions.checkNotNull;
Jordan Haltermane9c37092017-03-21 11:16:14 -070049import static org.onosproject.store.primitives.impl.CopycatTransport.CLOSE;
50import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
51import static org.onosproject.store.primitives.impl.CopycatTransport.MESSAGE;
52import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
Madan Jampani3289fbf2016-01-13 14:14:27 -080053
Madan Jampani3289fbf2016-01-13 14:14:27 -080054/**
Jordan Haltermane9c37092017-03-21 11:16:14 -070055 * Base Copycat Transport connection.
Madan Jampani3289fbf2016-01-13 14:14:27 -080056 */
57public class CopycatTransportConnection implements Connection {
Jordan Haltermane9c37092017-03-21 11:16:14 -070058 private final Logger log = LoggerFactory.getLogger(getClass());
59 private final long connectionId;
60 private final String localSubject;
61 private final String remoteSubject;
62 private final PartitionId partitionId;
63 private final Endpoint endpoint;
64 private final MessagingService messagingService;
65 private final ThreadContext context;
66 private final Map<Class, InternalHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -080067 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
68 private final Listeners<Connection> closeListeners = new Listeners<>();
69
Jordan Haltermane9c37092017-03-21 11:16:14 -070070 CopycatTransportConnection(
71 long connectionId,
72 Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080073 PartitionId partitionId,
Jordan Haltermane9c37092017-03-21 11:16:14 -070074 Endpoint endpoint,
Madan Jampani3289fbf2016-01-13 14:14:27 -080075 MessagingService messagingService,
76 ThreadContext context) {
77 this.connectionId = connectionId;
Jordan Haltermane9c37092017-03-21 11:16:14 -070078 this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
79 this.localSubject = mode.getLocalSubject(partitionId, connectionId);
80 this.remoteSubject = mode.getRemoteSubject(partitionId, connectionId);
81 this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
82 this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
83 this.context = checkNotNull(context, "context cannot be null");
84 messagingService.registerHandler(localSubject, this::handle);
Madan Jampani3289fbf2016-01-13 14:14:27 -080085 }
86
87 @Override
88 public <T, U> CompletableFuture<U> send(T message) {
89 ThreadContext context = ThreadContext.currentContextOrThrow();
Jordan Haltermane9c37092017-03-21 11:16:14 -070090 CompletableFuture<U> future = new CompletableFuture<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -080091 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
Jordan Haltermane9c37092017-03-21 11:16:14 -070092 DataOutputStream dos = new DataOutputStream(baos);
93 dos.writeByte(MESSAGE);
Madan Jampani3289fbf2016-01-13 14:14:27 -080094 context.serializer().writeObject(message, baos);
95 if (message instanceof ReferenceCounted) {
96 ((ReferenceCounted<?>) message).release();
97 }
Jordan Haltermane9c37092017-03-21 11:16:14 -070098 messagingService.sendAndReceive(endpoint,
99 remoteSubject,
Madan Jampani3289fbf2016-01-13 14:14:27 -0800100 baos.toByteArray(),
101 context.executor())
102 .whenComplete((r, e) -> {
Madan Jampani6f743712016-03-26 11:20:25 -0700103 Throwable wrappedError = e;
104 if (e != null) {
105 Throwable rootCause = Throwables.getRootCause(e);
106 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
107 wrappedError = new TransportException(e);
108 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800109 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700110 handleResponse(r, wrappedError, future);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800111 });
Madan Jampani0da01a42016-03-18 14:33:18 -0700112 } catch (SerializationException | IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700113 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800114 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700115 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800116 }
117
Jordan Haltermane9c37092017-03-21 11:16:14 -0700118 /**
119 * Handles a response received from the other side of the connection.
120 */
121 private <T> void handleResponse(
122 byte[] response,
123 Throwable error,
124 CompletableFuture<T> future) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800125 if (error != null) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700126 future.completeExceptionally(error);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800127 return;
128 }
129 checkNotNull(response);
130 InputStream input = new ByteArrayInputStream(response);
131 try {
132 byte status = (byte) input.read();
133 if (status == FAILURE) {
134 Throwable t = context.serializer().readObject(input);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700135 future.completeExceptionally(t);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800136 } else {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700137 try {
138 future.complete(context.serializer().readObject(input));
139 } catch (SerializationException e) {
140 future.completeExceptionally(e);
141 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800142 }
143 } catch (IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700144 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800145 }
146 }
147
Jordan Haltermane9c37092017-03-21 11:16:14 -0700148 /**
149 * Handles a message sent to the connection.
150 */
151 private CompletableFuture<byte[]> handle(Endpoint sender, byte[] payload) {
152 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
153 byte type = input.readByte();
154 switch (type) {
155 case MESSAGE:
156 return handleMessage(IOUtils.toByteArray(input));
157 case CLOSE:
158 return handleClose();
159 default:
160 throw new IllegalStateException("Invalid message type");
161 }
162 } catch (IOException e) {
163 Throwables.propagate(e);
164 return null;
165 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800166 }
167
Jordan Haltermane9c37092017-03-21 11:16:14 -0700168 /**
169 * Handles a message from the other side of the connection.
170 */
171 @SuppressWarnings("unchecked")
172 private CompletableFuture<byte[]> handleMessage(byte[] message) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800173 try {
174 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
175 InternalHandler handler = handlers.get(request.getClass());
176 if (handler == null) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700177 log.warn("No handler registered on connection {}-{} for type {}",
178 partitionId, connectionId, request.getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800179 return Tools.exceptionalFuture(new IllegalStateException(
180 "No handler registered for " + request.getClass()));
181 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700182
Madan Jampani3289fbf2016-01-13 14:14:27 -0800183 return handler.handle(request).handle((result, error) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800184 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
185 baos.write(error != null ? FAILURE : SUCCESS);
186 context.serializer().writeObject(error != null ? error : result, baos);
187 return baos.toByteArray();
188 } catch (IOException e) {
189 Throwables.propagate(e);
190 return null;
191 }
192 });
193 } catch (Exception e) {
194 return Tools.exceptionalFuture(e);
195 }
196 }
197
Jordan Haltermane9c37092017-03-21 11:16:14 -0700198 /**
199 * Handles a close request from the other side of the connection.
200 */
201 private CompletableFuture<byte[]> handleClose() {
202 CompletableFuture<byte[]> future = new CompletableFuture<>();
203 context.executor().execute(() -> {
204 cleanup();
205 ByteBuffer responseBuffer = ByteBuffer.allocate(1);
206 responseBuffer.put(SUCCESS);
207 future.complete(responseBuffer.array());
208 });
209 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800210 }
211
212 @Override
Jordan Haltermane9c37092017-03-21 11:16:14 -0700213 public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
214 if (log.isTraceEnabled()) {
215 log.trace("Registered handler on connection {}-{}: {}", partitionId, connectionId, type);
216 }
217 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
218 return this;
219 }
220
221 @Override
222 public Listener<Throwable> exceptionListener(Consumer<Throwable> consumer) {
223 return exceptionListeners.add(consumer);
224 }
225
226 @Override
227 public Listener<Connection> closeListener(Consumer<Connection> consumer) {
228 return closeListeners.add(consumer);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800229 }
230
231 @Override
232 public CompletableFuture<Void> close() {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700233 log.debug("Closing connection {}-{}", partitionId, connectionId);
234
235 ByteBuffer requestBuffer = ByteBuffer.allocate(1);
236 requestBuffer.put(CLOSE);
237
238 ThreadContext context = ThreadContext.currentContextOrThrow();
239 CompletableFuture<Void> future = new CompletableFuture<>();
240 messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
241 .whenComplete((payload, error) -> {
242 cleanup();
243 Throwable wrappedError = error;
244 if (error != null) {
245 Throwable rootCause = Throwables.getRootCause(error);
246 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
247 wrappedError = new TransportException(error);
248 }
249 future.completeExceptionally(wrappedError);
250 } else {
251 ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
252 if (responseBuffer.get() == SUCCESS) {
253 future.complete(null);
254 } else {
255 future.completeExceptionally(new TransportException("Failed to close connection"));
256 }
257 }
258 });
259 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800260 }
261
Jordan Haltermane9c37092017-03-21 11:16:14 -0700262 /**
263 * Cleans up the connection, unregistering handlers registered on the MessagingService.
264 */
265 private void cleanup() {
266 log.debug("Connection {}-{} closed", partitionId, connectionId);
267 messagingService.unregisterHandler(localSubject);
268 closeListeners.accept(this);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800269 }
270
Jordan Haltermane9c37092017-03-21 11:16:14 -0700271 /**
272 * Connection mode used to indicate whether this side of the connection is
273 * a client or server.
274 */
275 enum Mode {
276
277 /**
278 * Represents the client side of a bi-directional connection.
279 */
280 CLIENT {
281 @Override
282 String getLocalSubject(PartitionId partitionId, long connectionId) {
283 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
284 }
285
286 @Override
287 String getRemoteSubject(PartitionId partitionId, long connectionId) {
288 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
289 }
290 },
291
292 /**
293 * Represents the server side of a bi-directional connection.
294 */
295 SERVER {
296 @Override
297 String getLocalSubject(PartitionId partitionId, long connectionId) {
298 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
299 }
300
301 @Override
302 String getRemoteSubject(PartitionId partitionId, long connectionId) {
303 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
304 }
305 };
306
307 /**
308 * Returns the local messaging service subject for the connection in this mode.
309 * Subjects generated by the connection mode are guaranteed to be globally unique.
310 *
311 * @param partitionId the partition ID to which the connection belongs.
312 * @param connectionId the connection ID.
313 * @return the globally unique local subject for the connection.
314 */
315 abstract String getLocalSubject(PartitionId partitionId, long connectionId);
316
317 /**
318 * Returns the remote messaging service subject for the connection in this mode.
319 * Subjects generated by the connection mode are guaranteed to be globally unique.
320 *
321 * @param partitionId the partition ID to which the connection belongs.
322 * @param connectionId the connection ID.
323 * @return the globally unique remote subject for the connection.
324 */
325 abstract String getRemoteSubject(PartitionId partitionId, long connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800326 }
327
Jordan Haltermane9c37092017-03-21 11:16:14 -0700328 /**
329 * Internal container for a handler/context pair.
330 */
331 private static class InternalHandler {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800332 private final MessageHandler handler;
333 private final ThreadContext context;
334
Jordan Haltermane9c37092017-03-21 11:16:14 -0700335 InternalHandler(MessageHandler handler, ThreadContext context) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800336 this.handler = handler;
337 this.context = context;
338 }
339
340 @SuppressWarnings("unchecked")
Jordan Haltermane9c37092017-03-21 11:16:14 -0700341 CompletableFuture<Object> handle(Object message) {
342 CompletableFuture<Object> future = new CompletableFuture<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -0800343 context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
344 if (e != null) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700345 future.completeExceptionally((Throwable) e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800346 } else {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700347 future.complete(r);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800348 }
349 }));
Jordan Haltermane9c37092017-03-21 11:16:14 -0700350 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800351 }
352 }
353}