blob: 2f24d6b4f341b9d660a493897403e2e7e2cd82fe [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani2f9cc712016-02-15 19:36:21 -080019import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080020
21import java.io.ByteArrayInputStream;
22import java.io.DataInputStream;
23import java.io.IOException;
Madan Jampani3289fbf2016-01-13 14:14:27 -080024import java.util.Map;
25import java.util.concurrent.CompletableFuture;
Madan Jampani7efc8dd2016-02-16 13:11:53 -080026import java.util.concurrent.Executors;
27import java.util.concurrent.ScheduledExecutorService;
Madan Jampani3289fbf2016-01-13 14:14:27 -080028import java.util.concurrent.atomic.AtomicBoolean;
29import java.util.function.Consumer;
30
31import org.apache.commons.io.IOUtils;
32import org.onlab.util.Tools;
Madan Jampanif778c962016-01-31 22:56:38 -080033import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080034import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani2f9cc712016-02-15 19:36:21 -080035import org.slf4j.Logger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080036
37import com.google.common.collect.Maps;
Madan Jampani86cb2432016-02-17 11:07:56 -080038import com.google.common.primitives.Longs;
Madan Jampani3289fbf2016-01-13 14:14:27 -080039
40import io.atomix.catalyst.transport.Address;
41import io.atomix.catalyst.transport.Connection;
42import io.atomix.catalyst.transport.Server;
Madan Jampani7efc8dd2016-02-16 13:11:53 -080043import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
Madan Jampani3289fbf2016-01-13 14:14:27 -080044import io.atomix.catalyst.util.concurrent.SingleThreadContext;
45import io.atomix.catalyst.util.concurrent.ThreadContext;
46
47/**
48 * {@link Server} implementation for {@link CopycatTransport}.
49 */
50public class CopycatTransportServer implements Server {
51
Madan Jampani2f9cc712016-02-15 19:36:21 -080052 private final Logger log = getLogger(getClass());
Madan Jampani3289fbf2016-01-13 14:14:27 -080053 private final AtomicBoolean listening = new AtomicBoolean(false);
Madan Jampani71d13e12016-01-13 17:14:35 -080054 private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
Madan Jampani7efc8dd2016-02-16 13:11:53 -080055 private final ScheduledExecutorService executorService;
Madan Jampanif778c962016-01-31 22:56:38 -080056 private final PartitionId partitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080057 private final MessagingService messagingService;
Madan Jampani86cb2432016-02-17 11:07:56 -080058 private final String protocolMessageSubject;
59 private final String newConnectionMessageSubject;
Madan Jampani3289fbf2016-01-13 14:14:27 -080060 private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
61
Madan Jampanif778c962016-01-31 22:56:38 -080062 CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
63 this.partitionId = checkNotNull(partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080064 this.messagingService = checkNotNull(messagingService);
Madan Jampani86cb2432016-02-17 11:07:56 -080065 this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
66 this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
Madan Jampani7efc8dd2016-02-16 13:11:53 -080067 this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
68 new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
Madan Jampani3289fbf2016-01-13 14:14:27 -080069 }
70
71 @Override
72 public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
Madan Jampani71d13e12016-01-13 17:14:35 -080073 if (listening.compareAndSet(false, true)) {
Madan Jampani86cb2432016-02-17 11:07:56 -080074 // message handler for all non-connection-establishment messages.
75 messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
76 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
77 long connectionId = input.readLong();
78 CopycatTransportConnection connection = connections.get(connectionId);
79 if (connection == null) {
80 throw new IOException("Closed connection");
81 }
82 byte[] messagePayload = IOUtils.toByteArray(input);
83 return connection.handle(messagePayload);
84 } catch (IOException e) {
85 return Tools.exceptionalFuture(e);
86 }
87 });
88
89 // message handler for new connection attempts.
Madan Jampani71d13e12016-01-13 17:14:35 -080090 ThreadContext context = ThreadContext.currentContextOrThrow();
Madan Jampani86cb2432016-02-17 11:07:56 -080091 messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
92 long connectionId = Longs.fromByteArray(payload);
93 CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
94 CopycatTransport.Mode.SERVER,
95 partitionId,
96 CopycatTransport.toAddress(sender),
97 messagingService,
98 getOrCreateContext(context));
99 connections.put(connectionId, connection);
100 connection.closeListener(c -> connections.remove(connectionId, c));
101 log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
102 return CompletableFuture.supplyAsync(() -> {
103 listener.accept(connection);
104 // echo the connectionId back to indicate successful completion.
105 return payload;
106 }, context.executor());
107 });
108 context.execute(() -> listenFuture.complete(null));
Madan Jampani3289fbf2016-01-13 14:14:27 -0800109 }
110 return listenFuture;
111 }
112
Madan Jampani3289fbf2016-01-13 14:14:27 -0800113 @Override
114 public CompletableFuture<Void> close() {
Madan Jampani86cb2432016-02-17 11:07:56 -0800115 messagingService.unregisterHandler(newConnectionMessageSubject);
116 messagingService.unregisterHandler(protocolMessageSubject);
Madan Jampani7efc8dd2016-02-16 13:11:53 -0800117 executorService.shutdown();
Madan Jampani3289fbf2016-01-13 14:14:27 -0800118 return CompletableFuture.completedFuture(null);
119 }
120
121 /**
122 * Returns the current execution context or creates one.
123 */
124 private ThreadContext getOrCreateContext(ThreadContext parentContext) {
125 ThreadContext context = ThreadContext.currentContext();
126 if (context != null) {
127 return context;
128 }
Madan Jampani7efc8dd2016-02-16 13:11:53 -0800129 return new SingleThreadContext(executorService, parentContext.serializer().clone());
Madan Jampani3289fbf2016-01-13 14:14:27 -0800130 }
131}