blob: d06999e9e47d588b657ab6c3affe73bc1898de46 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.io.IOException;
6import java.util.concurrent.CompletableFuture;
7
8import net.kuujo.copycat.cluster.TcpMember;
9import net.kuujo.copycat.protocol.PingRequest;
10import net.kuujo.copycat.protocol.PollRequest;
11import net.kuujo.copycat.protocol.RequestHandler;
12import net.kuujo.copycat.protocol.SubmitRequest;
13import net.kuujo.copycat.protocol.SyncRequest;
14import net.kuujo.copycat.spi.protocol.ProtocolServer;
15
16import org.onlab.netty.Message;
17import org.onlab.netty.MessageHandler;
18import org.onlab.netty.NettyMessagingService;
19import org.slf4j.Logger;
20
21/**
22 * {@link NettyMessagingService} based Copycat protocol server.
23 */
24public class NettyProtocolServer implements ProtocolServer {
25
26 private final Logger log = getLogger(getClass());
27
28 private final NettyMessagingService messagingService;
29 private RequestHandler handler;
30
31
32 public NettyProtocolServer(TcpMember member) {
33 messagingService = new NettyMessagingService(member.host(), member.port());
34
35 messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
36 messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
37 messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
38 messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
39 }
40
41 protected NettyMessagingService getNettyMessagingService() {
42 return messagingService;
43 }
44
45 @Override
46 public void requestHandler(RequestHandler handler) {
47 this.handler = handler;
48 }
49
50 @Override
51 public CompletableFuture<Void> listen() {
52 try {
53 messagingService.activate();
54 return CompletableFuture.completedFuture(null);
55 } catch (Exception e) {
56 CompletableFuture<Void> future = new CompletableFuture<>();
57 future.completeExceptionally(e);
58 return future;
59 }
60 }
61
62 @Override
63 public CompletableFuture<Void> close() {
64 CompletableFuture<Void> future = new CompletableFuture<>();
65 try {
66 messagingService.deactivate();
67 future.complete(null);
68 return future;
69 } catch (Exception e) {
70 future.completeExceptionally(e);
71 return future;
72 }
73 }
74
75 private class CopycatMessageHandler<T> implements MessageHandler {
76
77 @Override
78 public void handle(Message message) throws IOException {
79 T request = NettyProtocol.SERIALIZER.decode(message.payload());
80 if (request.getClass().equals(PingRequest.class)) {
81 handler.ping((PingRequest) request).whenComplete((response, error) -> {
82 try {
83 message.respond(NettyProtocol.SERIALIZER.encode(response));
84 } catch (Exception e) {
85 log.error("Failed to respond to ping request", e);
86 }
87 });
88 } else if (request.getClass().equals(PollRequest.class)) {
89 handler.poll((PollRequest) request).whenComplete((response, error) -> {
90 try {
91 message.respond(NettyProtocol.SERIALIZER.encode(response));
92 } catch (Exception e) {
93 log.error("Failed to respond to poll request", e);
94 }
95 });
96 } else if (request.getClass().equals(SyncRequest.class)) {
97 handler.sync((SyncRequest) request).whenComplete((response, error) -> {
98 try {
99 message.respond(NettyProtocol.SERIALIZER.encode(response));
100 } catch (Exception e) {
101 log.error("Failed to respond to sync request", e);
102 }
103 });
104 } else if (request.getClass().equals(SubmitRequest.class)) {
105 handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
106 try {
107 message.respond(NettyProtocol.SERIALIZER.encode(response));
108 } catch (Exception e) {
109 log.error("Failed to respond to submit request", e);
110 }
111 });
112 }
113 }
114 }
115}