blob: a60f125694bad73f7261d64dd661d7541911de12 [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.service.impl;
Madan Jampani9b19a822014-11-04 21:37:13 -080017
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080018import static java.util.concurrent.Executors.newCachedThreadPool;
19import static org.onlab.util.Tools.namedThreads;
Madan Jampani9b19a822014-11-04 21:37:13 -080020import static org.slf4j.LoggerFactory.getLogger;
Brian O'Connorabafb502014-12-02 22:26:20 -080021import static org.onosproject.store.service.impl.ClusterMessagingProtocol.*;
22import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
Madan Jampani9b19a822014-11-04 21:37:13 -080023
24import java.util.concurrent.CompletableFuture;
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080025import java.util.concurrent.ExecutorService;
Madan Jampani778f7ad2014-11-05 22:46:15 -080026import java.util.function.BiConsumer;
Madan Jampani9b19a822014-11-04 21:37:13 -080027
28import net.kuujo.copycat.protocol.PingRequest;
29import net.kuujo.copycat.protocol.PollRequest;
30import net.kuujo.copycat.protocol.RequestHandler;
31import net.kuujo.copycat.protocol.SubmitRequest;
32import net.kuujo.copycat.protocol.SyncRequest;
33import net.kuujo.copycat.spi.protocol.ProtocolServer;
34
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani9b19a822014-11-04 21:37:13 -080038import org.slf4j.Logger;
39
40/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080041 * ONOS Cluster messaging based Copycat protocol server.
Madan Jampani9b19a822014-11-04 21:37:13 -080042 */
Madan Jampani9b19a822014-11-04 21:37:13 -080043public class ClusterMessagingProtocolServer implements ProtocolServer {
44
45 private final Logger log = getLogger(getClass());
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080046
47 private final ClusterCommunicationService clusterCommunicator;
48
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080049 private volatile RequestHandler handler;
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080050
51 private ExecutorService pool;
Madan Jampani9b19a822014-11-04 21:37:13 -080052
53 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080054 this.clusterCommunicator = clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080055 }
56
57 @Override
58 public void requestHandler(RequestHandler handler) {
59 this.handler = handler;
60 }
61
62 @Override
63 public CompletableFuture<Void> listen() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080064 if (pool == null || pool.isShutdown()) {
65 pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-server-%d"));
66 }
67
68 clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
69 clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
70 clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
71 clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
Madan Jampani9b19a822014-11-04 21:37:13 -080072 return CompletableFuture.completedFuture(null);
73 }
74
75 @Override
76 public CompletableFuture<Void> close() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080077 clusterCommunicator.removeSubscriber(COPYCAT_PING);
78 clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
79 clusterCommunicator.removeSubscriber(COPYCAT_POLL);
80 clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
81 if (pool != null) {
82 pool.shutdownNow();
83 pool = null;
84 }
Madan Jampani9b19a822014-11-04 21:37:13 -080085 return CompletableFuture.completedFuture(null);
86 }
87
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080088 private final class PingHandler extends CopycatMessageHandler<PingRequest> {
89
90 @Override
91 public void raftHandle(PingRequest request, ClusterMessage message) {
92 pool.submit(new Runnable() {
93
94 @Override
95 public void run() {
96 currentHandler().ping(request)
97 .whenComplete(new PostExecutionTask<>(message));
98 }
99 });
100 }
101 }
102
103 private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
104
105 @Override
106 public void raftHandle(SyncRequest request, ClusterMessage message) {
107 pool.submit(new Runnable() {
108
109 @Override
110 public void run() {
111 currentHandler().sync(request)
112 .whenComplete(new PostExecutionTask<>(message));
113 }
114 });
115 }
116 }
117
118 private final class PollHandler extends CopycatMessageHandler<PollRequest> {
119
120 @Override
121 public void raftHandle(PollRequest request, ClusterMessage message) {
122 pool.submit(new Runnable() {
123
124 @Override
125 public void run() {
126 currentHandler().poll(request)
127 .whenComplete(new PostExecutionTask<>(message));
128 }
129 });
130 }
131 }
132
133 private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
134
135 @Override
136 public void raftHandle(SubmitRequest request, ClusterMessage message) {
137 pool.submit(new Runnable() {
138
139 @Override
140 public void run() {
141 currentHandler().submit(request)
142 .whenComplete(new PostExecutionTask<>(message));
143 }
144 });
145 }
146 }
147
148 private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
149
150 public abstract void raftHandle(T request, ClusterMessage message);
Madan Jampani9b19a822014-11-04 21:37:13 -0800151
152 @Override
153 public void handle(ClusterMessage message) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800154 T request = DB_SERIALIZER.decode(message.payload());
155 raftHandle(request, message);
156 }
157
158 RequestHandler currentHandler() {
159 RequestHandler currentHandler = handler;
160 if (currentHandler == null) {
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800161 // there is a slight window of time during state transition,
162 // where handler becomes null
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800163 long sleepMs = 1;
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800164 for (int i = 0; i < 10; ++i) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800165 currentHandler = handler;
166 if (currentHandler != null) {
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800167 break;
168 }
169 try {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800170 sleepMs <<= 1;
171 Thread.sleep(sleepMs);
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800172 } catch (InterruptedException e) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800173 log.error("Interrupted", e);
174 return handler;
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800175 }
176 }
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800177 if (currentHandler == null) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800178 log.error("There was no handler registered!");
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800179 return handler;
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800180 }
181 }
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800182 return currentHandler;
Madan Jampani778f7ad2014-11-05 22:46:15 -0800183 }
184
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800185 final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800186
187 private final ClusterMessage message;
188
189 public PostExecutionTask(ClusterMessage message) {
190 this.message = message;
191 }
192
193 @Override
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800194 public void accept(R response, Throwable error) {
195 if (error != null) {
196 log.error("Processing {} failed.", message.subject(), error);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800197 } else {
Madan Jampani9b19a822014-11-04 21:37:13 -0800198 try {
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800199 log.trace("responding to {}", message.subject());
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800200 message.respond(DB_SERIALIZER.encode(response));
Madan Jampani9b19a822014-11-04 21:37:13 -0800201 } catch (Exception e) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800202 log.error("Failed responding with {}", response.getClass().getName(), e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800203 }
Madan Jampani778f7ad2014-11-05 22:46:15 -0800204 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800205 }
206 }
207 }
Brian O'Connorabafb502014-12-02 22:26:20 -0800208}