Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2017-present Open Networking Foundation |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.impl; |
| 17 | |
| 18 | import java.util.Collection; |
| 19 | import java.util.Set; |
| 20 | import java.util.concurrent.CompletableFuture; |
| 21 | import java.util.concurrent.Executor; |
| 22 | import java.util.function.Consumer; |
Jordan Halterman | e372d85 | 2017-11-02 15:00:06 -0700 | [diff] [blame] | 23 | import java.util.function.Function; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 24 | import java.util.stream.Collectors; |
| 25 | |
| 26 | import io.atomix.protocols.raft.cluster.MemberId; |
| 27 | import io.atomix.protocols.raft.protocol.CloseSessionRequest; |
| 28 | import io.atomix.protocols.raft.protocol.CloseSessionResponse; |
| 29 | import io.atomix.protocols.raft.protocol.CommandRequest; |
| 30 | import io.atomix.protocols.raft.protocol.CommandResponse; |
Jordan Halterman | e372d85 | 2017-11-02 15:00:06 -0700 | [diff] [blame] | 31 | import io.atomix.protocols.raft.protocol.HeartbeatRequest; |
| 32 | import io.atomix.protocols.raft.protocol.HeartbeatResponse; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 33 | import io.atomix.protocols.raft.protocol.KeepAliveRequest; |
| 34 | import io.atomix.protocols.raft.protocol.KeepAliveResponse; |
| 35 | import io.atomix.protocols.raft.protocol.MetadataRequest; |
| 36 | import io.atomix.protocols.raft.protocol.MetadataResponse; |
| 37 | import io.atomix.protocols.raft.protocol.OpenSessionRequest; |
| 38 | import io.atomix.protocols.raft.protocol.OpenSessionResponse; |
| 39 | import io.atomix.protocols.raft.protocol.PublishRequest; |
| 40 | import io.atomix.protocols.raft.protocol.QueryRequest; |
| 41 | import io.atomix.protocols.raft.protocol.QueryResponse; |
| 42 | import io.atomix.protocols.raft.protocol.RaftClientProtocol; |
| 43 | import io.atomix.protocols.raft.protocol.ResetRequest; |
| 44 | import io.atomix.protocols.raft.session.SessionId; |
| 45 | import org.onosproject.cluster.NodeId; |
| 46 | import org.onosproject.cluster.PartitionId; |
| 47 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
| 48 | import org.onosproject.store.service.Serializer; |
| 49 | |
| 50 | /** |
| 51 | * Raft client protocol that uses a cluster communicator. |
| 52 | */ |
| 53 | public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol { |
| 54 | |
| 55 | public RaftClientCommunicator( |
| 56 | PartitionId partitionId, |
| 57 | Serializer serializer, |
| 58 | ClusterCommunicationService clusterCommunicator) { |
| 59 | super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator); |
| 60 | } |
| 61 | |
| 62 | @Override |
| 63 | public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) { |
| 64 | return sendAndReceive(context.openSessionSubject, request, memberId); |
| 65 | } |
| 66 | |
| 67 | @Override |
| 68 | public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) { |
| 69 | return sendAndReceive(context.closeSessionSubject, request, memberId); |
| 70 | } |
| 71 | |
| 72 | @Override |
| 73 | public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) { |
| 74 | return sendAndReceive(context.keepAliveSubject, request, memberId); |
| 75 | } |
| 76 | |
| 77 | @Override |
| 78 | public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { |
| 79 | return sendAndReceive(context.querySubject, request, memberId); |
| 80 | } |
| 81 | |
| 82 | @Override |
| 83 | public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) { |
| 84 | return sendAndReceive(context.commandSubject, request, memberId); |
| 85 | } |
| 86 | |
| 87 | @Override |
| 88 | public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) { |
| 89 | return sendAndReceive(context.metadataSubject, request, memberId); |
| 90 | } |
| 91 | |
| 92 | @Override |
Jordan Halterman | e372d85 | 2017-11-02 15:00:06 -0700 | [diff] [blame] | 93 | public void registerHeartbeatHandler(Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> function) { |
| 94 | clusterCommunicator.addSubscriber(context.heartbeatSubject, serializer::decode, function, serializer::encode); |
| 95 | } |
| 96 | |
| 97 | @Override |
| 98 | public void unregisterHeartbeatHandler() { |
| 99 | clusterCommunicator.removeSubscriber(context.heartbeatSubject); |
| 100 | } |
| 101 | |
| 102 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 103 | public void reset(Collection<MemberId> members, ResetRequest request) { |
| 104 | Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet()); |
| 105 | clusterCommunicator.multicast( |
| 106 | request, |
| 107 | context.resetSubject(request.session()), |
| 108 | serializer::encode, |
| 109 | nodes); |
| 110 | } |
| 111 | |
| 112 | @Override |
| 113 | public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, Executor executor) { |
| 114 | clusterCommunicator.addSubscriber( |
| 115 | context.publishSubject(sessionId.id()), |
| 116 | serializer::decode, |
| 117 | listener, |
| 118 | executor); |
| 119 | } |
| 120 | |
| 121 | @Override |
| 122 | public void unregisterPublishListener(SessionId sessionId) { |
| 123 | clusterCommunicator.removeSubscriber(context.publishSubject(sessionId.id())); |
| 124 | } |
| 125 | } |