blob: 333ec3d748694bce4c1cbbd5abecd43d7f44044d [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
Madan Jampani3289fbf2016-01-13 14:14:27 -080024import java.util.Map;
25import java.util.Objects;
26import java.util.concurrent.CompletableFuture;
Madan Jampani3289fbf2016-01-13 14:14:27 -080027import java.util.function.Consumer;
28
Madan Jampani6f743712016-03-26 11:20:25 -070029
30
Madan Jampani3289fbf2016-01-13 14:14:27 -080031import org.apache.commons.io.IOUtils;
Madan Jampani3289fbf2016-01-13 14:14:27 -080032import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080033import org.onosproject.cluster.PartitionId;
Madan Jampani6f743712016-03-26 11:20:25 -070034import org.onosproject.store.cluster.messaging.MessagingException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080035import org.onosproject.store.cluster.messaging.MessagingService;
36
37import com.google.common.base.MoreObjects;
38import com.google.common.base.Throwables;
39import com.google.common.collect.Maps;
40
41import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani0da01a42016-03-18 14:33:18 -070042import io.atomix.catalyst.serializer.SerializationException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080043import io.atomix.catalyst.transport.Address;
44import io.atomix.catalyst.transport.Connection;
45import io.atomix.catalyst.transport.MessageHandler;
Madan Jampani6f743712016-03-26 11:20:25 -070046import io.atomix.catalyst.transport.TransportException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080047import io.atomix.catalyst.util.Assert;
48import io.atomix.catalyst.util.Listener;
49import io.atomix.catalyst.util.Listeners;
50import io.atomix.catalyst.util.ReferenceCounted;
51import io.atomix.catalyst.util.concurrent.ThreadContext;
52
53/**
54 * {@link Connection} implementation for CopycatTransport.
55 */
56public class CopycatTransportConnection implements Connection {
57
58 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
59 private final Listeners<Connection> closeListeners = new Listeners<>();
60
61 static final byte SUCCESS = 0x03;
62 static final byte FAILURE = 0x04;
63
64 private final long connectionId;
Madan Jampani71d13e12016-01-13 17:14:35 -080065 private final CopycatTransport.Mode mode;
Madan Jampani3289fbf2016-01-13 14:14:27 -080066 private final Address remoteAddress;
67 private final MessagingService messagingService;
68 private final String outboundMessageSubject;
69 private final String inboundMessageSubject;
70 private final ThreadContext context;
71 private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
Madan Jampani3289fbf2016-01-13 14:14:27 -080072
73 CopycatTransportConnection(long connectionId,
74 CopycatTransport.Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080075 PartitionId partitionId,
Madan Jampani3289fbf2016-01-13 14:14:27 -080076 Address address,
77 MessagingService messagingService,
78 ThreadContext context) {
79 this.connectionId = connectionId;
80 this.mode = checkNotNull(mode);
81 this.remoteAddress = checkNotNull(address);
82 this.messagingService = checkNotNull(messagingService);
83 if (mode == CopycatTransport.Mode.CLIENT) {
Madan Jampani3a9911c2016-02-21 11:25:45 -080084 this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
85 this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080086 } else {
Madan Jampani3a9911c2016-02-21 11:25:45 -080087 this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
88 this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080089 }
90 this.context = checkNotNull(context);
91 }
92
93 public void setBidirectional() {
94 messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
95 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
96 if (input.readLong() != connectionId) {
97 throw new IllegalStateException("Invalid connection Id");
98 }
99 return handle(IOUtils.toByteArray(input));
100 } catch (IOException e) {
101 Throwables.propagate(e);
102 return null;
103 }
104 });
105 }
106
107 @Override
108 public <T, U> CompletableFuture<U> send(T message) {
109 ThreadContext context = ThreadContext.currentContextOrThrow();
110 CompletableFuture<U> result = new CompletableFuture<>();
111 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
112 new DataOutputStream(baos).writeLong(connectionId);
113 context.serializer().writeObject(message, baos);
114 if (message instanceof ReferenceCounted) {
115 ((ReferenceCounted<?>) message).release();
116 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800117 messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
Madan Jampani3289fbf2016-01-13 14:14:27 -0800118 outboundMessageSubject,
119 baos.toByteArray(),
120 context.executor())
121 .whenComplete((r, e) -> {
Madan Jampani6f743712016-03-26 11:20:25 -0700122 Throwable wrappedError = e;
123 if (e != null) {
124 Throwable rootCause = Throwables.getRootCause(e);
125 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
126 wrappedError = new TransportException(e);
127 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800128 }
Madan Jampani6f743712016-03-26 11:20:25 -0700129 handleResponse(r, wrappedError, result, context);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800130 });
Madan Jampani0da01a42016-03-18 14:33:18 -0700131 } catch (SerializationException | IOException e) {
132 result.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800133 }
134 return result;
135 }
136
137 private <T> void handleResponse(byte[] response,
138 Throwable error,
139 CompletableFuture<T> future,
140 ThreadContext context) {
141 if (error != null) {
142 context.execute(() -> future.completeExceptionally(error));
143 return;
144 }
145 checkNotNull(response);
146 InputStream input = new ByteArrayInputStream(response);
147 try {
148 byte status = (byte) input.read();
149 if (status == FAILURE) {
150 Throwable t = context.serializer().readObject(input);
151 context.execute(() -> future.completeExceptionally(t));
152 } else {
153 context.execute(() -> future.complete(context.serializer().readObject(input)));
154 }
155 } catch (IOException e) {
156 context.execute(() -> future.completeExceptionally(e));
157 }
158 }
159
160 @Override
161 public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
162 Assert.notNull(type, "type");
163 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
164 return null;
165 }
166
167 public CompletableFuture<byte[]> handle(byte[] message) {
168 try {
169 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
170 InternalHandler handler = handlers.get(request.getClass());
171 if (handler == null) {
172 return Tools.exceptionalFuture(new IllegalStateException(
173 "No handler registered for " + request.getClass()));
174 }
175 return handler.handle(request).handle((result, error) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800176 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
177 baos.write(error != null ? FAILURE : SUCCESS);
178 context.serializer().writeObject(error != null ? error : result, baos);
179 return baos.toByteArray();
180 } catch (IOException e) {
181 Throwables.propagate(e);
182 return null;
183 }
184 });
185 } catch (Exception e) {
186 return Tools.exceptionalFuture(e);
187 }
188 }
189
190 @Override
191 public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
192 return exceptionListeners.add(listener);
193 }
194
195 @Override
196 public Listener<Connection> closeListener(Consumer<Connection> listener) {
197 return closeListeners.add(listener);
198 }
199
200 @Override
201 public CompletableFuture<Void> close() {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800202 closeListeners.forEach(listener -> listener.accept(this));
203 if (mode == CopycatTransport.Mode.CLIENT) {
204 messagingService.unregisterHandler(inboundMessageSubject);
205 }
206 return CompletableFuture.completedFuture(null);
207 }
208
209 @Override
210 public int hashCode() {
211 return Objects.hash(connectionId);
212 }
213
214 @Override
215 public boolean equals(Object other) {
216 if (!(other instanceof CopycatTransportConnection)) {
217 return false;
218 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800219 return connectionId == ((CopycatTransportConnection) other).connectionId;
220 }
221
222 @Override
223 public String toString() {
224 return MoreObjects.toStringHelper(getClass())
225 .add("id", connectionId)
Madan Jampani3289fbf2016-01-13 14:14:27 -0800226 .toString();
227 }
228
Madan Jampani3289fbf2016-01-13 14:14:27 -0800229 @SuppressWarnings("rawtypes")
230 private final class InternalHandler {
231
232 private final MessageHandler handler;
233 private final ThreadContext context;
234
235 private InternalHandler(MessageHandler handler, ThreadContext context) {
236 this.handler = handler;
237 this.context = context;
238 }
239
240 @SuppressWarnings("unchecked")
241 public CompletableFuture<Object> handle(Object message) {
242 CompletableFuture<Object> answer = new CompletableFuture<>();
243 context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
244 if (e != null) {
245 answer.completeExceptionally((Throwable) e);
246 } else {
247 answer.complete(r);
248 }
249 }));
250 return answer;
251 }
252 }
253}