blob: 53c3f634510b14d076fd723f1b1ae8b7ad04ad20 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.concurrent.CompletableFuture;
Madan Jampani778f7ad2014-11-05 22:46:15 -08006import java.util.function.BiConsumer;
Madan Jampani9b19a822014-11-04 21:37:13 -08007
8import net.kuujo.copycat.protocol.PingRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -08009import net.kuujo.copycat.protocol.PingResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080010import net.kuujo.copycat.protocol.PollRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080011import net.kuujo.copycat.protocol.PollResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080012import net.kuujo.copycat.protocol.RequestHandler;
13import net.kuujo.copycat.protocol.SubmitRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080014import net.kuujo.copycat.protocol.SubmitResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080015import net.kuujo.copycat.protocol.SyncRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080016import net.kuujo.copycat.protocol.SyncResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080017import net.kuujo.copycat.spi.protocol.ProtocolServer;
18
19import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
20import org.onlab.onos.store.cluster.messaging.ClusterMessage;
21import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
22import org.slf4j.Logger;
23
24/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080025 * ONOS Cluster messaging based Copycat protocol server.
Madan Jampani9b19a822014-11-04 21:37:13 -080026 */
Madan Jampani9b19a822014-11-04 21:37:13 -080027public class ClusterMessagingProtocolServer implements ProtocolServer {
28
29 private final Logger log = getLogger(getClass());
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080030 private volatile RequestHandler handler;
31 private ClusterCommunicationService clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080032
33 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080034 this.clusterCommunicator = clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080035
Madan Jampani9b19a822014-11-04 21:37:13 -080036 }
37
38 @Override
39 public void requestHandler(RequestHandler handler) {
40 this.handler = handler;
41 }
42
43 @Override
44 public CompletableFuture<Void> listen() {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080045 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
46 new CopycatMessageHandler<PingRequest>());
47 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
48 new CopycatMessageHandler<SyncRequest>());
49 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
50 new CopycatMessageHandler<PollRequest>());
51 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
52 new CopycatMessageHandler<SubmitRequest>());
Madan Jampani9b19a822014-11-04 21:37:13 -080053 return CompletableFuture.completedFuture(null);
54 }
55
56 @Override
57 public CompletableFuture<Void> close() {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080058 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
59 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
60 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
61 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
Madan Jampani9b19a822014-11-04 21:37:13 -080062 return CompletableFuture.completedFuture(null);
63 }
64
65 private class CopycatMessageHandler<T> implements ClusterMessageHandler {
66
67 @Override
68 public void handle(ClusterMessage message) {
69 T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -080070 if (handler == null) {
71 // there is a slight window of time during state transition,
72 // where handler becomes null
73 for (int i = 0; i < 10; ++i) {
74 if (handler != null) {
75 break;
76 }
77 try {
78 Thread.sleep(1);
79 } catch (InterruptedException e) {
80 log.trace("Exception", e);
81 }
82 }
83 if (handler == null) {
84 log.error("There was no handler for registered!");
85 return;
86 }
87 }
Madan Jampani9b19a822014-11-04 21:37:13 -080088 if (request.getClass().equals(PingRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080089 handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080090 } else if (request.getClass().equals(PollRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080091 handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080092 } else if (request.getClass().equals(SyncRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080093 handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080094 } else if (request.getClass().equals(SubmitRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080095 handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
96 } else {
97 throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
98 }
99 }
100
101 private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
102
103 private final ClusterMessage message;
104
105 public PostExecutionTask(ClusterMessage message) {
106 this.message = message;
107 }
108
109 @Override
110 public void accept(R response, Throwable t) {
111 if (t != null) {
112 log.error("Processing for " + message.subject() + " failed.", t);
113 } else {
Madan Jampani9b19a822014-11-04 21:37:13 -0800114 try {
Yuta HIGUCHI5fb6c962014-11-07 13:07:40 -0800115 log.trace("responding to {}", message.subject());
Madan Jampani9b19a822014-11-04 21:37:13 -0800116 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
117 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800118 log.error("Failed to respond to " + response.getClass().getName(), e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800119 }
Madan Jampani778f7ad2014-11-05 22:46:15 -0800120 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800121 }
122 }
123 }
124}