blob: f34545969c52d259a3230296b54a10ae22f66fff [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.concurrent.CompletableFuture;
19import java.util.concurrent.Executor;
20import java.util.function.Consumer;
21import java.util.function.Function;
22
23import io.atomix.protocols.raft.cluster.MemberId;
24import io.atomix.protocols.raft.protocol.AppendRequest;
25import io.atomix.protocols.raft.protocol.AppendResponse;
26import io.atomix.protocols.raft.protocol.CloseSessionRequest;
27import io.atomix.protocols.raft.protocol.CloseSessionResponse;
28import io.atomix.protocols.raft.protocol.CommandRequest;
29import io.atomix.protocols.raft.protocol.CommandResponse;
30import io.atomix.protocols.raft.protocol.ConfigureRequest;
31import io.atomix.protocols.raft.protocol.ConfigureResponse;
32import io.atomix.protocols.raft.protocol.InstallRequest;
33import io.atomix.protocols.raft.protocol.InstallResponse;
34import io.atomix.protocols.raft.protocol.JoinRequest;
35import io.atomix.protocols.raft.protocol.JoinResponse;
36import io.atomix.protocols.raft.protocol.KeepAliveRequest;
37import io.atomix.protocols.raft.protocol.KeepAliveResponse;
38import io.atomix.protocols.raft.protocol.LeaveRequest;
39import io.atomix.protocols.raft.protocol.LeaveResponse;
40import io.atomix.protocols.raft.protocol.MetadataRequest;
41import io.atomix.protocols.raft.protocol.MetadataResponse;
42import io.atomix.protocols.raft.protocol.OpenSessionRequest;
43import io.atomix.protocols.raft.protocol.OpenSessionResponse;
44import io.atomix.protocols.raft.protocol.PollRequest;
45import io.atomix.protocols.raft.protocol.PollResponse;
46import io.atomix.protocols.raft.protocol.PublishRequest;
47import io.atomix.protocols.raft.protocol.QueryRequest;
48import io.atomix.protocols.raft.protocol.QueryResponse;
49import io.atomix.protocols.raft.protocol.RaftServerProtocol;
50import io.atomix.protocols.raft.protocol.ReconfigureRequest;
51import io.atomix.protocols.raft.protocol.ReconfigureResponse;
52import io.atomix.protocols.raft.protocol.ResetRequest;
53import io.atomix.protocols.raft.protocol.VoteRequest;
54import io.atomix.protocols.raft.protocol.VoteResponse;
55import io.atomix.protocols.raft.session.SessionId;
56import org.onosproject.cluster.NodeId;
57import org.onosproject.cluster.PartitionId;
58import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
59import org.onosproject.store.service.Serializer;
60
61/**
62 * Raft server protocol that uses a {@link ClusterCommunicationService}.
63 */
64public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
65
66 public RaftServerCommunicator(
67 PartitionId partitionId,
68 Serializer serializer,
69 ClusterCommunicationService clusterCommunicator) {
70 super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
71 }
72
73 @Override
74 public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
75 return sendAndReceive(context.openSessionSubject, request, memberId);
76 }
77
78 @Override
79 public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
80 return sendAndReceive(context.closeSessionSubject, request, memberId);
81 }
82
83 @Override
84 public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
85 return sendAndReceive(context.keepAliveSubject, request, memberId);
86 }
87
88 @Override
89 public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
90 return sendAndReceive(context.querySubject, request, memberId);
91 }
92
93 @Override
94 public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
95 return sendAndReceive(context.commandSubject, request, memberId);
96 }
97
98 @Override
99 public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
100 return sendAndReceive(context.metadataSubject, request, memberId);
101 }
102
103 @Override
104 public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
105 return sendAndReceive(context.joinSubject, request, memberId);
106 }
107
108 @Override
109 public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
110 return sendAndReceive(context.leaveSubject, request, memberId);
111 }
112
113 @Override
114 public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) {
115 return sendAndReceive(context.configureSubject, request, memberId);
116 }
117
118 @Override
119 public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) {
120 return sendAndReceive(context.reconfigureSubject, request, memberId);
121 }
122
123 @Override
124 public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
125 return sendAndReceive(context.installSubject, request, memberId);
126 }
127
128 @Override
129 public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
130 return sendAndReceive(context.pollSubject, request, memberId);
131 }
132
133 @Override
134 public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
135 return sendAndReceive(context.voteSubject, request, memberId);
136 }
137
138 @Override
139 public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
140 return sendAndReceive(context.appendSubject, request, memberId);
141 }
142
143 @Override
144 public void publish(MemberId memberId, PublishRequest request) {
145 clusterCommunicator.unicast(request,
146 context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
147 }
148
149 @Override
150 public void registerOpenSessionHandler(
151 Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
152 clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
153 }
154
155 @Override
156 public void unregisterOpenSessionHandler() {
157 clusterCommunicator.removeSubscriber(context.openSessionSubject);
158 }
159
160 @Override
161 public void registerCloseSessionHandler(
162 Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) {
163 clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode);
164 }
165
166 @Override
167 public void unregisterCloseSessionHandler() {
168 clusterCommunicator.removeSubscriber(context.closeSessionSubject);
169 }
170
171 @Override
172 public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
173 clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode);
174 }
175
176 @Override
177 public void unregisterKeepAliveHandler() {
178 clusterCommunicator.removeSubscriber(context.keepAliveSubject);
179 }
180
181 @Override
182 public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) {
183 clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode);
184 }
185
186 @Override
187 public void unregisterQueryHandler() {
188 clusterCommunicator.removeSubscriber(context.querySubject);
189 }
190
191 @Override
192 public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) {
193 clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode);
194 }
195
196 @Override
197 public void unregisterCommandHandler() {
198 clusterCommunicator.removeSubscriber(context.commandSubject);
199 }
200
201 @Override
202 public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) {
203 clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode);
204 }
205
206 @Override
207 public void unregisterMetadataHandler() {
208 clusterCommunicator.removeSubscriber(context.metadataSubject);
209 }
210
211 @Override
212 public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) {
213 clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode);
214 }
215
216 @Override
217 public void unregisterJoinHandler() {
218 clusterCommunicator.removeSubscriber(context.joinSubject);
219 }
220
221 @Override
222 public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) {
223 clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode);
224 }
225
226 @Override
227 public void unregisterLeaveHandler() {
228 clusterCommunicator.removeSubscriber(context.leaveSubject);
229 }
230
231 @Override
232 public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) {
233 clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode);
234 }
235
236 @Override
237 public void unregisterConfigureHandler() {
238 clusterCommunicator.removeSubscriber(context.configureSubject);
239 }
240
241 @Override
242 public void registerReconfigureHandler(
243 Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) {
244 clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode);
245 }
246
247 @Override
248 public void unregisterReconfigureHandler() {
249 clusterCommunicator.removeSubscriber(context.reconfigureSubject);
250 }
251
252 @Override
253 public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) {
254 clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode);
255 }
256
257 @Override
258 public void unregisterInstallHandler() {
259 clusterCommunicator.removeSubscriber(context.installSubject);
260 }
261
262 @Override
263 public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) {
264 clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode);
265 }
266
267 @Override
268 public void unregisterPollHandler() {
269 clusterCommunicator.removeSubscriber(context.pollSubject);
270 }
271
272 @Override
273 public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) {
274 clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode);
275 }
276
277 @Override
278 public void unregisterVoteHandler() {
279 clusterCommunicator.removeSubscriber(context.voteSubject);
280 }
281
282 @Override
283 public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) {
284 clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode);
285 }
286
287 @Override
288 public void unregisterAppendHandler() {
289 clusterCommunicator.removeSubscriber(context.appendSubject);
290 }
291
292 @Override
293 public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
294 clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
295 }
296
297 @Override
298 public void unregisterResetListener(SessionId sessionId) {
299 clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id()));
300 }
301}