blob: 5b15d33ca45b4a5cec6b029636a5d7c565d693ea [file] [log] [blame]
Madan Jampani3289fbf2016-01-13 14:14:27 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani86cb2432016-02-17 11:07:56 -080019import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080020
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
Madan Jampanif778c962016-01-31 22:56:38 -080023
Madan Jampani3289fbf2016-01-13 14:14:27 -080024import org.apache.commons.lang.math.RandomUtils;
Madan Jampanif778c962016-01-31 22:56:38 -080025import org.onosproject.cluster.PartitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080026import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani86cb2432016-02-17 11:07:56 -080027import org.slf4j.Logger;
Madan Jampani3289fbf2016-01-13 14:14:27 -080028
29import com.google.common.collect.Sets;
Madan Jampani86cb2432016-02-17 11:07:56 -080030import com.google.common.primitives.Longs;
Madan Jampani3289fbf2016-01-13 14:14:27 -080031
32import io.atomix.catalyst.transport.Address;
33import io.atomix.catalyst.transport.Client;
34import io.atomix.catalyst.transport.Connection;
35import io.atomix.catalyst.util.concurrent.ThreadContext;
36
37/**
38 * {@link Client} implementation for {@link CopycatTransport}.
39 */
40public class CopycatTransportClient implements Client {
41
Madan Jampani86cb2432016-02-17 11:07:56 -080042 private final Logger log = getLogger(getClass());
Madan Jampanif778c962016-01-31 22:56:38 -080043 private final PartitionId partitionId;
Madan Jampani3289fbf2016-01-13 14:14:27 -080044 private final MessagingService messagingService;
45 private final CopycatTransport.Mode mode;
Madan Jampani86cb2432016-02-17 11:07:56 -080046 private final String newConnectionMessageSubject;
Madan Jampani3289fbf2016-01-13 14:14:27 -080047 private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
48
Madan Jampanif778c962016-01-31 22:56:38 -080049 CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
50 this.partitionId = checkNotNull(partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080051 this.messagingService = checkNotNull(messagingService);
52 this.mode = checkNotNull(mode);
Madan Jampani86cb2432016-02-17 11:07:56 -080053 this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
Madan Jampani3289fbf2016-01-13 14:14:27 -080054 }
55
56 @Override
57 public CompletableFuture<Connection> connect(Address remoteAddress) {
Madan Jampanib06ccef2016-01-25 10:51:16 -080058 ThreadContext context = ThreadContext.currentContextOrThrow();
Madan Jampani2f9cc712016-02-15 19:36:21 -080059 return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
Madan Jampani86cb2432016-02-17 11:07:56 -080060 newConnectionMessageSubject,
61 Longs.toByteArray(nextConnectionId()))
62 .thenApplyAsync(bytes -> {
63 long connectionId = Longs.fromByteArray(bytes);
Madan Jampani2f9cc712016-02-15 19:36:21 -080064 CopycatTransportConnection connection = new CopycatTransportConnection(
Madan Jampani86cb2432016-02-17 11:07:56 -080065 connectionId,
Madan Jampani2f9cc712016-02-15 19:36:21 -080066 CopycatTransport.Mode.CLIENT,
67 partitionId,
68 remoteAddress,
69 messagingService,
70 context);
71 if (mode == CopycatTransport.Mode.CLIENT) {
72 connection.setBidirectional();
73 }
Madan Jampani86cb2432016-02-17 11:07:56 -080074 log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
Madan Jampani2f9cc712016-02-15 19:36:21 -080075 connections.add(connection);
76 return connection;
77 }, context.executor());
Madan Jampani3289fbf2016-01-13 14:14:27 -080078 }
79
80 @Override
81 public CompletableFuture<Void> close() {
82 return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
83 }
84
85 private long nextConnectionId() {
86 return RandomUtils.nextLong();
87 }
88 }