blob: bbe0546a086b27115e7c5d7a092ad54fd3bc16ac [file] [log] [blame]
Carmelo Cascone37d5dbf2016-04-18 15:15:48 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Most of the code of this class was copied from:
19 * http://liveramp.com/engineering/reconnecting-thrift-client/
20 */
21
22package org.onosproject.bmv2.ctl;
23
24import com.google.common.collect.ImmutableSet;
25import org.apache.thrift.TServiceClient;
26import org.apache.thrift.transport.TTransport;
27import org.apache.thrift.transport.TTransportException;
28import org.slf4j.Logger;
29import org.slf4j.LoggerFactory;
30
31import java.lang.reflect.InvocationHandler;
32import java.lang.reflect.InvocationTargetException;
33import java.lang.reflect.Method;
34import java.lang.reflect.Proxy;
35import java.util.Set;
36
37/**
38 * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It al provides
39 * synchronization between calls (automatically serialize multiple calls to the same client from different threads).
40 */
41public final class SafeThriftClient {
42
43 private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
44
45 /**
46 * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
47 */
48 private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
49 TTransportException.END_OF_FILE,
50 TTransportException.TIMED_OUT,
51 TTransportException.UNKNOWN);
52
53 private SafeThriftClient() {
54 // ban constructor.
55 }
56
57 /**
58 * Reflectively wraps an already existing Thrift client.
59 *
60 * @param baseClient the client to wrap
61 * @param clientInterface the interface that the client implements
62 * @param options options that control behavior of the reconnecting client
63 * @param <T>
64 * @param <C>
65 * @return
66 */
67 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
68 Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
69 new Class<?>[]{clientInterface},
70 new ReconnectingClientProxy<T>(baseClient,
71 options.getNumRetries(),
72 options.getTimeBetweenRetries()));
73
74 return (C) proxyObject;
75 }
76
77 /**
78 * Reflectively wraps an already existing Thrift client.
79 *
80 * @param baseClient the client to wrap
81 * @param options options that control behavior of the reconnecting client
82 * @param <T>
83 * @param <C>
84 * @return
85 */
86 public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
87 Class<?>[] interfaces = baseClient.getClass().getInterfaces();
88
89 for (Class<?> iface : interfaces) {
90 if (iface.getSimpleName().equals("Iface")
91 && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
92 return (C) wrap(baseClient, iface, options);
93 }
94 }
95
96 throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
97 }
98
99 /**
100 * Reflectively wraps an already existing Thrift client.
101 *
102 * @param baseClient the client to wrap
103 * @param clientInterface the interface that the client implements
104 * @param <T>
105 * @param <C>
106 * @return
107 */
108 public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
109 return wrap(baseClient, clientInterface, Options.defaults());
110 }
111
112 /**
113 * Reflectively wraps an already existing Thrift client.
114 *
115 * @param baseClient the client to wrap
116 * @param <T>
117 * @param <C>
118 * @return
119 */
120 public static <T extends TServiceClient, C> C wrap(T baseClient) {
121 return wrap(baseClient, Options.defaults());
122 }
123
124 /**
125 * Reconnection options for {@link SafeThriftClient}.
126 */
127 public static class Options {
128 private int numRetries;
129 private long timeBetweenRetries;
130
131 /**
132 * Creates new options with the given parameters.
133 *
134 * @param numRetries the maximum number of times to try reconnecting before giving up and throwing an
135 * exception
136 * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
137 */
138 public Options(int numRetries, long timeBetweenRetries) {
139 this.numRetries = numRetries;
140 this.timeBetweenRetries = timeBetweenRetries;
141 }
142
143 private static Options defaults() {
144 return new Options(5, 10000L);
145 }
146
147 private int getNumRetries() {
148 return numRetries;
149 }
150
151 private long getTimeBetweenRetries() {
152 return timeBetweenRetries;
153 }
154 }
155
156 /**
157 * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
158 * reconnect and tries the method again.
159 *
160 * @param <T>
161 */
162 private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
163 private final T baseClient;
164 private final int maxRetries;
165 private final long timeBetweenRetries;
166
167 public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
168 this.baseClient = baseClient;
169 this.maxRetries = maxRetries;
170 this.timeBetweenRetries = timeBetweenRetries;
171 }
172
173 private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries)
174 throws TTransportException {
175 int errors = 0;
176 transport.close();
177
178 while (errors < maxRetries) {
179 try {
180 LOG.debug("Attempting to reconnect...");
181 transport.open();
182 LOG.debug("Reconnection successful");
183 break;
184 } catch (TTransportException e) {
185 LOG.error("Error while reconnecting:", e);
186 errors++;
187
188 if (errors < maxRetries) {
189 try {
190 LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
191 Thread.sleep(timeBetweenRetries);
192 } catch (InterruptedException e2) {
193 Thread.currentThread().interrupt();
194 throw new RuntimeException(e);
195 }
196 }
197 }
198 }
199
200 if (errors >= maxRetries) {
201 throw new TTransportException("Failed to reconnect");
202 }
203 }
204
205 @Override
206 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
207
208 // With Thrift clients must be instantiated for each different transport session, i.e. server instance.
209 // Hence, using baseClient as lock, only calls towards the same server will be synchronized.
210
211 synchronized (baseClient) {
212
213 LOG.debug("Invoking client method... > method={}, fromThread={}",
214 method.getName(), Thread.currentThread().getId());
215
216 Object result = null;
217
218 try {
219 result = method.invoke(baseClient, args);
220
221 } catch (InvocationTargetException e) {
222 if (e.getTargetException() instanceof TTransportException) {
223 TTransportException cause = (TTransportException) e.getTargetException();
224
225 if (RESTARTABLE_CAUSES.contains(cause.getType())) {
226 reconnectOrThrowException(baseClient.getInputProtocol().getTransport(),
227 maxRetries,
228 timeBetweenRetries);
229 result = method.invoke(baseClient, args);
230 }
231 }
232
233 if (result == null) {
234 LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
235 e, method.getName(), Thread.currentThread().getId());
236 throw e.getTargetException();
237 }
238 }
239
240 LOG.debug("Method invoke complete! > method={}, fromThread={}",
241 method.getName(), Thread.currentThread().getId());
242
243 return result;
244 }
245 }
246 }
247}