blob: e2c06c70e68244258cd8f0581df85549a8aeb795 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.concurrent.CompletableFuture;
Madan Jampani778f7ad2014-11-05 22:46:15 -08006import java.util.function.BiConsumer;
Madan Jampani9b19a822014-11-04 21:37:13 -08007
8import net.kuujo.copycat.protocol.PingRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -08009import net.kuujo.copycat.protocol.PingResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080010import net.kuujo.copycat.protocol.PollRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080011import net.kuujo.copycat.protocol.PollResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080012import net.kuujo.copycat.protocol.RequestHandler;
13import net.kuujo.copycat.protocol.SubmitRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080014import net.kuujo.copycat.protocol.SubmitResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080015import net.kuujo.copycat.protocol.SyncRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080016import net.kuujo.copycat.protocol.SyncResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080017import net.kuujo.copycat.spi.protocol.ProtocolServer;
18
19import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
20import org.onlab.onos.store.cluster.messaging.ClusterMessage;
21import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
22import org.slf4j.Logger;
23
24/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080025 * ONOS Cluster messaging based Copycat protocol server.
Madan Jampani9b19a822014-11-04 21:37:13 -080026 */
Madan Jampani9b19a822014-11-04 21:37:13 -080027public class ClusterMessagingProtocolServer implements ProtocolServer {
28
29 private final Logger log = getLogger(getClass());
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080030 private volatile RequestHandler handler;
31 private ClusterCommunicationService clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080032
33 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080034 this.clusterCommunicator = clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080035
Madan Jampani9b19a822014-11-04 21:37:13 -080036 }
37
38 @Override
39 public void requestHandler(RequestHandler handler) {
40 this.handler = handler;
41 }
42
43 @Override
44 public CompletableFuture<Void> listen() {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080045 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
46 new CopycatMessageHandler<PingRequest>());
47 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
48 new CopycatMessageHandler<SyncRequest>());
49 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
50 new CopycatMessageHandler<PollRequest>());
51 clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
52 new CopycatMessageHandler<SubmitRequest>());
Madan Jampani9b19a822014-11-04 21:37:13 -080053 return CompletableFuture.completedFuture(null);
54 }
55
56 @Override
57 public CompletableFuture<Void> close() {
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -080058 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
59 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
60 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
61 clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
Madan Jampani9b19a822014-11-04 21:37:13 -080062 return CompletableFuture.completedFuture(null);
63 }
64
65 private class CopycatMessageHandler<T> implements ClusterMessageHandler {
66
67 @Override
68 public void handle(ClusterMessage message) {
69 T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
70 if (request.getClass().equals(PingRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080071 handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080072 } else if (request.getClass().equals(PollRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080073 handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080074 } else if (request.getClass().equals(SyncRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080075 handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080076 } else if (request.getClass().equals(SubmitRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080077 handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
78 } else {
79 throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
80 }
81 }
82
83 private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
84
85 private final ClusterMessage message;
86
87 public PostExecutionTask(ClusterMessage message) {
88 this.message = message;
89 }
90
91 @Override
92 public void accept(R response, Throwable t) {
93 if (t != null) {
94 log.error("Processing for " + message.subject() + " failed.", t);
95 } else {
Madan Jampani9b19a822014-11-04 21:37:13 -080096 try {
97 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
98 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080099 log.error("Failed to respond to " + response.getClass().getName(), e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800100 }
Madan Jampani778f7ad2014-11-05 22:46:15 -0800101 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800102 }
103 }
104 }
105}