blob: 54d49d1ec675eeee0f4e46163f55b6f0723b5628 [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.cluster.messaging.impl;
Jordan Halterman00e92da2018-05-22 23:05:52 -070017
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import io.atomix.utils.net.Address;
19import org.onlab.packet.IpAddress;
20import org.onosproject.store.atomix.impl.AtomixManager;
21import org.onosproject.store.cluster.messaging.Endpoint;
22import org.onosproject.store.cluster.messaging.MessagingService;
23import org.osgi.service.component.annotations.Activate;
24import org.osgi.service.component.annotations.Component;
25import org.osgi.service.component.annotations.Deactivate;
26import org.osgi.service.component.annotations.Reference;
27import org.osgi.service.component.annotations.ReferenceCardinality;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
Jordan Halterman00e92da2018-05-22 23:05:52 -070031import java.util.concurrent.CompletableFuture;
32import java.util.concurrent.Executor;
33import java.util.function.BiConsumer;
34import java.util.function.BiFunction;
35
Jordan Halterman00e92da2018-05-22 23:05:52 -070036/**
37 * Atomix messaging manager.
38 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070039@Component(immediate = true, service = MessagingService.class)
Jordan Halterman00e92da2018-05-22 23:05:52 -070040public class AtomixMessagingManager implements MessagingService {
41
42 private final Logger log = LoggerFactory.getLogger(getClass());
43
Ray Milkeyd84f89b2018-08-17 14:54:17 -070044 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman00e92da2018-05-22 23:05:52 -070045 protected AtomixManager atomixManager;
46
47 private io.atomix.cluster.messaging.MessagingService messagingService;
48
49 @Activate
50 public void activate() {
51 messagingService = atomixManager.getAtomix().getMessagingService();
52 log.info("Started");
53 }
54
55 @Deactivate
56 public void deactivate() {
57 log.info("Stopped");
58 }
59
60 private Address toAddress(Endpoint ep) {
61 return new Address(ep.host().toString(), ep.port(), ep.host().toInetAddress());
62 }
63
64 private Endpoint toEndpoint(Address address) {
65 return new Endpoint(IpAddress.valueOf(address.address()), address.port());
66 }
67
68 @Override
69 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
70 return messagingService.sendAsync(toAddress(ep), type, payload);
71 }
72
73 @Override
74 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
75 return messagingService.sendAndReceive(toAddress(ep), type, payload);
76 }
77
78 @Override
79 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
80 return messagingService.sendAndReceive(toAddress(ep), type, payload, executor);
81 }
82
83 @Override
84 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
85 BiConsumer<Address, byte[]> consumer = (address, payload) -> handler.accept(toEndpoint(address), payload);
86 messagingService.registerHandler(type, consumer, executor);
87 }
88
89 @Override
90 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
91 BiFunction<Address, byte[], byte[]> function = (address, payload) ->
92 handler.apply(toEndpoint(address), payload);
93 messagingService.registerHandler(type, function, executor);
94 }
95
96 @Override
97 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
98 BiFunction<Address, byte[], CompletableFuture<byte[]>> function = (address, payload) ->
99 handler.apply(toEndpoint(address), payload);
100 messagingService.registerHandler(type, function);
101 }
102
103 @Override
104 public void unregisterHandler(String type) {
105 messagingService.unregisterHandler(type);
106 }
107}