blob: a791990c2d585459eeb81c4ac46693d323ff5fa1 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.io.IOException;
6import java.util.concurrent.CompletableFuture;
7import java.util.concurrent.ExecutionException;
8import java.util.concurrent.ScheduledExecutorService;
9import java.util.concurrent.ScheduledThreadPoolExecutor;
10import java.util.concurrent.ThreadFactory;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
13
14import net.kuujo.copycat.cluster.TcpMember;
15import net.kuujo.copycat.protocol.PingRequest;
16import net.kuujo.copycat.protocol.PingResponse;
17import net.kuujo.copycat.protocol.PollRequest;
18import net.kuujo.copycat.protocol.PollResponse;
19import net.kuujo.copycat.protocol.SubmitRequest;
20import net.kuujo.copycat.protocol.SubmitResponse;
21import net.kuujo.copycat.protocol.SyncRequest;
22import net.kuujo.copycat.protocol.SyncResponse;
23import net.kuujo.copycat.spi.protocol.ProtocolClient;
24
25import org.onlab.netty.Endpoint;
26import org.onlab.netty.NettyMessagingService;
27import org.slf4j.Logger;
28
29import com.google.common.util.concurrent.ThreadFactoryBuilder;
30
31/**
32 * {@link NettyMessagingService} based Copycat protocol client.
33 */
34public class NettyProtocolClient implements ProtocolClient {
35
36 private final Logger log = getLogger(getClass());
37 private static final ThreadFactory THREAD_FACTORY =
38 new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
39
40 // Remote endpoint, this client instance is used
41 // for communicating with.
42 private final Endpoint remoteEp;
43 private final NettyMessagingService messagingService;
44
45 // TODO: Is 10 the right number of threads?
46 private static final ScheduledExecutorService THREAD_POOL =
47 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
48
49 public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
50 this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
51 }
52
53 public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
54 this.remoteEp = remoteEp;
55 this.messagingService = messagingService;
56 }
57
58 @Override
59 public CompletableFuture<PingResponse> ping(PingRequest request) {
60 return requestReply(request);
61 }
62
63 @Override
64 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
65 return requestReply(request);
66 }
67
68 @Override
69 public CompletableFuture<PollResponse> poll(PollRequest request) {
70 return requestReply(request);
71 }
72
73 @Override
74 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
75 return requestReply(request);
76 }
77
78 @Override
79 public CompletableFuture<Void> connect() {
80 return CompletableFuture.completedFuture(null);
81 }
82
83 @Override
84 public CompletableFuture<Void> close() {
85 return CompletableFuture.completedFuture(null);
86 }
87
88 public <I> String messageType(I input) {
89 Class<?> clazz = input.getClass();
90 if (clazz.equals(PollRequest.class)) {
91 return NettyProtocol.COPYCAT_POLL;
92 } else if (clazz.equals(SyncRequest.class)) {
93 return NettyProtocol.COPYCAT_SYNC;
94 } else if (clazz.equals(SubmitRequest.class)) {
95 return NettyProtocol.COPYCAT_SUBMIT;
96 } else if (clazz.equals(PingRequest.class)) {
97 return NettyProtocol.COPYCAT_PING;
98 } else {
99 throw new IllegalArgumentException("Unknown class " + clazz.getName());
100 }
101
102 }
103
104 private <I, O> CompletableFuture<O> requestReply(I request) {
105 CompletableFuture<O> future = new CompletableFuture<>();
106 THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
107 return future;
108 }
109
110 private class RPCTask<I, O> implements Runnable {
111
112 private final String messageType;
113 private final byte[] payload;
114
115 private final CompletableFuture<O> future;
116
117 public RPCTask(I request, CompletableFuture<O> future) {
118 this.messageType = messageType(request);
119 this.payload = NettyProtocol.SERIALIZER.encode(request);
120 this.future = future;
121 }
122
123 @Override
124 public void run() {
125 try {
126 byte[] response = messagingService
127 .sendAndReceive(remoteEp, messageType, payload)
128 .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
129 future.complete(NettyProtocol.SERIALIZER.decode(response));
130
131 } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
132 if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
133 messageType.equals(NettyProtocol.COPYCAT_PING)) {
134 log.warn("Request to {} failed. Will retry "
135 + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
136 THREAD_POOL.schedule(
137 this,
138 NettyProtocol.RETRY_INTERVAL_MILLIS,
139 TimeUnit.MILLISECONDS);
140 } else {
141 future.completeExceptionally(e);
142 }
143 } catch (Exception e) {
144 future.completeExceptionally(e);
145 }
146 }
147 }
148}