blob: 097ee469c68877e01251a3087c1450a9b29cda68 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.concurrent.CompletableFuture;
19import java.util.concurrent.Executor;
20import java.util.function.Consumer;
21import java.util.function.Function;
22
23import io.atomix.protocols.raft.cluster.MemberId;
24import io.atomix.protocols.raft.protocol.AppendRequest;
25import io.atomix.protocols.raft.protocol.AppendResponse;
26import io.atomix.protocols.raft.protocol.CloseSessionRequest;
27import io.atomix.protocols.raft.protocol.CloseSessionResponse;
28import io.atomix.protocols.raft.protocol.CommandRequest;
29import io.atomix.protocols.raft.protocol.CommandResponse;
30import io.atomix.protocols.raft.protocol.ConfigureRequest;
31import io.atomix.protocols.raft.protocol.ConfigureResponse;
32import io.atomix.protocols.raft.protocol.InstallRequest;
33import io.atomix.protocols.raft.protocol.InstallResponse;
34import io.atomix.protocols.raft.protocol.JoinRequest;
35import io.atomix.protocols.raft.protocol.JoinResponse;
36import io.atomix.protocols.raft.protocol.KeepAliveRequest;
37import io.atomix.protocols.raft.protocol.KeepAliveResponse;
38import io.atomix.protocols.raft.protocol.LeaveRequest;
39import io.atomix.protocols.raft.protocol.LeaveResponse;
40import io.atomix.protocols.raft.protocol.MetadataRequest;
41import io.atomix.protocols.raft.protocol.MetadataResponse;
42import io.atomix.protocols.raft.protocol.OpenSessionRequest;
43import io.atomix.protocols.raft.protocol.OpenSessionResponse;
44import io.atomix.protocols.raft.protocol.PollRequest;
45import io.atomix.protocols.raft.protocol.PollResponse;
46import io.atomix.protocols.raft.protocol.PublishRequest;
47import io.atomix.protocols.raft.protocol.QueryRequest;
48import io.atomix.protocols.raft.protocol.QueryResponse;
49import io.atomix.protocols.raft.protocol.RaftServerProtocol;
50import io.atomix.protocols.raft.protocol.ReconfigureRequest;
51import io.atomix.protocols.raft.protocol.ReconfigureResponse;
52import io.atomix.protocols.raft.protocol.ResetRequest;
Jordan Halterman6807c8f2017-08-28 20:58:24 -070053import io.atomix.protocols.raft.protocol.TransferRequest;
54import io.atomix.protocols.raft.protocol.TransferResponse;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070055import io.atomix.protocols.raft.protocol.VoteRequest;
56import io.atomix.protocols.raft.protocol.VoteResponse;
57import io.atomix.protocols.raft.session.SessionId;
58import org.onosproject.cluster.NodeId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070059import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.service.Serializer;
61
62/**
63 * Raft server protocol that uses a {@link ClusterCommunicationService}.
64 */
65public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
66
67 public RaftServerCommunicator(
Jordan Halterman980a8c12017-09-22 18:01:19 -070068 String prefix,
Jordan Halterman2bf177c2017-06-29 01:49:08 -070069 Serializer serializer,
Jordan Halterman28183ee2017-10-17 17:29:10 -070070 ClusterCommunicationService clusterCommunicator) {
Jordan Halterman980a8c12017-09-22 18:01:19 -070071 super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
Jordan Halterman2bf177c2017-06-29 01:49:08 -070072 }
73
74 @Override
75 public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
76 return sendAndReceive(context.openSessionSubject, request, memberId);
77 }
78
79 @Override
80 public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
81 return sendAndReceive(context.closeSessionSubject, request, memberId);
82 }
83
84 @Override
85 public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
86 return sendAndReceive(context.keepAliveSubject, request, memberId);
87 }
88
89 @Override
90 public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
91 return sendAndReceive(context.querySubject, request, memberId);
92 }
93
94 @Override
95 public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
96 return sendAndReceive(context.commandSubject, request, memberId);
97 }
98
99 @Override
100 public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
101 return sendAndReceive(context.metadataSubject, request, memberId);
102 }
103
104 @Override
105 public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
106 return sendAndReceive(context.joinSubject, request, memberId);
107 }
108
109 @Override
110 public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
111 return sendAndReceive(context.leaveSubject, request, memberId);
112 }
113
114 @Override
115 public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) {
116 return sendAndReceive(context.configureSubject, request, memberId);
117 }
118
119 @Override
120 public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) {
121 return sendAndReceive(context.reconfigureSubject, request, memberId);
122 }
123
124 @Override
125 public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
126 return sendAndReceive(context.installSubject, request, memberId);
127 }
128
129 @Override
130 public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
131 return sendAndReceive(context.pollSubject, request, memberId);
132 }
133
134 @Override
135 public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
136 return sendAndReceive(context.voteSubject, request, memberId);
137 }
138
139 @Override
140 public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
141 return sendAndReceive(context.appendSubject, request, memberId);
142 }
143
144 @Override
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700145 public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
146 return sendAndReceive(context.transferSubject, request, memberId);
147 }
148
149 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700150 public void publish(MemberId memberId, PublishRequest request) {
151 clusterCommunicator.unicast(request,
152 context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
153 }
154
155 @Override
156 public void registerOpenSessionHandler(
157 Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
158 clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
159 }
160
161 @Override
162 public void unregisterOpenSessionHandler() {
163 clusterCommunicator.removeSubscriber(context.openSessionSubject);
164 }
165
166 @Override
167 public void registerCloseSessionHandler(
168 Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) {
169 clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode);
170 }
171
172 @Override
173 public void unregisterCloseSessionHandler() {
174 clusterCommunicator.removeSubscriber(context.closeSessionSubject);
175 }
176
177 @Override
178 public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
179 clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode);
180 }
181
182 @Override
183 public void unregisterKeepAliveHandler() {
184 clusterCommunicator.removeSubscriber(context.keepAliveSubject);
185 }
186
187 @Override
188 public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) {
189 clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode);
190 }
191
192 @Override
193 public void unregisterQueryHandler() {
194 clusterCommunicator.removeSubscriber(context.querySubject);
195 }
196
197 @Override
198 public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) {
199 clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode);
200 }
201
202 @Override
203 public void unregisterCommandHandler() {
204 clusterCommunicator.removeSubscriber(context.commandSubject);
205 }
206
207 @Override
208 public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) {
209 clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode);
210 }
211
212 @Override
213 public void unregisterMetadataHandler() {
214 clusterCommunicator.removeSubscriber(context.metadataSubject);
215 }
216
217 @Override
218 public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) {
219 clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode);
220 }
221
222 @Override
223 public void unregisterJoinHandler() {
224 clusterCommunicator.removeSubscriber(context.joinSubject);
225 }
226
227 @Override
228 public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) {
229 clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode);
230 }
231
232 @Override
233 public void unregisterLeaveHandler() {
234 clusterCommunicator.removeSubscriber(context.leaveSubject);
235 }
236
237 @Override
238 public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) {
239 clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode);
240 }
241
242 @Override
243 public void unregisterConfigureHandler() {
244 clusterCommunicator.removeSubscriber(context.configureSubject);
245 }
246
247 @Override
248 public void registerReconfigureHandler(
249 Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) {
250 clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode);
251 }
252
253 @Override
254 public void unregisterReconfigureHandler() {
255 clusterCommunicator.removeSubscriber(context.reconfigureSubject);
256 }
257
258 @Override
259 public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) {
260 clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode);
261 }
262
263 @Override
264 public void unregisterInstallHandler() {
265 clusterCommunicator.removeSubscriber(context.installSubject);
266 }
267
268 @Override
269 public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) {
270 clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode);
271 }
272
273 @Override
274 public void unregisterPollHandler() {
275 clusterCommunicator.removeSubscriber(context.pollSubject);
276 }
277
278 @Override
279 public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) {
280 clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode);
281 }
282
283 @Override
284 public void unregisterVoteHandler() {
285 clusterCommunicator.removeSubscriber(context.voteSubject);
286 }
287
288 @Override
289 public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) {
290 clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode);
291 }
292
293 @Override
294 public void unregisterAppendHandler() {
295 clusterCommunicator.removeSubscriber(context.appendSubject);
296 }
297
298 @Override
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700299 public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) {
300 clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode);
301 }
302
303 @Override
304 public void unregisterTransferHandler() {
305 clusterCommunicator.removeSubscriber(context.transferSubject);
306 }
307
308 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700309 public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
310 clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
311 }
312
313 @Override
314 public void unregisterResetListener(SessionId sessionId) {
315 clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id()));
316 }
317}