blob: be0da209bf98feae640e2d0d7fa1ec2826fd5170 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.io.IOException;
6import java.util.concurrent.CompletableFuture;
7import java.util.concurrent.ExecutionException;
8import java.util.concurrent.ScheduledExecutorService;
9import java.util.concurrent.ScheduledThreadPoolExecutor;
10import java.util.concurrent.ThreadFactory;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
13
14import net.kuujo.copycat.protocol.PingRequest;
15import net.kuujo.copycat.protocol.PingResponse;
16import net.kuujo.copycat.protocol.PollRequest;
17import net.kuujo.copycat.protocol.PollResponse;
18import net.kuujo.copycat.protocol.SubmitRequest;
19import net.kuujo.copycat.protocol.SubmitResponse;
20import net.kuujo.copycat.protocol.SyncRequest;
21import net.kuujo.copycat.protocol.SyncResponse;
22import net.kuujo.copycat.spi.protocol.ProtocolClient;
23
24import org.onlab.onos.cluster.ControllerNode;
25import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27import org.onlab.onos.store.cluster.messaging.MessageSubject;
28import org.slf4j.Logger;
29
30import com.google.common.util.concurrent.ThreadFactoryBuilder;
31
32/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080033 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080034 */
Madan Jampani9b19a822014-11-04 21:37:13 -080035public class ClusterMessagingProtocolClient implements ProtocolClient {
36
37 private final Logger log = getLogger(getClass());
38
39 private static final ThreadFactory THREAD_FACTORY =
40 new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
41
42 public static final long RETRY_INTERVAL_MILLIS = 2000;
43
44 private final ClusterCommunicationService clusterCommunicator;
45 private final ControllerNode remoteNode;
46
47 // FIXME: Thread pool sizing.
48 private static final ScheduledExecutorService THREAD_POOL =
49 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
50
51 public ClusterMessagingProtocolClient(
52 ClusterCommunicationService clusterCommunicator,
53 ControllerNode remoteNode) {
54 this.clusterCommunicator = clusterCommunicator;
55 this.remoteNode = remoteNode;
56 }
57
58 @Override
59 public CompletableFuture<PingResponse> ping(PingRequest request) {
60 return requestReply(request);
61 }
62
63 @Override
64 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
65 return requestReply(request);
66 }
67
68 @Override
69 public CompletableFuture<PollResponse> poll(PollRequest request) {
70 return requestReply(request);
71 }
72
73 @Override
74 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
75 return requestReply(request);
76 }
77
78 @Override
79 public CompletableFuture<Void> connect() {
80 return CompletableFuture.completedFuture(null);
81 }
82
83 @Override
84 public CompletableFuture<Void> close() {
85 return CompletableFuture.completedFuture(null);
86 }
87
88 public <I> MessageSubject messageType(I input) {
89 Class<?> clazz = input.getClass();
90 if (clazz.equals(PollRequest.class)) {
91 return ClusterMessagingProtocol.COPYCAT_POLL;
92 } else if (clazz.equals(SyncRequest.class)) {
93 return ClusterMessagingProtocol.COPYCAT_SYNC;
94 } else if (clazz.equals(SubmitRequest.class)) {
95 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
96 } else if (clazz.equals(PingRequest.class)) {
97 return ClusterMessagingProtocol.COPYCAT_PING;
98 } else {
99 throw new IllegalArgumentException("Unknown class " + clazz.getName());
100 }
101
102 }
103
104 private <I, O> CompletableFuture<O> requestReply(I request) {
105 CompletableFuture<O> future = new CompletableFuture<>();
106 THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
107 return future;
108 }
109
110 private class RPCTask<I, O> implements Runnable {
111
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800112 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800113 private final ClusterMessage message;
114 private final CompletableFuture<O> future;
115
116 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800117 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800118 this.message =
119 new ClusterMessage(
120 null,
121 messageType(request),
122 ClusterMessagingProtocol.SERIALIZER.encode(request));
123 this.future = future;
124 }
125
126 @Override
127 public void run() {
128 try {
129 byte[] response = clusterCommunicator
130 .sendAndReceive(message, remoteNode.id())
131 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
132 future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
133
134 } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
135 if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
136 message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
Yuta HIGUCHI4248bee2014-11-05 18:12:46 -0800137 log.warn("{} Request to {} failed. Will retry in {} ms",
138 message.subject(), remoteNode, RETRY_INTERVAL_MILLIS);
Madan Jampani9b19a822014-11-04 21:37:13 -0800139 THREAD_POOL.schedule(
140 this,
141 RETRY_INTERVAL_MILLIS,
142 TimeUnit.MILLISECONDS);
143 } else {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800144 log.warn("RPCTask for {} failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800145 future.completeExceptionally(e);
146 }
147 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800148 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800149 future.completeExceptionally(e);
150 }
151 }
152 }
153}