blob: a510954e94ddfdec0e52e03bde815bdde1c09f3b [file] [log] [blame]
Esin Karaman971fb7f2017-12-28 13:44:52 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18/*
19 * Most of the code of this class was copied from:
20 * http://liveramp.com/engineering/reconnecting-thrift-client/
21 */
22
23package org.onosproject.drivers.bmv2.ctl;
24
25import com.google.common.collect.ImmutableSet;
26import org.apache.thrift.TServiceClient;
27import org.apache.thrift.transport.TTransport;
28import org.apache.thrift.transport.TTransportException;
29import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
32import java.lang.reflect.InvocationHandler;
33import java.lang.reflect.InvocationTargetException;
34import java.lang.reflect.Method;
35import java.lang.reflect.Proxy;
36import java.util.Set;
37
38/**
39 * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It also provides
40 * synchronization between calls over the same transport.
41 */
42final class SafeThriftClient {
43
44 private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
45
46 /**
47 * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
48 */
49 private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
50 TTransportException.END_OF_FILE,
51 TTransportException.TIMED_OUT,
52 TTransportException.UNKNOWN);
53
54 private SafeThriftClient() {
55 // ban constructor.
56 }
57
58 /**
59 * Reflectively wraps an already existing Thrift client.
60 *
61 * @param baseClient the client to wrap
62 * @param clientInterface the interface that the client implements
63 * @param options options that control behavior of the reconnecting client
64 * @param <T> a class extending TServiceClient
65 * @param <C> a client interface
66 * @return a wrapped client interface
67 */
68 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
69 Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
70 new Class<?>[]{clientInterface},
71 new ReconnectingClientProxy<T>(baseClient,
72 options.getNumRetries(),
73 options.getTimeBetweenRetries()));
74
75 return (C) proxyObject;
76 }
77
78 /**
79 * Reflectively wraps an already existing Thrift client.
80 *
81 * @param baseClient the client to wrap
82 * @param options options that control behavior of the reconnecting client
83 * @param <T> a class that extends TServiceClient
84 * @param <C> a client interface
85 * @return a wrapped client interface
86 */
87 public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
88 Class<?>[] interfaces = baseClient.getClass().getInterfaces();
89
90 for (Class<?> iface : interfaces) {
91 if (iface.getSimpleName().equals("Iface")
92 && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
93 return (C) wrap(baseClient, iface, options);
94 }
95 }
96
97 throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
98 }
99
100 /**
101 * Reflectively wraps an already existing Thrift client.
102 *
103 * @param baseClient the client to wrap
104 * @param clientInterface the interface that the client implements
105 * @param <T> a class that extends TServiceClient
106 * @param <C> a client interface
107 * @return a wrapped client interface
108 */
109 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
110 return wrap(baseClient, clientInterface, Options.defaults());
111 }
112
113 /**
114 * Reflectively wraps an already existing Thrift client.
115 *
116 * @param baseClient the client to wrap
117 * @param <T> a class that extends TServiceClient
118 * @param <C> a client interface
119 * @return a wrapped client interface
120 */
121 public static <T extends TServiceClient, C> C wrap(T baseClient) {
122 return wrap(baseClient, Options.defaults());
123 }
124
125 /**
126 * Reconnection options for {@link SafeThriftClient}.
127 */
128 public static class Options {
129 private int numRetries;
130 private long timeBetweenRetries;
131
132 /**
133 * Creates new options with the given parameters.
134 *
135 * @param numRetries the maximum number of times to try reconnecting before giving up and throwing an
136 * exception
137 * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
138 */
139 public Options(int numRetries, long timeBetweenRetries) {
140 this.numRetries = numRetries;
141 this.timeBetweenRetries = timeBetweenRetries;
142 }
143
144 private static Options defaults() {
145 return new Options(5, 10000L);
146 }
147
148 private int getNumRetries() {
149 return numRetries;
150 }
151
152 private long getTimeBetweenRetries() {
153 return timeBetweenRetries;
154 }
155 }
156
157 /**
158 * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
159 * reconnect and tries the method again.
160 *
161 * @param <T> a class that extends TServiceClient
162 */
163 private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
164 private final T baseClient;
165 private final TTransport transport;
166 private final int maxRetries;
167 private final long timeBetweenRetries;
168
169 public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
170 this.baseClient = baseClient;
171 this.transport = baseClient.getInputProtocol().getTransport();
172 this.maxRetries = maxRetries;
173 this.timeBetweenRetries = timeBetweenRetries;
174 }
175
176 private void reconnectOrThrowException()
177 throws TTransportException {
178 int errors = 0;
179 try {
180 if (transport.isOpen()) {
181 transport.close();
182 }
183 } catch (Exception e) {
184 // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
185 // However, such an exception is not advertised by .close(), hence the general-purpose catch.
186 LOG.debug("Exception while closing transport", e);
187 }
188
189 while (errors < maxRetries) {
190 try {
191 LOG.debug("Attempting to reconnect...");
192 transport.open();
193 LOG.debug("Reconnection successful");
194 break;
195 } catch (TTransportException e) {
196 LOG.debug("Error while reconnecting:", e);
197 errors++;
198
199 if (errors < maxRetries) {
200 try {
201 LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
202 Thread.sleep(timeBetweenRetries);
203 } catch (InterruptedException e2) {
204 Thread.currentThread().interrupt();
205 throw new RuntimeException(e);
206 }
207 }
208 }
209 }
210
211 if (errors >= maxRetries) {
212 throw new TTransportException("Failed to reconnect");
213 }
214 }
215
216 @Override
217 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
218
219 // Thrift transport layer is not thread-safe (it's a wrapper on a socket), hence we need locking.
220 synchronized (transport) {
221
222 LOG.debug("Invoking method... > fromThread={}, method={}, args={}",
223 Thread.currentThread().getId(), method.getName(), args);
224
225 try {
226
227 return method.invoke(baseClient, args);
228 } catch (InvocationTargetException e) {
229 if (e.getTargetException() instanceof TTransportException) {
230 TTransportException cause = (TTransportException) e.getTargetException();
231
232 if (RESTARTABLE_CAUSES.contains(cause.getType())) {
233 // Try to reconnect. If fail, a TTransportException will be thrown.
234 reconnectOrThrowException();
235 try {
236 // If here, transport has been successfully open, hence new exceptions will be thrown.
237 return method.invoke(baseClient, args);
238 } catch (InvocationTargetException e1) {
239 LOG.debug("Exception: {}", e1.getTargetException());
240 throw e1.getTargetException();
241 }
242 }
243 }
244 // Target exception is neither a TTransportException nor a restartable cause.
245 LOG.debug("Exception: {}", e.getTargetException());
246 throw e.getTargetException();
247 }
248 }
249 }
250 }
251}