blob: 23c34b24d6985e32bd8ded7412af5afb50f93de5 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08003import static com.google.common.base.Verify.verifyNotNull;
Madan Jampani9b19a822014-11-04 21:37:13 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08005import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
Madan Jampani9b19a822014-11-04 21:37:13 -08006
7import java.io.IOException;
8import java.util.concurrent.CompletableFuture;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ScheduledExecutorService;
11import java.util.concurrent.ScheduledThreadPoolExecutor;
12import java.util.concurrent.ThreadFactory;
13import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
15
16import net.kuujo.copycat.protocol.PingRequest;
17import net.kuujo.copycat.protocol.PingResponse;
18import net.kuujo.copycat.protocol.PollRequest;
19import net.kuujo.copycat.protocol.PollResponse;
20import net.kuujo.copycat.protocol.SubmitRequest;
21import net.kuujo.copycat.protocol.SubmitResponse;
22import net.kuujo.copycat.protocol.SyncRequest;
23import net.kuujo.copycat.protocol.SyncResponse;
24import net.kuujo.copycat.spi.protocol.ProtocolClient;
25
26import org.onlab.onos.cluster.ControllerNode;
27import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
28import org.onlab.onos.store.cluster.messaging.ClusterMessage;
29import org.onlab.onos.store.cluster.messaging.MessageSubject;
30import org.slf4j.Logger;
31
32import com.google.common.util.concurrent.ThreadFactoryBuilder;
33
34/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080035 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080036 */
Madan Jampani9b19a822014-11-04 21:37:13 -080037public class ClusterMessagingProtocolClient implements ProtocolClient {
38
39 private final Logger log = getLogger(getClass());
40
41 private static final ThreadFactory THREAD_FACTORY =
42 new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
43
44 public static final long RETRY_INTERVAL_MILLIS = 2000;
45
46 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080047 private final ControllerNode localNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080048 private final ControllerNode remoteNode;
49
50 // FIXME: Thread pool sizing.
51 private static final ScheduledExecutorService THREAD_POOL =
52 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
53
54 public ClusterMessagingProtocolClient(
55 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080056 ControllerNode localNode,
Madan Jampani9b19a822014-11-04 21:37:13 -080057 ControllerNode remoteNode) {
58 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080059 this.localNode = localNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080060 this.remoteNode = remoteNode;
61 }
62
63 @Override
64 public CompletableFuture<PingResponse> ping(PingRequest request) {
65 return requestReply(request);
66 }
67
68 @Override
69 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
70 return requestReply(request);
71 }
72
73 @Override
74 public CompletableFuture<PollResponse> poll(PollRequest request) {
75 return requestReply(request);
76 }
77
78 @Override
79 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
80 return requestReply(request);
81 }
82
83 @Override
84 public CompletableFuture<Void> connect() {
85 return CompletableFuture.completedFuture(null);
86 }
87
88 @Override
89 public CompletableFuture<Void> close() {
90 return CompletableFuture.completedFuture(null);
91 }
92
Yuta HIGUCHI3b016732014-11-07 15:32:18 -080093 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -080094 Class<?> clazz = input.getClass();
95 if (clazz.equals(PollRequest.class)) {
96 return ClusterMessagingProtocol.COPYCAT_POLL;
97 } else if (clazz.equals(SyncRequest.class)) {
98 return ClusterMessagingProtocol.COPYCAT_SYNC;
99 } else if (clazz.equals(SubmitRequest.class)) {
100 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
101 } else if (clazz.equals(PingRequest.class)) {
102 return ClusterMessagingProtocol.COPYCAT_PING;
103 } else {
104 throw new IllegalArgumentException("Unknown class " + clazz.getName());
105 }
106
107 }
108
109 private <I, O> CompletableFuture<O> requestReply(I request) {
110 CompletableFuture<O> future = new CompletableFuture<>();
111 THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
112 return future;
113 }
114
115 private class RPCTask<I, O> implements Runnable {
116
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800117 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800118 private final ClusterMessage message;
119 private final CompletableFuture<O> future;
120
121 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800122 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800123 this.message =
124 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800125 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800126 messageType(request),
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800127 verifyNotNull(SERIALIZER.encode(request)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800128 this.future = future;
129 }
130
131 @Override
132 public void run() {
133 try {
134 byte[] response = clusterCommunicator
135 .sendAndReceive(message, remoteNode.id())
136 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800137 future.complete(verifyNotNull(SERIALIZER.decode(response)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800138
139 } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
Madan Jampani515865d2014-11-09 22:29:34 -0800140 log.warn("RPCTask for {} failed.", request, e);
141 future.completeExceptionally(e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800142 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800143 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800144 future.completeExceptionally(e);
145 }
146 }
147 }
Madan Jampani515865d2014-11-09 22:29:34 -0800148}