blob: f2752cd9dcd7b6b24d17aaf684a78c898bde4ad4 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jonathan Hartad0c3022017-02-22 14:06:01 -080018import com.google.common.base.MoreObjects;
19import com.google.common.base.Throwables;
20import com.google.common.collect.Maps;
Madan Jampani630e7ac2016-05-31 11:34:05 -070021import io.atomix.catalyst.concurrent.Listener;
22import io.atomix.catalyst.concurrent.Listeners;
23import io.atomix.catalyst.concurrent.ThreadContext;
24import io.atomix.catalyst.serializer.SerializationException;
25import io.atomix.catalyst.transport.Address;
26import io.atomix.catalyst.transport.Connection;
27import io.atomix.catalyst.transport.MessageHandler;
28import io.atomix.catalyst.transport.TransportException;
29import io.atomix.catalyst.util.Assert;
30import io.atomix.catalyst.util.reference.ReferenceCounted;
Jonathan Hartad0c3022017-02-22 14:06:01 -080031import org.apache.commons.io.IOUtils;
32import org.onlab.util.Tools;
33import org.onosproject.cluster.PartitionId;
34import org.onosproject.store.cluster.messaging.MessagingException;
35import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani630e7ac2016-05-31 11:34:05 -070036
Madan Jampani3289fbf2016-01-13 14:14:27 -080037import java.io.ByteArrayInputStream;
38import java.io.ByteArrayOutputStream;
39import java.io.DataInputStream;
40import java.io.DataOutputStream;
41import java.io.IOException;
42import java.io.InputStream;
Madan Jampani3289fbf2016-01-13 14:14:27 -080043import java.util.Map;
44import java.util.Objects;
45import java.util.concurrent.CompletableFuture;
Madan Jampani3289fbf2016-01-13 14:14:27 -080046import java.util.function.Consumer;
47
Jonathan Hartad0c3022017-02-22 14:06:01 -080048import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani3289fbf2016-01-13 14:14:27 -080049
Madan Jampani3289fbf2016-01-13 14:14:27 -080050/**
51 * {@link Connection} implementation for CopycatTransport.
52 */
53public class CopycatTransportConnection implements Connection {
54
55 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
56 private final Listeners<Connection> closeListeners = new Listeners<>();
57
58 static final byte SUCCESS = 0x03;
59 static final byte FAILURE = 0x04;
60
61 private final long connectionId;
Madan Jampani71d13e12016-01-13 17:14:35 -080062 private final CopycatTransport.Mode mode;
Madan Jampani3289fbf2016-01-13 14:14:27 -080063 private final Address remoteAddress;
64 private final MessagingService messagingService;
65 private final String outboundMessageSubject;
66 private final String inboundMessageSubject;
67 private final ThreadContext context;
68 private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
Madan Jampani3289fbf2016-01-13 14:14:27 -080069
70 CopycatTransportConnection(long connectionId,
71 CopycatTransport.Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080072 PartitionId partitionId,
Madan Jampani3289fbf2016-01-13 14:14:27 -080073 Address address,
74 MessagingService messagingService,
75 ThreadContext context) {
76 this.connectionId = connectionId;
77 this.mode = checkNotNull(mode);
78 this.remoteAddress = checkNotNull(address);
79 this.messagingService = checkNotNull(messagingService);
80 if (mode == CopycatTransport.Mode.CLIENT) {
Madan Jampani3a9911c2016-02-21 11:25:45 -080081 this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
82 this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080083 } else {
Madan Jampani3a9911c2016-02-21 11:25:45 -080084 this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
85 this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080086 }
87 this.context = checkNotNull(context);
88 }
89
90 public void setBidirectional() {
91 messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
92 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
93 if (input.readLong() != connectionId) {
94 throw new IllegalStateException("Invalid connection Id");
95 }
96 return handle(IOUtils.toByteArray(input));
97 } catch (IOException e) {
98 Throwables.propagate(e);
99 return null;
100 }
101 });
102 }
103
104 @Override
105 public <T, U> CompletableFuture<U> send(T message) {
106 ThreadContext context = ThreadContext.currentContextOrThrow();
107 CompletableFuture<U> result = new CompletableFuture<>();
108 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
109 new DataOutputStream(baos).writeLong(connectionId);
110 context.serializer().writeObject(message, baos);
111 if (message instanceof ReferenceCounted) {
112 ((ReferenceCounted<?>) message).release();
113 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800114 messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
Madan Jampani3289fbf2016-01-13 14:14:27 -0800115 outboundMessageSubject,
116 baos.toByteArray(),
117 context.executor())
118 .whenComplete((r, e) -> {
Madan Jampani6f743712016-03-26 11:20:25 -0700119 Throwable wrappedError = e;
120 if (e != null) {
121 Throwable rootCause = Throwables.getRootCause(e);
122 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
123 wrappedError = new TransportException(e);
124 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800125 }
Madan Jampani6f743712016-03-26 11:20:25 -0700126 handleResponse(r, wrappedError, result, context);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800127 });
Madan Jampani0da01a42016-03-18 14:33:18 -0700128 } catch (SerializationException | IOException e) {
129 result.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800130 }
131 return result;
132 }
133
134 private <T> void handleResponse(byte[] response,
135 Throwable error,
136 CompletableFuture<T> future,
137 ThreadContext context) {
138 if (error != null) {
139 context.execute(() -> future.completeExceptionally(error));
140 return;
141 }
142 checkNotNull(response);
143 InputStream input = new ByteArrayInputStream(response);
144 try {
145 byte status = (byte) input.read();
146 if (status == FAILURE) {
147 Throwable t = context.serializer().readObject(input);
148 context.execute(() -> future.completeExceptionally(t));
149 } else {
Jonathan Hartad0c3022017-02-22 14:06:01 -0800150 context.execute(() -> {
151 try {
152 future.complete(context.serializer().readObject(input));
153 } catch (SerializationException e) {
154 future.completeExceptionally(e);
155 }
156 });
Madan Jampani3289fbf2016-01-13 14:14:27 -0800157 }
158 } catch (IOException e) {
159 context.execute(() -> future.completeExceptionally(e));
160 }
161 }
162
163 @Override
164 public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
165 Assert.notNull(type, "type");
166 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
167 return null;
168 }
169
170 public CompletableFuture<byte[]> handle(byte[] message) {
171 try {
172 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
173 InternalHandler handler = handlers.get(request.getClass());
174 if (handler == null) {
175 return Tools.exceptionalFuture(new IllegalStateException(
176 "No handler registered for " + request.getClass()));
177 }
178 return handler.handle(request).handle((result, error) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800179 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
180 baos.write(error != null ? FAILURE : SUCCESS);
181 context.serializer().writeObject(error != null ? error : result, baos);
182 return baos.toByteArray();
183 } catch (IOException e) {
184 Throwables.propagate(e);
185 return null;
186 }
187 });
188 } catch (Exception e) {
189 return Tools.exceptionalFuture(e);
190 }
191 }
192
193 @Override
194 public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
195 return exceptionListeners.add(listener);
196 }
197
198 @Override
199 public Listener<Connection> closeListener(Consumer<Connection> listener) {
200 return closeListeners.add(listener);
201 }
202
203 @Override
204 public CompletableFuture<Void> close() {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800205 closeListeners.forEach(listener -> listener.accept(this));
206 if (mode == CopycatTransport.Mode.CLIENT) {
207 messagingService.unregisterHandler(inboundMessageSubject);
208 }
209 return CompletableFuture.completedFuture(null);
210 }
211
212 @Override
213 public int hashCode() {
214 return Objects.hash(connectionId);
215 }
216
217 @Override
218 public boolean equals(Object other) {
219 if (!(other instanceof CopycatTransportConnection)) {
220 return false;
221 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800222 return connectionId == ((CopycatTransportConnection) other).connectionId;
223 }
224
225 @Override
226 public String toString() {
227 return MoreObjects.toStringHelper(getClass())
228 .add("id", connectionId)
Madan Jampani3289fbf2016-01-13 14:14:27 -0800229 .toString();
230 }
231
Madan Jampani3289fbf2016-01-13 14:14:27 -0800232 @SuppressWarnings("rawtypes")
233 private final class InternalHandler {
234
235 private final MessageHandler handler;
236 private final ThreadContext context;
237
238 private InternalHandler(MessageHandler handler, ThreadContext context) {
239 this.handler = handler;
240 this.context = context;
241 }
242
243 @SuppressWarnings("unchecked")
244 public CompletableFuture<Object> handle(Object message) {
245 CompletableFuture<Object> answer = new CompletableFuture<>();
246 context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
247 if (e != null) {
248 answer.completeExceptionally((Throwable) e);
249 } else {
250 answer.complete(r);
251 }
252 }));
253 return answer;
254 }
255 }
256}