blob: 6acb7db8edfd9c05d5c634a625be1a6f9845ab13 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.io.ByteArrayInputStream;
21import java.io.DataInputStream;
22import java.io.IOException;
23import java.net.InetAddress;
24import java.net.InetSocketAddress;
Madan Jampani39fff102016-02-14 13:17:28 -080025import java.net.UnknownHostException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080026import java.util.Map;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.atomic.AtomicBoolean;
29import java.util.function.Consumer;
30
31import org.apache.commons.io.IOUtils;
32import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080033import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080034import org.onosproject.store.cluster.messaging.MessagingService;
35
Madan Jampani39fff102016-02-14 13:17:28 -080036import com.google.common.base.Throwables;
Madan Jampani3289fbf2016-01-13 14:14:27 -080037import com.google.common.collect.Maps;
38
39import io.atomix.catalyst.transport.Address;
40import io.atomix.catalyst.transport.Connection;
41import io.atomix.catalyst.transport.Server;
42import io.atomix.catalyst.util.concurrent.SingleThreadContext;
43import io.atomix.catalyst.util.concurrent.ThreadContext;
44
45/**
46 * {@link Server} implementation for {@link CopycatTransport}.
47 */
48public class CopycatTransportServer implements Server {
49
50 private final AtomicBoolean listening = new AtomicBoolean(false);
Madan Jampani71d13e12016-01-13 17:14:35 -080051 private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
Madan Jampanif778c962016-01-31 22:56:38 -080052 private final PartitionId partitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080053 private final MessagingService messagingService;
54 private final String messageSubject;
55 private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
56
Madan Jampanif778c962016-01-31 22:56:38 -080057 CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
58 this.partitionId = checkNotNull(partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080059 this.messagingService = checkNotNull(messagingService);
Madan Jampanif778c962016-01-31 22:56:38 -080060 this.messageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080061 }
62
63 @Override
64 public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
Madan Jampani71d13e12016-01-13 17:14:35 -080065 if (listening.compareAndSet(false, true)) {
66 ThreadContext context = ThreadContext.currentContextOrThrow();
67 listen(address, listener, context);
Madan Jampani3289fbf2016-01-13 14:14:27 -080068 }
69 return listenFuture;
70 }
71
Madan Jampani71d13e12016-01-13 17:14:35 -080072 private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
Madan Jampani3289fbf2016-01-13 14:14:27 -080073 messagingService.registerHandler(messageSubject, (sender, payload) -> {
74 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
75 long connectionId = input.readLong();
Madan Jampani3289fbf2016-01-13 14:14:27 -080076 AtomicBoolean newConnection = new AtomicBoolean(false);
77 CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
78 newConnection.set(true);
Madan Jampani39fff102016-02-14 13:17:28 -080079 try {
80 InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
81 int senderPort = sender.port();
82 Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
83 return new CopycatTransportConnection(connectionId,
84 CopycatTransport.Mode.SERVER,
85 partitionId,
86 senderAddress,
87 messagingService,
88 getOrCreateContext(context));
89 } catch (UnknownHostException e) {
90 Throwables.propagate(e);
91 return null;
92 }
Madan Jampani3289fbf2016-01-13 14:14:27 -080093 });
94 byte[] request = IOUtils.toByteArray(input);
95 return CompletableFuture.supplyAsync(
96 () -> {
97 if (newConnection.get()) {
98 listener.accept(connection);
99 }
100 return connection;
101 }, context.executor()).thenCompose(c -> c.handle(request));
102 } catch (IOException e) {
103 return Tools.exceptionalFuture(e);
104 }
105 });
Madan Jampani3289fbf2016-01-13 14:14:27 -0800106 context.execute(() -> {
107 listenFuture.complete(null);
108 });
109 }
110
111 @Override
112 public CompletableFuture<Void> close() {
113 messagingService.unregisterHandler(messageSubject);
114 return CompletableFuture.completedFuture(null);
115 }
116
117 /**
118 * Returns the current execution context or creates one.
119 */
120 private ThreadContext getOrCreateContext(ThreadContext parentContext) {
121 ThreadContext context = ThreadContext.currentContext();
122 if (context != null) {
123 return context;
124 }
Madan Jampanif778c962016-01-31 22:56:38 -0800125 return new SingleThreadContext("copycat-transport-server-" + partitionId, parentContext.serializer().clone());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800126 }
127}