blob: 3dd93b9e7780624bb69b12787b4b057c1543a769 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08003import static com.google.common.base.Verify.verifyNotNull;
Madan Jampani9b19a822014-11-04 21:37:13 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08005import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
Madan Jampani9b19a822014-11-04 21:37:13 -08006
7import java.io.IOException;
8import java.util.concurrent.CompletableFuture;
9import java.util.concurrent.ExecutionException;
10import java.util.concurrent.ScheduledExecutorService;
11import java.util.concurrent.ScheduledThreadPoolExecutor;
12import java.util.concurrent.ThreadFactory;
13import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
15
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080016import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080017import net.kuujo.copycat.protocol.PingRequest;
18import net.kuujo.copycat.protocol.PingResponse;
19import net.kuujo.copycat.protocol.PollRequest;
20import net.kuujo.copycat.protocol.PollResponse;
21import net.kuujo.copycat.protocol.SubmitRequest;
22import net.kuujo.copycat.protocol.SubmitResponse;
23import net.kuujo.copycat.protocol.SyncRequest;
24import net.kuujo.copycat.protocol.SyncResponse;
25import net.kuujo.copycat.spi.protocol.ProtocolClient;
26
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080027import org.onlab.onos.cluster.ClusterEvent;
28import org.onlab.onos.cluster.ClusterEventListener;
29import org.onlab.onos.cluster.ClusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080030import org.onlab.onos.cluster.ControllerNode;
31import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
32import org.onlab.onos.store.cluster.messaging.ClusterMessage;
33import org.onlab.onos.store.cluster.messaging.MessageSubject;
34import org.slf4j.Logger;
35
36import com.google.common.util.concurrent.ThreadFactoryBuilder;
37
38/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080039 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080040 */
Madan Jampani9b19a822014-11-04 21:37:13 -080041public class ClusterMessagingProtocolClient implements ProtocolClient {
42
43 private final Logger log = getLogger(getClass());
44
45 private static final ThreadFactory THREAD_FACTORY =
46 new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
47
48 public static final long RETRY_INTERVAL_MILLIS = 2000;
49
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080050 private final ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080051 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080052 private final ControllerNode localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080053 private final TcpMember remoteMember;
54 private ControllerNode remoteNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080055
56 // FIXME: Thread pool sizing.
57 private static final ScheduledExecutorService THREAD_POOL =
58 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
59
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080060 private volatile CompletableFuture<Void> appeared;
61
62 private volatile InternalClusterEventListener listener;
63
Madan Jampani9b19a822014-11-04 21:37:13 -080064 public ClusterMessagingProtocolClient(
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080065 ClusterService clusterService,
Madan Jampani9b19a822014-11-04 21:37:13 -080066 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080067 ControllerNode localNode,
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080068 TcpMember remoteMember) {
69
70 this.clusterService = clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080071 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080072 this.localNode = localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080073 this.remoteMember = remoteMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080074 }
75
76 @Override
77 public CompletableFuture<PingResponse> ping(PingRequest request) {
78 return requestReply(request);
79 }
80
81 @Override
82 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
83 return requestReply(request);
84 }
85
86 @Override
87 public CompletableFuture<PollResponse> poll(PollRequest request) {
88 return requestReply(request);
89 }
90
91 @Override
92 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
93 return requestReply(request);
94 }
95
96 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080097 public synchronized CompletableFuture<Void> connect() {
98 if (remoteNode != null) {
99 // done
100 return CompletableFuture.completedFuture(null);
101 }
102
103 remoteNode = getControllerNode(remoteMember);
104
105 if (remoteNode != null) {
106 // done
107 return CompletableFuture.completedFuture(null);
108 }
109
110 if (appeared != null) {
111 // already waiting for member to appear
112 return appeared;
113 }
114
115 appeared = new CompletableFuture<>();
116 listener = new InternalClusterEventListener();
117 clusterService.addListener(listener);
118
119 // wait for specified controller node to come up
120 return null;
Madan Jampani9b19a822014-11-04 21:37:13 -0800121 }
122
123 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800124 public synchronized CompletableFuture<Void> close() {
125 if (listener != null) {
126 clusterService.removeListener(listener);
127 listener = null;
128 }
129 if (appeared != null) {
130 appeared.cancel(true);
131 appeared = null;
132 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800133 return CompletableFuture.completedFuture(null);
134 }
135
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800136 private synchronized void checkIfMemberAppeared() {
137 final ControllerNode controllerNode = getControllerNode(remoteMember);
138 if (controllerNode == null) {
139 // still not there: no-op
140 return;
141 }
142
143 // found
144 remoteNode = controllerNode;
145 if (appeared != null) {
146 appeared.complete(null);
147 }
148
149 if (listener != null) {
150 clusterService.removeListener(listener);
151 listener = null;
152 }
153 }
154
Yuta HIGUCHI3b016732014-11-07 15:32:18 -0800155 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -0800156 Class<?> clazz = input.getClass();
157 if (clazz.equals(PollRequest.class)) {
158 return ClusterMessagingProtocol.COPYCAT_POLL;
159 } else if (clazz.equals(SyncRequest.class)) {
160 return ClusterMessagingProtocol.COPYCAT_SYNC;
161 } else if (clazz.equals(SubmitRequest.class)) {
162 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
163 } else if (clazz.equals(PingRequest.class)) {
164 return ClusterMessagingProtocol.COPYCAT_PING;
165 } else {
166 throw new IllegalArgumentException("Unknown class " + clazz.getName());
167 }
168
169 }
170
171 private <I, O> CompletableFuture<O> requestReply(I request) {
172 CompletableFuture<O> future = new CompletableFuture<>();
173 THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
174 return future;
175 }
176
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800177 private ControllerNode getControllerNode(TcpMember remoteMember) {
178 final String host = remoteMember.host();
179 final int port = remoteMember.port();
180 for (ControllerNode node : clusterService.getNodes()) {
181 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
182 return node;
183 }
184 }
185 return null;
186 }
187
188 private final class InternalClusterEventListener
189 implements ClusterEventListener {
190
191 public InternalClusterEventListener() {
192 }
193
194 @Override
195 public void event(ClusterEvent event) {
196 checkIfMemberAppeared();
197 }
198
199 }
200
Madan Jampani9b19a822014-11-04 21:37:13 -0800201 private class RPCTask<I, O> implements Runnable {
202
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800203 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800204 private final ClusterMessage message;
205 private final CompletableFuture<O> future;
206
207 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800208 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800209 this.message =
210 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800211 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800212 messageType(request),
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800213 verifyNotNull(SERIALIZER.encode(request)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800214 this.future = future;
215 }
216
217 @Override
218 public void run() {
219 try {
220 byte[] response = clusterCommunicator
221 .sendAndReceive(message, remoteNode.id())
222 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800223 future.complete(verifyNotNull(SERIALIZER.decode(response)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800224
225 } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
Madan Jampani515865d2014-11-09 22:29:34 -0800226 log.warn("RPCTask for {} failed.", request, e);
227 future.completeExceptionally(e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800228 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800229 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800230 future.completeExceptionally(e);
231 }
232 }
233 }
Madan Jampani515865d2014-11-09 22:29:34 -0800234}