blob: d1b686f0c3d8ad63fe3eaa467166557f5d0784e6 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
Madan Jampani3289fbf2016-01-13 14:14:27 -080024import java.util.Map;
25import java.util.Objects;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.atomic.AtomicInteger;
28import java.util.function.Consumer;
29
30import org.apache.commons.io.IOUtils;
Madan Jampani3289fbf2016-01-13 14:14:27 -080031import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080032import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080033import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani86cb2432016-02-17 11:07:56 -080034import org.slf4j.Logger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080035
36import com.google.common.base.MoreObjects;
37import com.google.common.base.Throwables;
38import com.google.common.collect.Maps;
39
40import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani86cb2432016-02-17 11:07:56 -080041import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080042import io.atomix.catalyst.transport.Address;
43import io.atomix.catalyst.transport.Connection;
44import io.atomix.catalyst.transport.MessageHandler;
45import io.atomix.catalyst.transport.TransportException;
46import io.atomix.catalyst.util.Assert;
47import io.atomix.catalyst.util.Listener;
48import io.atomix.catalyst.util.Listeners;
49import io.atomix.catalyst.util.ReferenceCounted;
50import io.atomix.catalyst.util.concurrent.ThreadContext;
51
52/**
53 * {@link Connection} implementation for CopycatTransport.
54 */
55public class CopycatTransportConnection implements Connection {
56
Madan Jampani86cb2432016-02-17 11:07:56 -080057 private final Logger log = getLogger(getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -080058 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
59 private final Listeners<Connection> closeListeners = new Listeners<>();
60
61 static final byte SUCCESS = 0x03;
62 static final byte FAILURE = 0x04;
63
64 private final long connectionId;
Madan Jampani71d13e12016-01-13 17:14:35 -080065 private final CopycatTransport.Mode mode;
Madan Jampani3289fbf2016-01-13 14:14:27 -080066 private final Address remoteAddress;
67 private final MessagingService messagingService;
68 private final String outboundMessageSubject;
69 private final String inboundMessageSubject;
70 private final ThreadContext context;
71 private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
72 private final AtomicInteger messagesSent = new AtomicInteger(0);
73 private final AtomicInteger sendFailures = new AtomicInteger(0);
74 private final AtomicInteger messagesReceived = new AtomicInteger(0);
75 private final AtomicInteger receiveFailures = new AtomicInteger(0);
76
77 CopycatTransportConnection(long connectionId,
78 CopycatTransport.Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080079 PartitionId partitionId,
Madan Jampani3289fbf2016-01-13 14:14:27 -080080 Address address,
81 MessagingService messagingService,
82 ThreadContext context) {
83 this.connectionId = connectionId;
84 this.mode = checkNotNull(mode);
85 this.remoteAddress = checkNotNull(address);
86 this.messagingService = checkNotNull(messagingService);
87 if (mode == CopycatTransport.Mode.CLIENT) {
Madan Jampani86cb2432016-02-17 11:07:56 -080088 this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
89 this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080090 } else {
Madan Jampani86cb2432016-02-17 11:07:56 -080091 this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
92 this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080093 }
94 this.context = checkNotNull(context);
95 }
96
97 public void setBidirectional() {
98 messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
99 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
100 if (input.readLong() != connectionId) {
101 throw new IllegalStateException("Invalid connection Id");
102 }
103 return handle(IOUtils.toByteArray(input));
104 } catch (IOException e) {
105 Throwables.propagate(e);
106 return null;
107 }
108 });
109 }
110
111 @Override
112 public <T, U> CompletableFuture<U> send(T message) {
113 ThreadContext context = ThreadContext.currentContextOrThrow();
114 CompletableFuture<U> result = new CompletableFuture<>();
115 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
116 new DataOutputStream(baos).writeLong(connectionId);
117 context.serializer().writeObject(message, baos);
118 if (message instanceof ReferenceCounted) {
119 ((ReferenceCounted<?>) message).release();
120 }
Madan Jampani2f9cc712016-02-15 19:36:21 -0800121 messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
Madan Jampani3289fbf2016-01-13 14:14:27 -0800122 outboundMessageSubject,
123 baos.toByteArray(),
124 context.executor())
125 .whenComplete((r, e) -> {
126 if (e == null) {
127 messagesSent.incrementAndGet();
128 } else {
129 sendFailures.incrementAndGet();
130 }
131 handleResponse(r, e, result, context);
132 });
133 } catch (Exception e) {
134 result.completeExceptionally(new TransportException("Failed to send request", e));
135 }
136 return result;
137 }
138
139 private <T> void handleResponse(byte[] response,
140 Throwable error,
141 CompletableFuture<T> future,
142 ThreadContext context) {
143 if (error != null) {
144 context.execute(() -> future.completeExceptionally(error));
145 return;
146 }
147 checkNotNull(response);
148 InputStream input = new ByteArrayInputStream(response);
149 try {
150 byte status = (byte) input.read();
151 if (status == FAILURE) {
152 Throwable t = context.serializer().readObject(input);
153 context.execute(() -> future.completeExceptionally(t));
154 } else {
155 context.execute(() -> future.complete(context.serializer().readObject(input)));
156 }
157 } catch (IOException e) {
158 context.execute(() -> future.completeExceptionally(e));
159 }
160 }
161
162 @Override
163 public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
164 Assert.notNull(type, "type");
165 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
166 return null;
167 }
168
169 public CompletableFuture<byte[]> handle(byte[] message) {
170 try {
171 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
172 InternalHandler handler = handlers.get(request.getClass());
173 if (handler == null) {
174 return Tools.exceptionalFuture(new IllegalStateException(
175 "No handler registered for " + request.getClass()));
176 }
177 return handler.handle(request).handle((result, error) -> {
178 if (error == null) {
179 messagesReceived.incrementAndGet();
180 } else {
181 receiveFailures.incrementAndGet();
182 }
183 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
184 baos.write(error != null ? FAILURE : SUCCESS);
185 context.serializer().writeObject(error != null ? error : result, baos);
186 return baos.toByteArray();
187 } catch (IOException e) {
188 Throwables.propagate(e);
189 return null;
190 }
191 });
192 } catch (Exception e) {
193 return Tools.exceptionalFuture(e);
194 }
195 }
196
197 @Override
198 public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
199 return exceptionListeners.add(listener);
200 }
201
202 @Override
203 public Listener<Connection> closeListener(Consumer<Connection> listener) {
204 return closeListeners.add(listener);
205 }
206
207 @Override
208 public CompletableFuture<Void> close() {
Madan Jampani86cb2432016-02-17 11:07:56 -0800209 log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
Madan Jampani3289fbf2016-01-13 14:14:27 -0800210 closeListeners.forEach(listener -> listener.accept(this));
211 if (mode == CopycatTransport.Mode.CLIENT) {
212 messagingService.unregisterHandler(inboundMessageSubject);
213 }
214 return CompletableFuture.completedFuture(null);
215 }
216
217 @Override
218 public int hashCode() {
219 return Objects.hash(connectionId);
220 }
221
222 @Override
223 public boolean equals(Object other) {
224 if (!(other instanceof CopycatTransportConnection)) {
225 return false;
226 }
227
228 return connectionId == ((CopycatTransportConnection) other).connectionId;
229 }
230
231 @Override
232 public String toString() {
233 return MoreObjects.toStringHelper(getClass())
234 .add("id", connectionId)
235 .add("sent", messagesSent.get())
236 .add("received", messagesReceived.get())
237 .add("sendFailures", sendFailures.get())
238 .add("receiveFailures", receiveFailures.get())
239 .toString();
240 }
241
Madan Jampani3289fbf2016-01-13 14:14:27 -0800242 @SuppressWarnings("rawtypes")
243 private final class InternalHandler {
244
245 private final MessageHandler handler;
246 private final ThreadContext context;
247
248 private InternalHandler(MessageHandler handler, ThreadContext context) {
249 this.handler = handler;
250 this.context = context;
251 }
252
253 @SuppressWarnings("unchecked")
254 public CompletableFuture<Object> handle(Object message) {
255 CompletableFuture<Object> answer = new CompletableFuture<>();
256 context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
257 if (e != null) {
258 answer.completeExceptionally((Throwable) e);
259 } else {
260 answer.complete(r);
261 }
262 }));
263 return answer;
264 }
265 }
266}