Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2017-present Open Networking Foundation |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.impl; |
| 17 | |
| 18 | import java.util.Collection; |
| 19 | import java.util.Set; |
| 20 | import java.util.concurrent.CompletableFuture; |
| 21 | import java.util.concurrent.Executor; |
| 22 | import java.util.function.Consumer; |
| 23 | import java.util.stream.Collectors; |
| 24 | |
| 25 | import io.atomix.protocols.raft.cluster.MemberId; |
| 26 | import io.atomix.protocols.raft.protocol.CloseSessionRequest; |
| 27 | import io.atomix.protocols.raft.protocol.CloseSessionResponse; |
| 28 | import io.atomix.protocols.raft.protocol.CommandRequest; |
| 29 | import io.atomix.protocols.raft.protocol.CommandResponse; |
| 30 | import io.atomix.protocols.raft.protocol.KeepAliveRequest; |
| 31 | import io.atomix.protocols.raft.protocol.KeepAliveResponse; |
| 32 | import io.atomix.protocols.raft.protocol.MetadataRequest; |
| 33 | import io.atomix.protocols.raft.protocol.MetadataResponse; |
| 34 | import io.atomix.protocols.raft.protocol.OpenSessionRequest; |
| 35 | import io.atomix.protocols.raft.protocol.OpenSessionResponse; |
| 36 | import io.atomix.protocols.raft.protocol.PublishRequest; |
| 37 | import io.atomix.protocols.raft.protocol.QueryRequest; |
| 38 | import io.atomix.protocols.raft.protocol.QueryResponse; |
| 39 | import io.atomix.protocols.raft.protocol.RaftClientProtocol; |
| 40 | import io.atomix.protocols.raft.protocol.ResetRequest; |
| 41 | import io.atomix.protocols.raft.session.SessionId; |
| 42 | import org.onosproject.cluster.NodeId; |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame^] | 43 | import org.onosproject.store.cluster.messaging.ClusterCommunicator; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 44 | import org.onosproject.store.service.Serializer; |
| 45 | |
| 46 | /** |
| 47 | * Raft client protocol that uses a cluster communicator. |
| 48 | */ |
| 49 | public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol { |
| 50 | |
| 51 | public RaftClientCommunicator( |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame^] | 52 | String prefix, |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 53 | Serializer serializer, |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame^] | 54 | ClusterCommunicator clusterCommunicator) { |
| 55 | super(new RaftMessageContext(prefix), serializer, clusterCommunicator); |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 56 | } |
| 57 | |
| 58 | @Override |
| 59 | public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) { |
| 60 | return sendAndReceive(context.openSessionSubject, request, memberId); |
| 61 | } |
| 62 | |
| 63 | @Override |
| 64 | public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) { |
| 65 | return sendAndReceive(context.closeSessionSubject, request, memberId); |
| 66 | } |
| 67 | |
| 68 | @Override |
| 69 | public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) { |
| 70 | return sendAndReceive(context.keepAliveSubject, request, memberId); |
| 71 | } |
| 72 | |
| 73 | @Override |
| 74 | public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { |
| 75 | return sendAndReceive(context.querySubject, request, memberId); |
| 76 | } |
| 77 | |
| 78 | @Override |
| 79 | public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) { |
| 80 | return sendAndReceive(context.commandSubject, request, memberId); |
| 81 | } |
| 82 | |
| 83 | @Override |
| 84 | public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) { |
| 85 | return sendAndReceive(context.metadataSubject, request, memberId); |
| 86 | } |
| 87 | |
| 88 | @Override |
| 89 | public void reset(Collection<MemberId> members, ResetRequest request) { |
| 90 | Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet()); |
| 91 | clusterCommunicator.multicast( |
| 92 | request, |
| 93 | context.resetSubject(request.session()), |
| 94 | serializer::encode, |
| 95 | nodes); |
| 96 | } |
| 97 | |
| 98 | @Override |
| 99 | public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, Executor executor) { |
| 100 | clusterCommunicator.addSubscriber( |
| 101 | context.publishSubject(sessionId.id()), |
| 102 | serializer::decode, |
| 103 | listener, |
| 104 | executor); |
| 105 | } |
| 106 | |
| 107 | @Override |
| 108 | public void unregisterPublishListener(SessionId sessionId) { |
| 109 | clusterCommunicator.removeSubscriber(context.publishSubject(sessionId.id())); |
| 110 | } |
| 111 | } |