blob: 95a052ac56ada866d20f6e72805443cc83ea2c43 [file] [log] [blame]
Carmelo Cascone37d5dbf2016-04-18 15:15:48 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Most of the code of this class was copied from:
19 * http://liveramp.com/engineering/reconnecting-thrift-client/
20 */
21
22package org.onosproject.bmv2.ctl;
23
24import com.google.common.collect.ImmutableSet;
25import org.apache.thrift.TServiceClient;
26import org.apache.thrift.transport.TTransport;
27import org.apache.thrift.transport.TTransportException;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import java.lang.reflect.InvocationHandler;
32import java.lang.reflect.InvocationTargetException;
33import java.lang.reflect.Method;
34import java.lang.reflect.Proxy;
35import java.util.Set;
36
37/**
38 * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It al provides
39 * synchronization between calls (automatically serialize multiple calls to the same client from different threads).
40 */
41public final class SafeThriftClient {
42
43 private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
44
45 /**
46 * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
47 */
48 private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
49 TTransportException.END_OF_FILE,
50 TTransportException.TIMED_OUT,
51 TTransportException.UNKNOWN);
52
53 private SafeThriftClient() {
54 // ban constructor.
55 }
56
57 /**
58 * Reflectively wraps an already existing Thrift client.
59 *
60 * @param baseClient the client to wrap
61 * @param clientInterface the interface that the client implements
62 * @param options options that control behavior of the reconnecting client
63 * @param <T>
64 * @param <C>
65 * @return
66 */
67 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
68 Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
69 new Class<?>[]{clientInterface},
70 new ReconnectingClientProxy<T>(baseClient,
71 options.getNumRetries(),
72 options.getTimeBetweenRetries()));
73
74 return (C) proxyObject;
75 }
76
77 /**
78 * Reflectively wraps an already existing Thrift client.
79 *
80 * @param baseClient the client to wrap
81 * @param options options that control behavior of the reconnecting client
82 * @param <T>
83 * @param <C>
84 * @return
85 */
86 public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
87 Class<?>[] interfaces = baseClient.getClass().getInterfaces();
88
89 for (Class<?> iface : interfaces) {
90 if (iface.getSimpleName().equals("Iface")
91 && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
92 return (C) wrap(baseClient, iface, options);
93 }
94 }
95
96 throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
97 }
98
99 /**
100 * Reflectively wraps an already existing Thrift client.
101 *
102 * @param baseClient the client to wrap
103 * @param clientInterface the interface that the client implements
104 * @param <T>
105 * @param <C>
106 * @return
107 */
108 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
109 return wrap(baseClient, clientInterface, Options.defaults());
110 }
111
112 /**
113 * Reflectively wraps an already existing Thrift client.
114 *
115 * @param baseClient the client to wrap
116 * @param <T>
117 * @param <C>
118 * @return
119 */
120 public static <T extends TServiceClient, C> C wrap(T baseClient) {
121 return wrap(baseClient, Options.defaults());
122 }
123
124 /**
125 * Reconnection options for {@link SafeThriftClient}.
126 */
127 public static class Options {
128 private int numRetries;
129 private long timeBetweenRetries;
130
131 /**
132 * Creates new options with the given parameters.
133 *
134 * @param numRetries the maximum number of times to try reconnecting before giving up and throwing an
135 * exception
136 * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
137 */
138 public Options(int numRetries, long timeBetweenRetries) {
139 this.numRetries = numRetries;
140 this.timeBetweenRetries = timeBetweenRetries;
141 }
142
143 private static Options defaults() {
144 return new Options(5, 10000L);
145 }
146
147 private int getNumRetries() {
148 return numRetries;
149 }
150
151 private long getTimeBetweenRetries() {
152 return timeBetweenRetries;
153 }
154 }
155
156 /**
157 * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
158 * reconnect and tries the method again.
159 *
160 * @param <T>
161 */
162 private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
163 private final T baseClient;
164 private final int maxRetries;
165 private final long timeBetweenRetries;
166
167 public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
168 this.baseClient = baseClient;
169 this.maxRetries = maxRetries;
170 this.timeBetweenRetries = timeBetweenRetries;
171 }
172
173 private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries)
174 throws TTransportException {
175 int errors = 0;
Carmelo Cascone5fa651e2016-04-27 17:35:57 -0700176 try {
177 transport.close();
178 } catch (Exception e) {
179 // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
180 // However, such an exception is not advertised by .close(), hence the general-purpose catch.
181 }
Carmelo Cascone37d5dbf2016-04-18 15:15:48 -0700182
183 while (errors < maxRetries) {
184 try {
185 LOG.debug("Attempting to reconnect...");
186 transport.open();
187 LOG.debug("Reconnection successful");
188 break;
189 } catch (TTransportException e) {
Carmelo Cascone5fa651e2016-04-27 17:35:57 -0700190 LOG.debug("Error while reconnecting:", e);
Carmelo Cascone37d5dbf2016-04-18 15:15:48 -0700191 errors++;
192
193 if (errors < maxRetries) {
194 try {
195 LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
196 Thread.sleep(timeBetweenRetries);
197 } catch (InterruptedException e2) {
198 Thread.currentThread().interrupt();
199 throw new RuntimeException(e);
200 }
201 }
202 }
203 }
204
205 if (errors >= maxRetries) {
206 throw new TTransportException("Failed to reconnect");
207 }
208 }
209
210 @Override
211 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
212
213 // With Thrift clients must be instantiated for each different transport session, i.e. server instance.
214 // Hence, using baseClient as lock, only calls towards the same server will be synchronized.
215
216 synchronized (baseClient) {
217
218 LOG.debug("Invoking client method... > method={}, fromThread={}",
219 method.getName(), Thread.currentThread().getId());
220
221 Object result = null;
222
223 try {
224 result = method.invoke(baseClient, args);
225
226 } catch (InvocationTargetException e) {
227 if (e.getTargetException() instanceof TTransportException) {
228 TTransportException cause = (TTransportException) e.getTargetException();
229
230 if (RESTARTABLE_CAUSES.contains(cause.getType())) {
231 reconnectOrThrowException(baseClient.getInputProtocol().getTransport(),
232 maxRetries,
233 timeBetweenRetries);
234 result = method.invoke(baseClient, args);
235 }
236 }
237
238 if (result == null) {
239 LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
240 e, method.getName(), Thread.currentThread().getId());
241 throw e.getTargetException();
242 }
243 }
244
245 LOG.debug("Method invoke complete! > method={}, fromThread={}",
246 method.getName(), Thread.currentThread().getId());
247
248 return result;
249 }
250 }
251 }
252}