Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2017-present Open Networking Foundation |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.impl; |
| 17 | |
| 18 | import java.util.Collection; |
| 19 | import java.util.Set; |
| 20 | import java.util.concurrent.CompletableFuture; |
| 21 | import java.util.concurrent.Executor; |
| 22 | import java.util.function.Consumer; |
Jordan Halterman | 19486e3 | 2017-11-02 15:00:06 -0700 | [diff] [blame^] | 23 | import java.util.function.Function; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 24 | import java.util.stream.Collectors; |
| 25 | |
| 26 | import io.atomix.protocols.raft.cluster.MemberId; |
| 27 | import io.atomix.protocols.raft.protocol.CloseSessionRequest; |
| 28 | import io.atomix.protocols.raft.protocol.CloseSessionResponse; |
| 29 | import io.atomix.protocols.raft.protocol.CommandRequest; |
| 30 | import io.atomix.protocols.raft.protocol.CommandResponse; |
Jordan Halterman | 19486e3 | 2017-11-02 15:00:06 -0700 | [diff] [blame^] | 31 | import io.atomix.protocols.raft.protocol.HeartbeatRequest; |
| 32 | import io.atomix.protocols.raft.protocol.HeartbeatResponse; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 33 | import io.atomix.protocols.raft.protocol.KeepAliveRequest; |
| 34 | import io.atomix.protocols.raft.protocol.KeepAliveResponse; |
| 35 | import io.atomix.protocols.raft.protocol.MetadataRequest; |
| 36 | import io.atomix.protocols.raft.protocol.MetadataResponse; |
| 37 | import io.atomix.protocols.raft.protocol.OpenSessionRequest; |
| 38 | import io.atomix.protocols.raft.protocol.OpenSessionResponse; |
| 39 | import io.atomix.protocols.raft.protocol.PublishRequest; |
| 40 | import io.atomix.protocols.raft.protocol.QueryRequest; |
| 41 | import io.atomix.protocols.raft.protocol.QueryResponse; |
| 42 | import io.atomix.protocols.raft.protocol.RaftClientProtocol; |
| 43 | import io.atomix.protocols.raft.protocol.ResetRequest; |
| 44 | import io.atomix.protocols.raft.session.SessionId; |
| 45 | import org.onosproject.cluster.NodeId; |
Jordan Halterman | 28183ee | 2017-10-17 17:29:10 -0700 | [diff] [blame] | 46 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 47 | import org.onosproject.store.service.Serializer; |
| 48 | |
| 49 | /** |
| 50 | * Raft client protocol that uses a cluster communicator. |
| 51 | */ |
| 52 | public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol { |
| 53 | |
| 54 | public RaftClientCommunicator( |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame] | 55 | String prefix, |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 56 | Serializer serializer, |
Jordan Halterman | 28183ee | 2017-10-17 17:29:10 -0700 | [diff] [blame] | 57 | ClusterCommunicationService clusterCommunicator) { |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame] | 58 | super(new RaftMessageContext(prefix), serializer, clusterCommunicator); |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 59 | } |
| 60 | |
| 61 | @Override |
| 62 | public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) { |
| 63 | return sendAndReceive(context.openSessionSubject, request, memberId); |
| 64 | } |
| 65 | |
| 66 | @Override |
| 67 | public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) { |
| 68 | return sendAndReceive(context.closeSessionSubject, request, memberId); |
| 69 | } |
| 70 | |
| 71 | @Override |
| 72 | public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) { |
| 73 | return sendAndReceive(context.keepAliveSubject, request, memberId); |
| 74 | } |
| 75 | |
| 76 | @Override |
| 77 | public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { |
| 78 | return sendAndReceive(context.querySubject, request, memberId); |
| 79 | } |
| 80 | |
| 81 | @Override |
| 82 | public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) { |
| 83 | return sendAndReceive(context.commandSubject, request, memberId); |
| 84 | } |
| 85 | |
| 86 | @Override |
| 87 | public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) { |
| 88 | return sendAndReceive(context.metadataSubject, request, memberId); |
| 89 | } |
| 90 | |
| 91 | @Override |
Jordan Halterman | 19486e3 | 2017-11-02 15:00:06 -0700 | [diff] [blame^] | 92 | public void registerHeartbeatHandler(Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> function) { |
| 93 | clusterCommunicator.addSubscriber(context.heartbeatSubject, serializer::decode, function, serializer::encode); |
| 94 | } |
| 95 | |
| 96 | @Override |
| 97 | public void unregisterHeartbeatHandler() { |
| 98 | clusterCommunicator.removeSubscriber(context.heartbeatSubject); |
| 99 | } |
| 100 | |
| 101 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 102 | public void reset(Collection<MemberId> members, ResetRequest request) { |
| 103 | Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet()); |
| 104 | clusterCommunicator.multicast( |
| 105 | request, |
| 106 | context.resetSubject(request.session()), |
| 107 | serializer::encode, |
| 108 | nodes); |
| 109 | } |
| 110 | |
| 111 | @Override |
| 112 | public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, Executor executor) { |
| 113 | clusterCommunicator.addSubscriber( |
| 114 | context.publishSubject(sessionId.id()), |
| 115 | serializer::decode, |
| 116 | listener, |
| 117 | executor); |
| 118 | } |
| 119 | |
| 120 | @Override |
| 121 | public void unregisterPublishListener(SessionId sessionId) { |
| 122 | clusterCommunicator.removeSubscriber(context.publishSubject(sessionId.id())); |
| 123 | } |
| 124 | } |