blob: 8de05a36f379d7ca8c792fa3a16e5251397c8f95 [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani3289fbf2016-01-13 14:14:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jordan Haltermane9c37092017-03-21 11:16:14 -070018import com.google.common.collect.Sets;
Madan Jampani630e7ac2016-05-31 11:34:05 -070019import io.atomix.catalyst.concurrent.ThreadContext;
20import io.atomix.catalyst.transport.Address;
21import io.atomix.catalyst.transport.Connection;
22import io.atomix.catalyst.transport.Server;
Jordan Haltermane9c37092017-03-21 11:16:14 -070023import org.apache.commons.lang3.RandomUtils;
Madan Jampanif778c962016-01-31 22:56:38 -080024import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080025import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani2f9cc712016-02-15 19:36:21 -080026import org.slf4j.Logger;
Jordan Haltermane9c37092017-03-21 11:16:14 -070027import org.slf4j.LoggerFactory;
Madan Jampani3289fbf2016-01-13 14:14:27 -080028
Jordan Haltermane9c37092017-03-21 11:16:14 -070029import java.nio.ByteBuffer;
30import java.util.Set;
31import java.util.concurrent.CompletableFuture;
32import java.util.function.Consumer;
33
34import static com.google.common.base.MoreObjects.toStringHelper;
35import static com.google.common.base.Preconditions.checkNotNull;
36import static org.onosproject.store.primitives.impl.CopycatTransport.CONNECT;
37import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
38import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
Madan Jampani3289fbf2016-01-13 14:14:27 -080039
Madan Jampani3289fbf2016-01-13 14:14:27 -080040/**
Jordan Haltermane9c37092017-03-21 11:16:14 -070041 * Copycat transport server implementation.
Madan Jampani3289fbf2016-01-13 14:14:27 -080042 */
43public class CopycatTransportServer implements Server {
Jordan Haltermane9c37092017-03-21 11:16:14 -070044 private final Logger log = LoggerFactory.getLogger(getClass());
Madan Jampanif778c962016-01-31 22:56:38 -080045 private final PartitionId partitionId;
Jordan Haltermane9c37092017-03-21 11:16:14 -070046 private final String serverSubject;
Madan Jampani3289fbf2016-01-13 14:14:27 -080047 private final MessagingService messagingService;
Jordan Haltermane9c37092017-03-21 11:16:14 -070048 private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
Madan Jampani3289fbf2016-01-13 14:14:27 -080049
Jordan Haltermane9c37092017-03-21 11:16:14 -070050 public CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
51 this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
52 this.serverSubject = String.format("onos-copycat-%s", partitionId);
53 this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
Madan Jampani3289fbf2016-01-13 14:14:27 -080054 }
55
56 @Override
Jordan Haltermane9c37092017-03-21 11:16:14 -070057 public CompletableFuture<Void> listen(Address address, Consumer<Connection> consumer) {
58 ThreadContext context = ThreadContext.currentContextOrThrow();
59 messagingService.registerHandler(serverSubject, (sender, payload) -> {
Madan Jampani3289fbf2016-01-13 14:14:27 -080060
Jordan Haltermane9c37092017-03-21 11:16:14 -070061 // Only connect messages can be sent to the server. Once a connect message
62 // is received, the connection will register a separate handler for messaging.
63 ByteBuffer requestBuffer = ByteBuffer.wrap(payload);
64 if (requestBuffer.get() != CONNECT) {
65 ByteBuffer responseBuffer = ByteBuffer.allocate(1);
66 responseBuffer.put(FAILURE);
67 return CompletableFuture.completedFuture(responseBuffer.array());
Madan Jampani3a9911c2016-02-21 11:25:45 -080068 }
Jordan Haltermane9c37092017-03-21 11:16:14 -070069
70 // Create the connection and ensure state is cleaned up when the connection is closed.
71 long connectionId = RandomUtils.nextLong();
72 CopycatTransportConnection connection = new CopycatTransportConnection(
73 connectionId,
74 CopycatTransportConnection.Mode.SERVER,
75 partitionId,
76 sender,
77 messagingService,
78 context);
Jordan Haltermanfda46f92017-04-14 10:49:44 -070079 connection.onClose(connections::remove);
Jordan Haltermane9c37092017-03-21 11:16:14 -070080 connections.add(connection);
81
82 CompletableFuture<byte[]> future = new CompletableFuture<>();
83
84 // We need to ensure the connection event is called on the Copycat thread
85 // and that the future is not completed until the Copycat server has been
86 // able to register message handlers, otherwise some messages can be received
87 // prior to any handlers being registered.
88 context.executor().execute(() -> {
89 log.debug("Created connection {}-{}", partitionId, connectionId);
90 consumer.accept(connection);
91
92 ByteBuffer responseBuffer = ByteBuffer.allocate(9);
93 responseBuffer.put(SUCCESS);
94 responseBuffer.putLong(connectionId);
95 future.complete(responseBuffer.array());
96 });
97 return future;
Madan Jampani3a9911c2016-02-21 11:25:45 -080098 });
Jordan Haltermane9c37092017-03-21 11:16:14 -070099 return CompletableFuture.completedFuture(null);
Madan Jampani3a9911c2016-02-21 11:25:45 -0800100 }
101
Madan Jampani3289fbf2016-01-13 14:14:27 -0800102 @Override
103 public CompletableFuture<Void> close() {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700104 return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
Madan Jampani3289fbf2016-01-13 14:14:27 -0800105 }
106
Jordan Haltermane9c37092017-03-21 11:16:14 -0700107 @Override
108 public String toString() {
109 return toStringHelper(this)
110 .add("partitionId", partitionId)
111 .toString();
Madan Jampani3289fbf2016-01-13 14:14:27 -0800112 }
113}