blob: 39ce10f2852e87b4af70cff106135aa52b9a1d5e [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
Madan Jampani3289fbf2016-01-13 14:14:27 -080024import java.util.Map;
25import java.util.Objects;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.atomic.AtomicInteger;
28import java.util.function.Consumer;
29
30import org.apache.commons.io.IOUtils;
Madan Jampani3289fbf2016-01-13 14:14:27 -080031import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080032import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080033import org.onosproject.store.cluster.messaging.MessagingService;
34
35import com.google.common.base.MoreObjects;
36import com.google.common.base.Throwables;
37import com.google.common.collect.Maps;
38
39import static com.google.common.base.Preconditions.checkNotNull;
40import io.atomix.catalyst.transport.Address;
41import io.atomix.catalyst.transport.Connection;
42import io.atomix.catalyst.transport.MessageHandler;
43import io.atomix.catalyst.transport.TransportException;
44import io.atomix.catalyst.util.Assert;
45import io.atomix.catalyst.util.Listener;
46import io.atomix.catalyst.util.Listeners;
47import io.atomix.catalyst.util.ReferenceCounted;
48import io.atomix.catalyst.util.concurrent.ThreadContext;
49
50/**
51 * {@link Connection} implementation for CopycatTransport.
52 */
53public class CopycatTransportConnection implements Connection {
54
55 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
56 private final Listeners<Connection> closeListeners = new Listeners<>();
57
58 static final byte SUCCESS = 0x03;
59 static final byte FAILURE = 0x04;
60
61 private final long connectionId;
Madan Jampani71d13e12016-01-13 17:14:35 -080062 private final CopycatTransport.Mode mode;
Madan Jampani3289fbf2016-01-13 14:14:27 -080063 private final Address remoteAddress;
64 private final MessagingService messagingService;
65 private final String outboundMessageSubject;
66 private final String inboundMessageSubject;
67 private final ThreadContext context;
68 private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
69 private final AtomicInteger messagesSent = new AtomicInteger(0);
70 private final AtomicInteger sendFailures = new AtomicInteger(0);
71 private final AtomicInteger messagesReceived = new AtomicInteger(0);
72 private final AtomicInteger receiveFailures = new AtomicInteger(0);
73
74 CopycatTransportConnection(long connectionId,
75 CopycatTransport.Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080076 PartitionId partitionId,
Madan Jampani3289fbf2016-01-13 14:14:27 -080077 Address address,
78 MessagingService messagingService,
79 ThreadContext context) {
80 this.connectionId = connectionId;
81 this.mode = checkNotNull(mode);
82 this.remoteAddress = checkNotNull(address);
83 this.messagingService = checkNotNull(messagingService);
84 if (mode == CopycatTransport.Mode.CLIENT) {
Madan Jampani3a9911c2016-02-21 11:25:45 -080085 this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
86 this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080087 } else {
Madan Jampani3a9911c2016-02-21 11:25:45 -080088 this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
89 this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080090 }
91 this.context = checkNotNull(context);
92 }
93
94 public void setBidirectional() {
95 messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
96 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
97 if (input.readLong() != connectionId) {
98 throw new IllegalStateException("Invalid connection Id");
99 }
100 return handle(IOUtils.toByteArray(input));
101 } catch (IOException e) {
102 Throwables.propagate(e);
103 return null;
104 }
105 });
106 }
107
108 @Override
109 public <T, U> CompletableFuture<U> send(T message) {
110 ThreadContext context = ThreadContext.currentContextOrThrow();
111 CompletableFuture<U> result = new CompletableFuture<>();
112 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
113 new DataOutputStream(baos).writeLong(connectionId);
114 context.serializer().writeObject(message, baos);
115 if (message instanceof ReferenceCounted) {
116 ((ReferenceCounted<?>) message).release();
117 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800118 messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
Madan Jampani3289fbf2016-01-13 14:14:27 -0800119 outboundMessageSubject,
120 baos.toByteArray(),
121 context.executor())
122 .whenComplete((r, e) -> {
123 if (e == null) {
124 messagesSent.incrementAndGet();
125 } else {
126 sendFailures.incrementAndGet();
127 }
128 handleResponse(r, e, result, context);
129 });
130 } catch (Exception e) {
131 result.completeExceptionally(new TransportException("Failed to send request", e));
132 }
133 return result;
134 }
135
136 private <T> void handleResponse(byte[] response,
137 Throwable error,
138 CompletableFuture<T> future,
139 ThreadContext context) {
140 if (error != null) {
141 context.execute(() -> future.completeExceptionally(error));
142 return;
143 }
144 checkNotNull(response);
145 InputStream input = new ByteArrayInputStream(response);
146 try {
147 byte status = (byte) input.read();
148 if (status == FAILURE) {
149 Throwable t = context.serializer().readObject(input);
150 context.execute(() -> future.completeExceptionally(t));
151 } else {
152 context.execute(() -> future.complete(context.serializer().readObject(input)));
153 }
154 } catch (IOException e) {
155 context.execute(() -> future.completeExceptionally(e));
156 }
157 }
158
159 @Override
160 public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
161 Assert.notNull(type, "type");
162 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
163 return null;
164 }
165
166 public CompletableFuture<byte[]> handle(byte[] message) {
167 try {
168 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
169 InternalHandler handler = handlers.get(request.getClass());
170 if (handler == null) {
171 return Tools.exceptionalFuture(new IllegalStateException(
172 "No handler registered for " + request.getClass()));
173 }
174 return handler.handle(request).handle((result, error) -> {
175 if (error == null) {
176 messagesReceived.incrementAndGet();
177 } else {
178 receiveFailures.incrementAndGet();
179 }
180 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
181 baos.write(error != null ? FAILURE : SUCCESS);
182 context.serializer().writeObject(error != null ? error : result, baos);
183 return baos.toByteArray();
184 } catch (IOException e) {
185 Throwables.propagate(e);
186 return null;
187 }
188 });
189 } catch (Exception e) {
190 return Tools.exceptionalFuture(e);
191 }
192 }
193
194 @Override
195 public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
196 return exceptionListeners.add(listener);
197 }
198
199 @Override
200 public Listener<Connection> closeListener(Consumer<Connection> listener) {
201 return closeListeners.add(listener);
202 }
203
204 @Override
205 public CompletableFuture<Void> close() {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800206 closeListeners.forEach(listener -> listener.accept(this));
207 if (mode == CopycatTransport.Mode.CLIENT) {
208 messagingService.unregisterHandler(inboundMessageSubject);
209 }
210 return CompletableFuture.completedFuture(null);
211 }
212
213 @Override
214 public int hashCode() {
215 return Objects.hash(connectionId);
216 }
217
218 @Override
219 public boolean equals(Object other) {
220 if (!(other instanceof CopycatTransportConnection)) {
221 return false;
222 }
223
224 return connectionId == ((CopycatTransportConnection) other).connectionId;
225 }
226
227 @Override
228 public String toString() {
229 return MoreObjects.toStringHelper(getClass())
230 .add("id", connectionId)
231 .add("sent", messagesSent.get())
232 .add("received", messagesReceived.get())
233 .add("sendFailures", sendFailures.get())
234 .add("receiveFailures", receiveFailures.get())
235 .toString();
236 }
237
Madan Jampani3289fbf2016-01-13 14:14:27 -0800238 @SuppressWarnings("rawtypes")
239 private final class InternalHandler {
240
241 private final MessageHandler handler;
242 private final ThreadContext context;
243
244 private InternalHandler(MessageHandler handler, ThreadContext context) {
245 this.handler = handler;
246 this.context = context;
247 }
248
249 @SuppressWarnings("unchecked")
250 public CompletableFuture<Object> handle(Object message) {
251 CompletableFuture<Object> answer = new CompletableFuture<>();
252 context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
253 if (e != null) {
254 answer.completeExceptionally((Throwable) e);
255 } else {
256 answer.complete(r);
257 }
258 }));
259 return answer;
260 }
261 }
262}