ONOS-7251 - Initial implementation of fabric.p4 L2 broadcast feature.

Thrift client cherry-picked from the commit dd5792ac9ee38a702c3128a34224852b5c284687

Change-Id: I989f2b2074485a892195889a7c976b518510da88
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java
new file mode 100644
index 0000000..a510954
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * Most of the code of this class was copied from:
+ * http://liveramp.com/engineering/reconnecting-thrift-client/
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Set;
+
+/**
+ * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It also provides
+ * synchronization between calls over the same transport.
+ */
+final class SafeThriftClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
+
+    /**
+     * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
+     */
+    private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
+                                                                           TTransportException.END_OF_FILE,
+                                                                           TTransportException.TIMED_OUT,
+                                                                           TTransportException.UNKNOWN);
+
+    private SafeThriftClient() {
+        // ban constructor.
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient      the client to wrap
+     * @param clientInterface the interface that the client implements
+     * @param options         options that control behavior of the reconnecting client
+     * @param <T>             a class extending TServiceClient
+     * @param <C>             a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
+        Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
+                                                    new Class<?>[]{clientInterface},
+                                                    new ReconnectingClientProxy<T>(baseClient,
+                                                                                   options.getNumRetries(),
+                                                                                   options.getTimeBetweenRetries()));
+
+        return (C) proxyObject;
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient the client to wrap
+     * @param options    options that control behavior of the reconnecting client
+     * @param <T>        a class that extends TServiceClient
+     * @param <C>        a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
+        Class<?>[] interfaces = baseClient.getClass().getInterfaces();
+
+        for (Class<?> iface : interfaces) {
+            if (iface.getSimpleName().equals("Iface")
+                    && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
+                return (C) wrap(baseClient, iface, options);
+            }
+        }
+
+        throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient      the client to wrap
+     * @param clientInterface the interface that the client implements
+     * @param <T>             a class that extends TServiceClient
+     * @param <C>             a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
+        return wrap(baseClient, clientInterface, Options.defaults());
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient the client to wrap
+     * @param <T>        a class that extends TServiceClient
+     * @param <C>        a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient) {
+        return wrap(baseClient, Options.defaults());
+    }
+
+    /**
+     * Reconnection options for {@link SafeThriftClient}.
+     */
+    public static class Options {
+        private int numRetries;
+        private long timeBetweenRetries;
+
+        /**
+         * Creates new options with the given parameters.
+         *
+         * @param numRetries         the maximum number of times to try reconnecting before giving up and throwing an
+         *                           exception
+         * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
+         */
+        public Options(int numRetries, long timeBetweenRetries) {
+            this.numRetries = numRetries;
+            this.timeBetweenRetries = timeBetweenRetries;
+        }
+
+        private static Options defaults() {
+            return new Options(5, 10000L);
+        }
+
+        private int getNumRetries() {
+            return numRetries;
+        }
+
+        private long getTimeBetweenRetries() {
+            return timeBetweenRetries;
+        }
+    }
+
+    /**
+     * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
+     * reconnect and tries the method again.
+     *
+     * @param <T> a class that extends TServiceClient
+     */
+    private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
+        private final T baseClient;
+        private final TTransport transport;
+        private final int maxRetries;
+        private final long timeBetweenRetries;
+
+        public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
+            this.baseClient = baseClient;
+            this.transport = baseClient.getInputProtocol().getTransport();
+            this.maxRetries = maxRetries;
+            this.timeBetweenRetries = timeBetweenRetries;
+        }
+
+        private void reconnectOrThrowException()
+                throws TTransportException {
+            int errors = 0;
+            try {
+                if (transport.isOpen()) {
+                    transport.close();
+                }
+            } catch (Exception e) {
+                // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
+                // However, such an exception is not advertised by .close(), hence the general-purpose catch.
+                LOG.debug("Exception while closing transport", e);
+            }
+
+            while (errors < maxRetries) {
+                try {
+                    LOG.debug("Attempting to reconnect...");
+                    transport.open();
+                    LOG.debug("Reconnection successful");
+                    break;
+                } catch (TTransportException e) {
+                    LOG.debug("Error while reconnecting:", e);
+                    errors++;
+
+                    if (errors < maxRetries) {
+                        try {
+                            LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
+                            Thread.sleep(timeBetweenRetries);
+                        } catch (InterruptedException e2) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }
+
+            if (errors >= maxRetries) {
+                throw new TTransportException("Failed to reconnect");
+            }
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+            // Thrift transport layer is not thread-safe (it's a wrapper on a socket), hence we need locking.
+            synchronized (transport) {
+
+                LOG.debug("Invoking method... > fromThread={}, method={}, args={}",
+                          Thread.currentThread().getId(), method.getName(), args);
+
+                try {
+
+                    return method.invoke(baseClient, args);
+                } catch (InvocationTargetException e) {
+                    if (e.getTargetException() instanceof TTransportException) {
+                        TTransportException cause = (TTransportException) e.getTargetException();
+
+                        if (RESTARTABLE_CAUSES.contains(cause.getType())) {
+                            // Try to reconnect. If fail, a TTransportException will be thrown.
+                            reconnectOrThrowException();
+                            try {
+                                // If here, transport has been successfully open, hence new exceptions will be thrown.
+                                return method.invoke(baseClient, args);
+                            } catch (InvocationTargetException e1) {
+                                LOG.debug("Exception: {}", e1.getTargetException());
+                                throw e1.getTargetException();
+                            }
+                        }
+                    }
+                    // Target exception is neither a TTransportException nor a restartable cause.
+                    LOG.debug("Exception: {}", e.getTargetException());
+                    throw e.getTargetException();
+                }
+            }
+        }
+    }
+}