blob: 08c5f48573e3b3c3524a8cb2f9ee975012a6720e [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.concurrent.CompletableFuture;
19import java.util.concurrent.Executor;
20import java.util.function.Consumer;
21import java.util.function.Function;
22
23import io.atomix.protocols.raft.cluster.MemberId;
24import io.atomix.protocols.raft.protocol.AppendRequest;
25import io.atomix.protocols.raft.protocol.AppendResponse;
26import io.atomix.protocols.raft.protocol.CloseSessionRequest;
27import io.atomix.protocols.raft.protocol.CloseSessionResponse;
28import io.atomix.protocols.raft.protocol.CommandRequest;
29import io.atomix.protocols.raft.protocol.CommandResponse;
30import io.atomix.protocols.raft.protocol.ConfigureRequest;
31import io.atomix.protocols.raft.protocol.ConfigureResponse;
Jordan Haltermane372d852017-11-02 15:00:06 -070032import io.atomix.protocols.raft.protocol.HeartbeatRequest;
33import io.atomix.protocols.raft.protocol.HeartbeatResponse;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import io.atomix.protocols.raft.protocol.InstallRequest;
35import io.atomix.protocols.raft.protocol.InstallResponse;
36import io.atomix.protocols.raft.protocol.JoinRequest;
37import io.atomix.protocols.raft.protocol.JoinResponse;
38import io.atomix.protocols.raft.protocol.KeepAliveRequest;
39import io.atomix.protocols.raft.protocol.KeepAliveResponse;
40import io.atomix.protocols.raft.protocol.LeaveRequest;
41import io.atomix.protocols.raft.protocol.LeaveResponse;
42import io.atomix.protocols.raft.protocol.MetadataRequest;
43import io.atomix.protocols.raft.protocol.MetadataResponse;
44import io.atomix.protocols.raft.protocol.OpenSessionRequest;
45import io.atomix.protocols.raft.protocol.OpenSessionResponse;
46import io.atomix.protocols.raft.protocol.PollRequest;
47import io.atomix.protocols.raft.protocol.PollResponse;
48import io.atomix.protocols.raft.protocol.PublishRequest;
49import io.atomix.protocols.raft.protocol.QueryRequest;
50import io.atomix.protocols.raft.protocol.QueryResponse;
51import io.atomix.protocols.raft.protocol.RaftServerProtocol;
52import io.atomix.protocols.raft.protocol.ReconfigureRequest;
53import io.atomix.protocols.raft.protocol.ReconfigureResponse;
54import io.atomix.protocols.raft.protocol.ResetRequest;
Jordan Haltermand3e02d32017-08-28 20:58:24 -070055import io.atomix.protocols.raft.protocol.TransferRequest;
56import io.atomix.protocols.raft.protocol.TransferResponse;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070057import io.atomix.protocols.raft.protocol.VoteRequest;
58import io.atomix.protocols.raft.protocol.VoteResponse;
59import io.atomix.protocols.raft.session.SessionId;
60import org.onosproject.cluster.NodeId;
61import org.onosproject.cluster.PartitionId;
62import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
63import org.onosproject.store.service.Serializer;
64
65/**
66 * Raft server protocol that uses a {@link ClusterCommunicationService}.
67 */
68public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
69
70 public RaftServerCommunicator(
71 PartitionId partitionId,
72 Serializer serializer,
73 ClusterCommunicationService clusterCommunicator) {
74 super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
75 }
76
77 @Override
78 public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
79 return sendAndReceive(context.openSessionSubject, request, memberId);
80 }
81
82 @Override
83 public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
84 return sendAndReceive(context.closeSessionSubject, request, memberId);
85 }
86
87 @Override
88 public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
89 return sendAndReceive(context.keepAliveSubject, request, memberId);
90 }
91
92 @Override
93 public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
94 return sendAndReceive(context.querySubject, request, memberId);
95 }
96
97 @Override
98 public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
99 return sendAndReceive(context.commandSubject, request, memberId);
100 }
101
102 @Override
103 public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
104 return sendAndReceive(context.metadataSubject, request, memberId);
105 }
106
107 @Override
108 public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
109 return sendAndReceive(context.joinSubject, request, memberId);
110 }
111
112 @Override
113 public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
114 return sendAndReceive(context.leaveSubject, request, memberId);
115 }
116
117 @Override
118 public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) {
119 return sendAndReceive(context.configureSubject, request, memberId);
120 }
121
122 @Override
123 public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) {
124 return sendAndReceive(context.reconfigureSubject, request, memberId);
125 }
126
127 @Override
128 public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
129 return sendAndReceive(context.installSubject, request, memberId);
130 }
131
132 @Override
133 public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
134 return sendAndReceive(context.pollSubject, request, memberId);
135 }
136
137 @Override
138 public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
139 return sendAndReceive(context.voteSubject, request, memberId);
140 }
141
142 @Override
143 public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
144 return sendAndReceive(context.appendSubject, request, memberId);
145 }
146
147 @Override
Jordan Haltermand3e02d32017-08-28 20:58:24 -0700148 public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
149 return sendAndReceive(context.transferSubject, request, memberId);
150 }
151
152 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700153 public void publish(MemberId memberId, PublishRequest request) {
154 clusterCommunicator.unicast(request,
155 context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
156 }
157
158 @Override
Jordan Haltermane372d852017-11-02 15:00:06 -0700159 public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) {
160 return sendAndReceive(context.heartbeatSubject, request, memberId);
161 }
162
163 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700164 public void registerOpenSessionHandler(
165 Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
166 clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
167 }
168
169 @Override
170 public void unregisterOpenSessionHandler() {
171 clusterCommunicator.removeSubscriber(context.openSessionSubject);
172 }
173
174 @Override
175 public void registerCloseSessionHandler(
176 Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) {
177 clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode);
178 }
179
180 @Override
181 public void unregisterCloseSessionHandler() {
182 clusterCommunicator.removeSubscriber(context.closeSessionSubject);
183 }
184
185 @Override
186 public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
187 clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode);
188 }
189
190 @Override
191 public void unregisterKeepAliveHandler() {
192 clusterCommunicator.removeSubscriber(context.keepAliveSubject);
193 }
194
195 @Override
196 public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) {
197 clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode);
198 }
199
200 @Override
201 public void unregisterQueryHandler() {
202 clusterCommunicator.removeSubscriber(context.querySubject);
203 }
204
205 @Override
206 public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) {
207 clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode);
208 }
209
210 @Override
211 public void unregisterCommandHandler() {
212 clusterCommunicator.removeSubscriber(context.commandSubject);
213 }
214
215 @Override
216 public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) {
217 clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode);
218 }
219
220 @Override
221 public void unregisterMetadataHandler() {
222 clusterCommunicator.removeSubscriber(context.metadataSubject);
223 }
224
225 @Override
226 public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) {
227 clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode);
228 }
229
230 @Override
231 public void unregisterJoinHandler() {
232 clusterCommunicator.removeSubscriber(context.joinSubject);
233 }
234
235 @Override
236 public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) {
237 clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode);
238 }
239
240 @Override
241 public void unregisterLeaveHandler() {
242 clusterCommunicator.removeSubscriber(context.leaveSubject);
243 }
244
245 @Override
246 public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) {
247 clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode);
248 }
249
250 @Override
251 public void unregisterConfigureHandler() {
252 clusterCommunicator.removeSubscriber(context.configureSubject);
253 }
254
255 @Override
256 public void registerReconfigureHandler(
257 Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) {
258 clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode);
259 }
260
261 @Override
262 public void unregisterReconfigureHandler() {
263 clusterCommunicator.removeSubscriber(context.reconfigureSubject);
264 }
265
266 @Override
267 public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) {
268 clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode);
269 }
270
271 @Override
272 public void unregisterInstallHandler() {
273 clusterCommunicator.removeSubscriber(context.installSubject);
274 }
275
276 @Override
277 public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) {
278 clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode);
279 }
280
281 @Override
282 public void unregisterPollHandler() {
283 clusterCommunicator.removeSubscriber(context.pollSubject);
284 }
285
286 @Override
287 public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) {
288 clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode);
289 }
290
291 @Override
292 public void unregisterVoteHandler() {
293 clusterCommunicator.removeSubscriber(context.voteSubject);
294 }
295
296 @Override
297 public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) {
298 clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode);
299 }
300
301 @Override
302 public void unregisterAppendHandler() {
303 clusterCommunicator.removeSubscriber(context.appendSubject);
304 }
305
306 @Override
Jordan Haltermand3e02d32017-08-28 20:58:24 -0700307 public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) {
308 clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode);
309 }
310
311 @Override
312 public void unregisterTransferHandler() {
313 clusterCommunicator.removeSubscriber(context.transferSubject);
314 }
315
316 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700317 public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
318 clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
319 }
320
321 @Override
322 public void unregisterResetListener(SessionId sessionId) {
323 clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id()));
324 }
325}