blob: 9b8f3e611f55c8854fa55884b021024f9553397c [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.concurrent.CompletableFuture;
19import java.util.concurrent.Executor;
20import java.util.function.Consumer;
21import java.util.function.Function;
22
23import io.atomix.protocols.raft.cluster.MemberId;
24import io.atomix.protocols.raft.protocol.AppendRequest;
25import io.atomix.protocols.raft.protocol.AppendResponse;
26import io.atomix.protocols.raft.protocol.CloseSessionRequest;
27import io.atomix.protocols.raft.protocol.CloseSessionResponse;
28import io.atomix.protocols.raft.protocol.CommandRequest;
29import io.atomix.protocols.raft.protocol.CommandResponse;
30import io.atomix.protocols.raft.protocol.ConfigureRequest;
31import io.atomix.protocols.raft.protocol.ConfigureResponse;
32import io.atomix.protocols.raft.protocol.InstallRequest;
33import io.atomix.protocols.raft.protocol.InstallResponse;
34import io.atomix.protocols.raft.protocol.JoinRequest;
35import io.atomix.protocols.raft.protocol.JoinResponse;
36import io.atomix.protocols.raft.protocol.KeepAliveRequest;
37import io.atomix.protocols.raft.protocol.KeepAliveResponse;
38import io.atomix.protocols.raft.protocol.LeaveRequest;
39import io.atomix.protocols.raft.protocol.LeaveResponse;
40import io.atomix.protocols.raft.protocol.MetadataRequest;
41import io.atomix.protocols.raft.protocol.MetadataResponse;
42import io.atomix.protocols.raft.protocol.OpenSessionRequest;
43import io.atomix.protocols.raft.protocol.OpenSessionResponse;
44import io.atomix.protocols.raft.protocol.PollRequest;
45import io.atomix.protocols.raft.protocol.PollResponse;
46import io.atomix.protocols.raft.protocol.PublishRequest;
47import io.atomix.protocols.raft.protocol.QueryRequest;
48import io.atomix.protocols.raft.protocol.QueryResponse;
49import io.atomix.protocols.raft.protocol.RaftServerProtocol;
50import io.atomix.protocols.raft.protocol.ReconfigureRequest;
51import io.atomix.protocols.raft.protocol.ReconfigureResponse;
52import io.atomix.protocols.raft.protocol.ResetRequest;
Jordan Haltermand3e02d32017-08-28 20:58:24 -070053import io.atomix.protocols.raft.protocol.TransferRequest;
54import io.atomix.protocols.raft.protocol.TransferResponse;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070055import io.atomix.protocols.raft.protocol.VoteRequest;
56import io.atomix.protocols.raft.protocol.VoteResponse;
57import io.atomix.protocols.raft.session.SessionId;
58import org.onosproject.cluster.NodeId;
59import org.onosproject.cluster.PartitionId;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.service.Serializer;
62
63/**
64 * Raft server protocol that uses a {@link ClusterCommunicationService}.
65 */
66public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
67
68 public RaftServerCommunicator(
69 PartitionId partitionId,
70 Serializer serializer,
71 ClusterCommunicationService clusterCommunicator) {
72 super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
73 }
74
75 @Override
76 public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
77 return sendAndReceive(context.openSessionSubject, request, memberId);
78 }
79
80 @Override
81 public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
82 return sendAndReceive(context.closeSessionSubject, request, memberId);
83 }
84
85 @Override
86 public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
87 return sendAndReceive(context.keepAliveSubject, request, memberId);
88 }
89
90 @Override
91 public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
92 return sendAndReceive(context.querySubject, request, memberId);
93 }
94
95 @Override
96 public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
97 return sendAndReceive(context.commandSubject, request, memberId);
98 }
99
100 @Override
101 public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
102 return sendAndReceive(context.metadataSubject, request, memberId);
103 }
104
105 @Override
106 public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
107 return sendAndReceive(context.joinSubject, request, memberId);
108 }
109
110 @Override
111 public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
112 return sendAndReceive(context.leaveSubject, request, memberId);
113 }
114
115 @Override
116 public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) {
117 return sendAndReceive(context.configureSubject, request, memberId);
118 }
119
120 @Override
121 public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) {
122 return sendAndReceive(context.reconfigureSubject, request, memberId);
123 }
124
125 @Override
126 public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
127 return sendAndReceive(context.installSubject, request, memberId);
128 }
129
130 @Override
131 public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
132 return sendAndReceive(context.pollSubject, request, memberId);
133 }
134
135 @Override
136 public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
137 return sendAndReceive(context.voteSubject, request, memberId);
138 }
139
140 @Override
141 public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
142 return sendAndReceive(context.appendSubject, request, memberId);
143 }
144
145 @Override
Jordan Haltermand3e02d32017-08-28 20:58:24 -0700146 public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
147 return sendAndReceive(context.transferSubject, request, memberId);
148 }
149
150 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700151 public void publish(MemberId memberId, PublishRequest request) {
152 clusterCommunicator.unicast(request,
153 context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
154 }
155
156 @Override
157 public void registerOpenSessionHandler(
158 Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
159 clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
160 }
161
162 @Override
163 public void unregisterOpenSessionHandler() {
164 clusterCommunicator.removeSubscriber(context.openSessionSubject);
165 }
166
167 @Override
168 public void registerCloseSessionHandler(
169 Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) {
170 clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode);
171 }
172
173 @Override
174 public void unregisterCloseSessionHandler() {
175 clusterCommunicator.removeSubscriber(context.closeSessionSubject);
176 }
177
178 @Override
179 public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
180 clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode);
181 }
182
183 @Override
184 public void unregisterKeepAliveHandler() {
185 clusterCommunicator.removeSubscriber(context.keepAliveSubject);
186 }
187
188 @Override
189 public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) {
190 clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode);
191 }
192
193 @Override
194 public void unregisterQueryHandler() {
195 clusterCommunicator.removeSubscriber(context.querySubject);
196 }
197
198 @Override
199 public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) {
200 clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode);
201 }
202
203 @Override
204 public void unregisterCommandHandler() {
205 clusterCommunicator.removeSubscriber(context.commandSubject);
206 }
207
208 @Override
209 public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) {
210 clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode);
211 }
212
213 @Override
214 public void unregisterMetadataHandler() {
215 clusterCommunicator.removeSubscriber(context.metadataSubject);
216 }
217
218 @Override
219 public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) {
220 clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode);
221 }
222
223 @Override
224 public void unregisterJoinHandler() {
225 clusterCommunicator.removeSubscriber(context.joinSubject);
226 }
227
228 @Override
229 public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) {
230 clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode);
231 }
232
233 @Override
234 public void unregisterLeaveHandler() {
235 clusterCommunicator.removeSubscriber(context.leaveSubject);
236 }
237
238 @Override
239 public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) {
240 clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode);
241 }
242
243 @Override
244 public void unregisterConfigureHandler() {
245 clusterCommunicator.removeSubscriber(context.configureSubject);
246 }
247
248 @Override
249 public void registerReconfigureHandler(
250 Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) {
251 clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode);
252 }
253
254 @Override
255 public void unregisterReconfigureHandler() {
256 clusterCommunicator.removeSubscriber(context.reconfigureSubject);
257 }
258
259 @Override
260 public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) {
261 clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode);
262 }
263
264 @Override
265 public void unregisterInstallHandler() {
266 clusterCommunicator.removeSubscriber(context.installSubject);
267 }
268
269 @Override
270 public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) {
271 clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode);
272 }
273
274 @Override
275 public void unregisterPollHandler() {
276 clusterCommunicator.removeSubscriber(context.pollSubject);
277 }
278
279 @Override
280 public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) {
281 clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode);
282 }
283
284 @Override
285 public void unregisterVoteHandler() {
286 clusterCommunicator.removeSubscriber(context.voteSubject);
287 }
288
289 @Override
290 public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) {
291 clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode);
292 }
293
294 @Override
295 public void unregisterAppendHandler() {
296 clusterCommunicator.removeSubscriber(context.appendSubject);
297 }
298
299 @Override
Jordan Haltermand3e02d32017-08-28 20:58:24 -0700300 public void registerTransferHandler(Function<TransferRequest, CompletableFuture<TransferResponse>> handler) {
301 clusterCommunicator.addSubscriber(context.transferSubject, serializer::decode, handler, serializer::encode);
302 }
303
304 @Override
305 public void unregisterTransferHandler() {
306 clusterCommunicator.removeSubscriber(context.transferSubject);
307 }
308
309 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700310 public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
311 clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
312 }
313
314 @Override
315 public void unregisterResetListener(SessionId sessionId) {
316 clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id()));
317 }
318}