blob: 7d94847771ad4dc8b5f1a3c8590197213936e323 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.concurrent.CompletableFuture;
6
7import net.kuujo.copycat.protocol.PingRequest;
8import net.kuujo.copycat.protocol.PollRequest;
9import net.kuujo.copycat.protocol.RequestHandler;
10import net.kuujo.copycat.protocol.SubmitRequest;
11import net.kuujo.copycat.protocol.SyncRequest;
12import net.kuujo.copycat.spi.protocol.ProtocolServer;
13
14import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15import org.onlab.onos.store.cluster.messaging.ClusterMessage;
16import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
17import org.slf4j.Logger;
18
19/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080020 * ONOS Cluster messaging based Copycat protocol server.
Madan Jampani9b19a822014-11-04 21:37:13 -080021 */
Madan Jampani9b19a822014-11-04 21:37:13 -080022public class ClusterMessagingProtocolServer implements ProtocolServer {
23
24 private final Logger log = getLogger(getClass());
25 private RequestHandler handler;
26
27 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
28
29 clusterCommunicator.addSubscriber(
30 ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
31 clusterCommunicator.addSubscriber(
32 ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
33 clusterCommunicator.addSubscriber(
34 ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
35 clusterCommunicator.addSubscriber(
36 ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
37 }
38
39 @Override
40 public void requestHandler(RequestHandler handler) {
41 this.handler = handler;
42 }
43
44 @Override
45 public CompletableFuture<Void> listen() {
46 return CompletableFuture.completedFuture(null);
47 }
48
49 @Override
50 public CompletableFuture<Void> close() {
51 return CompletableFuture.completedFuture(null);
52 }
53
54 private class CopycatMessageHandler<T> implements ClusterMessageHandler {
55
56 @Override
57 public void handle(ClusterMessage message) {
58 T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
59 if (request.getClass().equals(PingRequest.class)) {
60 handler.ping((PingRequest) request).whenComplete((response, error) -> {
61 try {
62 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
63 } catch (Exception e) {
64 log.error("Failed to respond to ping request", e);
65 }
66 });
67 } else if (request.getClass().equals(PollRequest.class)) {
68 handler.poll((PollRequest) request).whenComplete((response, error) -> {
69 try {
70 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
71 } catch (Exception e) {
72 log.error("Failed to respond to poll request", e);
73 }
74 });
75 } else if (request.getClass().equals(SyncRequest.class)) {
76 handler.sync((SyncRequest) request).whenComplete((response, error) -> {
77 try {
78 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
79 } catch (Exception e) {
80 log.error("Failed to respond to sync request", e);
81 }
82 });
83 } else if (request.getClass().equals(SubmitRequest.class)) {
84 handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
85 try {
86 message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
87 } catch (Exception e) {
88 log.error("Failed to respond to submit request", e);
89 }
90 });
91 }
92 }
93 }
94}