blob: eebbf9c8d3176ce68a025731573ff72375e17a1b [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
Jordan Haltermane9c37092017-03-21 11:16:14 -070024import java.nio.ByteBuffer;
Madan Jampani3289fbf2016-01-13 14:14:27 -080025import java.util.Map;
Madan Jampani3289fbf2016-01-13 14:14:27 -080026import java.util.concurrent.CompletableFuture;
Jordan Haltermane9c37092017-03-21 11:16:14 -070027import java.util.concurrent.ConcurrentHashMap;
Madan Jampani3289fbf2016-01-13 14:14:27 -080028import java.util.function.Consumer;
Jordan Haltermanfda46f92017-04-14 10:49:44 -070029import java.util.function.Function;
30
31import com.google.common.base.Throwables;
32import io.atomix.catalyst.concurrent.Listener;
33import io.atomix.catalyst.concurrent.Listeners;
34import io.atomix.catalyst.concurrent.ThreadContext;
35import io.atomix.catalyst.serializer.SerializationException;
36import io.atomix.catalyst.transport.Connection;
37import io.atomix.catalyst.transport.TransportException;
38import io.atomix.catalyst.util.reference.ReferenceCounted;
39import org.apache.commons.io.IOUtils;
40import org.onlab.util.Tools;
41import org.onosproject.cluster.PartitionId;
42import org.onosproject.store.cluster.messaging.Endpoint;
43import org.onosproject.store.cluster.messaging.MessagingException;
44import org.onosproject.store.cluster.messaging.MessagingService;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
Madan Jampani3289fbf2016-01-13 14:14:27 -080047
Jonathan Hartad0c3022017-02-22 14:06:01 -080048import static com.google.common.base.Preconditions.checkNotNull;
Jordan Haltermane9c37092017-03-21 11:16:14 -070049import static org.onosproject.store.primitives.impl.CopycatTransport.CLOSE;
50import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
51import static org.onosproject.store.primitives.impl.CopycatTransport.MESSAGE;
52import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
Madan Jampani3289fbf2016-01-13 14:14:27 -080053
Madan Jampani3289fbf2016-01-13 14:14:27 -080054/**
Jordan Haltermane9c37092017-03-21 11:16:14 -070055 * Base Copycat Transport connection.
Madan Jampani3289fbf2016-01-13 14:14:27 -080056 */
57public class CopycatTransportConnection implements Connection {
Jordan Haltermane9c37092017-03-21 11:16:14 -070058 private final Logger log = LoggerFactory.getLogger(getClass());
59 private final long connectionId;
60 private final String localSubject;
61 private final String remoteSubject;
62 private final PartitionId partitionId;
63 private final Endpoint endpoint;
64 private final MessagingService messagingService;
65 private final ThreadContext context;
66 private final Map<Class, InternalHandler> handlers = new ConcurrentHashMap<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -080067 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
68 private final Listeners<Connection> closeListeners = new Listeners<>();
69
Jordan Haltermane9c37092017-03-21 11:16:14 -070070 CopycatTransportConnection(
71 long connectionId,
72 Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080073 PartitionId partitionId,
Jordan Haltermane9c37092017-03-21 11:16:14 -070074 Endpoint endpoint,
Madan Jampani3289fbf2016-01-13 14:14:27 -080075 MessagingService messagingService,
76 ThreadContext context) {
77 this.connectionId = connectionId;
Jordan Haltermane9c37092017-03-21 11:16:14 -070078 this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
79 this.localSubject = mode.getLocalSubject(partitionId, connectionId);
80 this.remoteSubject = mode.getRemoteSubject(partitionId, connectionId);
81 this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
82 this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
83 this.context = checkNotNull(context, "context cannot be null");
84 messagingService.registerHandler(localSubject, this::handle);
Madan Jampani3289fbf2016-01-13 14:14:27 -080085 }
86
87 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -070088 public CompletableFuture<Void> send(Object message) {
89 ThreadContext context = ThreadContext.currentContextOrThrow();
90 CompletableFuture<Void> future = new CompletableFuture<>();
91 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
92 DataOutputStream dos = new DataOutputStream(baos);
93 dos.writeByte(MESSAGE);
94 context.serializer().writeObject(message, baos);
95 if (message instanceof ReferenceCounted) {
96 ((ReferenceCounted<?>) message).release();
97 }
98
99 messagingService.sendAsync(endpoint, remoteSubject, baos.toByteArray())
100 .whenComplete((r, e) -> {
101 if (e != null) {
102 context.executor().execute(() -> future.completeExceptionally(e));
103 } else {
104 context.executor().execute(() -> future.complete(null));
105 }
106 });
107 } catch (SerializationException | IOException e) {
108 future.completeExceptionally(e);
109 }
110 return future;
111 }
112
113 @Override
114 public <T, U> CompletableFuture<U> sendAndReceive(T message) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800115 ThreadContext context = ThreadContext.currentContextOrThrow();
Jordan Haltermane9c37092017-03-21 11:16:14 -0700116 CompletableFuture<U> future = new CompletableFuture<>();
Madan Jampani3289fbf2016-01-13 14:14:27 -0800117 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700118 DataOutputStream dos = new DataOutputStream(baos);
119 dos.writeByte(MESSAGE);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800120 context.serializer().writeObject(message, baos);
121 if (message instanceof ReferenceCounted) {
122 ((ReferenceCounted<?>) message).release();
123 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700124 messagingService.sendAndReceive(endpoint,
125 remoteSubject,
Madan Jampani3289fbf2016-01-13 14:14:27 -0800126 baos.toByteArray(),
127 context.executor())
128 .whenComplete((r, e) -> {
Madan Jampani6f743712016-03-26 11:20:25 -0700129 Throwable wrappedError = e;
130 if (e != null) {
131 Throwable rootCause = Throwables.getRootCause(e);
132 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
133 wrappedError = new TransportException(e);
134 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800135 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700136 handleResponse(r, wrappedError, future);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800137 });
Madan Jampani0da01a42016-03-18 14:33:18 -0700138 } catch (SerializationException | IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700139 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800140 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700141 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800142 }
143
Jordan Haltermane9c37092017-03-21 11:16:14 -0700144 /**
145 * Handles a response received from the other side of the connection.
146 */
147 private <T> void handleResponse(
148 byte[] response,
149 Throwable error,
150 CompletableFuture<T> future) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800151 if (error != null) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700152 future.completeExceptionally(error);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800153 return;
154 }
155 checkNotNull(response);
156 InputStream input = new ByteArrayInputStream(response);
157 try {
158 byte status = (byte) input.read();
159 if (status == FAILURE) {
160 Throwable t = context.serializer().readObject(input);
Jordan Haltermane9c37092017-03-21 11:16:14 -0700161 future.completeExceptionally(t);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800162 } else {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700163 try {
164 future.complete(context.serializer().readObject(input));
165 } catch (SerializationException e) {
166 future.completeExceptionally(e);
167 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800168 }
169 } catch (IOException e) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700170 future.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800171 }
172 }
173
Jordan Haltermane9c37092017-03-21 11:16:14 -0700174 /**
175 * Handles a message sent to the connection.
176 */
177 private CompletableFuture<byte[]> handle(Endpoint sender, byte[] payload) {
178 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
179 byte type = input.readByte();
180 switch (type) {
181 case MESSAGE:
182 return handleMessage(IOUtils.toByteArray(input));
183 case CLOSE:
184 return handleClose();
185 default:
186 throw new IllegalStateException("Invalid message type");
187 }
188 } catch (IOException e) {
189 Throwables.propagate(e);
190 return null;
191 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800192 }
193
Jordan Haltermane9c37092017-03-21 11:16:14 -0700194 /**
195 * Handles a message from the other side of the connection.
196 */
197 @SuppressWarnings("unchecked")
198 private CompletableFuture<byte[]> handleMessage(byte[] message) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800199 try {
200 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
201 InternalHandler handler = handlers.get(request.getClass());
202 if (handler == null) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700203 log.warn("No handler registered on connection {}-{} for type {}",
204 partitionId, connectionId, request.getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800205 return Tools.exceptionalFuture(new IllegalStateException(
206 "No handler registered for " + request.getClass()));
207 }
Jordan Haltermane9c37092017-03-21 11:16:14 -0700208
Madan Jampani3289fbf2016-01-13 14:14:27 -0800209 return handler.handle(request).handle((result, error) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800210 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
211 baos.write(error != null ? FAILURE : SUCCESS);
212 context.serializer().writeObject(error != null ? error : result, baos);
213 return baos.toByteArray();
214 } catch (IOException e) {
215 Throwables.propagate(e);
216 return null;
217 }
218 });
219 } catch (Exception e) {
220 return Tools.exceptionalFuture(e);
221 }
222 }
223
Jordan Haltermane9c37092017-03-21 11:16:14 -0700224 /**
225 * Handles a close request from the other side of the connection.
226 */
227 private CompletableFuture<byte[]> handleClose() {
228 CompletableFuture<byte[]> future = new CompletableFuture<>();
229 context.executor().execute(() -> {
230 cleanup();
231 ByteBuffer responseBuffer = ByteBuffer.allocate(1);
232 responseBuffer.put(SUCCESS);
233 future.complete(responseBuffer.array());
234 });
235 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800236 }
237
238 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700239 public <T, U> Connection handler(Class<T> type, Consumer<T> handler) {
240 return handler(type, r -> {
241 handler.accept(r);
242 return null;
243 });
244 }
245
246 @Override
247 public <T, U> Connection handler(Class<T> type, Function<T, CompletableFuture<U>> handler) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700248 if (log.isTraceEnabled()) {
249 log.trace("Registered handler on connection {}-{}: {}", partitionId, connectionId, type);
250 }
251 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
252 return this;
253 }
254
255 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700256 public Listener<Throwable> onException(Consumer<Throwable> consumer) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700257 return exceptionListeners.add(consumer);
258 }
259
260 @Override
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700261 public Listener<Connection> onClose(Consumer<Connection> consumer) {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700262 return closeListeners.add(consumer);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800263 }
264
265 @Override
266 public CompletableFuture<Void> close() {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700267 log.debug("Closing connection {}-{}", partitionId, connectionId);
268
269 ByteBuffer requestBuffer = ByteBuffer.allocate(1);
270 requestBuffer.put(CLOSE);
271
272 ThreadContext context = ThreadContext.currentContextOrThrow();
273 CompletableFuture<Void> future = new CompletableFuture<>();
274 messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
275 .whenComplete((payload, error) -> {
276 cleanup();
277 Throwable wrappedError = error;
278 if (error != null) {
279 Throwable rootCause = Throwables.getRootCause(error);
280 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
281 wrappedError = new TransportException(error);
282 }
283 future.completeExceptionally(wrappedError);
284 } else {
285 ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
286 if (responseBuffer.get() == SUCCESS) {
287 future.complete(null);
288 } else {
289 future.completeExceptionally(new TransportException("Failed to close connection"));
290 }
291 }
292 });
293 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800294 }
295
Jordan Haltermane9c37092017-03-21 11:16:14 -0700296 /**
297 * Cleans up the connection, unregistering handlers registered on the MessagingService.
298 */
299 private void cleanup() {
300 log.debug("Connection {}-{} closed", partitionId, connectionId);
301 messagingService.unregisterHandler(localSubject);
302 closeListeners.accept(this);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800303 }
304
Jordan Haltermane9c37092017-03-21 11:16:14 -0700305 /**
306 * Connection mode used to indicate whether this side of the connection is
307 * a client or server.
308 */
309 enum Mode {
310
311 /**
312 * Represents the client side of a bi-directional connection.
313 */
314 CLIENT {
315 @Override
316 String getLocalSubject(PartitionId partitionId, long connectionId) {
317 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
318 }
319
320 @Override
321 String getRemoteSubject(PartitionId partitionId, long connectionId) {
322 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
323 }
324 },
325
326 /**
327 * Represents the server side of a bi-directional connection.
328 */
329 SERVER {
330 @Override
331 String getLocalSubject(PartitionId partitionId, long connectionId) {
332 return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
333 }
334
335 @Override
336 String getRemoteSubject(PartitionId partitionId, long connectionId) {
337 return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
338 }
339 };
340
341 /**
342 * Returns the local messaging service subject for the connection in this mode.
343 * Subjects generated by the connection mode are guaranteed to be globally unique.
344 *
345 * @param partitionId the partition ID to which the connection belongs.
346 * @param connectionId the connection ID.
347 * @return the globally unique local subject for the connection.
348 */
349 abstract String getLocalSubject(PartitionId partitionId, long connectionId);
350
351 /**
352 * Returns the remote messaging service subject for the connection in this mode.
353 * Subjects generated by the connection mode are guaranteed to be globally unique.
354 *
355 * @param partitionId the partition ID to which the connection belongs.
356 * @param connectionId the connection ID.
357 * @return the globally unique remote subject for the connection.
358 */
359 abstract String getRemoteSubject(PartitionId partitionId, long connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800360 }
361
Jordan Haltermane9c37092017-03-21 11:16:14 -0700362 /**
363 * Internal container for a handler/context pair.
364 */
365 private static class InternalHandler {
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700366 private final Function handler;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800367 private final ThreadContext context;
368
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700369 InternalHandler(Function handler, ThreadContext context) {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800370 this.handler = handler;
371 this.context = context;
372 }
373
374 @SuppressWarnings("unchecked")
Jordan Haltermane9c37092017-03-21 11:16:14 -0700375 CompletableFuture<Object> handle(Object message) {
376 CompletableFuture<Object> future = new CompletableFuture<>();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700377 context.executor().execute(() -> {
378 CompletableFuture<Object> responseFuture = (CompletableFuture<Object>) handler.apply(message);
379 if (responseFuture != null) {
380 responseFuture.whenComplete((r, e) -> {
381 if (e != null) {
382 future.completeExceptionally((Throwable) e);
383 } else {
384 future.complete(r);
385 }
386 });
Madan Jampani3289fbf2016-01-13 14:14:27 -0800387 }
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700388 });
Jordan Haltermane9c37092017-03-21 11:16:14 -0700389 return future;
Madan Jampani3289fbf2016-01-13 14:14:27 -0800390 }
391 }
392}