blob: 040abed883685f1f25255aa70c64ed14dcaea881 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08003import static com.google.common.base.Verify.verifyNotNull;
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08004import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -08005import static org.onlab.util.Tools.namedThreads;
Madan Jampani2b6ca912014-11-21 14:44:45 -08006import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani9b19a822014-11-04 21:37:13 -08007
8import java.io.IOException;
9import java.util.concurrent.CompletableFuture;
10import java.util.concurrent.ExecutionException;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080011import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Madan Jampani9b19a822014-11-04 21:37:13 -080013import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
Madan Jampani2b6ca912014-11-21 14:44:45 -080015
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080016import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080017import net.kuujo.copycat.protocol.PingRequest;
18import net.kuujo.copycat.protocol.PingResponse;
19import net.kuujo.copycat.protocol.PollRequest;
20import net.kuujo.copycat.protocol.PollResponse;
21import net.kuujo.copycat.protocol.SubmitRequest;
22import net.kuujo.copycat.protocol.SubmitResponse;
23import net.kuujo.copycat.protocol.SyncRequest;
24import net.kuujo.copycat.protocol.SyncResponse;
25import net.kuujo.copycat.spi.protocol.ProtocolClient;
26
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080027import org.onlab.onos.cluster.ClusterEvent;
28import org.onlab.onos.cluster.ClusterEventListener;
29import org.onlab.onos.cluster.ClusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080030import org.onlab.onos.cluster.ControllerNode;
31import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
32import org.onlab.onos.store.cluster.messaging.ClusterMessage;
33import org.onlab.onos.store.cluster.messaging.MessageSubject;
34import org.slf4j.Logger;
35
Madan Jampani9b19a822014-11-04 21:37:13 -080036/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080037 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080038 */
Madan Jampani9b19a822014-11-04 21:37:13 -080039public class ClusterMessagingProtocolClient implements ProtocolClient {
40
41 private final Logger log = getLogger(getClass());
42
Madan Jampani9b19a822014-11-04 21:37:13 -080043 public static final long RETRY_INTERVAL_MILLIS = 2000;
44
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080045 private final ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080046 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080047 private final ControllerNode localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080048 private final TcpMember remoteMember;
Yuta HIGUCHId2499432014-11-20 12:07:43 -080049
50 // (remoteNode == null) => disconnected state
51 private volatile ControllerNode remoteNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080052
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080053 // TODO: make this non-static and stop on close
54 private static final ExecutorService THREAD_POOL
55 = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
Madan Jampani9b19a822014-11-04 21:37:13 -080056
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080057 private volatile CompletableFuture<Void> appeared;
58
59 private volatile InternalClusterEventListener listener;
60
Madan Jampani9b19a822014-11-04 21:37:13 -080061 public ClusterMessagingProtocolClient(
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080062 ClusterService clusterService,
Madan Jampani9b19a822014-11-04 21:37:13 -080063 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080064 ControllerNode localNode,
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080065 TcpMember remoteMember) {
66
67 this.clusterService = clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080068 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080069 this.localNode = localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080070 this.remoteMember = remoteMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080071 }
72
73 @Override
74 public CompletableFuture<PingResponse> ping(PingRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080075 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080076 }
77
78 @Override
79 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080080 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080081 }
82
83 @Override
84 public CompletableFuture<PollResponse> poll(PollRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080085 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080086 }
87
88 @Override
89 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080090 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080091 }
92
93 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080094 public synchronized CompletableFuture<Void> connect() {
95 if (remoteNode != null) {
96 // done
97 return CompletableFuture.completedFuture(null);
98 }
99
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800100 if (appeared != null) {
101 // already waiting for member to appear
102 return appeared;
103 }
104
105 appeared = new CompletableFuture<>();
106 listener = new InternalClusterEventListener();
107 clusterService.addListener(listener);
108
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800109 remoteNode = getControllerNode(remoteMember);
110
111 if (remoteNode != null) {
112 // done
113 return CompletableFuture.completedFuture(null);
114 }
115
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800116 // wait for specified controller node to come up
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800117 return appeared;
Madan Jampani9b19a822014-11-04 21:37:13 -0800118 }
119
120 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800121 public synchronized CompletableFuture<Void> close() {
122 if (listener != null) {
123 clusterService.removeListener(listener);
124 listener = null;
125 }
126 if (appeared != null) {
127 appeared.cancel(true);
128 appeared = null;
129 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800130 return CompletableFuture.completedFuture(null);
131 }
132
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800133 private synchronized void checkIfMemberAppeared() {
134 final ControllerNode controllerNode = getControllerNode(remoteMember);
135 if (controllerNode == null) {
136 // still not there: no-op
137 return;
138 }
139
140 // found
141 remoteNode = controllerNode;
142 if (appeared != null) {
143 appeared.complete(null);
144 }
145
146 if (listener != null) {
147 clusterService.removeListener(listener);
148 listener = null;
149 }
150 }
151
Yuta HIGUCHI3b016732014-11-07 15:32:18 -0800152 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -0800153 Class<?> clazz = input.getClass();
154 if (clazz.equals(PollRequest.class)) {
155 return ClusterMessagingProtocol.COPYCAT_POLL;
156 } else if (clazz.equals(SyncRequest.class)) {
157 return ClusterMessagingProtocol.COPYCAT_SYNC;
158 } else if (clazz.equals(SubmitRequest.class)) {
159 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
160 } else if (clazz.equals(PingRequest.class)) {
161 return ClusterMessagingProtocol.COPYCAT_PING;
162 } else {
163 throw new IllegalArgumentException("Unknown class " + clazz.getName());
164 }
165
166 }
167
168 private <I, O> CompletableFuture<O> requestReply(I request) {
169 CompletableFuture<O> future = new CompletableFuture<>();
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800170 THREAD_POOL.submit(new RPCTask<I, O>(request, future));
Madan Jampani9b19a822014-11-04 21:37:13 -0800171 return future;
172 }
173
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800174 private ControllerNode getControllerNode(TcpMember remoteMember) {
175 final String host = remoteMember.host();
176 final int port = remoteMember.port();
177 for (ControllerNode node : clusterService.getNodes()) {
178 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
179 return node;
180 }
181 }
182 return null;
183 }
184
185 private final class InternalClusterEventListener
186 implements ClusterEventListener {
187
188 public InternalClusterEventListener() {
189 }
190
191 @Override
192 public void event(ClusterEvent event) {
193 checkIfMemberAppeared();
194 }
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800195 }
196
Madan Jampani9b19a822014-11-04 21:37:13 -0800197 private class RPCTask<I, O> implements Runnable {
198
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800199 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800200 private final ClusterMessage message;
201 private final CompletableFuture<O> future;
202
203 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800204 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800205 this.message =
206 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800207 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800208 messageType(request),
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800209 verifyNotNull(SERIALIZER.encode(request)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800210 this.future = future;
211 }
212
213 @Override
214 public void run() {
215 try {
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800216 ControllerNode node = remoteNode;
217 if (node == null) {
218 throw new IOException("Remote node disappeared");
219 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800220 byte[] response = clusterCommunicator
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800221 .sendAndReceive(message, node.id())
Madan Jampani9b19a822014-11-04 21:37:13 -0800222 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800223 future.complete(verifyNotNull(SERIALIZER.decode(response)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800224
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800225 } catch (IOException | TimeoutException e) {
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800226 log.warn("RPCTask for {} failed: {}", request, e.getMessage());
227 log.debug("RPCTask for {} failed.", request, e);
Madan Jampani515865d2014-11-09 22:29:34 -0800228 future.completeExceptionally(e);
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800229 // Treating this client as disconnected
230 remoteNode = null;
Madan Jampani2b6ca912014-11-21 14:44:45 -0800231 appeared = null;
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800232 } catch (ExecutionException e) {
233 log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
234 log.debug("RPCTask execution for {} failed.", request, e);
235 future.completeExceptionally(e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800236 } catch (InterruptedException e) {
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800237 log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
238 log.debug("RPCTask for {} was interrupted.", request, e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800239 future.completeExceptionally(e);
240 Thread.currentThread().interrupt();
Madan Jampani9b19a822014-11-04 21:37:13 -0800241 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800242 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800243 future.completeExceptionally(e);
244 }
245 }
246 }
Madan Jampani515865d2014-11-09 22:29:34 -0800247}