blob: b796ccdfa9ad43c1449681c7f312c72e06012828 [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.service.impl;
Madan Jampani9b19a822014-11-04 21:37:13 -080017
Yuta HIGUCHIa3982c12014-11-10 09:47:08 -080018import static com.google.common.base.Verify.verifyNotNull;
Brian O'Connorabafb502014-12-02 22:26:20 -080019import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080020import static org.onlab.util.Tools.namedThreads;
Madan Jampani2b6ca912014-11-21 14:44:45 -080021import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080022import static java.util.concurrent.Executors.newCachedThreadPool;
Madan Jampani9b19a822014-11-04 21:37:13 -080023
24import java.io.IOException;
Sho SHIMIZU09082752014-12-04 12:21:56 -080025import java.time.Duration;
Madan Jampani9b19a822014-11-04 21:37:13 -080026import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.ExecutionException;
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -080028import java.util.concurrent.ExecutorService;
Madan Jampani9b19a822014-11-04 21:37:13 -080029import java.util.concurrent.TimeUnit;
30import java.util.concurrent.TimeoutException;
Madan Jampani6234fd42014-11-21 22:34:28 -080031import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani2b6ca912014-11-21 14:44:45 -080032
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080033import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080034import net.kuujo.copycat.protocol.PingRequest;
35import net.kuujo.copycat.protocol.PingResponse;
36import net.kuujo.copycat.protocol.PollRequest;
37import net.kuujo.copycat.protocol.PollResponse;
38import net.kuujo.copycat.protocol.SubmitRequest;
39import net.kuujo.copycat.protocol.SubmitResponse;
40import net.kuujo.copycat.protocol.SyncRequest;
41import net.kuujo.copycat.protocol.SyncResponse;
42import net.kuujo.copycat.spi.protocol.ProtocolClient;
43
Brian O'Connorabafb502014-12-02 22:26:20 -080044import org.onosproject.cluster.ClusterService;
45import org.onosproject.cluster.ControllerNode;
46import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
47import org.onosproject.store.cluster.messaging.ClusterMessage;
48import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani9b19a822014-11-04 21:37:13 -080049import org.slf4j.Logger;
50
Madan Jampani9b19a822014-11-04 21:37:13 -080051/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080052 * ONOS Cluster messaging based Copycat protocol client.
Madan Jampani9b19a822014-11-04 21:37:13 -080053 */
Madan Jampani9b19a822014-11-04 21:37:13 -080054public class ClusterMessagingProtocolClient implements ProtocolClient {
55
56 private final Logger log = getLogger(getClass());
57
Sho SHIMIZU09082752014-12-04 12:21:56 -080058 public static final Duration RETRY_INTERVAL = Duration.ofMillis(2000);
Madan Jampani9b19a822014-11-04 21:37:13 -080059
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080060 private final ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080061 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080062 private final ControllerNode localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080063 private final TcpMember remoteMember;
Yuta HIGUCHId2499432014-11-20 12:07:43 -080064
Madan Jampani6234fd42014-11-21 22:34:28 -080065 private ControllerNode remoteNode;
66 private final AtomicBoolean connectionOK = new AtomicBoolean(true);
Madan Jampani9b19a822014-11-04 21:37:13 -080067
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -080068 private ExecutorService pool;
Madan Jampani9b19a822014-11-04 21:37:13 -080069
70 public ClusterMessagingProtocolClient(
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080071 ClusterService clusterService,
Madan Jampani9b19a822014-11-04 21:37:13 -080072 ClusterCommunicationService clusterCommunicator,
Madan Jampani515865d2014-11-09 22:29:34 -080073 ControllerNode localNode,
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080074 TcpMember remoteMember) {
75
76 this.clusterService = clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080077 this.clusterCommunicator = clusterCommunicator;
Madan Jampani515865d2014-11-09 22:29:34 -080078 this.localNode = localNode;
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -080079 this.remoteMember = remoteMember;
Madan Jampani9b19a822014-11-04 21:37:13 -080080 }
81
82 @Override
83 public CompletableFuture<PingResponse> ping(PingRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080084 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -080085 }
86
87 @Override
88 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080089 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -080090 }
91
92 @Override
93 public CompletableFuture<PollResponse> poll(PollRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080094 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -080095 }
96
97 @Override
98 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
Madan Jampani6234fd42014-11-21 22:34:28 -080099 return requestReply(request);
Madan Jampani9b19a822014-11-04 21:37:13 -0800100 }
101
102 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800103 public synchronized CompletableFuture<Void> connect() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800104 if (pool == null || pool.isShutdown()) {
105 // TODO include remote name?
106 pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
107 }
Madan Jampani6234fd42014-11-21 22:34:28 -0800108 return CompletableFuture.completedFuture(null);
Madan Jampani9b19a822014-11-04 21:37:13 -0800109 }
110
111 @Override
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800112 public synchronized CompletableFuture<Void> close() {
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800113 if (pool != null) {
114 pool.shutdownNow();
115 pool = null;
116 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800117 return CompletableFuture.completedFuture(null);
118 }
119
Yuta HIGUCHI3b016732014-11-07 15:32:18 -0800120 private <I> MessageSubject messageType(I input) {
Madan Jampani9b19a822014-11-04 21:37:13 -0800121 Class<?> clazz = input.getClass();
122 if (clazz.equals(PollRequest.class)) {
123 return ClusterMessagingProtocol.COPYCAT_POLL;
124 } else if (clazz.equals(SyncRequest.class)) {
125 return ClusterMessagingProtocol.COPYCAT_SYNC;
126 } else if (clazz.equals(SubmitRequest.class)) {
127 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
128 } else if (clazz.equals(PingRequest.class)) {
129 return ClusterMessagingProtocol.COPYCAT_PING;
130 } else {
131 throw new IllegalArgumentException("Unknown class " + clazz.getName());
132 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800133 }
134
135 private <I, O> CompletableFuture<O> requestReply(I request) {
136 CompletableFuture<O> future = new CompletableFuture<>();
Yuta HIGUCHI03e77e92014-11-22 17:53:01 -0800137 if (pool == null) {
138 log.info("Attempted to use closed client, connecting now. {}", request);
139 connect();
140 }
141 pool.submit(new RPCTask<I, O>(request, future));
Madan Jampani9b19a822014-11-04 21:37:13 -0800142 return future;
143 }
144
Yuta HIGUCHI71b9d092014-11-12 13:31:11 -0800145 private ControllerNode getControllerNode(TcpMember remoteMember) {
146 final String host = remoteMember.host();
147 final int port = remoteMember.port();
148 for (ControllerNode node : clusterService.getNodes()) {
149 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
150 return node;
151 }
152 }
153 return null;
154 }
155
Madan Jampani9b19a822014-11-04 21:37:13 -0800156 private class RPCTask<I, O> implements Runnable {
157
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800158 private final I request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800159 private final ClusterMessage message;
160 private final CompletableFuture<O> future;
161
162 public RPCTask(I request, CompletableFuture<O> future) {
Yuta HIGUCHIe59656d2014-11-07 01:57:32 -0800163 this.request = request;
Madan Jampani9b19a822014-11-04 21:37:13 -0800164 this.message =
165 new ClusterMessage(
Madan Jampani515865d2014-11-09 22:29:34 -0800166 localNode.id(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800167 messageType(request),
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800168 verifyNotNull(DB_SERIALIZER.encode(request)));
Madan Jampani9b19a822014-11-04 21:37:13 -0800169 this.future = future;
170 }
171
172 @Override
173 public void run() {
174 try {
Madan Jampani6234fd42014-11-21 22:34:28 -0800175 if (remoteNode == null) {
176 remoteNode = getControllerNode(remoteMember);
177 if (remoteNode == null) {
178 throw new IOException("Remote node is offline!");
179 }
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800180 }
Madan Jampani9b19a822014-11-04 21:37:13 -0800181 byte[] response = clusterCommunicator
Madan Jampani6234fd42014-11-21 22:34:28 -0800182 .sendAndReceive(message, remoteNode.id())
Sho SHIMIZU09082752014-12-04 12:21:56 -0800183 .get(RETRY_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
Madan Jampani6234fd42014-11-21 22:34:28 -0800184 if (!connectionOK.getAndSet(true)) {
185 log.info("Connectivity to {} restored", remoteNode);
186 }
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800187 future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
188
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800189 } catch (IOException | TimeoutException e) {
Madan Jampani6234fd42014-11-21 22:34:28 -0800190 if (connectionOK.getAndSet(false)) {
191 log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
192 }
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800193 log.debug("RPCTask for {} failed.", request, e);
Madan Jampani515865d2014-11-09 22:29:34 -0800194 future.completeExceptionally(e);
Yuta HIGUCHId2499432014-11-20 12:07:43 -0800195 } catch (ExecutionException e) {
196 log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
197 log.debug("RPCTask execution for {} failed.", request, e);
198 future.completeExceptionally(e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800199 } catch (InterruptedException e) {
Yuta HIGUCHI45207162014-11-17 15:25:13 -0800200 log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
201 log.debug("RPCTask for {} was interrupted.", request, e);
Yuta HIGUCHI2b75f1a2014-11-14 16:07:34 -0800202 future.completeExceptionally(e);
203 Thread.currentThread().interrupt();
Madan Jampani9b19a822014-11-04 21:37:13 -0800204 } catch (Exception e) {
Yuta HIGUCHI9bbaca32014-11-07 13:08:31 -0800205 log.warn("RPCTask for {} terribly failed.", request, e);
Madan Jampani9b19a822014-11-04 21:37:13 -0800206 future.completeExceptionally(e);
207 }
208 }
209 }
Madan Jampani515865d2014-11-09 22:29:34 -0800210}