blob: f58db45228c3360ee5479ab7340e35def8516924 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.concurrent.CompletableFuture;
19import java.util.concurrent.Executor;
20import java.util.function.Consumer;
21import java.util.function.Function;
22
23import io.atomix.protocols.raft.cluster.MemberId;
24import io.atomix.protocols.raft.protocol.AppendRequest;
25import io.atomix.protocols.raft.protocol.AppendResponse;
26import io.atomix.protocols.raft.protocol.CloseSessionRequest;
27import io.atomix.protocols.raft.protocol.CloseSessionResponse;
28import io.atomix.protocols.raft.protocol.CommandRequest;
29import io.atomix.protocols.raft.protocol.CommandResponse;
30import io.atomix.protocols.raft.protocol.ConfigureRequest;
31import io.atomix.protocols.raft.protocol.ConfigureResponse;
Jordan Halterman19486e32017-11-02 15:00:06 -070032import io.atomix.protocols.raft.protocol.HeartbeatRequest;
33import io.atomix.protocols.raft.protocol.HeartbeatResponse;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070034import io.atomix.protocols.raft.protocol.InstallRequest;
35import io.atomix.protocols.raft.protocol.InstallResponse;
36import io.atomix.protocols.raft.protocol.JoinRequest;
37import io.atomix.protocols.raft.protocol.JoinResponse;
38import io.atomix.protocols.raft.protocol.KeepAliveRequest;
39import io.atomix.protocols.raft.protocol.KeepAliveResponse;
40import io.atomix.protocols.raft.protocol.LeaveRequest;
41import io.atomix.protocols.raft.protocol.LeaveResponse;
42import io.atomix.protocols.raft.protocol.MetadataRequest;
43import io.atomix.protocols.raft.protocol.MetadataResponse;
44import io.atomix.protocols.raft.protocol.OpenSessionRequest;
45import io.atomix.protocols.raft.protocol.OpenSessionResponse;
46import io.atomix.protocols.raft.protocol.PollRequest;
47import io.atomix.protocols.raft.protocol.PollResponse;
48import io.atomix.protocols.raft.protocol.PublishRequest;
49import io.atomix.protocols.raft.protocol.QueryRequest;
50import io.atomix.protocols.raft.protocol.QueryResponse;
51import io.atomix.protocols.raft.protocol.RaftServerProtocol;
52import io.atomix.protocols.raft.protocol.ReconfigureRequest;
53import io.atomix.protocols.raft.protocol.ReconfigureResponse;
54import io.atomix.protocols.raft.protocol.ResetRequest;
Jordan Halterman6807c8f2017-08-28 20:58:24 -070055import io.atomix.protocols.raft.protocol.TransferRequest;
56import io.atomix.protocols.raft.protocol.TransferResponse;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070057import io.atomix.protocols.raft.protocol.VoteRequest;
58import io.atomix.protocols.raft.protocol.VoteResponse;
59import io.atomix.protocols.raft.session.SessionId;
60import org.onosproject.cluster.NodeId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.service.Serializer;
63
64/**
65 * Raft server protocol that uses a {@link ClusterCommunicationService}.
66 */
67public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
68
69 public RaftServerCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -070070 String prefix,
Jordan Halterman2bf177c2017-06-29 01:49:08 -070071 Serializer serializer,
Jordan Halterman28183ee2017-10-17 17:29:10 -070072 ClusterCommunicationService clusterCommunicator) {
Jordan Halterman980a8c12017-09-22 18:01:19 -070073 super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
Jordan Halterman2bf177c2017-06-29 01:49:08 -070074 }
75
76 @Override
77 public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
78 return sendAndReceive(context.openSessionSubject, request, memberId);
79 }
80
81 @Override
82 public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
83 return sendAndReceive(context.closeSessionSubject, request, memberId);
84 }
85
86 @Override
87 public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
88 return sendAndReceive(context.keepAliveSubject, request, memberId);
89 }
90
91 @Override
92 public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
93 return sendAndReceive(context.querySubject, request, memberId);
94 }
95
96 @Override
97 public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
98 return sendAndReceive(context.commandSubject, request, memberId);
99 }
100
101 @Override
102 public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
103 return sendAndReceive(context.metadataSubject, request, memberId);
104 }
105
106 @Override
107 public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
108 return sendAndReceive(context.joinSubject, request, memberId);
109 }
110
111 @Override
112 public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
113 return sendAndReceive(context.leaveSubject, request, memberId);
114 }
115
116 @Override
117 public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) {
118 return sendAndReceive(context.configureSubject, request, memberId);
119 }
120
121 @Override
122 public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) {
123 return sendAndReceive(context.reconfigureSubject, request, memberId);
124 }
125
126 @Override
127 public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
128 return sendAndReceive(context.installSubject, request, memberId);
129 }
130
131 @Override
132 public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
133 return sendAndReceive(context.pollSubject, request, memberId);
134 }
135
136 @Override
137 public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
138 return sendAndReceive(context.voteSubject, request, memberId);
139 }
140
141 @Override
142 public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
143 return sendAndReceive(context.appendSubject, request, memberId);
144 }
145
146 @Override
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700147 public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
148 return sendAndReceive(context.transferSubject, request, memberId);
149 }
150
151 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700152 public void publish(MemberId memberId, PublishRequest request) {
153 clusterCommunicator.unicast(request,
154 context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
155 }
156
157 @Override
Jordan Halterman19486e32017-11-02 15:00:06 -0700158 public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId, HeartbeatRequest request) {
159 return sendAndReceive(context.heartbeatSubject, request, memberId);
160 }
161
162 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700163 public void registerOpenSessionHandler(
164 Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
165 clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
166 }
167
168 @Override
169 public void unregisterOpenSessionHandler() {
170 clusterCommunicator.removeSubscriber(context.openSessionSubject);
171 }
172
173 @Override
174 public void registerCloseSessionHandler(
175 Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) {
176 clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode);
177 }
178
179 @Override
180 public void unregisterCloseSessionHandler() {
181 clusterCommunicator.removeSubscriber(context.closeSessionSubject);
182 }
183
184 @Override
185 public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
186 clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode);
187 }
188
189 @Override
190 public void unregisterKeepAliveHandler() {
191 clusterCommunicator.removeSubscriber(context.keepAliveSubject);
192 }
193
194 @Override
195 public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) {
196 clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode);
197 }
198
199 @Override
200 public void unregisterQueryHandler() {
201 clusterCommunicator.removeSubscriber(context.querySubject);
202 }
203
204 @Override
205 public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) {
206 clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode);
207 }
208
209 @Override
210 public void unregisterCommandHandler() {
211 clusterCommunicator.removeSubscriber(context.commandSubject);
212 }
213
214 @Override
215 public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) {
216 clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode);
217 }
218
219 @Override
220 public void unregisterMetadataHandler() {
221 clusterCommunicator.removeSubscriber(context.metadataSubject);
222 }
223
224 @Override
225 public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) {
226 clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode);
227 }
228
229 @Override
230 public void unregisterJoinHandler() {
231 clusterCommunicator.removeSubscriber(context.joinSubject);
232 }
233
234 @Override
235 public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) {
236 clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode);
237 }
238
239 @Override
240 public void unregisterLeaveHandler() {
241 clusterCommunicator.removeSubscriber(context.leaveSubject);
242 }
243
244 @Override
245 public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) {
246 clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode);
247 }
248
249 @Override
250 public void unregisterConfigureHandler() {
251 clusterCommunicator.removeSubscriber(context.configureSubject);
252 }
253
254 @Override
255 public void registerReconfigureHandler(
256 Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) {
257 clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode);
258 }
259
260 @Override
261 public void unregisterReconfigureHandler() {
262 clusterCommunicator.removeSubscriber(context.reconfigureSubject);
263 }
264
265 @Override
266 public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) {
267 clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode);
268 }
269
270 @Override
271 public void unregisterInstallHandler() {
272 clusterCommunicator.removeSubscriber(context.installSubject);
273 }
274
275 @Override
276 public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) {
277 clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode);
278 }
279
280 @Override
281 public void unregisterPollHandler() {
282 clusterCommunicator.removeSubscriber(context.pollSubject);
283 }
284
285 @Override
286 public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) {
287 clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode);
288 }
289
290 @Override
291 public void unregisterVoteHandler() {
292 clusterCommunicator.removeSubscriber(context.voteSubject);
293 }
294
295 @Override
296 public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) {
297 clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode);
298 }
299
300 @Override
301 public void unregisterAppendHandler() {
302 clusterCommunicator.removeSubscriber(context.appendSubject);
303 }
304
305 @Override
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700306 public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) {
307 clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode);
308 }
309
310 @Override
311 public void unregisterTransferHandler() {
312 clusterCommunicator.removeSubscriber(context.transferSubject);
313 }
314
315 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700316 public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
317 clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
318 }
319
320 @Override
321 public void unregisterResetListener(SessionId sessionId) {
322 clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id()));
323 }
324}