blob: 46b27bfd0cdfbb682846376e93a21854b99087e3 [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.messaging.impl;
17
18import java.util.concurrent.CompletableFuture;
19import java.util.concurrent.Executor;
20import java.util.function.BiConsumer;
21import java.util.function.BiFunction;
22
23import io.atomix.utils.net.Address;
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
30import org.onlab.packet.IpAddress;
31import org.onosproject.store.cluster.messaging.Endpoint;
32import org.onosproject.store.cluster.messaging.MessagingService;
33import org.onosproject.store.impl.AtomixManager;
34import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
37/**
38 * Atomix messaging manager.
39 */
40@Component(immediate = true)
41@Service
42public class AtomixMessagingManager implements MessagingService {
43
44 private final Logger log = LoggerFactory.getLogger(getClass());
45
46 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
47 protected AtomixManager atomixManager;
48
49 private io.atomix.cluster.messaging.MessagingService messagingService;
50
51 @Activate
52 public void activate() {
53 messagingService = atomixManager.getAtomix().getMessagingService();
54 log.info("Started");
55 }
56
57 @Deactivate
58 public void deactivate() {
59 log.info("Stopped");
60 }
61
62 private Address toAddress(Endpoint ep) {
63 return new Address(ep.host().toString(), ep.port(), ep.host().toInetAddress());
64 }
65
66 private Endpoint toEndpoint(Address address) {
67 return new Endpoint(IpAddress.valueOf(address.address()), address.port());
68 }
69
70 @Override
71 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
72 return messagingService.sendAsync(toAddress(ep), type, payload);
73 }
74
75 @Override
76 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
77 return messagingService.sendAndReceive(toAddress(ep), type, payload);
78 }
79
80 @Override
81 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
82 return messagingService.sendAndReceive(toAddress(ep), type, payload, executor);
83 }
84
85 @Override
86 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
87 BiConsumer<Address, byte[]> consumer = (address, payload) -> handler.accept(toEndpoint(address), payload);
88 messagingService.registerHandler(type, consumer, executor);
89 }
90
91 @Override
92 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
93 BiFunction<Address, byte[], byte[]> function = (address, payload) ->
94 handler.apply(toEndpoint(address), payload);
95 messagingService.registerHandler(type, function, executor);
96 }
97
98 @Override
99 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
100 BiFunction<Address, byte[], CompletableFuture<byte[]>> function = (address, payload) ->
101 handler.apply(toEndpoint(address), payload);
102 messagingService.registerHandler(type, function);
103 }
104
105 @Override
106 public void unregisterHandler(String type) {
107 messagingService.unregisterHandler(type);
108 }
109}