blob: 3c5b64905cc2184c0dc37b76fdac32624b4008c8 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.io.ByteArrayInputStream;
19import java.io.ByteArrayOutputStream;
20import java.io.DataInputStream;
21import java.io.DataOutputStream;
22import java.io.IOException;
23import java.io.InputStream;
24import java.net.InetAddress;
25import java.net.UnknownHostException;
26import java.util.Map;
27import java.util.Objects;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicInteger;
30import java.util.function.Consumer;
31
32import org.apache.commons.io.IOUtils;
33import org.onlab.packet.IpAddress;
34import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080035import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080036import org.onosproject.store.cluster.messaging.Endpoint;
37import org.onosproject.store.cluster.messaging.MessagingService;
38
39import com.google.common.base.MoreObjects;
40import com.google.common.base.Throwables;
41import com.google.common.collect.Maps;
42
43import static com.google.common.base.Preconditions.checkNotNull;
44import io.atomix.catalyst.transport.Address;
45import io.atomix.catalyst.transport.Connection;
46import io.atomix.catalyst.transport.MessageHandler;
47import io.atomix.catalyst.transport.TransportException;
48import io.atomix.catalyst.util.Assert;
49import io.atomix.catalyst.util.Listener;
50import io.atomix.catalyst.util.Listeners;
51import io.atomix.catalyst.util.ReferenceCounted;
52import io.atomix.catalyst.util.concurrent.ThreadContext;
53
54/**
55 * {@link Connection} implementation for CopycatTransport.
56 */
57public class CopycatTransportConnection implements Connection {
58
59 private final Listeners<Throwable> exceptionListeners = new Listeners<>();
60 private final Listeners<Connection> closeListeners = new Listeners<>();
61
62 static final byte SUCCESS = 0x03;
63 static final byte FAILURE = 0x04;
64
65 private final long connectionId;
Madan Jampani71d13e12016-01-13 17:14:35 -080066 private final CopycatTransport.Mode mode;
Madan Jampani3289fbf2016-01-13 14:14:27 -080067 private final Address remoteAddress;
68 private final MessagingService messagingService;
69 private final String outboundMessageSubject;
70 private final String inboundMessageSubject;
71 private final ThreadContext context;
72 private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
73 private final AtomicInteger messagesSent = new AtomicInteger(0);
74 private final AtomicInteger sendFailures = new AtomicInteger(0);
75 private final AtomicInteger messagesReceived = new AtomicInteger(0);
76 private final AtomicInteger receiveFailures = new AtomicInteger(0);
Madan Jampani71d13e12016-01-13 17:14:35 -080077 private final Map<Address, Endpoint> endpointLookupCache = Maps.newConcurrentMap();
Madan Jampani3289fbf2016-01-13 14:14:27 -080078
79 CopycatTransportConnection(long connectionId,
80 CopycatTransport.Mode mode,
Madan Jampanif778c962016-01-31 22:56:38 -080081 PartitionId partitionId,
Madan Jampani3289fbf2016-01-13 14:14:27 -080082 Address address,
83 MessagingService messagingService,
84 ThreadContext context) {
85 this.connectionId = connectionId;
86 this.mode = checkNotNull(mode);
87 this.remoteAddress = checkNotNull(address);
88 this.messagingService = checkNotNull(messagingService);
89 if (mode == CopycatTransport.Mode.CLIENT) {
Madan Jampanif778c962016-01-31 22:56:38 -080090 this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
91 this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080092 } else {
Madan Jampanif778c962016-01-31 22:56:38 -080093 this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
94 this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080095 }
96 this.context = checkNotNull(context);
97 }
98
99 public void setBidirectional() {
100 messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
101 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
102 if (input.readLong() != connectionId) {
103 throw new IllegalStateException("Invalid connection Id");
104 }
105 return handle(IOUtils.toByteArray(input));
106 } catch (IOException e) {
107 Throwables.propagate(e);
108 return null;
109 }
110 });
111 }
112
113 @Override
114 public <T, U> CompletableFuture<U> send(T message) {
115 ThreadContext context = ThreadContext.currentContextOrThrow();
116 CompletableFuture<U> result = new CompletableFuture<>();
117 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
118 new DataOutputStream(baos).writeLong(connectionId);
119 context.serializer().writeObject(message, baos);
120 if (message instanceof ReferenceCounted) {
121 ((ReferenceCounted<?>) message).release();
122 }
123 messagingService.sendAndReceive(toEndpoint(remoteAddress),
124 outboundMessageSubject,
125 baos.toByteArray(),
126 context.executor())
127 .whenComplete((r, e) -> {
128 if (e == null) {
129 messagesSent.incrementAndGet();
130 } else {
131 sendFailures.incrementAndGet();
132 }
133 handleResponse(r, e, result, context);
134 });
135 } catch (Exception e) {
136 result.completeExceptionally(new TransportException("Failed to send request", e));
137 }
138 return result;
139 }
140
141 private <T> void handleResponse(byte[] response,
142 Throwable error,
143 CompletableFuture<T> future,
144 ThreadContext context) {
145 if (error != null) {
146 context.execute(() -> future.completeExceptionally(error));
147 return;
148 }
149 checkNotNull(response);
150 InputStream input = new ByteArrayInputStream(response);
151 try {
152 byte status = (byte) input.read();
153 if (status == FAILURE) {
154 Throwable t = context.serializer().readObject(input);
155 context.execute(() -> future.completeExceptionally(t));
156 } else {
157 context.execute(() -> future.complete(context.serializer().readObject(input)));
158 }
159 } catch (IOException e) {
160 context.execute(() -> future.completeExceptionally(e));
161 }
162 }
163
164 @Override
165 public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
166 Assert.notNull(type, "type");
167 handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
168 return null;
169 }
170
171 public CompletableFuture<byte[]> handle(byte[] message) {
172 try {
173 Object request = context.serializer().readObject(new ByteArrayInputStream(message));
174 InternalHandler handler = handlers.get(request.getClass());
175 if (handler == null) {
176 return Tools.exceptionalFuture(new IllegalStateException(
177 "No handler registered for " + request.getClass()));
178 }
179 return handler.handle(request).handle((result, error) -> {
180 if (error == null) {
181 messagesReceived.incrementAndGet();
182 } else {
183 receiveFailures.incrementAndGet();
184 }
185 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
186 baos.write(error != null ? FAILURE : SUCCESS);
187 context.serializer().writeObject(error != null ? error : result, baos);
188 return baos.toByteArray();
189 } catch (IOException e) {
190 Throwables.propagate(e);
191 return null;
192 }
193 });
194 } catch (Exception e) {
195 return Tools.exceptionalFuture(e);
196 }
197 }
198
199 @Override
200 public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
201 return exceptionListeners.add(listener);
202 }
203
204 @Override
205 public Listener<Connection> closeListener(Consumer<Connection> listener) {
206 return closeListeners.add(listener);
207 }
208
209 @Override
210 public CompletableFuture<Void> close() {
Madan Jampani3289fbf2016-01-13 14:14:27 -0800211 closeListeners.forEach(listener -> listener.accept(this));
212 if (mode == CopycatTransport.Mode.CLIENT) {
213 messagingService.unregisterHandler(inboundMessageSubject);
214 }
215 return CompletableFuture.completedFuture(null);
216 }
217
218 @Override
219 public int hashCode() {
220 return Objects.hash(connectionId);
221 }
222
223 @Override
224 public boolean equals(Object other) {
225 if (!(other instanceof CopycatTransportConnection)) {
226 return false;
227 }
228
229 return connectionId == ((CopycatTransportConnection) other).connectionId;
230 }
231
232 @Override
233 public String toString() {
234 return MoreObjects.toStringHelper(getClass())
235 .add("id", connectionId)
236 .add("sent", messagesSent.get())
237 .add("received", messagesReceived.get())
238 .add("sendFailures", sendFailures.get())
239 .add("receiveFailures", receiveFailures.get())
240 .toString();
241 }
242
243 private Endpoint toEndpoint(Address address) {
Madan Jampani71d13e12016-01-13 17:14:35 -0800244 return endpointLookupCache.computeIfAbsent(address, a -> {
245 try {
246 return new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
247 } catch (UnknownHostException e) {
248 Throwables.propagate(e);
249 return null;
250 }
251 });
Madan Jampani3289fbf2016-01-13 14:14:27 -0800252 }
253
254 @SuppressWarnings("rawtypes")
255 private final class InternalHandler {
256
257 private final MessageHandler handler;
258 private final ThreadContext context;
259
260 private InternalHandler(MessageHandler handler, ThreadContext context) {
261 this.handler = handler;
262 this.context = context;
263 }
264
265 @SuppressWarnings("unchecked")
266 public CompletableFuture<Object> handle(Object message) {
267 CompletableFuture<Object> answer = new CompletableFuture<>();
268 context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
269 if (e != null) {
270 answer.completeExceptionally((Throwable) e);
271 } else {
272 answer.complete(r);
273 }
274 }));
275 return answer;
276 }
277 }
278}