blob: 1f5a20d041762158fcef5f43ec6da6d6983aa90c [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08003import static com.google.common.base.Verify.verifyNotNull;
Madan Jampani9b19a822014-11-04 21:37:13 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08005import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -08006import static org.onlab.util.Tools.namedThreads;
Madan Jampani9b19a822014-11-04 21:37:13 -08007
8import java.io.IOException;
9import java.util.concurrent.CompletableFuture;
10import java.util.concurrent.ExecutionException;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080011import java.util.concurrent.ExecutorService;
12import java.util.concurrent.Executors;
Madan Jampani9b19a822014-11-04 21:37:13 -080013import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080015import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080016import net.kuujo.copycat.protocol.PingRequest;
17import net.kuujo.copycat.protocol.PingResponse;
18import net.kuujo.copycat.protocol.PollRequest;
19import net.kuujo.copycat.protocol.PollResponse;
20import net.kuujo.copycat.protocol.SubmitRequest;
21import net.kuujo.copycat.protocol.SubmitResponse;
22import net.kuujo.copycat.protocol.SyncRequest;
23import net.kuujo.copycat.protocol.SyncResponse;
24import net.kuujo.copycat.spi.protocol.ProtocolClient;
25
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080026import org.onlab.onos.cluster.ClusterEvent;
27import org.onlab.onos.cluster.ClusterEventListener;
28import org.onlab.onos.cluster.ClusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080029import org.onlab.onos.cluster.ControllerNode;
30import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
31import org.onlab.onos.store.cluster.messaging.ClusterMessage;
32import org.onlab.onos.store.cluster.messaging.MessageSubject;
33import org.slf4j.Logger;
34
Madan Jampani9b19a822014-11-04 21:37:13 -080035/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080036 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080037 */
Madan Jampani9b19a822014-11-04 21:37:13 -080038public class ClusterMessagingProtocolClient implements ProtocolClient {
39
40 private final Logger log = getLogger(getClass());
41
Madan Jampani9b19a822014-11-04 21:37:13 -080042 public static final long RETRY_INTERVAL_MILLIS = 2000;
43
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080044 private final ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080045 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080046 private final ControllerNode localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080047 private final TcpMember remoteMember;
Yuta HIGUCHId2499432014-11-20 12:07:43 -080048
49 // (remoteNode == null) => disconnected state
50 private volatile ControllerNode remoteNode;
Madan Jampani9b19a822014-11-04 21:37:13 -080051
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080052 // TODO: make this non-static and stop on close
53 private static final ExecutorService THREAD_POOL
54 = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
Madan Jampani9b19a822014-11-04 21:37:13 -080055
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080056 private volatile CompletableFuture<Void> appeared;
57
58 private volatile InternalClusterEventListener listener;
59
Madan Jampani9b19a822014-11-04 21:37:13 -080060 public ClusterMessagingProtocolClient(
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080061 ClusterService clusterService,
Madan Jampani9b19a822014-11-04 21:37:13 -080062 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080063 ControllerNode localNode,
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080064 TcpMember remoteMember) {
65
66 this.clusterService = clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080067 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080068 this.localNode = localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080069 this.remoteMember = remoteMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080070 }
71
72 @Override
73 public CompletableFuture<PingResponse> ping(PingRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080074 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080075 }
76
77 @Override
78 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080079 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080080 }
81
82 @Override
83 public CompletableFuture<PollResponse> poll(PollRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080084 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080085 }
86
87 @Override
88 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
Yuta HIGUCHId2499432014-11-20 12:07:43 -080089 return connect().thenCompose((connected) -> { return requestReply(request); });
Madan Jampani9b19a822014-11-04 21:37:13 -080090 }
91
92 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080093 public synchronized CompletableFuture<Void> connect() {
94 if (remoteNode != null) {
95 // done
96 return CompletableFuture.completedFuture(null);
97 }
98
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080099 if (appeared != null) {
100 // already waiting for member to appear
101 return appeared;
102 }
103
104 appeared = new CompletableFuture<>();
105 listener = new InternalClusterEventListener();
106 clusterService.addListener(listener);
107
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800108 remoteNode = getControllerNode(remoteMember);
109
110 if (remoteNode != null) {
111 // done
112 return CompletableFuture.completedFuture(null);
113 }
114
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800115 // wait for specified controller node to come up
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800116 return appeared;
Madan Jampani9b19a822014-11-04 21:37:13 -0800117 }
118
119 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800120 public synchronized CompletableFuture<Void> close() {
121 if (listener != null) {
122 clusterService.removeListener(listener);
123 listener = null;
124 }
125 if (appeared != null) {
126 appeared.cancel(true);
127 appeared = null;
128 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800129 return CompletableFuture.completedFuture(null);
130 }
131
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800132 private synchronized void checkIfMemberAppeared() {
133 final ControllerNode controllerNode = getControllerNode(remoteMember);
134 if (controllerNode == null) {
135 // still not there: no-op
136 return;
137 }
138
139 // found
140 remoteNode = controllerNode;
141 if (appeared != null) {
142 appeared.complete(null);
143 }
144
145 if (listener != null) {
146 clusterService.removeListener(listener);
147 listener = null;
148 }
149 }
150
Yuta HIGUCHI3b016732014-11-07 15:32:18 -0800151 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -0800152 Class<?> clazz = input.getClass();
153 if (clazz.equals(PollRequest.class)) {
154 return ClusterMessagingProtocol.COPYCAT_POLL;
155 } else if (clazz.equals(SyncRequest.class)) {
156 return ClusterMessagingProtocol.COPYCAT_SYNC;
157 } else if (clazz.equals(SubmitRequest.class)) {
158 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
159 } else if (clazz.equals(PingRequest.class)) {
160 return ClusterMessagingProtocol.COPYCAT_PING;
161 } else {
162 throw new IllegalArgumentException("Unknown class " + clazz.getName());
163 }
164
165 }
166
167 private <I, O> CompletableFuture<O> requestReply(I request) {
168 CompletableFuture<O> future = new CompletableFuture<>();
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800169 THREAD_POOL.submit(new RPCTask<I, O>(request, future));
Madan Jampani9b19a822014-11-04 21:37:13 -0800170 return future;
171 }
172
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800173 private ControllerNode getControllerNode(TcpMember remoteMember) {
174 final String host = remoteMember.host();
175 final int port = remoteMember.port();
176 for (ControllerNode node : clusterService.getNodes()) {
177 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
178 return node;
179 }
180 }
181 return null;
182 }
183
184 private final class InternalClusterEventListener
185 implements ClusterEventListener {
186
187 public InternalClusterEventListener() {
188 }
189
190 @Override
191 public void event(ClusterEvent event) {
192 checkIfMemberAppeared();
193 }
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800194 }
195
Madan Jampani9b19a822014-11-04 21:37:13 -0800196 private class RPCTask<I, O> implements Runnable {
197
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800198 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800199 private final ClusterMessage message;
200 private final CompletableFuture<O> future;
201
202 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800203 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800204 this.message =
205 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800206 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800207 messageType(request),
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800208 verifyNotNull(SERIALIZER.encode(request)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800209 this.future = future;
210 }
211
212 @Override
213 public void run() {
214 try {
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800215 ControllerNode node = remoteNode;
216 if (node == null) {
217 throw new IOException("Remote node disappeared");
218 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800219 byte[] response = clusterCommunicator
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800220 .sendAndReceive(message, node.id())
Madan Jampani9b19a822014-11-04 21:37:13 -0800221 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -0800222 future.complete(verifyNotNull(SERIALIZER.decode(response)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800223
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800224 } catch (IOException | TimeoutException e) {
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800225 log.warn("RPCTask for {} failed: {}", request, e.getMessage());
226 log.debug("RPCTask for {} failed.", request, e);
Madan Jampani515865d2014-11-09 22:29:34 -0800227 future.completeExceptionally(e);
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800228 // Treating this client as disconnected
229 remoteNode = null;
230 } catch (ExecutionException e) {
231 log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
232 log.debug("RPCTask execution for {} failed.", request, e);
233 future.completeExceptionally(e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800234 } catch (InterruptedException e) {
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800235 log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
236 log.debug("RPCTask for {} was interrupted.", request, e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800237 future.completeExceptionally(e);
238 Thread.currentThread().interrupt();
Madan Jampani9b19a822014-11-04 21:37:13 -0800239 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800240 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800241 future.completeExceptionally(e);
242 }
243 }
244 }
Madan Jampani515865d2014-11-09 22:29:34 -0800245}