blob: 43c2cfc0a8645db786044c3bd44d939310c33f38 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.io.ByteArrayInputStream;
21import java.io.DataInputStream;
22import java.io.IOException;
23import java.net.InetAddress;
24import java.net.InetSocketAddress;
25import java.util.Map;
26import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.atomic.AtomicBoolean;
28import java.util.function.Consumer;
29
30import org.apache.commons.io.IOUtils;
31import org.onlab.util.Tools;
32import org.onosproject.store.cluster.messaging.MessagingService;
33
34import com.google.common.collect.Maps;
35
36import io.atomix.catalyst.transport.Address;
37import io.atomix.catalyst.transport.Connection;
38import io.atomix.catalyst.transport.Server;
39import io.atomix.catalyst.util.concurrent.SingleThreadContext;
40import io.atomix.catalyst.util.concurrent.ThreadContext;
41
42/**
43 * {@link Server} implementation for {@link CopycatTransport}.
44 */
45public class CopycatTransportServer implements Server {
46
47 private final AtomicBoolean listening = new AtomicBoolean(false);
48 private CompletableFuture<Void> listenFuture;
49 private final String clusterName;
50 private final MessagingService messagingService;
51 private final String messageSubject;
52 private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
53
54 CopycatTransportServer(String clusterName, MessagingService messagingService) {
55 this.clusterName = checkNotNull(clusterName);
56 this.messagingService = checkNotNull(messagingService);
57 this.messageSubject = String.format("onos-copycat-%s", clusterName);
58 }
59
60 @Override
61 public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
62 if (listening.get()) {
63 return CompletableFuture.completedFuture(null);
64 }
65 ThreadContext context = ThreadContext.currentContextOrThrow();
66 synchronized (this) {
67 if (listenFuture == null) {
68 listenFuture = new CompletableFuture<>();
69 listen(address, listener, context);
70 }
71 }
72 return listenFuture;
73 }
74
75 public void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
76 messagingService.registerHandler(messageSubject, (sender, payload) -> {
77 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
78 long connectionId = input.readLong();
79 InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
80 int senderPort = sender.port();
81 Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
82 AtomicBoolean newConnection = new AtomicBoolean(false);
83 CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
84 newConnection.set(true);
85 return new CopycatTransportConnection(connectionId,
86 CopycatTransport.Mode.SERVER,
87 clusterName,
88 senderAddress,
89 messagingService,
90 getOrCreateContext(context));
91 });
92 byte[] request = IOUtils.toByteArray(input);
93 return CompletableFuture.supplyAsync(
94 () -> {
95 if (newConnection.get()) {
96 listener.accept(connection);
97 }
98 return connection;
99 }, context.executor()).thenCompose(c -> c.handle(request));
100 } catch (IOException e) {
101 return Tools.exceptionalFuture(e);
102 }
103 });
104 listening.set(true);
105 context.execute(() -> {
106 listenFuture.complete(null);
107 });
108 }
109
110 @Override
111 public CompletableFuture<Void> close() {
112 messagingService.unregisterHandler(messageSubject);
113 return CompletableFuture.completedFuture(null);
114 }
115
116 /**
117 * Returns the current execution context or creates one.
118 */
119 private ThreadContext getOrCreateContext(ThreadContext parentContext) {
120 ThreadContext context = ThreadContext.currentContext();
121 if (context != null) {
122 return context;
123 }
124 return new SingleThreadContext("copycat-transport-server-" + clusterName, parentContext.serializer().clone());
125 }
126}