blob: a3a8539f00b87599dac7035aef516913024317b6 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
Jordan Halterman2f7a5d02017-05-03 16:56:53 -070024import java.net.SocketException;
Jordan Haltermane9c37092017-03-21 11:16:14 -070025import java.nio.ByteBuffer;
Madan Jampani3289fbf2016-01-13 14:14:27 -080026import java.util.Map;
Madan Jampani3289fbf2016-01-13 14:14:27 -080027import java.util.concurrent.CompletableFuture;
Jordan Haltermane9c37092017-03-21 11:16:14 -070028import java.util.concurrent.ConcurrentHashMap;
Madan Jampani3289fbf2016-01-13 14:14:27 -080029import java.util.function.Consumer;
Jordan Haltermanfda46f92017-04-14 10:49:44 -070030import java.util.function.Function;
31
32import com.google.common.base.Throwables;
33import io.atomix.catalyst.concurrent.Listener;
34import io.atomix.catalyst.concurrent.Listeners;
35import io.atomix.catalyst.concurrent.ThreadContext;
36import io.atomix.catalyst.serializer.SerializationException;
37import io.atomix.catalyst.transport.Connection;
38import io.atomix.catalyst.transport.TransportException;
39import io.atomix.catalyst.util.reference.ReferenceCounted;
40import org.apache.commons.io.IOUtils;
41import org.onlab.util.Tools;
42import org.onosproject.cluster.PartitionId;
43import org.onosproject.store.cluster.messaging.Endpoint;
44import org.onosproject.store.cluster.messaging.MessagingException;
45import org.onosproject.store.cluster.messaging.MessagingService;
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
Madan Jampani3289fbf2016-01-13 14:14:27 -080048
Jonathan Hartad0c3022017-02-22 14:06:01 -080049import static com.google.common.base.Preconditions.checkNotNull;
Jordan Haltermane9c37092017-03-21 11:16:14 -070050import static org.onosproject.store.primitives.impl.CopycatTransport.CLOSE;
51import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
52import static org.onosproject.store.primitives.impl.CopycatTransport.MESSAGE;
53import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
Madan Jampani3289fbf2016-01-13 14:14:27 -080054
Madan Jampani3289fbf2016-01-13 14:14:27 -080055/**
Jordan Haltermane9c37092017-03-21 11:16:14 -070056 * Base Copycat Transport connection.
Madan Jampani3289fbf2016-01-13 14:14:27 -080057 */
58public class CopycatTransportConnection implements Connection {
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -070059 private static final int MAX_MESSAGE_SIZE = 1024 * 1024;
60
Jordan Haltermane9c37092017-03-21 11:16:14 -070061 private final Logger log = LoggerFactory.getLogger(getClass());
62 private final long connectionId;
63 private final String localSubject;
64 private final String remoteSubject;
65 private final PartitionId partitionId;
66 private final Endpoint endpoint;
67 private final MessagingService messagingService;
68 private final ThreadContext context;
69 private final Map<Class, InternalHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -080070 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
71 private final Listeners<Connection> closeListeners = new Listeners<>();
72
Jordan Haltermane9c37092017-03-21 11:16:14 -070073 CopycatTransportConnection(
74 long connectionId,
75 Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080076 PartitionId partitionId,
Jordan Haltermane9c37092017-03-21 11:16:14 -070077 Endpoint endpoint,
Madan Jampani3289fbf2016-01-13 14:14:27 -080078 MessagingService messagingService,
79 ThreadContext context) {
80 this.connectionId = connectionId;
Jordan Haltermane9c37092017-03-21 11:16:14 -070081 this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
82 this.localSubject = mode.getLocalSubject(partitionId, connectionId);
83 this.remoteSubject = mode.getRemoteSubject(partitionId, connectionId);
84 this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
85 this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
86 this.context = checkNotNull(context, "context cannot be null");
87 messagingService.registerHandler(localSubject, this::handle);
Madan Jampani3289fbf2016-01-13 14:14:27 -080088 }
89
90 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -070091 public CompletableFuture<Void> send(Object message) {
92 ThreadContext context = ThreadContext.currentContextOrThrow();
93 CompletableFuture<Void> future = new CompletableFuture<>();
94 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
95 DataOutputStream dos = new DataOutputStream(baos);
96 dos.writeByte(MESSAGE);
97 context.serializer().writeObject(message, baos);
98 if (message instanceof ReferenceCounted) {
99 ((ReferenceCounted<?>) message).release();
100 }
101
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700102 byte[] bytes = baos.toByteArray();
103 if (bytes.length > MAX_MESSAGE_SIZE) {
104 throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
105 }
106 messagingService.sendAsync(endpoint, remoteSubject, bytes)
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700107 .whenComplete((r, e) -> {
108 if (e != null) {
109 context.executor().execute(() -> future.completeExceptionally(e));
110 } else {
111 context.executor().execute(() -> future.complete(null));
112 }
113 });
114 } catch (SerializationException | IOException e) {
115 future.completeExceptionally(e);
116 }
117 return future;
118 }
119
120 @Override
121 public <T, U> CompletableFuture<U> sendAndReceive(T message) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800122 ThreadContext context = ThreadContext.currentContextOrThrow();
Jordan Haltermane9c37092017-03-21 11:16:14 -0700123 CompletableFuture<U> future = new CompletableFuture<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -0800124 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700125 DataOutputStream dos = new DataOutputStream(baos);
126 dos.writeByte(MESSAGE);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800127 context.serializer().writeObject(message, baos);
128 if (message instanceof ReferenceCounted) {
129 ((ReferenceCounted<?>) message).release();
130 }
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700131
132 byte[] bytes = baos.toByteArray();
133 if (bytes.length > MAX_MESSAGE_SIZE) {
134 throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
135 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700136 messagingService.sendAndReceive(endpoint,
137 remoteSubject,
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700138 bytes,
Madan Jampani3289fbf2016-01-13 14:14:27 -0800139 context.executor())
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700140 .whenComplete((response, error) -> handleResponse(response, error, future));
Madan Jampani0da01a42016-03-18 14:33:18 -0700141 } catch (SerializationException | IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700142 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800143 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700144 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800145 }
146
Jordan Haltermane9c37092017-03-21 11:16:14 -0700147 /**
148 * Handles a response received from the other side of the connection.
149 */
150 private <T> void handleResponse(
151 byte[] response,
152 Throwable error,
153 CompletableFuture<T> future) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800154 if (error != null) {
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700155 Throwable rootCause = Throwables.getRootCause(error);
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700156 if (rootCause instanceof MessagingException.NoRemoteHandler) {
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700157 future.completeExceptionally(new TransportException(error));
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700158 close(rootCause);
159 } else if (rootCause instanceof SocketException) {
160 future.completeExceptionally(new TransportException(error));
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700161 } else {
162 future.completeExceptionally(error);
163 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800164 return;
165 }
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700166
Madan Jampani3289fbf2016-01-13 14:14:27 -0800167 checkNotNull(response);
168 InputStream input = new ByteArrayInputStream(response);
169 try {
170 byte status = (byte) input.read();
171 if (status == FAILURE) {
172 Throwable t = context.serializer().readObject(input);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700173 future.completeExceptionally(t);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800174 } else {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700175 try {
176 future.complete(context.serializer().readObject(input));
177 } catch (SerializationException e) {
178 future.completeExceptionally(e);
179 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800180 }
181 } catch (IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700182 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800183 }
184 }
185
Jordan Haltermane9c37092017-03-21 11:16:14 -0700186 /**
187 * Handles a message sent to the connection.
188 */
189 private CompletableFuture<byte[]> handle(Endpoint sender, byte[] payload) {
190 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
191 byte type = input.readByte();
192 switch (type) {
193 case MESSAGE:
194 return handleMessage(IOUtils.toByteArray(input));
195 case CLOSE:
196 return handleClose();
197 default:
198 throw new IllegalStateException("Invalid message type");
199 }
200 } catch (IOException e) {
201 Throwables.propagate(e);
202 return null;
203 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800204 }
205
Jordan Haltermane9c37092017-03-21 11:16:14 -0700206 /**
207 * Handles a message from the other side of the connection.
208 */
209 @SuppressWarnings("unchecked")
210 private CompletableFuture<byte[]> handleMessage(byte[] message) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800211 try {
212 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
213 InternalHandler handler = handlers.get(request.getClass());
214 if (handler == null) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700215 log.warn("No handler registered on connection {}-{} for type {}",
216 partitionId, connectionId, request.getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800217 return Tools.exceptionalFuture(new IllegalStateException(
218 "No handler registered for " + request.getClass()));
219 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700220
Madan Jampani3289fbf2016-01-13 14:14:27 -0800221 return handler.handle(request).handle((result, error) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800222 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
223 baos.write(error != null ? FAILURE : SUCCESS);
224 context.serializer().writeObject(error != null ? error : result, baos);
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700225 byte[] bytes = baos.toByteArray();
226 if (bytes.length > MAX_MESSAGE_SIZE) {
227 throw new IllegalArgumentException("response exceeds maximum message size " + MAX_MESSAGE_SIZE);
228 }
229 return bytes;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800230 } catch (IOException e) {
231 Throwables.propagate(e);
232 return null;
233 }
234 });
235 } catch (Exception e) {
236 return Tools.exceptionalFuture(e);
237 }
238 }
239
Jordan Haltermane9c37092017-03-21 11:16:14 -0700240 /**
241 * Handles a close request from the other side of the connection.
242 */
243 private CompletableFuture<byte[]> handleClose() {
244 CompletableFuture<byte[]> future = new CompletableFuture<>();
245 context.executor().execute(() -> {
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700246 close(null);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700247 ByteBuffer responseBuffer = ByteBuffer.allocate(1);
248 responseBuffer.put(SUCCESS);
249 future.complete(responseBuffer.array());
250 });
251 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800252 }
253
254 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700255 public <T, U> Connection handler(Class<T> type, Consumer<T> handler) {
256 return handler(type, r -> {
257 handler.accept(r);
258 return null;
259 });
260 }
261
262 @Override
263 public <T, U> Connection handler(Class<T> type, Function<T, CompletableFuture<U>> handler) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700264 if (log.isTraceEnabled()) {
265 log.trace("Registered handler on connection {}-{}: {}", partitionId, connectionId, type);
266 }
267 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
268 return this;
269 }
270
271 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700272 public Listener<Throwable> onException(Consumer<Throwable> consumer) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700273 return exceptionListeners.add(consumer);
274 }
275
276 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700277 public Listener<Connection> onClose(Consumer<Connection> consumer) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700278 return closeListeners.add(consumer);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800279 }
280
281 @Override
282 public CompletableFuture<Void> close() {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700283 log.debug("Closing connection {}-{}", partitionId, connectionId);
284
285 ByteBuffer requestBuffer = ByteBuffer.allocate(1);
286 requestBuffer.put(CLOSE);
287
288 ThreadContext context = ThreadContext.currentContextOrThrow();
289 CompletableFuture<Void> future = new CompletableFuture<>();
290 messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
291 .whenComplete((payload, error) -> {
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700292 close(error);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700293 Throwable wrappedError = error;
294 if (error != null) {
295 Throwable rootCause = Throwables.getRootCause(error);
Jordan Haltermanb6ee9e92017-06-21 15:26:28 -0700296 if (rootCause instanceof MessagingException.NoRemoteHandler) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700297 wrappedError = new TransportException(error);
298 }
299 future.completeExceptionally(wrappedError);
300 } else {
301 ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
302 if (responseBuffer.get() == SUCCESS) {
303 future.complete(null);
304 } else {
305 future.completeExceptionally(new TransportException("Failed to close connection"));
306 }
307 }
308 });
309 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800310 }
311
Jordan Haltermane9c37092017-03-21 11:16:14 -0700312 /**
313 * Cleans up the connection, unregistering handlers registered on the MessagingService.
314 */
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700315 private void close(Throwable error) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700316 log.debug("Connection {}-{} closed", partitionId, connectionId);
317 messagingService.unregisterHandler(localSubject);
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700318 if (error != null) {
319 exceptionListeners.accept(error);
320 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700321 closeListeners.accept(this);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800322 }
323
Jordan Haltermane9c37092017-03-21 11:16:14 -0700324 /**
325 * Connection mode used to indicate whether this side of the connection is
326 * a client or server.
327 */
328 enum Mode {
329
330 /**
331 * Represents the client side of a bi-directional connection.
332 */
333 CLIENT {
334 @Override
335 String getLocalSubject(PartitionId partitionId, long connectionId) {
336 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
337 }
338
339 @Override
340 String getRemoteSubject(PartitionId partitionId, long connectionId) {
341 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
342 }
343 },
344
345 /**
346 * Represents the server side of a bi-directional connection.
347 */
348 SERVER {
349 @Override
350 String getLocalSubject(PartitionId partitionId, long connectionId) {
351 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
352 }
353
354 @Override
355 String getRemoteSubject(PartitionId partitionId, long connectionId) {
356 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
357 }
358 };
359
360 /**
361 * Returns the local messaging service subject for the connection in this mode.
362 * Subjects generated by the connection mode are guaranteed to be globally unique.
363 *
364 * @param partitionId the partition ID to which the connection belongs.
365 * @param connectionId the connection ID.
366 * @return the globally unique local subject for the connection.
367 */
368 abstract String getLocalSubject(PartitionId partitionId, long connectionId);
369
370 /**
371 * Returns the remote messaging service subject for the connection in this mode.
372 * Subjects generated by the connection mode are guaranteed to be globally unique.
373 *
374 * @param partitionId the partition ID to which the connection belongs.
375 * @param connectionId the connection ID.
376 * @return the globally unique remote subject for the connection.
377 */
378 abstract String getRemoteSubject(PartitionId partitionId, long connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800379 }
380
Jordan Haltermane9c37092017-03-21 11:16:14 -0700381 /**
382 * Internal container for a handler/context pair.
383 */
384 private static class InternalHandler {
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700385 private final Function handler;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800386 private final ThreadContext context;
387
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700388 InternalHandler(Function handler, ThreadContext context) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800389 this.handler = handler;
390 this.context = context;
391 }
392
393 @SuppressWarnings("unchecked")
Jordan Haltermane9c37092017-03-21 11:16:14 -0700394 CompletableFuture<Object> handle(Object message) {
395 CompletableFuture<Object> future = new CompletableFuture<>();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700396 context.executor().execute(() -> {
397 CompletableFuture<Object> responseFuture = (CompletableFuture<Object>) handler.apply(message);
398 if (responseFuture != null) {
399 responseFuture.whenComplete((r, e) -> {
400 if (e != null) {
401 future.completeExceptionally((Throwable) e);
402 } else {
403 future.complete(r);
404 }
405 });
Madan Jampani3289fbf2016-01-13 14:14:27 -0800406 }
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700407 });
Jordan Haltermane9c37092017-03-21 11:16:14 -0700408 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800409 }
410 }
411}