blob: 8d4b577b92542b94fe57063953516ca3d0619193 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Madan Jampani630e7ac2016-05-31 11:34:05 -070018import static com.google.common.base.Preconditions.checkNotNull;
19import io.atomix.catalyst.concurrent.Listener;
20import io.atomix.catalyst.concurrent.Listeners;
21import io.atomix.catalyst.concurrent.ThreadContext;
22import io.atomix.catalyst.serializer.SerializationException;
23import io.atomix.catalyst.transport.Address;
24import io.atomix.catalyst.transport.Connection;
25import io.atomix.catalyst.transport.MessageHandler;
26import io.atomix.catalyst.transport.TransportException;
27import io.atomix.catalyst.util.Assert;
28import io.atomix.catalyst.util.reference.ReferenceCounted;
29
Madan Jampani3289fbf2016-01-13 14:14:27 -080030import java.io.ByteArrayInputStream;
31import java.io.ByteArrayOutputStream;
32import java.io.DataInputStream;
33import java.io.DataOutputStream;
34import java.io.IOException;
35import java.io.InputStream;
Madan Jampani3289fbf2016-01-13 14:14:27 -080036import java.util.Map;
37import java.util.Objects;
38import java.util.concurrent.CompletableFuture;
Madan Jampani3289fbf2016-01-13 14:14:27 -080039import java.util.function.Consumer;
40
41import org.apache.commons.io.IOUtils;
Madan Jampani3289fbf2016-01-13 14:14:27 -080042import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080043import org.onosproject.cluster.PartitionId;
Madan Jampani6f743712016-03-26 11:20:25 -070044import org.onosproject.store.cluster.messaging.MessagingException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080045import org.onosproject.store.cluster.messaging.MessagingService;
46
47import com.google.common.base.MoreObjects;
48import com.google.common.base.Throwables;
49import com.google.common.collect.Maps;
50
Madan Jampani3289fbf2016-01-13 14:14:27 -080051/**
52 * {@link Connection} implementation for CopycatTransport.
53 */
54public class CopycatTransportConnection implements Connection {
55
56 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
57 private final Listeners<Connection> closeListeners = new Listeners<>();
58
59 static final byte SUCCESS = 0x03;
60 static final byte FAILURE = 0x04;
61
62 private final long connectionId;
Madan Jampani71d13e12016-01-13 17:14:35 -080063 private final CopycatTransport.Mode mode;
Madan Jampani3289fbf2016-01-13 14:14:27 -080064 private final Address remoteAddress;
65 private final MessagingService messagingService;
66 private final String outboundMessageSubject;
67 private final String inboundMessageSubject;
68 private final ThreadContext context;
69 private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
Madan Jampani3289fbf2016-01-13 14:14:27 -080070
71 CopycatTransportConnection(long connectionId,
72 CopycatTransport.Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080073 PartitionId partitionId,
Madan Jampani3289fbf2016-01-13 14:14:27 -080074 Address address,
75 MessagingService messagingService,
76 ThreadContext context) {
77 this.connectionId = connectionId;
78 this.mode = checkNotNull(mode);
79 this.remoteAddress = checkNotNull(address);
80 this.messagingService = checkNotNull(messagingService);
81 if (mode == CopycatTransport.Mode.CLIENT) {
Madan Jampani3a9911c2016-02-21 11:25:45 -080082 this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
83 this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080084 } else {
Madan Jampani3a9911c2016-02-21 11:25:45 -080085 this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
86 this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080087 }
88 this.context = checkNotNull(context);
89 }
90
91 public void setBidirectional() {
92 messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
93 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
94 if (input.readLong() != connectionId) {
95 throw new IllegalStateException("Invalid connection Id");
96 }
97 return handle(IOUtils.toByteArray(input));
98 } catch (IOException e) {
99 Throwables.propagate(e);
100 return null;
101 }
102 });
103 }
104
105 @Override
106 public <T, U> CompletableFuture<U> send(T message) {
107 ThreadContext context = ThreadContext.currentContextOrThrow();
108 CompletableFuture<U> result = new CompletableFuture<>();
109 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
110 new DataOutputStream(baos).writeLong(connectionId);
111 context.serializer().writeObject(message, baos);
112 if (message instanceof ReferenceCounted) {
113 ((ReferenceCounted<?>) message).release();
114 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800115 messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
Madan Jampani3289fbf2016-01-13 14:14:27 -0800116 outboundMessageSubject,
117 baos.toByteArray(),
118 context.executor())
119 .whenComplete((r, e) -> {
Madan Jampani6f743712016-03-26 11:20:25 -0700120 Throwable wrappedError = e;
121 if (e != null) {
122 Throwable rootCause = Throwables.getRootCause(e);
123 if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
124 wrappedError = new TransportException(e);
125 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800126 }
Madan Jampani6f743712016-03-26 11:20:25 -0700127 handleResponse(r, wrappedError, result, context);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800128 });
Madan Jampani0da01a42016-03-18 14:33:18 -0700129 } catch (SerializationException | IOException e) {
130 result.completeExceptionally(e);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800131 }
132 return result;
133 }
134
135 private <T> void handleResponse(byte[] response,
136 Throwable error,
137 CompletableFuture<T> future,
138 ThreadContext context) {
139 if (error != null) {
140 context.execute(() -> future.completeExceptionally(error));
141 return;
142 }
143 checkNotNull(response);
144 InputStream input = new ByteArrayInputStream(response);
145 try {
146 byte status = (byte) input.read();
147 if (status == FAILURE) {
148 Throwable t = context.serializer().readObject(input);
149 context.execute(() -> future.completeExceptionally(t));
150 } else {
151 context.execute(() -> future.complete(context.serializer().readObject(input)));
152 }
153 } catch (IOException e) {
154 context.execute(() -> future.completeExceptionally(e));
155 }
156 }
157
158 @Override
159 public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
160 Assert.notNull(type, "type");
161 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
162 return null;
163 }
164
165 public CompletableFuture<byte[]> handle(byte[] message) {
166 try {
167 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
168 InternalHandler handler = handlers.get(request.getClass());
169 if (handler == null) {
170 return Tools.exceptionalFuture(new IllegalStateException(
171 "No handler registered for " + request.getClass()));
172 }
173 return handler.handle(request).handle((result, error) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800174 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
175 baos.write(error != null ? FAILURE : SUCCESS);
176 context.serializer().writeObject(error != null ? error : result, baos);
177 return baos.toByteArray();
178 } catch (IOException e) {
179 Throwables.propagate(e);
180 return null;
181 }
182 });
183 } catch (Exception e) {
184 return Tools.exceptionalFuture(e);
185 }
186 }
187
188 @Override
189 public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
190 return exceptionListeners.add(listener);
191 }
192
193 @Override
194 public Listener<Connection> closeListener(Consumer<Connection> listener) {
195 return closeListeners.add(listener);
196 }
197
198 @Override
199 public CompletableFuture<Void> close() {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800200 closeListeners.forEach(listener -> listener.accept(this));
201 if (mode == CopycatTransport.Mode.CLIENT) {
202 messagingService.unregisterHandler(inboundMessageSubject);
203 }
204 return CompletableFuture.completedFuture(null);
205 }
206
207 @Override
208 public int hashCode() {
209 return Objects.hash(connectionId);
210 }
211
212 @Override
213 public boolean equals(Object other) {
214 if (!(other instanceof CopycatTransportConnection)) {
215 return false;
216 }
Madan Jampani3289fbf2016-01-13 14:14:27 -0800217 return connectionId == ((CopycatTransportConnection) other).connectionId;
218 }
219
220 @Override
221 public String toString() {
222 return MoreObjects.toStringHelper(getClass())
223 .add("id", connectionId)
Madan Jampani3289fbf2016-01-13 14:14:27 -0800224 .toString();
225 }
226
Madan Jampani3289fbf2016-01-13 14:14:27 -0800227 @SuppressWarnings("rawtypes")
228 private final class InternalHandler {
229
230 private final MessageHandler handler;
231 private final ThreadContext context;
232
233 private InternalHandler(MessageHandler handler, ThreadContext context) {
234 this.handler = handler;
235 this.context = context;
236 }
237
238 @SuppressWarnings("unchecked")
239 public CompletableFuture<Object> handle(Object message) {
240 CompletableFuture<Object> answer = new CompletableFuture<>();
241 context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
242 if (e != null) {
243 answer.completeExceptionally((Throwable) e);
244 } else {
245 answer.complete(r);
246 }
247 }));
248 return answer;
249 }
250 }
251}