Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2017-present Open Networking Foundation |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.impl; |
| 17 | |
| 18 | import java.util.concurrent.CompletableFuture; |
| 19 | import java.util.concurrent.Executor; |
| 20 | import java.util.function.Consumer; |
| 21 | import java.util.function.Function; |
| 22 | |
| 23 | import io.atomix.protocols.raft.cluster.MemberId; |
| 24 | import io.atomix.protocols.raft.protocol.AppendRequest; |
| 25 | import io.atomix.protocols.raft.protocol.AppendResponse; |
| 26 | import io.atomix.protocols.raft.protocol.CloseSessionRequest; |
| 27 | import io.atomix.protocols.raft.protocol.CloseSessionResponse; |
| 28 | import io.atomix.protocols.raft.protocol.CommandRequest; |
| 29 | import io.atomix.protocols.raft.protocol.CommandResponse; |
| 30 | import io.atomix.protocols.raft.protocol.ConfigureRequest; |
| 31 | import io.atomix.protocols.raft.protocol.ConfigureResponse; |
| 32 | import io.atomix.protocols.raft.protocol.InstallRequest; |
| 33 | import io.atomix.protocols.raft.protocol.InstallResponse; |
| 34 | import io.atomix.protocols.raft.protocol.JoinRequest; |
| 35 | import io.atomix.protocols.raft.protocol.JoinResponse; |
| 36 | import io.atomix.protocols.raft.protocol.KeepAliveRequest; |
| 37 | import io.atomix.protocols.raft.protocol.KeepAliveResponse; |
| 38 | import io.atomix.protocols.raft.protocol.LeaveRequest; |
| 39 | import io.atomix.protocols.raft.protocol.LeaveResponse; |
| 40 | import io.atomix.protocols.raft.protocol.MetadataRequest; |
| 41 | import io.atomix.protocols.raft.protocol.MetadataResponse; |
| 42 | import io.atomix.protocols.raft.protocol.OpenSessionRequest; |
| 43 | import io.atomix.protocols.raft.protocol.OpenSessionResponse; |
| 44 | import io.atomix.protocols.raft.protocol.PollRequest; |
| 45 | import io.atomix.protocols.raft.protocol.PollResponse; |
| 46 | import io.atomix.protocols.raft.protocol.PublishRequest; |
| 47 | import io.atomix.protocols.raft.protocol.QueryRequest; |
| 48 | import io.atomix.protocols.raft.protocol.QueryResponse; |
| 49 | import io.atomix.protocols.raft.protocol.RaftServerProtocol; |
| 50 | import io.atomix.protocols.raft.protocol.ReconfigureRequest; |
| 51 | import io.atomix.protocols.raft.protocol.ReconfigureResponse; |
| 52 | import io.atomix.protocols.raft.protocol.ResetRequest; |
Jordan Halterman | 6807c8f | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 53 | import io.atomix.protocols.raft.protocol.TransferRequest; |
| 54 | import io.atomix.protocols.raft.protocol.TransferResponse; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 55 | import io.atomix.protocols.raft.protocol.VoteRequest; |
| 56 | import io.atomix.protocols.raft.protocol.VoteResponse; |
| 57 | import io.atomix.protocols.raft.session.SessionId; |
| 58 | import org.onosproject.cluster.NodeId; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 59 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
| 60 | import org.onosproject.store.service.Serializer; |
| 61 | |
| 62 | /** |
| 63 | * Raft server protocol that uses a {@link ClusterCommunicationService}. |
| 64 | */ |
| 65 | public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol { |
| 66 | |
| 67 | public RaftServerCommunicator( |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame] | 68 | String prefix, |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 69 | Serializer serializer, |
Jordan Halterman | 28183ee | 2017-10-17 17:29:10 -0700 | [diff] [blame] | 70 | ClusterCommunicationService clusterCommunicator) { |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame] | 71 | super(new RaftMessageContext(prefix), serializer, clusterCommunicator); |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 72 | } |
| 73 | |
| 74 | @Override |
| 75 | public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) { |
| 76 | return sendAndReceive(context.openSessionSubject, request, memberId); |
| 77 | } |
| 78 | |
| 79 | @Override |
| 80 | public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) { |
| 81 | return sendAndReceive(context.closeSessionSubject, request, memberId); |
| 82 | } |
| 83 | |
| 84 | @Override |
| 85 | public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) { |
| 86 | return sendAndReceive(context.keepAliveSubject, request, memberId); |
| 87 | } |
| 88 | |
| 89 | @Override |
| 90 | public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { |
| 91 | return sendAndReceive(context.querySubject, request, memberId); |
| 92 | } |
| 93 | |
| 94 | @Override |
| 95 | public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) { |
| 96 | return sendAndReceive(context.commandSubject, request, memberId); |
| 97 | } |
| 98 | |
| 99 | @Override |
| 100 | public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) { |
| 101 | return sendAndReceive(context.metadataSubject, request, memberId); |
| 102 | } |
| 103 | |
| 104 | @Override |
| 105 | public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) { |
| 106 | return sendAndReceive(context.joinSubject, request, memberId); |
| 107 | } |
| 108 | |
| 109 | @Override |
| 110 | public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) { |
| 111 | return sendAndReceive(context.leaveSubject, request, memberId); |
| 112 | } |
| 113 | |
| 114 | @Override |
| 115 | public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) { |
| 116 | return sendAndReceive(context.configureSubject, request, memberId); |
| 117 | } |
| 118 | |
| 119 | @Override |
| 120 | public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) { |
| 121 | return sendAndReceive(context.reconfigureSubject, request, memberId); |
| 122 | } |
| 123 | |
| 124 | @Override |
| 125 | public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) { |
| 126 | return sendAndReceive(context.installSubject, request, memberId); |
| 127 | } |
| 128 | |
| 129 | @Override |
| 130 | public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) { |
| 131 | return sendAndReceive(context.pollSubject, request, memberId); |
| 132 | } |
| 133 | |
| 134 | @Override |
| 135 | public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) { |
| 136 | return sendAndReceive(context.voteSubject, request, memberId); |
| 137 | } |
| 138 | |
| 139 | @Override |
| 140 | public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) { |
| 141 | return sendAndReceive(context.appendSubject, request, memberId); |
| 142 | } |
| 143 | |
| 144 | @Override |
Jordan Halterman | 6807c8f | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 145 | public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) { |
| 146 | return sendAndReceive(context.transferSubject, request, memberId); |
| 147 | } |
| 148 | |
| 149 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 150 | public void publish(MemberId memberId, PublishRequest request) { |
| 151 | clusterCommunicator.unicast(request, |
| 152 | context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id())); |
| 153 | } |
| 154 | |
| 155 | @Override |
| 156 | public void registerOpenSessionHandler( |
| 157 | Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) { |
| 158 | clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode); |
| 159 | } |
| 160 | |
| 161 | @Override |
| 162 | public void unregisterOpenSessionHandler() { |
| 163 | clusterCommunicator.removeSubscriber(context.openSessionSubject); |
| 164 | } |
| 165 | |
| 166 | @Override |
| 167 | public void registerCloseSessionHandler( |
| 168 | Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) { |
| 169 | clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode); |
| 170 | } |
| 171 | |
| 172 | @Override |
| 173 | public void unregisterCloseSessionHandler() { |
| 174 | clusterCommunicator.removeSubscriber(context.closeSessionSubject); |
| 175 | } |
| 176 | |
| 177 | @Override |
| 178 | public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) { |
| 179 | clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode); |
| 180 | } |
| 181 | |
| 182 | @Override |
| 183 | public void unregisterKeepAliveHandler() { |
| 184 | clusterCommunicator.removeSubscriber(context.keepAliveSubject); |
| 185 | } |
| 186 | |
| 187 | @Override |
| 188 | public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) { |
| 189 | clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode); |
| 190 | } |
| 191 | |
| 192 | @Override |
| 193 | public void unregisterQueryHandler() { |
| 194 | clusterCommunicator.removeSubscriber(context.querySubject); |
| 195 | } |
| 196 | |
| 197 | @Override |
| 198 | public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) { |
| 199 | clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode); |
| 200 | } |
| 201 | |
| 202 | @Override |
| 203 | public void unregisterCommandHandler() { |
| 204 | clusterCommunicator.removeSubscriber(context.commandSubject); |
| 205 | } |
| 206 | |
| 207 | @Override |
| 208 | public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) { |
| 209 | clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode); |
| 210 | } |
| 211 | |
| 212 | @Override |
| 213 | public void unregisterMetadataHandler() { |
| 214 | clusterCommunicator.removeSubscriber(context.metadataSubject); |
| 215 | } |
| 216 | |
| 217 | @Override |
| 218 | public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) { |
| 219 | clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode); |
| 220 | } |
| 221 | |
| 222 | @Override |
| 223 | public void unregisterJoinHandler() { |
| 224 | clusterCommunicator.removeSubscriber(context.joinSubject); |
| 225 | } |
| 226 | |
| 227 | @Override |
| 228 | public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) { |
| 229 | clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode); |
| 230 | } |
| 231 | |
| 232 | @Override |
| 233 | public void unregisterLeaveHandler() { |
| 234 | clusterCommunicator.removeSubscriber(context.leaveSubject); |
| 235 | } |
| 236 | |
| 237 | @Override |
| 238 | public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) { |
| 239 | clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode); |
| 240 | } |
| 241 | |
| 242 | @Override |
| 243 | public void unregisterConfigureHandler() { |
| 244 | clusterCommunicator.removeSubscriber(context.configureSubject); |
| 245 | } |
| 246 | |
| 247 | @Override |
| 248 | public void registerReconfigureHandler( |
| 249 | Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) { |
| 250 | clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode); |
| 251 | } |
| 252 | |
| 253 | @Override |
| 254 | public void unregisterReconfigureHandler() { |
| 255 | clusterCommunicator.removeSubscriber(context.reconfigureSubject); |
| 256 | } |
| 257 | |
| 258 | @Override |
| 259 | public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) { |
| 260 | clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode); |
| 261 | } |
| 262 | |
| 263 | @Override |
| 264 | public void unregisterInstallHandler() { |
| 265 | clusterCommunicator.removeSubscriber(context.installSubject); |
| 266 | } |
| 267 | |
| 268 | @Override |
| 269 | public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) { |
| 270 | clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode); |
| 271 | } |
| 272 | |
| 273 | @Override |
| 274 | public void unregisterPollHandler() { |
| 275 | clusterCommunicator.removeSubscriber(context.pollSubject); |
| 276 | } |
| 277 | |
| 278 | @Override |
| 279 | public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) { |
| 280 | clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode); |
| 281 | } |
| 282 | |
| 283 | @Override |
| 284 | public void unregisterVoteHandler() { |
| 285 | clusterCommunicator.removeSubscriber(context.voteSubject); |
| 286 | } |
| 287 | |
| 288 | @Override |
| 289 | public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) { |
| 290 | clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode); |
| 291 | } |
| 292 | |
| 293 | @Override |
| 294 | public void unregisterAppendHandler() { |
| 295 | clusterCommunicator.removeSubscriber(context.appendSubject); |
| 296 | } |
| 297 | |
| 298 | @Override |
Jordan Halterman | 6807c8f | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 299 | public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) { |
| 300 | clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode); |
| 301 | } |
| 302 | |
| 303 | @Override |
| 304 | public void unregisterTransferHandler() { |
| 305 | clusterCommunicator.removeSubscriber(context.transferSubject); |
| 306 | } |
| 307 | |
| 308 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 309 | public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) { |
| 310 | clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor); |
| 311 | } |
| 312 | |
| 313 | @Override |
| 314 | public void unregisterResetListener(SessionId sessionId) { |
| 315 | clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id())); |
| 316 | } |
| 317 | } |