blob: 070ae8c64965028984edb51206b258ddcacd6dfe [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08003import static com.google.common.base.Verify.verifyNotNull;
Madan Jampani9b19a822014-11-04 21:37:13 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08005import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -08006import static org.onlab.util.Tools.namedThreads;
Madan Jampani9b19a822014-11-04 21:37:13 -08007
8import java.io.IOException;
9import java.util.concurrent.CompletableFuture;
10import java.util.concurrent.ExecutionException;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080011import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Madan Jampani9b19a822014-11-04 21:37:13 -080013import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
15
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080016import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080017import net.kuujo.copycat.protocol.PingRequest;
18import net.kuujo.copycat.protocol.PingResponse;
19import net.kuujo.copycat.protocol.PollRequest;
20import net.kuujo.copycat.protocol.PollResponse;
21import net.kuujo.copycat.protocol.SubmitRequest;
22import net.kuujo.copycat.protocol.SubmitResponse;
23import net.kuujo.copycat.protocol.SyncRequest;
24import net.kuujo.copycat.protocol.SyncResponse;
25import net.kuujo.copycat.spi.protocol.ProtocolClient;
26
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080027import org.onlab.onos.cluster.ClusterEvent;
28import org.onlab.onos.cluster.ClusterEventListener;
29import org.onlab.onos.cluster.ClusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080030import org.onlab.onos.cluster.ControllerNode;
31import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
32import org.onlab.onos.store.cluster.messaging.ClusterMessage;
33import org.onlab.onos.store.cluster.messaging.MessageSubject;
34import org.slf4j.Logger;
35
Madan Jampani9b19a822014-11-04 21:37:13 -080036/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080037 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080038 */
Madan Jampani9b19a822014-11-04 21:37:13 -080039public class ClusterMessagingProtocolClient implements ProtocolClient {
40
41 private final Logger log = getLogger(getClass());
42
Madan Jampani9b19a822014-11-04 21:37:13 -080043 public static final long RETRY_INTERVAL_MILLIS = 2000;
44
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080045 private final ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080046 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080047 private final ControllerNode localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080048 private final TcpMember remoteMember;
49 private ControllerNode remoteNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080050
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080051 // TODO: make this non-static and stop on close
52 private static final ExecutorService THREAD_POOL
53 = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
Madan Jampani9b19a822014-11-04 21:37:13 -080054
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080055 private volatile CompletableFuture<Void> appeared;
56
57 private volatile InternalClusterEventListener listener;
58
Madan Jampani9b19a822014-11-04 21:37:13 -080059 public ClusterMessagingProtocolClient(
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080060 ClusterService clusterService,
Madan Jampani9b19a822014-11-04 21:37:13 -080061 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080062 ControllerNode localNode,
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080063 TcpMember remoteMember) {
64
65 this.clusterService = clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080066 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080067 this.localNode = localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080068 this.remoteMember = remoteMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080069 }
70
71 @Override
72 public CompletableFuture<PingResponse> ping(PingRequest request) {
73 return requestReply(request);
74 }
75
76 @Override
77 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
78 return requestReply(request);
79 }
80
81 @Override
82 public CompletableFuture<PollResponse> poll(PollRequest request) {
83 return requestReply(request);
84 }
85
86 @Override
87 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
88 return requestReply(request);
89 }
90
91 @Override
Ray Milkeyc5cd0d92014-11-14 14:05:36 -080092// @edu.umd.cs.findbugs.annotations.SuppressWarnings
93// (value="NP_NONNULL_PARAM_VIOLATION",
94// justification="False positives on completedFuture call, passing null as a parameter is allowed")
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080095 public synchronized CompletableFuture<Void> connect() {
96 if (remoteNode != null) {
97 // done
98 return CompletableFuture.completedFuture(null);
99 }
100
101 remoteNode = getControllerNode(remoteMember);
102
103 if (remoteNode != null) {
104 // done
105 return CompletableFuture.completedFuture(null);
106 }
107
108 if (appeared != null) {
109 // already waiting for member to appear
110 return appeared;
111 }
112
113 appeared = new CompletableFuture<>();
114 listener = new InternalClusterEventListener();
115 clusterService.addListener(listener);
116
117 // wait for specified controller node to come up
118 return null;
Madan Jampani9b19a822014-11-04 21:37:13 -0800119 }
120
121 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800122 public synchronized CompletableFuture<Void> close() {
123 if (listener != null) {
124 clusterService.removeListener(listener);
125 listener = null;
126 }
127 if (appeared != null) {
128 appeared.cancel(true);
129 appeared = null;
130 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800131 return CompletableFuture.completedFuture(null);
132 }
133
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800134 private synchronized void checkIfMemberAppeared() {
135 final ControllerNode controllerNode = getControllerNode(remoteMember);
136 if (controllerNode == null) {
137 // still not there: no-op
138 return;
139 }
140
141 // found
142 remoteNode = controllerNode;
143 if (appeared != null) {
144 appeared.complete(null);
145 }
146
147 if (listener != null) {
148 clusterService.removeListener(listener);
149 listener = null;
150 }
151 }
152
Yuta HIGUCHI3b016732014-11-07 15:32:18 -0800153 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -0800154 Class<?> clazz = input.getClass();
155 if (clazz.equals(PollRequest.class)) {
156 return ClusterMessagingProtocol.COPYCAT_POLL;
157 } else if (clazz.equals(SyncRequest.class)) {
158 return ClusterMessagingProtocol.COPYCAT_SYNC;
159 } else if (clazz.equals(SubmitRequest.class)) {
160 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
161 } else if (clazz.equals(PingRequest.class)) {
162 return ClusterMessagingProtocol.COPYCAT_PING;
163 } else {
164 throw new IllegalArgumentException("Unknown class " + clazz.getName());
165 }
166
167 }
168
169 private <I, O> CompletableFuture<O> requestReply(I request) {
170 CompletableFuture<O> future = new CompletableFuture<>();
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800171 THREAD_POOL.submit(new RPCTask<I, O>(request, future));
Madan Jampani9b19a822014-11-04 21:37:13 -0800172 return future;
173 }
174
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800175 private ControllerNode getControllerNode(TcpMember remoteMember) {
176 final String host = remoteMember.host();
177 final int port = remoteMember.port();
178 for (ControllerNode node : clusterService.getNodes()) {
179 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
180 return node;
181 }
182 }
183 return null;
184 }
185
186 private final class InternalClusterEventListener
187 implements ClusterEventListener {
188
189 public InternalClusterEventListener() {
190 }
191
192 @Override
193 public void event(ClusterEvent event) {
194 checkIfMemberAppeared();
195 }
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800196 }
197
Madan Jampani9b19a822014-11-04 21:37:13 -0800198 private class RPCTask<I, O> implements Runnable {
199
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800200 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800201 private final ClusterMessage message;
202 private final CompletableFuture<O> future;
203
204 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800205 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800206 this.message =
207 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800208 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800209 messageType(request),
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800210 verifyNotNull(SERIALIZER.encode(request)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800211 this.future = future;
212 }
213
214 @Override
215 public void run() {
216 try {
217 byte[] response = clusterCommunicator
218 .sendAndReceive(message, remoteNode.id())
219 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800220 future.complete(verifyNotNull(SERIALIZER.decode(response)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800221
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800222 } catch (IOException | ExecutionException | TimeoutException e) {
Madan Jampani515865d2014-11-09 22:29:34 -0800223 log.warn("RPCTask for {} failed.", request, e);
224 future.completeExceptionally(e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800225 } catch (InterruptedException e) {
226 log.warn("RPCTask for {} was interrupted.", request, e);
227 future.completeExceptionally(e);
228 Thread.currentThread().interrupt();
Madan Jampani9b19a822014-11-04 21:37:13 -0800229 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800230 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800231 future.completeExceptionally(e);
232 }
233 }
234 }
Madan Jampani515865d2014-11-09 22:29:34 -0800235}