blob: d96ecbf03376dde682b3fb685abc9da7a8cab714 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -08003import static com.google.common.base.Verify.verifyNotNull;
Yuta HIGUCHI91768e32014-11-22 05:06:35 -08004import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -08005import static org.onlab.util.Tools.namedThreads;
Madan Jampani2b6ca912014-11-21 14:44:45 -08006import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -08007import static java.util.concurrent.Executors.newCachedThreadPool;
Madan Jampani9b19a822014-11-04 21:37:13 -08008
9import java.io.IOException;
10import java.util.concurrent.CompletableFuture;
11import java.util.concurrent.ExecutionException;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080012import java.util.concurrent.ExecutorService;
Madan Jampani9b19a822014-11-04 21:37:13 -080013import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
Madan Jampani6234fd42014-11-21 22:34:28 -080015import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani2b6ca912014-11-21 14:44:45 -080016
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080017import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080018import net.kuujo.copycat.protocol.PingRequest;
19import net.kuujo.copycat.protocol.PingResponse;
20import net.kuujo.copycat.protocol.PollRequest;
21import net.kuujo.copycat.protocol.PollResponse;
22import net.kuujo.copycat.protocol.SubmitRequest;
23import net.kuujo.copycat.protocol.SubmitResponse;
24import net.kuujo.copycat.protocol.SyncRequest;
25import net.kuujo.copycat.protocol.SyncResponse;
26import net.kuujo.copycat.spi.protocol.ProtocolClient;
27
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080028import org.onlab.onos.cluster.ClusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080029import org.onlab.onos.cluster.ControllerNode;
30import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
31import org.onlab.onos.store.cluster.messaging.ClusterMessage;
32import org.onlab.onos.store.cluster.messaging.MessageSubject;
33import org.slf4j.Logger;
34
Madan Jampani9b19a822014-11-04 21:37:13 -080035/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080036 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080037 */
Madan Jampani9b19a822014-11-04 21:37:13 -080038public class ClusterMessagingProtocolClient implements ProtocolClient {
39
40 private final Logger log = getLogger(getClass());
41
Madan Jampani9b19a822014-11-04 21:37:13 -080042 public static final long RETRY_INTERVAL_MILLIS = 2000;
43
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080044 private final ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080045 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080046 private final ControllerNode localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080047 private final TcpMember remoteMember;
Yuta HIGUCHId2499432014-11-20 12:07:43 -080048
Madan Jampani6234fd42014-11-21 22:34:28 -080049 private ControllerNode remoteNode;
50 private final AtomicBoolean connectionOK = new AtomicBoolean(true);
Madan Jampani9b19a822014-11-04 21:37:13 -080051
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080052 private ExecutorService pool;
Madan Jampani9b19a822014-11-04 21:37:13 -080053
54 public ClusterMessagingProtocolClient(
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080055 ClusterService clusterService,
Madan Jampani9b19a822014-11-04 21:37:13 -080056 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080057 ControllerNode localNode,
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080058 TcpMember remoteMember) {
59
60 this.clusterService = clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080061 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080062 this.localNode = localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080063 this.remoteMember = remoteMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080064 }
65
66 @Override
67 public CompletableFuture<PingResponse> ping(PingRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080068 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -080069 }
70
71 @Override
72 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080073 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -080074 }
75
76 @Override
77 public CompletableFuture<PollResponse> poll(PollRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080078 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -080079 }
80
81 @Override
82 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080083 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -080084 }
85
86 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080087 public synchronized CompletableFuture<Void> connect() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080088 if (pool == null || pool.isShutdown()) {
89 // TODO include remote name?
90 pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
91 }
Madan Jampani6234fd42014-11-21 22:34:28 -080092 return CompletableFuture.completedFuture(null);
Madan Jampani9b19a822014-11-04 21:37:13 -080093 }
94
95 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080096 public synchronized CompletableFuture<Void> close() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080097 if (pool != null) {
98 pool.shutdownNow();
99 pool = null;
100 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800101 return CompletableFuture.completedFuture(null);
102 }
103
Yuta HIGUCHI3b016732014-11-07 15:32:18 -0800104 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -0800105 Class<?> clazz = input.getClass();
106 if (clazz.equals(PollRequest.class)) {
107 return ClusterMessagingProtocol.COPYCAT_POLL;
108 } else if (clazz.equals(SyncRequest.class)) {
109 return ClusterMessagingProtocol.COPYCAT_SYNC;
110 } else if (clazz.equals(SubmitRequest.class)) {
111 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
112 } else if (clazz.equals(PingRequest.class)) {
113 return ClusterMessagingProtocol.COPYCAT_PING;
114 } else {
115 throw new IllegalArgumentException("Unknown class " + clazz.getName());
116 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800117 }
118
119 private <I, O> CompletableFuture<O> requestReply(I request) {
120 CompletableFuture<O> future = new CompletableFuture<>();
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800121 if (pool == null) {
122 log.info("Attempted to use closed client, connecting now. {}", request);
123 connect();
124 }
125 pool.submit(new RPCTask<I, O>(request, future));
Madan Jampani9b19a822014-11-04 21:37:13 -0800126 return future;
127 }
128
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800129 private ControllerNode getControllerNode(TcpMember remoteMember) {
130 final String host = remoteMember.host();
131 final int port = remoteMember.port();
132 for (ControllerNode node : clusterService.getNodes()) {
133 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
134 return node;
135 }
136 }
137 return null;
138 }
139
Madan Jampani9b19a822014-11-04 21:37:13 -0800140 private class RPCTask<I, O> implements Runnable {
141
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800142 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800143 private final ClusterMessage message;
144 private final CompletableFuture<O> future;
145
146 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800147 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800148 this.message =
149 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800150 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800151 messageType(request),
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800152 verifyNotNull(DB_SERIALIZER.encode(request)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800153 this.future = future;
154 }
155
156 @Override
157 public void run() {
158 try {
Madan Jampani6234fd42014-11-21 22:34:28 -0800159 if (remoteNode == null) {
160 remoteNode = getControllerNode(remoteMember);
161 if (remoteNode == null) {
162 throw new IOException("Remote node is offline!");
163 }
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800164 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800165 byte[] response = clusterCommunicator
Madan Jampani6234fd42014-11-21 22:34:28 -0800166 .sendAndReceive(message, remoteNode.id())
Madan Jampani9b19a822014-11-04 21:37:13 -0800167 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
Madan Jampani6234fd42014-11-21 22:34:28 -0800168 if (!connectionOK.getAndSet(true)) {
169 log.info("Connectivity to {} restored", remoteNode);
170 }
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800171 future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
172
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800173 } catch (IOException | TimeoutException e) {
Madan Jampani6234fd42014-11-21 22:34:28 -0800174 if (connectionOK.getAndSet(false)) {
175 log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
176 }
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800177 log.debug("RPCTask for {} failed.", request, e);
Madan Jampani515865d2014-11-09 22:29:34 -0800178 future.completeExceptionally(e);
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800179 } catch (ExecutionException e) {
180 log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
181 log.debug("RPCTask execution for {} failed.", request, e);
182 future.completeExceptionally(e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800183 } catch (InterruptedException e) {
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800184 log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
185 log.debug("RPCTask for {} was interrupted.", request, e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800186 future.completeExceptionally(e);
187 Thread.currentThread().interrupt();
Madan Jampani9b19a822014-11-04 21:37:13 -0800188 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800189 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800190 future.completeExceptionally(e);
191 }
192 }
193 }
Madan Jampani515865d2014-11-09 22:29:34 -0800194}