Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2017-present Open Networking Foundation |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.impl; |
| 17 | |
| 18 | import java.util.concurrent.CompletableFuture; |
| 19 | import java.util.concurrent.Executor; |
| 20 | import java.util.function.Consumer; |
| 21 | import java.util.function.Function; |
| 22 | |
| 23 | import io.atomix.protocols.raft.cluster.MemberId; |
| 24 | import io.atomix.protocols.raft.protocol.AppendRequest; |
| 25 | import io.atomix.protocols.raft.protocol.AppendResponse; |
| 26 | import io.atomix.protocols.raft.protocol.CloseSessionRequest; |
| 27 | import io.atomix.protocols.raft.protocol.CloseSessionResponse; |
| 28 | import io.atomix.protocols.raft.protocol.CommandRequest; |
| 29 | import io.atomix.protocols.raft.protocol.CommandResponse; |
| 30 | import io.atomix.protocols.raft.protocol.ConfigureRequest; |
| 31 | import io.atomix.protocols.raft.protocol.ConfigureResponse; |
| 32 | import io.atomix.protocols.raft.protocol.InstallRequest; |
| 33 | import io.atomix.protocols.raft.protocol.InstallResponse; |
| 34 | import io.atomix.protocols.raft.protocol.JoinRequest; |
| 35 | import io.atomix.protocols.raft.protocol.JoinResponse; |
| 36 | import io.atomix.protocols.raft.protocol.KeepAliveRequest; |
| 37 | import io.atomix.protocols.raft.protocol.KeepAliveResponse; |
| 38 | import io.atomix.protocols.raft.protocol.LeaveRequest; |
| 39 | import io.atomix.protocols.raft.protocol.LeaveResponse; |
| 40 | import io.atomix.protocols.raft.protocol.MetadataRequest; |
| 41 | import io.atomix.protocols.raft.protocol.MetadataResponse; |
| 42 | import io.atomix.protocols.raft.protocol.OpenSessionRequest; |
| 43 | import io.atomix.protocols.raft.protocol.OpenSessionResponse; |
| 44 | import io.atomix.protocols.raft.protocol.PollRequest; |
| 45 | import io.atomix.protocols.raft.protocol.PollResponse; |
| 46 | import io.atomix.protocols.raft.protocol.PublishRequest; |
| 47 | import io.atomix.protocols.raft.protocol.QueryRequest; |
| 48 | import io.atomix.protocols.raft.protocol.QueryResponse; |
| 49 | import io.atomix.protocols.raft.protocol.RaftServerProtocol; |
| 50 | import io.atomix.protocols.raft.protocol.ReconfigureRequest; |
| 51 | import io.atomix.protocols.raft.protocol.ReconfigureResponse; |
| 52 | import io.atomix.protocols.raft.protocol.ResetRequest; |
Jordan Halterman | 6807c8f | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 53 | import io.atomix.protocols.raft.protocol.TransferRequest; |
| 54 | import io.atomix.protocols.raft.protocol.TransferResponse; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 55 | import io.atomix.protocols.raft.protocol.VoteRequest; |
| 56 | import io.atomix.protocols.raft.protocol.VoteResponse; |
| 57 | import io.atomix.protocols.raft.session.SessionId; |
| 58 | import org.onosproject.cluster.NodeId; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 59 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame^] | 60 | import org.onosproject.store.cluster.messaging.ClusterCommunicator; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 61 | import org.onosproject.store.service.Serializer; |
| 62 | |
| 63 | /** |
| 64 | * Raft server protocol that uses a {@link ClusterCommunicationService}. |
| 65 | */ |
| 66 | public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol { |
| 67 | |
| 68 | public RaftServerCommunicator( |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame^] | 69 | String prefix, |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 70 | Serializer serializer, |
Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame^] | 71 | ClusterCommunicator clusterCommunicator) { |
| 72 | super(new RaftMessageContext(prefix), serializer, clusterCommunicator); |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 73 | } |
| 74 | |
| 75 | @Override |
| 76 | public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) { |
| 77 | return sendAndReceive(context.openSessionSubject, request, memberId); |
| 78 | } |
| 79 | |
| 80 | @Override |
| 81 | public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) { |
| 82 | return sendAndReceive(context.closeSessionSubject, request, memberId); |
| 83 | } |
| 84 | |
| 85 | @Override |
| 86 | public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) { |
| 87 | return sendAndReceive(context.keepAliveSubject, request, memberId); |
| 88 | } |
| 89 | |
| 90 | @Override |
| 91 | public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { |
| 92 | return sendAndReceive(context.querySubject, request, memberId); |
| 93 | } |
| 94 | |
| 95 | @Override |
| 96 | public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) { |
| 97 | return sendAndReceive(context.commandSubject, request, memberId); |
| 98 | } |
| 99 | |
| 100 | @Override |
| 101 | public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) { |
| 102 | return sendAndReceive(context.metadataSubject, request, memberId); |
| 103 | } |
| 104 | |
| 105 | @Override |
| 106 | public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) { |
| 107 | return sendAndReceive(context.joinSubject, request, memberId); |
| 108 | } |
| 109 | |
| 110 | @Override |
| 111 | public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) { |
| 112 | return sendAndReceive(context.leaveSubject, request, memberId); |
| 113 | } |
| 114 | |
| 115 | @Override |
| 116 | public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) { |
| 117 | return sendAndReceive(context.configureSubject, request, memberId); |
| 118 | } |
| 119 | |
| 120 | @Override |
| 121 | public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) { |
| 122 | return sendAndReceive(context.reconfigureSubject, request, memberId); |
| 123 | } |
| 124 | |
| 125 | @Override |
| 126 | public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) { |
| 127 | return sendAndReceive(context.installSubject, request, memberId); |
| 128 | } |
| 129 | |
| 130 | @Override |
| 131 | public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) { |
| 132 | return sendAndReceive(context.pollSubject, request, memberId); |
| 133 | } |
| 134 | |
| 135 | @Override |
| 136 | public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) { |
| 137 | return sendAndReceive(context.voteSubject, request, memberId); |
| 138 | } |
| 139 | |
| 140 | @Override |
| 141 | public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) { |
| 142 | return sendAndReceive(context.appendSubject, request, memberId); |
| 143 | } |
| 144 | |
| 145 | @Override |
Jordan Halterman | 6807c8f | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 146 | public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) { |
| 147 | return sendAndReceive(context.transferSubject, request, memberId); |
| 148 | } |
| 149 | |
| 150 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 151 | public void publish(MemberId memberId, PublishRequest request) { |
| 152 | clusterCommunicator.unicast(request, |
| 153 | context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id())); |
| 154 | } |
| 155 | |
| 156 | @Override |
| 157 | public void registerOpenSessionHandler( |
| 158 | Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) { |
| 159 | clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode); |
| 160 | } |
| 161 | |
| 162 | @Override |
| 163 | public void unregisterOpenSessionHandler() { |
| 164 | clusterCommunicator.removeSubscriber(context.openSessionSubject); |
| 165 | } |
| 166 | |
| 167 | @Override |
| 168 | public void registerCloseSessionHandler( |
| 169 | Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) { |
| 170 | clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode); |
| 171 | } |
| 172 | |
| 173 | @Override |
| 174 | public void unregisterCloseSessionHandler() { |
| 175 | clusterCommunicator.removeSubscriber(context.closeSessionSubject); |
| 176 | } |
| 177 | |
| 178 | @Override |
| 179 | public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) { |
| 180 | clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode); |
| 181 | } |
| 182 | |
| 183 | @Override |
| 184 | public void unregisterKeepAliveHandler() { |
| 185 | clusterCommunicator.removeSubscriber(context.keepAliveSubject); |
| 186 | } |
| 187 | |
| 188 | @Override |
| 189 | public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) { |
| 190 | clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode); |
| 191 | } |
| 192 | |
| 193 | @Override |
| 194 | public void unregisterQueryHandler() { |
| 195 | clusterCommunicator.removeSubscriber(context.querySubject); |
| 196 | } |
| 197 | |
| 198 | @Override |
| 199 | public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) { |
| 200 | clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode); |
| 201 | } |
| 202 | |
| 203 | @Override |
| 204 | public void unregisterCommandHandler() { |
| 205 | clusterCommunicator.removeSubscriber(context.commandSubject); |
| 206 | } |
| 207 | |
| 208 | @Override |
| 209 | public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) { |
| 210 | clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode); |
| 211 | } |
| 212 | |
| 213 | @Override |
| 214 | public void unregisterMetadataHandler() { |
| 215 | clusterCommunicator.removeSubscriber(context.metadataSubject); |
| 216 | } |
| 217 | |
| 218 | @Override |
| 219 | public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) { |
| 220 | clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode); |
| 221 | } |
| 222 | |
| 223 | @Override |
| 224 | public void unregisterJoinHandler() { |
| 225 | clusterCommunicator.removeSubscriber(context.joinSubject); |
| 226 | } |
| 227 | |
| 228 | @Override |
| 229 | public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) { |
| 230 | clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode); |
| 231 | } |
| 232 | |
| 233 | @Override |
| 234 | public void unregisterLeaveHandler() { |
| 235 | clusterCommunicator.removeSubscriber(context.leaveSubject); |
| 236 | } |
| 237 | |
| 238 | @Override |
| 239 | public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) { |
| 240 | clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode); |
| 241 | } |
| 242 | |
| 243 | @Override |
| 244 | public void unregisterConfigureHandler() { |
| 245 | clusterCommunicator.removeSubscriber(context.configureSubject); |
| 246 | } |
| 247 | |
| 248 | @Override |
| 249 | public void registerReconfigureHandler( |
| 250 | Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) { |
| 251 | clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode); |
| 252 | } |
| 253 | |
| 254 | @Override |
| 255 | public void unregisterReconfigureHandler() { |
| 256 | clusterCommunicator.removeSubscriber(context.reconfigureSubject); |
| 257 | } |
| 258 | |
| 259 | @Override |
| 260 | public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) { |
| 261 | clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode); |
| 262 | } |
| 263 | |
| 264 | @Override |
| 265 | public void unregisterInstallHandler() { |
| 266 | clusterCommunicator.removeSubscriber(context.installSubject); |
| 267 | } |
| 268 | |
| 269 | @Override |
| 270 | public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) { |
| 271 | clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode); |
| 272 | } |
| 273 | |
| 274 | @Override |
| 275 | public void unregisterPollHandler() { |
| 276 | clusterCommunicator.removeSubscriber(context.pollSubject); |
| 277 | } |
| 278 | |
| 279 | @Override |
| 280 | public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) { |
| 281 | clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode); |
| 282 | } |
| 283 | |
| 284 | @Override |
| 285 | public void unregisterVoteHandler() { |
| 286 | clusterCommunicator.removeSubscriber(context.voteSubject); |
| 287 | } |
| 288 | |
| 289 | @Override |
| 290 | public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) { |
| 291 | clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode); |
| 292 | } |
| 293 | |
| 294 | @Override |
| 295 | public void unregisterAppendHandler() { |
| 296 | clusterCommunicator.removeSubscriber(context.appendSubject); |
| 297 | } |
| 298 | |
| 299 | @Override |
Jordan Halterman | 6807c8f | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 300 | public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) { |
| 301 | clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode); |
| 302 | } |
| 303 | |
| 304 | @Override |
| 305 | public void unregisterTransferHandler() { |
| 306 | clusterCommunicator.removeSubscriber(context.transferSubject); |
| 307 | } |
| 308 | |
| 309 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 310 | public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) { |
| 311 | clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor); |
| 312 | } |
| 313 | |
| 314 | @Override |
| 315 | public void unregisterResetListener(SessionId sessionId) { |
| 316 | clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id())); |
| 317 | } |
| 318 | } |