blob: d5532d503c2b22bb3d0368e2c508cdaf8ae0a2be [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -08003import static java.util.concurrent.Executors.newCachedThreadPool;
4import static org.onlab.util.Tools.namedThreads;
Madan Jampani9b19a822014-11-04 21:37:13 -08005import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -08006import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.*;
7import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
Madan Jampani9b19a822014-11-04 21:37:13 -08008
9import java.util.concurrent.CompletableFuture;
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080010import java.util.concurrent.ExecutorService;
Madan Jampani778f7ad2014-11-05 22:46:15 -080011import java.util.function.BiConsumer;
Madan Jampani9b19a822014-11-04 21:37:13 -080012
13import net.kuujo.copycat.protocol.PingRequest;
14import net.kuujo.copycat.protocol.PollRequest;
15import net.kuujo.copycat.protocol.RequestHandler;
16import net.kuujo.copycat.protocol.SubmitRequest;
17import net.kuujo.copycat.protocol.SyncRequest;
18import net.kuujo.copycat.spi.protocol.ProtocolServer;
19
20import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
21import org.onlab.onos.store.cluster.messaging.ClusterMessage;
22import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
23import org.slf4j.Logger;
24
25/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080026 * ONOS Cluster messaging based Copycat protocol server.
Madan Jampani9b19a822014-11-04 21:37:13 -080027 */
Madan Jampani9b19a822014-11-04 21:37:13 -080028public class ClusterMessagingProtocolServer implements ProtocolServer {
29
30 private final Logger log = getLogger(getClass());
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080031
32 private final ClusterCommunicationService clusterCommunicator;
33
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080034 private volatile RequestHandler handler;
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080035
36 private ExecutorService pool;
Madan Jampani9b19a822014-11-04 21:37:13 -080037
38 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080039 this.clusterCommunicator = clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080040 }
41
42 @Override
43 public void requestHandler(RequestHandler handler) {
44 this.handler = handler;
45 }
46
47 @Override
48 public CompletableFuture<Void> listen() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080049 if (pool == null || pool.isShutdown()) {
50 pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-server-%d"));
51 }
52
53 clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
54 clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
55 clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
56 clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
Madan Jampani9b19a822014-11-04 21:37:13 -080057 return CompletableFuture.completedFuture(null);
58 }
59
60 @Override
61 public CompletableFuture<Void> close() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080062 clusterCommunicator.removeSubscriber(COPYCAT_PING);
63 clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
64 clusterCommunicator.removeSubscriber(COPYCAT_POLL);
65 clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
66 if (pool != null) {
67 pool.shutdownNow();
68 pool = null;
69 }
Madan Jampani9b19a822014-11-04 21:37:13 -080070 return CompletableFuture.completedFuture(null);
71 }
72
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080073 private final class PingHandler extends CopycatMessageHandler<PingRequest> {
74
75 @Override
76 public void raftHandle(PingRequest request, ClusterMessage message) {
77 pool.submit(new Runnable() {
78
79 @Override
80 public void run() {
81 currentHandler().ping(request)
82 .whenComplete(new PostExecutionTask<>(message));
83 }
84 });
85 }
86 }
87
88 private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
89
90 @Override
91 public void raftHandle(SyncRequest request, ClusterMessage message) {
92 pool.submit(new Runnable() {
93
94 @Override
95 public void run() {
96 currentHandler().sync(request)
97 .whenComplete(new PostExecutionTask<>(message));
98 }
99 });
100 }
101 }
102
103 private final class PollHandler extends CopycatMessageHandler<PollRequest> {
104
105 @Override
106 public void raftHandle(PollRequest request, ClusterMessage message) {
107 pool.submit(new Runnable() {
108
109 @Override
110 public void run() {
111 currentHandler().poll(request)
112 .whenComplete(new PostExecutionTask<>(message));
113 }
114 });
115 }
116 }
117
118 private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
119
120 @Override
121 public void raftHandle(SubmitRequest request, ClusterMessage message) {
122 pool.submit(new Runnable() {
123
124 @Override
125 public void run() {
126 currentHandler().submit(request)
127 .whenComplete(new PostExecutionTask<>(message));
128 }
129 });
130 }
131 }
132
133 private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
134
135 public abstract void raftHandle(T request, ClusterMessage message);
Madan Jampani9b19a822014-11-04 21:37:13 -0800136
137 @Override
138 public void handle(ClusterMessage message) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800139 T request = DB_SERIALIZER.decode(message.payload());
140 raftHandle(request, message);
141 }
142
143 RequestHandler currentHandler() {
144 RequestHandler currentHandler = handler;
145 if (currentHandler == null) {
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800146 // there is a slight window of time during state transition,
147 // where handler becomes null
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800148 long sleepMs = 1;
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800149 for (int i = 0; i < 10; ++i) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800150 currentHandler = handler;
151 if (currentHandler != null) {
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800152 break;
153 }
154 try {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800155 sleepMs <<= 1;
156 Thread.sleep(sleepMs);
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800157 } catch (InterruptedException e) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800158 log.error("Interrupted", e);
159 return handler;
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800160 }
161 }
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800162 if (currentHandler == null) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800163 log.error("There was no handler registered!");
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800164 return handler;
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800165 }
166 }
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800167 return currentHandler;
Madan Jampani778f7ad2014-11-05 22:46:15 -0800168 }
169
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800170 final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800171
172 private final ClusterMessage message;
173
174 public PostExecutionTask(ClusterMessage message) {
175 this.message = message;
176 }
177
178 @Override
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800179 public void accept(R response, Throwable error) {
180 if (error != null) {
181 log.error("Processing {} failed.", message.subject(), error);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800182 } else {
Madan Jampani9b19a822014-11-04 21:37:13 -0800183 try {
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800184 log.trace("responding to {}", message.subject());
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800185 message.respond(DB_SERIALIZER.encode(response));
Madan Jampani9b19a822014-11-04 21:37:13 -0800186 } catch (Exception e) {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800187 log.error("Failed responding with {}", response.getClass().getName(), e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800188 }
Madan Jampani778f7ad2014-11-05 22:46:15 -0800189 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800190 }
191 }
192 }
193}