blob: 03a5a71a13f4c4161ef6e2bec7cd010dd4ffe532 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani2f9cc712016-02-15 19:36:21 -080019import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080020
21import java.io.ByteArrayInputStream;
22import java.io.DataInputStream;
23import java.io.IOException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080024import java.util.Map;
25import java.util.concurrent.CompletableFuture;
26import java.util.concurrent.atomic.AtomicBoolean;
27import java.util.function.Consumer;
28
29import org.apache.commons.io.IOUtils;
30import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080031import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080032import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani2f9cc712016-02-15 19:36:21 -080033import org.slf4j.Logger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080034
35import com.google.common.collect.Maps;
36
37import io.atomix.catalyst.transport.Address;
38import io.atomix.catalyst.transport.Connection;
39import io.atomix.catalyst.transport.Server;
40import io.atomix.catalyst.util.concurrent.SingleThreadContext;
41import io.atomix.catalyst.util.concurrent.ThreadContext;
42
43/**
44 * {@link Server} implementation for {@link CopycatTransport}.
45 */
46public class CopycatTransportServer implements Server {
47
Madan Jampani2f9cc712016-02-15 19:36:21 -080048 private final Logger log = getLogger(getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -080049 private final AtomicBoolean listening = new AtomicBoolean(false);
Madan Jampani71d13e12016-01-13 17:14:35 -080050 private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
Madan Jampanif778c962016-01-31 22:56:38 -080051 private final PartitionId partitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080052 private final MessagingService messagingService;
53 private final String messageSubject;
54 private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
55
Madan Jampanif778c962016-01-31 22:56:38 -080056 CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
57 this.partitionId = checkNotNull(partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080058 this.messagingService = checkNotNull(messagingService);
Madan Jampanif778c962016-01-31 22:56:38 -080059 this.messageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080060 }
61
62 @Override
63 public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
Madan Jampani71d13e12016-01-13 17:14:35 -080064 if (listening.compareAndSet(false, true)) {
65 ThreadContext context = ThreadContext.currentContextOrThrow();
66 listen(address, listener, context);
Madan Jampani3289fbf2016-01-13 14:14:27 -080067 }
68 return listenFuture;
69 }
70
Madan Jampani71d13e12016-01-13 17:14:35 -080071 private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
Madan Jampani3289fbf2016-01-13 14:14:27 -080072 messagingService.registerHandler(messageSubject, (sender, payload) -> {
73 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
74 long connectionId = input.readLong();
Madan Jampani2f9cc712016-02-15 19:36:21 -080075 AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
Madan Jampani3289fbf2016-01-13 14:14:27 -080076 CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
Madan Jampani2f9cc712016-02-15 19:36:21 -080077 newConnectionCreated.set(true);
78 CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
79 CopycatTransport.Mode.SERVER,
80 partitionId,
81 CopycatTransport.toAddress(sender),
82 messagingService,
83 getOrCreateContext(context));
84 log.debug("Created new incoming connection {}", connectionId);
85 newConnection.closeListener(c -> connections.remove(connectionId, c));
86 return newConnection;
Madan Jampani3289fbf2016-01-13 14:14:27 -080087 });
88 byte[] request = IOUtils.toByteArray(input);
89 return CompletableFuture.supplyAsync(
90 () -> {
Madan Jampani2f9cc712016-02-15 19:36:21 -080091 if (newConnectionCreated.get()) {
Madan Jampani3289fbf2016-01-13 14:14:27 -080092 listener.accept(connection);
93 }
94 return connection;
95 }, context.executor()).thenCompose(c -> c.handle(request));
96 } catch (IOException e) {
97 return Tools.exceptionalFuture(e);
98 }
99 });
Madan Jampani3289fbf2016-01-13 14:14:27 -0800100 context.execute(() -> {
101 listenFuture.complete(null);
102 });
103 }
104
105 @Override
106 public CompletableFuture<Void> close() {
107 messagingService.unregisterHandler(messageSubject);
108 return CompletableFuture.completedFuture(null);
109 }
110
111 /**
112 * Returns the current execution context or creates one.
113 */
114 private ThreadContext getOrCreateContext(ThreadContext parentContext) {
115 ThreadContext context = ThreadContext.currentContext();
116 if (context != null) {
117 return context;
118 }
Madan Jampanif778c962016-01-31 22:56:38 -0800119 return new SingleThreadContext("copycat-transport-server-" + partitionId, parentContext.serializer().clone());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800120 }
121}