blob: b8596ae7f33437d0b980ef9cfe6818a7676e2869 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
Jordan Halterman2f7a5d02017-05-03 16:56:53 -070024import java.net.SocketException;
Jordan Haltermane9c37092017-03-21 11:16:14 -070025import java.nio.ByteBuffer;
Madan Jampani3289fbf2016-01-13 14:14:27 -080026import java.util.Map;
Madan Jampani3289fbf2016-01-13 14:14:27 -080027import java.util.concurrent.CompletableFuture;
Jordan Haltermane9c37092017-03-21 11:16:14 -070028import java.util.concurrent.ConcurrentHashMap;
Madan Jampani3289fbf2016-01-13 14:14:27 -080029import java.util.function.Consumer;
Jordan Haltermanfda46f92017-04-14 10:49:44 -070030import java.util.function.Function;
31
32import com.google.common.base.Throwables;
33import io.atomix.catalyst.concurrent.Listener;
34import io.atomix.catalyst.concurrent.Listeners;
35import io.atomix.catalyst.concurrent.ThreadContext;
36import io.atomix.catalyst.serializer.SerializationException;
37import io.atomix.catalyst.transport.Connection;
38import io.atomix.catalyst.transport.TransportException;
39import io.atomix.catalyst.util.reference.ReferenceCounted;
40import org.apache.commons.io.IOUtils;
41import org.onlab.util.Tools;
42import org.onosproject.cluster.PartitionId;
43import org.onosproject.store.cluster.messaging.Endpoint;
44import org.onosproject.store.cluster.messaging.MessagingException;
45import org.onosproject.store.cluster.messaging.MessagingService;
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
Madan Jampani3289fbf2016-01-13 14:14:27 -080048
Jonathan Hartad0c3022017-02-22 14:06:01 -080049import static com.google.common.base.Preconditions.checkNotNull;
Jordan Haltermane9c37092017-03-21 11:16:14 -070050import static org.onosproject.store.primitives.impl.CopycatTransport.CLOSE;
51import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
52import static org.onosproject.store.primitives.impl.CopycatTransport.MESSAGE;
53import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
Madan Jampani3289fbf2016-01-13 14:14:27 -080054
Madan Jampani3289fbf2016-01-13 14:14:27 -080055/**
Jordan Haltermane9c37092017-03-21 11:16:14 -070056 * Base Copycat Transport connection.
Madan Jampani3289fbf2016-01-13 14:14:27 -080057 */
58public class CopycatTransportConnection implements Connection {
Jordan Haltermane9c37092017-03-21 11:16:14 -070059 private final Logger log = LoggerFactory.getLogger(getClass());
60 private final long connectionId;
61 private final String localSubject;
62 private final String remoteSubject;
63 private final PartitionId partitionId;
64 private final Endpoint endpoint;
65 private final MessagingService messagingService;
66 private final ThreadContext context;
67 private final Map<Class, InternalHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -080068 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
69 private final Listeners<Connection> closeListeners = new Listeners<>();
70
Jordan Haltermane9c37092017-03-21 11:16:14 -070071 CopycatTransportConnection(
72 long connectionId,
73 Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080074 PartitionId partitionId,
Jordan Haltermane9c37092017-03-21 11:16:14 -070075 Endpoint endpoint,
Madan Jampani3289fbf2016-01-13 14:14:27 -080076 MessagingService messagingService,
77 ThreadContext context) {
78 this.connectionId = connectionId;
Jordan Haltermane9c37092017-03-21 11:16:14 -070079 this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
80 this.localSubject = mode.getLocalSubject(partitionId, connectionId);
81 this.remoteSubject = mode.getRemoteSubject(partitionId, connectionId);
82 this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
83 this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
84 this.context = checkNotNull(context, "context cannot be null");
85 messagingService.registerHandler(localSubject, this::handle);
Madan Jampani3289fbf2016-01-13 14:14:27 -080086 }
87
88 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -070089 public CompletableFuture<Void> send(Object message) {
90 ThreadContext context = ThreadContext.currentContextOrThrow();
91 CompletableFuture<Void> future = new CompletableFuture<>();
92 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
93 DataOutputStream dos = new DataOutputStream(baos);
94 dos.writeByte(MESSAGE);
95 context.serializer().writeObject(message, baos);
96 if (message instanceof ReferenceCounted) {
97 ((ReferenceCounted<?>) message).release();
98 }
99
100 messagingService.sendAsync(endpoint, remoteSubject, baos.toByteArray())
101 .whenComplete((r, e) -> {
102 if (e != null) {
103 context.executor().execute(() -> future.completeExceptionally(e));
104 } else {
105 context.executor().execute(() -> future.complete(null));
106 }
107 });
108 } catch (SerializationException | IOException e) {
109 future.completeExceptionally(e);
110 }
111 return future;
112 }
113
114 @Override
115 public <T, U> CompletableFuture<U> sendAndReceive(T message) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800116 ThreadContext context = ThreadContext.currentContextOrThrow();
Jordan Haltermane9c37092017-03-21 11:16:14 -0700117 CompletableFuture<U> future = new CompletableFuture<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -0800118 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700119 DataOutputStream dos = new DataOutputStream(baos);
120 dos.writeByte(MESSAGE);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800121 context.serializer().writeObject(message, baos);
122 if (message instanceof ReferenceCounted) {
123 ((ReferenceCounted<?>) message).release();
124 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700125 messagingService.sendAndReceive(endpoint,
126 remoteSubject,
Madan Jampani3289fbf2016-01-13 14:14:27 -0800127 baos.toByteArray(),
128 context.executor())
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700129 .whenComplete((response, error) -> handleResponse(response, error, future));
Madan Jampani0da01a42016-03-18 14:33:18 -0700130 } catch (SerializationException | IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700131 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800132 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700133 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800134 }
135
Jordan Haltermane9c37092017-03-21 11:16:14 -0700136 /**
137 * Handles a response received from the other side of the connection.
138 */
139 private <T> void handleResponse(
140 byte[] response,
141 Throwable error,
142 CompletableFuture<T> future) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800143 if (error != null) {
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700144 Throwable rootCause = Throwables.getRootCause(error);
145 if (rootCause instanceof MessagingException || rootCause instanceof SocketException) {
146 future.completeExceptionally(new TransportException(error));
147 if (rootCause instanceof MessagingException.NoRemoteHandler) {
148 close(rootCause);
149 }
150 } else {
151 future.completeExceptionally(error);
152 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800153 return;
154 }
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700155
Madan Jampani3289fbf2016-01-13 14:14:27 -0800156 checkNotNull(response);
157 InputStream input = new ByteArrayInputStream(response);
158 try {
159 byte status = (byte) input.read();
160 if (status == FAILURE) {
161 Throwable t = context.serializer().readObject(input);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700162 future.completeExceptionally(t);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800163 } else {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700164 try {
165 future.complete(context.serializer().readObject(input));
166 } catch (SerializationException e) {
167 future.completeExceptionally(e);
168 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800169 }
170 } catch (IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700171 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800172 }
173 }
174
Jordan Haltermane9c37092017-03-21 11:16:14 -0700175 /**
176 * Handles a message sent to the connection.
177 */
178 private CompletableFuture<byte[]> handle(Endpoint sender, byte[] payload) {
179 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
180 byte type = input.readByte();
181 switch (type) {
182 case MESSAGE:
183 return handleMessage(IOUtils.toByteArray(input));
184 case CLOSE:
185 return handleClose();
186 default:
187 throw new IllegalStateException("Invalid message type");
188 }
189 } catch (IOException e) {
190 Throwables.propagate(e);
191 return null;
192 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800193 }
194
Jordan Haltermane9c37092017-03-21 11:16:14 -0700195 /**
196 * Handles a message from the other side of the connection.
197 */
198 @SuppressWarnings("unchecked")
199 private CompletableFuture<byte[]> handleMessage(byte[] message) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800200 try {
201 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
202 InternalHandler handler = handlers.get(request.getClass());
203 if (handler == null) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700204 log.warn("No handler registered on connection {}-{} for type {}",
205 partitionId, connectionId, request.getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800206 return Tools.exceptionalFuture(new IllegalStateException(
207 "No handler registered for " + request.getClass()));
208 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700209
Madan Jampani3289fbf2016-01-13 14:14:27 -0800210 return handler.handle(request).handle((result, error) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800211 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
212 baos.write(error != null ? FAILURE : SUCCESS);
213 context.serializer().writeObject(error != null ? error : result, baos);
214 return baos.toByteArray();
215 } catch (IOException e) {
216 Throwables.propagate(e);
217 return null;
218 }
219 });
220 } catch (Exception e) {
221 return Tools.exceptionalFuture(e);
222 }
223 }
224
Jordan Haltermane9c37092017-03-21 11:16:14 -0700225 /**
226 * Handles a close request from the other side of the connection.
227 */
228 private CompletableFuture<byte[]> handleClose() {
229 CompletableFuture<byte[]> future = new CompletableFuture<>();
230 context.executor().execute(() -> {
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700231 close(null);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700232 ByteBuffer responseBuffer = ByteBuffer.allocate(1);
233 responseBuffer.put(SUCCESS);
234 future.complete(responseBuffer.array());
235 });
236 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800237 }
238
239 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700240 public <T, U> Connection handler(Class<T> type, Consumer<T> handler) {
241 return handler(type, r -> {
242 handler.accept(r);
243 return null;
244 });
245 }
246
247 @Override
248 public <T, U> Connection handler(Class<T> type, Function<T, CompletableFuture<U>> handler) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700249 if (log.isTraceEnabled()) {
250 log.trace("Registered handler on connection {}-{}: {}", partitionId, connectionId, type);
251 }
252 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
253 return this;
254 }
255
256 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700257 public Listener<Throwable> onException(Consumer<Throwable> consumer) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700258 return exceptionListeners.add(consumer);
259 }
260
261 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700262 public Listener<Connection> onClose(Consumer<Connection> consumer) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700263 return closeListeners.add(consumer);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800264 }
265
266 @Override
267 public CompletableFuture<Void> close() {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700268 log.debug("Closing connection {}-{}", partitionId, connectionId);
269
270 ByteBuffer requestBuffer = ByteBuffer.allocate(1);
271 requestBuffer.put(CLOSE);
272
273 ThreadContext context = ThreadContext.currentContextOrThrow();
274 CompletableFuture<Void> future = new CompletableFuture<>();
275 messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
276 .whenComplete((payload, error) -> {
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700277 close(error);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700278 Throwable wrappedError = error;
279 if (error != null) {
280 Throwable rootCause = Throwables.getRootCause(error);
281 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
282 wrappedError = new TransportException(error);
283 }
284 future.completeExceptionally(wrappedError);
285 } else {
286 ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
287 if (responseBuffer.get() == SUCCESS) {
288 future.complete(null);
289 } else {
290 future.completeExceptionally(new TransportException("Failed to close connection"));
291 }
292 }
293 });
294 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800295 }
296
Jordan Haltermane9c37092017-03-21 11:16:14 -0700297 /**
298 * Cleans up the connection, unregistering handlers registered on the MessagingService.
299 */
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700300 private void close(Throwable error) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700301 log.debug("Connection {}-{} closed", partitionId, connectionId);
302 messagingService.unregisterHandler(localSubject);
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700303 if (error != null) {
304 exceptionListeners.accept(error);
305 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700306 closeListeners.accept(this);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800307 }
308
Jordan Haltermane9c37092017-03-21 11:16:14 -0700309 /**
310 * Connection mode used to indicate whether this side of the connection is
311 * a client or server.
312 */
313 enum Mode {
314
315 /**
316 * Represents the client side of a bi-directional connection.
317 */
318 CLIENT {
319 @Override
320 String getLocalSubject(PartitionId partitionId, long connectionId) {
321 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
322 }
323
324 @Override
325 String getRemoteSubject(PartitionId partitionId, long connectionId) {
326 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
327 }
328 },
329
330 /**
331 * Represents the server side of a bi-directional connection.
332 */
333 SERVER {
334 @Override
335 String getLocalSubject(PartitionId partitionId, long connectionId) {
336 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
337 }
338
339 @Override
340 String getRemoteSubject(PartitionId partitionId, long connectionId) {
341 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
342 }
343 };
344
345 /**
346 * Returns the local messaging service subject for the connection in this mode.
347 * Subjects generated by the connection mode are guaranteed to be globally unique.
348 *
349 * @param partitionId the partition ID to which the connection belongs.
350 * @param connectionId the connection ID.
351 * @return the globally unique local subject for the connection.
352 */
353 abstract String getLocalSubject(PartitionId partitionId, long connectionId);
354
355 /**
356 * Returns the remote messaging service subject for the connection in this mode.
357 * Subjects generated by the connection mode are guaranteed to be globally unique.
358 *
359 * @param partitionId the partition ID to which the connection belongs.
360 * @param connectionId the connection ID.
361 * @return the globally unique remote subject for the connection.
362 */
363 abstract String getRemoteSubject(PartitionId partitionId, long connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800364 }
365
Jordan Haltermane9c37092017-03-21 11:16:14 -0700366 /**
367 * Internal container for a handler/context pair.
368 */
369 private static class InternalHandler {
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700370 private final Function handler;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800371 private final ThreadContext context;
372
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700373 InternalHandler(Function handler, ThreadContext context) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800374 this.handler = handler;
375 this.context = context;
376 }
377
378 @SuppressWarnings("unchecked")
Jordan Haltermane9c37092017-03-21 11:16:14 -0700379 CompletableFuture<Object> handle(Object message) {
380 CompletableFuture<Object> future = new CompletableFuture<>();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700381 context.executor().execute(() -> {
382 CompletableFuture<Object> responseFuture = (CompletableFuture<Object>) handler.apply(message);
383 if (responseFuture != null) {
384 responseFuture.whenComplete((r, e) -> {
385 if (e != null) {
386 future.completeExceptionally((Throwable) e);
387 } else {
388 future.complete(r);
389 }
390 });
Madan Jampani3289fbf2016-01-13 14:14:27 -0800391 }
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700392 });
Jordan Haltermane9c37092017-03-21 11:16:14 -0700393 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800394 }
395 }
396}