blob: ff641121e66f5023b99b00925e4f4263d717e33f [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani2f9cc712016-02-15 19:36:21 -080019import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani630e7ac2016-05-31 11:34:05 -070020import io.atomix.catalyst.concurrent.CatalystThreadFactory;
21import io.atomix.catalyst.concurrent.SingleThreadContext;
22import io.atomix.catalyst.concurrent.ThreadContext;
23import io.atomix.catalyst.transport.Address;
24import io.atomix.catalyst.transport.Connection;
25import io.atomix.catalyst.transport.Server;
Madan Jampani3289fbf2016-01-13 14:14:27 -080026
27import java.io.ByteArrayInputStream;
28import java.io.DataInputStream;
29import java.io.IOException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080030import java.util.Map;
31import java.util.concurrent.CompletableFuture;
Madan Jampani7efc8dd2016-02-16 13:11:53 -080032import java.util.concurrent.Executors;
33import java.util.concurrent.ScheduledExecutorService;
Madan Jampani3289fbf2016-01-13 14:14:27 -080034import java.util.concurrent.atomic.AtomicBoolean;
35import java.util.function.Consumer;
36
37import org.apache.commons.io.IOUtils;
38import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080039import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080040import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani2f9cc712016-02-15 19:36:21 -080041import org.slf4j.Logger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080042
43import com.google.common.collect.Maps;
44
Madan Jampani3289fbf2016-01-13 14:14:27 -080045/**
46 * {@link Server} implementation for {@link CopycatTransport}.
47 */
48public class CopycatTransportServer implements Server {
49
Madan Jampani2f9cc712016-02-15 19:36:21 -080050 private final Logger log = getLogger(getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -080051 private final AtomicBoolean listening = new AtomicBoolean(false);
Madan Jampani71d13e12016-01-13 17:14:35 -080052 private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
Madan Jampani7efc8dd2016-02-16 13:11:53 -080053 private final ScheduledExecutorService executorService;
Madan Jampanif778c962016-01-31 22:56:38 -080054 private final PartitionId partitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080055 private final MessagingService messagingService;
Madan Jampani3a9911c2016-02-21 11:25:45 -080056 private final String messageSubject;
Madan Jampani3289fbf2016-01-13 14:14:27 -080057 private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
58
Madan Jampanif778c962016-01-31 22:56:38 -080059 CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
60 this.partitionId = checkNotNull(partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080061 this.messagingService = checkNotNull(messagingService);
Madan Jampani3a9911c2016-02-21 11:25:45 -080062 this.messageSubject = String.format("onos-copycat-%s", partitionId);
Madan Jampani1172da72016-02-17 18:40:36 -080063 this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()),
Madan Jampani7efc8dd2016-02-16 13:11:53 -080064 new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
Madan Jampani3289fbf2016-01-13 14:14:27 -080065 }
66
67 @Override
68 public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
Madan Jampani71d13e12016-01-13 17:14:35 -080069 if (listening.compareAndSet(false, true)) {
70 ThreadContext context = ThreadContext.currentContextOrThrow();
Madan Jampani3a9911c2016-02-21 11:25:45 -080071 listen(address, listener, context);
Madan Jampani3289fbf2016-01-13 14:14:27 -080072 }
73 return listenFuture;
74 }
75
Madan Jampani3a9911c2016-02-21 11:25:45 -080076 private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
77 messagingService.registerHandler(messageSubject, (sender, payload) -> {
78 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
79 long connectionId = input.readLong();
80 AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
81 CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
82 newConnectionCreated.set(true);
83 CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
84 CopycatTransport.Mode.SERVER,
85 partitionId,
86 CopycatTransport.toAddress(sender),
87 messagingService,
88 getOrCreateContext(context));
89 log.debug("Created new incoming connection {}", connectionId);
90 newConnection.closeListener(c -> connections.remove(connectionId, c));
91 return newConnection;
92 });
93 byte[] request = IOUtils.toByteArray(input);
94 return CompletableFuture.supplyAsync(
95 () -> {
96 if (newConnectionCreated.get()) {
97 listener.accept(connection);
98 }
99 return connection;
100 }, context.executor()).thenCompose(c -> c.handle(request));
101 } catch (IOException e) {
102 return Tools.exceptionalFuture(e);
103 }
104 });
105 context.execute(() -> {
106 listenFuture.complete(null);
107 });
108 }
109
Madan Jampani3289fbf2016-01-13 14:14:27 -0800110 @Override
111 public CompletableFuture<Void> close() {
Madan Jampani3a9911c2016-02-21 11:25:45 -0800112 messagingService.unregisterHandler(messageSubject);
Madan Jampani7efc8dd2016-02-16 13:11:53 -0800113 executorService.shutdown();
Madan Jampani3289fbf2016-01-13 14:14:27 -0800114 return CompletableFuture.completedFuture(null);
115 }
116
117 /**
118 * Returns the current execution context or creates one.
119 */
120 private ThreadContext getOrCreateContext(ThreadContext parentContext) {
121 ThreadContext context = ThreadContext.currentContext();
122 if (context != null) {
123 return context;
124 }
Madan Jampani7efc8dd2016-02-16 13:11:53 -0800125 return new SingleThreadContext(executorService, parentContext.serializer().clone());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800126 }
127}