blob: b3eaeb4e90a1fff044a7e85cb4b0db9919a9a61f [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.concurrent.CompletableFuture;
Madan Jampani778f7ad2014-11-05 22:46:15 -08006import java.util.function.BiConsumer;
Madan Jampani9b19a822014-11-04 21:37:13 -08007
8import net.kuujo.copycat.protocol.PingRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -08009import net.kuujo.copycat.protocol.PingResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080010import net.kuujo.copycat.protocol.PollRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080011import net.kuujo.copycat.protocol.PollResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080012import net.kuujo.copycat.protocol.RequestHandler;
13import net.kuujo.copycat.protocol.SubmitRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080014import net.kuujo.copycat.protocol.SubmitResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080015import net.kuujo.copycat.protocol.SyncRequest;
Madan Jampani778f7ad2014-11-05 22:46:15 -080016import net.kuujo.copycat.protocol.SyncResponse;
Madan Jampani9b19a822014-11-04 21:37:13 -080017import net.kuujo.copycat.spi.protocol.ProtocolServer;
18
19import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
20import org.onlab.onos.store.cluster.messaging.ClusterMessage;
21import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
22import org.slf4j.Logger;
23
24/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080025 * ONOS Cluster messaging based Copycat protocol server.
Madan Jampani9b19a822014-11-04 21:37:13 -080026 */
Madan Jampani9b19a822014-11-04 21:37:13 -080027public class ClusterMessagingProtocolServer implements ProtocolServer {
28
29 private final Logger log = getLogger(getClass());
30 private RequestHandler handler;
31
32 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
33
34 clusterCommunicator.addSubscriber(
35 ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
36 clusterCommunicator.addSubscriber(
37 ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
38 clusterCommunicator.addSubscriber(
39 ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
40 clusterCommunicator.addSubscriber(
41 ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
42 }
43
44 @Override
45 public void requestHandler(RequestHandler handler) {
46 this.handler = handler;
47 }
48
49 @Override
50 public CompletableFuture<Void> listen() {
51 return CompletableFuture.completedFuture(null);
52 }
53
54 @Override
55 public CompletableFuture<Void> close() {
56 return CompletableFuture.completedFuture(null);
57 }
58
59 private class CopycatMessageHandler<T> implements ClusterMessageHandler {
60
61 @Override
62 public void handle(ClusterMessage message) {
63 T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
64 if (request.getClass().equals(PingRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080065 handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080066 } else if (request.getClass().equals(PollRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080067 handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080068 } else if (request.getClass().equals(SyncRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080069 handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
Madan Jampani9b19a822014-11-04 21:37:13 -080070 } else if (request.getClass().equals(SubmitRequest.class)) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080071 handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
72 } else {
73 throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
74 }
75 }
76
77 private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
78
79 private final ClusterMessage message;
80
81 public PostExecutionTask(ClusterMessage message) {
82 this.message = message;
83 }
84
85 @Override
86 public void accept(R response, Throwable t) {
87 if (t != null) {
88 log.error("Processing for " + message.subject() + " failed.", t);
89 } else {
Madan Jampani9b19a822014-11-04 21:37:13 -080090 try {
91 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
92 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -080093 log.error("Failed to respond to " + response.getClass().getName(), e);
Madan Jampani9b19a822014-11-04 21:37:13 -080094 }
Madan Jampani778f7ad2014-11-05 22:46:15 -080095 }
Madan Jampani9b19a822014-11-04 21:37:13 -080096 }
97 }
98 }
99}