Fixed race condition in bmv2 SafeThriftClient
This was due to the fact that multiple clients were instantiated over
the same tranposrt. Now SafeThriftClient locks over the transport.
Also fixed a bug where some exceptions where uncaught in
SafeThriftClient.
Change-Id: I841ef64e64c28fc78016a0e0dc33e59052cd983e
diff --git a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
index 5e6cbd9..28d88e9 100644
--- a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
+++ b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
@@ -236,16 +236,6 @@
return buffers;
}
- private void closeTransport() {
- LOG.debug("Closing transport session... > deviceId={}", deviceId);
- if (this.transport.isOpen()) {
- this.transport.close();
- LOG.debug("Transport session closed! > deviceId={}", deviceId);
- } else {
- LOG.debug("Transport session was already closed! deviceId={}", deviceId);
- }
- }
-
@Override
public final long addTableEntry(Bmv2TableEntry entry) throws Bmv2RuntimeException {
@@ -527,7 +517,7 @@
TTransport transport = new TSocket(
info.getLeft(), info.getRight());
TProtocol protocol = new TBinaryProtocol(transport);
- // Our BMv2 device implements multiple Thrift services, create a client for each one.
+ // Our BMv2 device implements multiple Thrift services, create a client for each one on the same transport.
Standard.Client standardClient = new Standard.Client(
new TMultiplexedProtocol(protocol, "standard"));
SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client(
@@ -551,11 +541,20 @@
RemovalListener<DeviceId, Bmv2ThriftClient> {
@Override
- public void onRemoval(
- RemovalNotification<DeviceId, Bmv2ThriftClient> notification) {
+ public void onRemoval(RemovalNotification<DeviceId, Bmv2ThriftClient> notification) {
// close the transport connection
- LOG.debug("Removing client from cache... > deviceId={}", notification.getKey());
- notification.getValue().closeTransport();
+ Bmv2ThriftClient client = notification.getValue();
+ // Locking here is ugly, but needed (see SafeThriftClient).
+ synchronized (client.transport) {
+ LOG.debug("Closing transport session... > deviceId={}", client.deviceId);
+ if (client.transport.isOpen()) {
+ client.transport.close();
+ LOG.debug("Transport session closed! > deviceId={}", client.deviceId);
+ } else {
+ LOG.debug("Transport session was already closed! deviceId={}", client.deviceId);
+ }
+ }
+ LOG.debug("Removing client from cache... > deviceId={}", client.deviceId);
}
}
}
diff --git a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
index 95a052a..98813f9 100644
--- a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
+++ b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
@@ -35,8 +35,8 @@
import java.util.Set;
/**
- * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It al provides
- * synchronization between calls (automatically serialize multiple calls to the same client from different threads).
+ * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It also provides
+ * synchronization between calls over the same transport.
*/
public final class SafeThriftClient {
@@ -161,23 +161,28 @@
*/
private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
private final T baseClient;
+ private final TTransport transport;
private final int maxRetries;
private final long timeBetweenRetries;
public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
this.baseClient = baseClient;
+ this.transport = baseClient.getInputProtocol().getTransport();
this.maxRetries = maxRetries;
this.timeBetweenRetries = timeBetweenRetries;
}
- private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries)
+ private void reconnectOrThrowException()
throws TTransportException {
int errors = 0;
try {
- transport.close();
+ if (transport.isOpen()) {
+ transport.close();
+ }
} catch (Exception e) {
// Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
// However, such an exception is not advertised by .close(), hence the general-purpose catch.
+ LOG.debug("Exception while closing transport", e);
}
while (errors < maxRetries) {
@@ -210,42 +215,37 @@
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- // With Thrift clients must be instantiated for each different transport session, i.e. server instance.
- // Hence, using baseClient as lock, only calls towards the same server will be synchronized.
-
- synchronized (baseClient) {
+ // Thrift transport layer is not thread-safe (it's a wrapper on a socket), hence we need locking.
+ synchronized (transport) {
LOG.debug("Invoking client method... > method={}, fromThread={}",
method.getName(), Thread.currentThread().getId());
- Object result = null;
-
try {
- result = method.invoke(baseClient, args);
+ return method.invoke(baseClient, args);
} catch (InvocationTargetException e) {
if (e.getTargetException() instanceof TTransportException) {
TTransportException cause = (TTransportException) e.getTargetException();
if (RESTARTABLE_CAUSES.contains(cause.getType())) {
- reconnectOrThrowException(baseClient.getInputProtocol().getTransport(),
- maxRetries,
- timeBetweenRetries);
- result = method.invoke(baseClient, args);
+ // Try to reconnect. If fail, a TTransportException will be thrown.
+ reconnectOrThrowException();
+ try {
+ // If here, transport has been successfully open, hence new exceptions will be thrown.
+ return method.invoke(baseClient, args);
+ } catch (InvocationTargetException e1) {
+ LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
+ e1, method.getName(), Thread.currentThread().getId());
+ throw e1.getTargetException();
+ }
}
}
-
- if (result == null) {
- LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
- e, method.getName(), Thread.currentThread().getId());
- throw e.getTargetException();
- }
+ // Target exception is neither a TTransportException nor a restartable cause.
+ LOG.debug("Exception while invoking client method: {} > method={}, fromThread={}",
+ e, method.getName(), Thread.currentThread().getId());
+ throw e.getTargetException();
}
-
- LOG.debug("Method invoke complete! > method={}, fromThread={}",
- method.getName(), Thread.currentThread().getId());
-
- return result;
}
}
}