blob: c1b83da484422fe4be82fda58d7d9ffdb8fc27e6 [file] [log] [blame]
Esin Karaman971fb7f2017-12-28 13:44:52 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18/*
19 * Most of the code of this class was copied from:
20 * http://liveramp.com/engineering/reconnecting-thrift-client/
21 */
22
23package org.onosproject.drivers.bmv2.ctl;
24
25import com.google.common.collect.ImmutableSet;
26import org.apache.thrift.TServiceClient;
27import org.apache.thrift.transport.TTransport;
28import org.apache.thrift.transport.TTransportException;
29import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
32import java.lang.reflect.InvocationHandler;
33import java.lang.reflect.InvocationTargetException;
34import java.lang.reflect.Method;
35import java.lang.reflect.Proxy;
36import java.util.Set;
37
38/**
39 * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It also provides
40 * synchronization between calls over the same transport.
41 */
42final class SafeThriftClient {
43
44 private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
45
46 /**
47 * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
48 */
49 private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
50 TTransportException.END_OF_FILE,
51 TTransportException.TIMED_OUT,
52 TTransportException.UNKNOWN);
53
54 private SafeThriftClient() {
55 // ban constructor.
56 }
57
58 /**
59 * Reflectively wraps an already existing Thrift client.
60 *
61 * @param baseClient the client to wrap
62 * @param clientInterface the interface that the client implements
63 * @param options options that control behavior of the reconnecting client
64 * @param <T> a class extending TServiceClient
65 * @param <C> a client interface
66 * @return a wrapped client interface
67 */
68 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
69 Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
70 new Class<?>[]{clientInterface},
71 new ReconnectingClientProxy<T>(baseClient,
72 options.getNumRetries(),
73 options.getTimeBetweenRetries()));
74
75 return (C) proxyObject;
76 }
77
78 /**
79 * Reflectively wraps an already existing Thrift client.
80 *
81 * @param baseClient the client to wrap
82 * @param options options that control behavior of the reconnecting client
83 * @param <T> a class that extends TServiceClient
84 * @param <C> a client interface
85 * @return a wrapped client interface
86 */
87 public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
88 Class<?>[] interfaces = baseClient.getClass().getInterfaces();
89
90 for (Class<?> iface : interfaces) {
91 if (iface.getSimpleName().equals("Iface")
92 && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
93 return (C) wrap(baseClient, iface, options);
94 }
95 }
96
Ray Milkeydbd38212018-07-02 09:18:09 -070097 throw new IllegalStateException(
98 "Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
Esin Karaman971fb7f2017-12-28 13:44:52 +000099 }
100
101 /**
102 * Reflectively wraps an already existing Thrift client.
103 *
104 * @param baseClient the client to wrap
105 * @param clientInterface the interface that the client implements
106 * @param <T> a class that extends TServiceClient
107 * @param <C> a client interface
108 * @return a wrapped client interface
109 */
110 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
111 return wrap(baseClient, clientInterface, Options.defaults());
112 }
113
114 /**
115 * Reflectively wraps an already existing Thrift client.
116 *
117 * @param baseClient the client to wrap
118 * @param <T> a class that extends TServiceClient
119 * @param <C> a client interface
120 * @return a wrapped client interface
121 */
122 public static <T extends TServiceClient, C> C wrap(T baseClient) {
123 return wrap(baseClient, Options.defaults());
124 }
125
126 /**
127 * Reconnection options for {@link SafeThriftClient}.
128 */
129 public static class Options {
130 private int numRetries;
131 private long timeBetweenRetries;
132
133 /**
134 * Creates new options with the given parameters.
135 *
136 * @param numRetries the maximum number of times to try reconnecting before giving up and throwing an
137 * exception
138 * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
139 */
140 public Options(int numRetries, long timeBetweenRetries) {
141 this.numRetries = numRetries;
142 this.timeBetweenRetries = timeBetweenRetries;
143 }
144
145 private static Options defaults() {
146 return new Options(5, 10000L);
147 }
148
149 private int getNumRetries() {
150 return numRetries;
151 }
152
153 private long getTimeBetweenRetries() {
154 return timeBetweenRetries;
155 }
156 }
157
158 /**
159 * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
160 * reconnect and tries the method again.
161 *
162 * @param <T> a class that extends TServiceClient
163 */
164 private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
165 private final T baseClient;
166 private final TTransport transport;
167 private final int maxRetries;
168 private final long timeBetweenRetries;
169
170 public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
171 this.baseClient = baseClient;
172 this.transport = baseClient.getInputProtocol().getTransport();
173 this.maxRetries = maxRetries;
174 this.timeBetweenRetries = timeBetweenRetries;
175 }
176
177 private void reconnectOrThrowException()
178 throws TTransportException {
179 int errors = 0;
180 try {
181 if (transport.isOpen()) {
182 transport.close();
183 }
184 } catch (Exception e) {
185 // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
186 // However, such an exception is not advertised by .close(), hence the general-purpose catch.
187 LOG.debug("Exception while closing transport", e);
188 }
189
190 while (errors < maxRetries) {
191 try {
192 LOG.debug("Attempting to reconnect...");
193 transport.open();
194 LOG.debug("Reconnection successful");
195 break;
196 } catch (TTransportException e) {
197 LOG.debug("Error while reconnecting:", e);
198 errors++;
199
200 if (errors < maxRetries) {
201 try {
202 LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
203 Thread.sleep(timeBetweenRetries);
204 } catch (InterruptedException e2) {
205 Thread.currentThread().interrupt();
Ray Milkeydbd38212018-07-02 09:18:09 -0700206 throw new IllegalStateException(e);
Esin Karaman971fb7f2017-12-28 13:44:52 +0000207 }
208 }
209 }
210 }
211
212 if (errors >= maxRetries) {
213 throw new TTransportException("Failed to reconnect");
214 }
215 }
216
217 @Override
218 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
219
220 // Thrift transport layer is not thread-safe (it's a wrapper on a socket), hence we need locking.
221 synchronized (transport) {
222
223 LOG.debug("Invoking method... > fromThread={}, method={}, args={}",
224 Thread.currentThread().getId(), method.getName(), args);
225
226 try {
227
228 return method.invoke(baseClient, args);
229 } catch (InvocationTargetException e) {
230 if (e.getTargetException() instanceof TTransportException) {
231 TTransportException cause = (TTransportException) e.getTargetException();
232
233 if (RESTARTABLE_CAUSES.contains(cause.getType())) {
234 // Try to reconnect. If fail, a TTransportException will be thrown.
235 reconnectOrThrowException();
236 try {
237 // If here, transport has been successfully open, hence new exceptions will be thrown.
238 return method.invoke(baseClient, args);
239 } catch (InvocationTargetException e1) {
240 LOG.debug("Exception: {}", e1.getTargetException());
241 throw e1.getTargetException();
242 }
243 }
244 }
245 // Target exception is neither a TTransportException nor a restartable cause.
246 LOG.debug("Exception: {}", e.getTargetException());
247 throw e.getTargetException();
248 }
249 }
250 }
251 }
252}