blob: 61ec4673af627ef33cbae306c878d5141f8f3ad4 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.io.IOException;
6import java.util.concurrent.CompletableFuture;
7import java.util.concurrent.ExecutionException;
8import java.util.concurrent.ScheduledExecutorService;
9import java.util.concurrent.ScheduledThreadPoolExecutor;
10import java.util.concurrent.ThreadFactory;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
13
14import net.kuujo.copycat.protocol.PingRequest;
15import net.kuujo.copycat.protocol.PingResponse;
16import net.kuujo.copycat.protocol.PollRequest;
17import net.kuujo.copycat.protocol.PollResponse;
18import net.kuujo.copycat.protocol.SubmitRequest;
19import net.kuujo.copycat.protocol.SubmitResponse;
20import net.kuujo.copycat.protocol.SyncRequest;
21import net.kuujo.copycat.protocol.SyncResponse;
22import net.kuujo.copycat.spi.protocol.ProtocolClient;
23
24import org.onlab.onos.cluster.ControllerNode;
25import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27import org.onlab.onos.store.cluster.messaging.MessageSubject;
28import org.slf4j.Logger;
29
30import com.google.common.util.concurrent.ThreadFactoryBuilder;
31
32/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080033 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080034 */
Madan Jampani9b19a822014-11-04 21:37:13 -080035public class ClusterMessagingProtocolClient implements ProtocolClient {
36
37 private final Logger log = getLogger(getClass());
38
39 private static final ThreadFactory THREAD_FACTORY =
40 new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
41
42 public static final long RETRY_INTERVAL_MILLIS = 2000;
43
44 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080045 private final ControllerNode localNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080046 private final ControllerNode remoteNode;
47
48 // FIXME: Thread pool sizing.
49 private static final ScheduledExecutorService THREAD_POOL =
50 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
51
52 public ClusterMessagingProtocolClient(
53 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080054 ControllerNode localNode,
Madan Jampani9b19a822014-11-04 21:37:13 -080055 ControllerNode remoteNode) {
56 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080057 this.localNode = localNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080058 this.remoteNode = remoteNode;
59 }
60
61 @Override
62 public CompletableFuture<PingResponse> ping(PingRequest request) {
63 return requestReply(request);
64 }
65
66 @Override
67 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
68 return requestReply(request);
69 }
70
71 @Override
72 public CompletableFuture<PollResponse> poll(PollRequest request) {
73 return requestReply(request);
74 }
75
76 @Override
77 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
78 return requestReply(request);
79 }
80
81 @Override
82 public CompletableFuture<Void> connect() {
83 return CompletableFuture.completedFuture(null);
84 }
85
86 @Override
87 public CompletableFuture<Void> close() {
88 return CompletableFuture.completedFuture(null);
89 }
90
Yuta HIGUCHI3b016732014-11-07 15:32:18 -080091 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -080092 Class<?> clazz = input.getClass();
93 if (clazz.equals(PollRequest.class)) {
94 return ClusterMessagingProtocol.COPYCAT_POLL;
95 } else if (clazz.equals(SyncRequest.class)) {
96 return ClusterMessagingProtocol.COPYCAT_SYNC;
97 } else if (clazz.equals(SubmitRequest.class)) {
98 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
99 } else if (clazz.equals(PingRequest.class)) {
100 return ClusterMessagingProtocol.COPYCAT_PING;
101 } else {
102 throw new IllegalArgumentException("Unknown class " + clazz.getName());
103 }
104
105 }
106
107 private <I, O> CompletableFuture<O> requestReply(I request) {
108 CompletableFuture<O> future = new CompletableFuture<>();
109 THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
110 return future;
111 }
112
113 private class RPCTask<I, O> implements Runnable {
114
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800115 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800116 private final ClusterMessage message;
117 private final CompletableFuture<O> future;
118
119 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800120 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800121 this.message =
122 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800123 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800124 messageType(request),
125 ClusterMessagingProtocol.SERIALIZER.encode(request));
126 this.future = future;
127 }
128
129 @Override
130 public void run() {
131 try {
132 byte[] response = clusterCommunicator
133 .sendAndReceive(message, remoteNode.id())
134 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
135 future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
136
137 } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
Madan Jampani515865d2014-11-09 22:29:34 -0800138 log.warn("RPCTask for {} failed.", request, e);
139 future.completeExceptionally(e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800140 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800141 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800142 future.completeExceptionally(e);
143 }
144 }
145 }
Madan Jampani515865d2014-11-09 22:29:34 -0800146}