Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2017-present Open Networking Foundation |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.primitives.impl; |
| 17 | |
| 18 | import java.util.concurrent.CompletableFuture; |
| 19 | import java.util.concurrent.Executor; |
| 20 | import java.util.function.Consumer; |
| 21 | import java.util.function.Function; |
| 22 | |
| 23 | import io.atomix.protocols.raft.cluster.MemberId; |
| 24 | import io.atomix.protocols.raft.protocol.AppendRequest; |
| 25 | import io.atomix.protocols.raft.protocol.AppendResponse; |
| 26 | import io.atomix.protocols.raft.protocol.CloseSessionRequest; |
| 27 | import io.atomix.protocols.raft.protocol.CloseSessionResponse; |
| 28 | import io.atomix.protocols.raft.protocol.CommandRequest; |
| 29 | import io.atomix.protocols.raft.protocol.CommandResponse; |
| 30 | import io.atomix.protocols.raft.protocol.ConfigureRequest; |
| 31 | import io.atomix.protocols.raft.protocol.ConfigureResponse; |
Jordan Halterman | e372d85 | 2017-11-02 15:00:06 -0700 | [diff] [blame] | 32 | import io.atomix.protocols.raft.protocol.HeartbeatRequest; |
| 33 | import io.atomix.protocols.raft.protocol.HeartbeatResponse; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 34 | import io.atomix.protocols.raft.protocol.InstallRequest; |
| 35 | import io.atomix.protocols.raft.protocol.InstallResponse; |
| 36 | import io.atomix.protocols.raft.protocol.JoinRequest; |
| 37 | import io.atomix.protocols.raft.protocol.JoinResponse; |
| 38 | import io.atomix.protocols.raft.protocol.KeepAliveRequest; |
| 39 | import io.atomix.protocols.raft.protocol.KeepAliveResponse; |
| 40 | import io.atomix.protocols.raft.protocol.LeaveRequest; |
| 41 | import io.atomix.protocols.raft.protocol.LeaveResponse; |
| 42 | import io.atomix.protocols.raft.protocol.MetadataRequest; |
| 43 | import io.atomix.protocols.raft.protocol.MetadataResponse; |
| 44 | import io.atomix.protocols.raft.protocol.OpenSessionRequest; |
| 45 | import io.atomix.protocols.raft.protocol.OpenSessionResponse; |
| 46 | import io.atomix.protocols.raft.protocol.PollRequest; |
| 47 | import io.atomix.protocols.raft.protocol.PollResponse; |
| 48 | import io.atomix.protocols.raft.protocol.PublishRequest; |
| 49 | import io.atomix.protocols.raft.protocol.QueryRequest; |
| 50 | import io.atomix.protocols.raft.protocol.QueryResponse; |
| 51 | import io.atomix.protocols.raft.protocol.RaftServerProtocol; |
| 52 | import io.atomix.protocols.raft.protocol.ReconfigureRequest; |
| 53 | import io.atomix.protocols.raft.protocol.ReconfigureResponse; |
| 54 | import io.atomix.protocols.raft.protocol.ResetRequest; |
Jordan Halterman | d3e02d3 | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 55 | import io.atomix.protocols.raft.protocol.TransferRequest; |
| 56 | import io.atomix.protocols.raft.protocol.TransferResponse; |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 57 | import io.atomix.protocols.raft.protocol.VoteRequest; |
| 58 | import io.atomix.protocols.raft.protocol.VoteResponse; |
| 59 | import io.atomix.protocols.raft.session.SessionId; |
| 60 | import org.onosproject.cluster.NodeId; |
| 61 | import org.onosproject.cluster.PartitionId; |
| 62 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
| 63 | import org.onosproject.store.service.Serializer; |
| 64 | |
| 65 | /** |
| 66 | * Raft server protocol that uses a {@link ClusterCommunicationService}. |
| 67 | */ |
| 68 | public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol { |
| 69 | |
| 70 | public RaftServerCommunicator( |
| 71 | PartitionId partitionId, |
| 72 | Serializer serializer, |
| 73 | ClusterCommunicationService clusterCommunicator) { |
| 74 | super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator); |
| 75 | } |
| 76 | |
| 77 | @Override |
| 78 | public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) { |
| 79 | return sendAndReceive(context.openSessionSubject, request, memberId); |
| 80 | } |
| 81 | |
| 82 | @Override |
| 83 | public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) { |
| 84 | return sendAndReceive(context.closeSessionSubject, request, memberId); |
| 85 | } |
| 86 | |
| 87 | @Override |
| 88 | public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) { |
| 89 | return sendAndReceive(context.keepAliveSubject, request, memberId); |
| 90 | } |
| 91 | |
| 92 | @Override |
| 93 | public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) { |
| 94 | return sendAndReceive(context.querySubject, request, memberId); |
| 95 | } |
| 96 | |
| 97 | @Override |
| 98 | public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) { |
| 99 | return sendAndReceive(context.commandSubject, request, memberId); |
| 100 | } |
| 101 | |
| 102 | @Override |
| 103 | public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) { |
| 104 | return sendAndReceive(context.metadataSubject, request, memberId); |
| 105 | } |
| 106 | |
| 107 | @Override |
| 108 | public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) { |
| 109 | return sendAndReceive(context.joinSubject, request, memberId); |
| 110 | } |
| 111 | |
| 112 | @Override |
| 113 | public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) { |
| 114 | return sendAndReceive(context.leaveSubject, request, memberId); |
| 115 | } |
| 116 | |
| 117 | @Override |
| 118 | public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) { |
| 119 | return sendAndReceive(context.configureSubject, request, memberId); |
| 120 | } |
| 121 | |
| 122 | @Override |
| 123 | public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) { |
| 124 | return sendAndReceive(context.reconfigureSubject, request, memberId); |
| 125 | } |
| 126 | |
| 127 | @Override |
| 128 | public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) { |
| 129 | return sendAndReceive(context.installSubject, request, memberId); |
| 130 | } |
| 131 | |
| 132 | @Override |
| 133 | public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) { |
| 134 | return sendAndReceive(context.pollSubject, request, memberId); |
| 135 | } |
| 136 | |
| 137 | @Override |
| 138 | public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) { |
| 139 | return sendAndReceive(context.voteSubject, request, memberId); |
| 140 | } |
| 141 | |
| 142 | @Override |
| 143 | public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) { |
| 144 | return sendAndReceive(context.appendSubject, request, memberId); |
| 145 | } |
| 146 | |
| 147 | @Override |
Jordan Halterman | d3e02d3 | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 148 | public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) { |
| 149 | return sendAndReceive(context.transferSubject, request, memberId); |
| 150 | } |
| 151 | |
| 152 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 153 | public void publish(MemberId memberId, PublishRequest request) { |
| 154 | clusterCommunicator.unicast(request, |
| 155 | context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id())); |
| 156 | } |
| 157 | |
| 158 | @Override |
Jordan Halterman | e372d85 | 2017-11-02 15:00:06 -0700 | [diff] [blame] | 159 | public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) { |
| 160 | return sendAndReceive(context.heartbeatSubject, request, memberId); |
| 161 | } |
| 162 | |
| 163 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 164 | public void registerOpenSessionHandler( |
| 165 | Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) { |
| 166 | clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode); |
| 167 | } |
| 168 | |
| 169 | @Override |
| 170 | public void unregisterOpenSessionHandler() { |
| 171 | clusterCommunicator.removeSubscriber(context.openSessionSubject); |
| 172 | } |
| 173 | |
| 174 | @Override |
| 175 | public void registerCloseSessionHandler( |
| 176 | Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) { |
| 177 | clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode); |
| 178 | } |
| 179 | |
| 180 | @Override |
| 181 | public void unregisterCloseSessionHandler() { |
| 182 | clusterCommunicator.removeSubscriber(context.closeSessionSubject); |
| 183 | } |
| 184 | |
| 185 | @Override |
| 186 | public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) { |
| 187 | clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode); |
| 188 | } |
| 189 | |
| 190 | @Override |
| 191 | public void unregisterKeepAliveHandler() { |
| 192 | clusterCommunicator.removeSubscriber(context.keepAliveSubject); |
| 193 | } |
| 194 | |
| 195 | @Override |
| 196 | public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) { |
| 197 | clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode); |
| 198 | } |
| 199 | |
| 200 | @Override |
| 201 | public void unregisterQueryHandler() { |
| 202 | clusterCommunicator.removeSubscriber(context.querySubject); |
| 203 | } |
| 204 | |
| 205 | @Override |
| 206 | public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) { |
| 207 | clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode); |
| 208 | } |
| 209 | |
| 210 | @Override |
| 211 | public void unregisterCommandHandler() { |
| 212 | clusterCommunicator.removeSubscriber(context.commandSubject); |
| 213 | } |
| 214 | |
| 215 | @Override |
| 216 | public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) { |
| 217 | clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode); |
| 218 | } |
| 219 | |
| 220 | @Override |
| 221 | public void unregisterMetadataHandler() { |
| 222 | clusterCommunicator.removeSubscriber(context.metadataSubject); |
| 223 | } |
| 224 | |
| 225 | @Override |
| 226 | public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) { |
| 227 | clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode); |
| 228 | } |
| 229 | |
| 230 | @Override |
| 231 | public void unregisterJoinHandler() { |
| 232 | clusterCommunicator.removeSubscriber(context.joinSubject); |
| 233 | } |
| 234 | |
| 235 | @Override |
| 236 | public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) { |
| 237 | clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode); |
| 238 | } |
| 239 | |
| 240 | @Override |
| 241 | public void unregisterLeaveHandler() { |
| 242 | clusterCommunicator.removeSubscriber(context.leaveSubject); |
| 243 | } |
| 244 | |
| 245 | @Override |
| 246 | public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) { |
| 247 | clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode); |
| 248 | } |
| 249 | |
| 250 | @Override |
| 251 | public void unregisterConfigureHandler() { |
| 252 | clusterCommunicator.removeSubscriber(context.configureSubject); |
| 253 | } |
| 254 | |
| 255 | @Override |
| 256 | public void registerReconfigureHandler( |
| 257 | Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) { |
| 258 | clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode); |
| 259 | } |
| 260 | |
| 261 | @Override |
| 262 | public void unregisterReconfigureHandler() { |
| 263 | clusterCommunicator.removeSubscriber(context.reconfigureSubject); |
| 264 | } |
| 265 | |
| 266 | @Override |
| 267 | public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) { |
| 268 | clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode); |
| 269 | } |
| 270 | |
| 271 | @Override |
| 272 | public void unregisterInstallHandler() { |
| 273 | clusterCommunicator.removeSubscriber(context.installSubject); |
| 274 | } |
| 275 | |
| 276 | @Override |
| 277 | public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) { |
| 278 | clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode); |
| 279 | } |
| 280 | |
| 281 | @Override |
| 282 | public void unregisterPollHandler() { |
| 283 | clusterCommunicator.removeSubscriber(context.pollSubject); |
| 284 | } |
| 285 | |
| 286 | @Override |
| 287 | public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) { |
| 288 | clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode); |
| 289 | } |
| 290 | |
| 291 | @Override |
| 292 | public void unregisterVoteHandler() { |
| 293 | clusterCommunicator.removeSubscriber(context.voteSubject); |
| 294 | } |
| 295 | |
| 296 | @Override |
| 297 | public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) { |
| 298 | clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode); |
| 299 | } |
| 300 | |
| 301 | @Override |
| 302 | public void unregisterAppendHandler() { |
| 303 | clusterCommunicator.removeSubscriber(context.appendSubject); |
| 304 | } |
| 305 | |
| 306 | @Override |
Jordan Halterman | d3e02d3 | 2017-08-28 20:58:24 -0700 | [diff] [blame] | 307 | public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) { |
| 308 | clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode); |
| 309 | } |
| 310 | |
| 311 | @Override |
| 312 | public void unregisterTransferHandler() { |
| 313 | clusterCommunicator.removeSubscriber(context.transferSubject); |
| 314 | } |
| 315 | |
| 316 | @Override |
Jordan Halterman | 2bf177c | 2017-06-29 01:49:08 -0700 | [diff] [blame] | 317 | public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) { |
| 318 | clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor); |
| 319 | } |
| 320 | |
| 321 | @Override |
| 322 | public void unregisterResetListener(SessionId sessionId) { |
| 323 | clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id())); |
| 324 | } |
| 325 | } |