ONOS-7251 - Initial implementation of fabric.p4 L2 broadcast feature.

Thrift client cherry-picked from the commit dd5792ac9ee38a702c3128a34224852b5c284687

Change-Id: I989f2b2074485a892195889a7c976b518510da88
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2DeviceThriftClient.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2DeviceThriftClient.java
new file mode 100644
index 0000000..cdcfb0c
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2DeviceThriftClient.java
@@ -0,0 +1,406 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+import org.apache.thrift.TException;
+import org.onosproject.bmv2.thriftapi.SimplePreLAG;
+import org.onosproject.drivers.bmv2.api.Bmv2DeviceAgent;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2PreGroup;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2PreNode;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.drivers.bmv2.impl.Bmv2PreGroupTranslatorImpl;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.drivers.bmv2.ctl.Bmv2TExceptionParser.parseTException;
+
+/**
+ * Implementation of a Thrift client to control a BMv2 device.
+ */
+final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent {
+
+    // FIXME: make context_id arbitrary for each call
+    // See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
+    private static final int CONTEXT_ID = 0;
+    private static final String DEFAULT_LAG_MAP = "";
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private final SimplePreLAG.Iface simplePreLagClient;
+    private final DeviceId deviceId;
+
+    // ban constructor
+    protected Bmv2DeviceThriftClient(DeviceId deviceId, SimplePreLAG.Iface simplePreLagClient) {
+        this.deviceId = deviceId;
+        this.simplePreLagClient = simplePreLagClient;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    @Override
+    public Bmv2PreGroup writePreGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+        log.debug("Creating a multicast group... > deviceId={}, {}", deviceId, preGroup);
+
+        GroupRollbackMachine groupRollbackMachine = new GroupRollbackMachine(preGroup);
+        try {
+            //first create mc group
+            preGroup.setNativeGroupHandle(createMcGroup(preGroup.groupId()));
+            groupRollbackMachine.setState(GroupOperationState.GROUP_CREATED);
+            //create mc nodes
+            createMcNodesOfGroup(preGroup);
+            groupRollbackMachine.setState(GroupOperationState.NODES_CREATED);
+            //associate nodes with group
+            associateMcNodesOfGroup(preGroup);
+            groupRollbackMachine.setState(GroupOperationState.NODES_ASSOCIATED);
+
+            log.debug("Multicast group created successfully. deviceId={}, {}", deviceId, preGroup);
+
+            return preGroup;
+        } finally {
+            groupRollbackMachine.rollbackIfNecessary();
+        }
+    }
+
+    @Override
+    public void deletePreGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+        log.debug("Deleting a multicast group... > deviceId={}, {}", deviceId, preGroup);
+        //disassociate mc nodes from group
+        disassociateMcNodesOfGroup(preGroup);
+        //delete mc nodes
+        deleteMcNodesOfGroup(preGroup);
+        //delete group
+        deleteMcGroup(preGroup);
+
+        log.debug("Multicast group deleted. deviceId={}, {}", deviceId, preGroup);
+    }
+
+    @Override
+    public List<Bmv2PreGroup> getPreGroups() throws Bmv2RuntimeException {
+        try {
+            String entries = simplePreLagClient.bm_mc_get_entries(CONTEXT_ID);
+            return Bmv2PreGroupTranslatorImpl.translate(entries);
+
+        } catch (TException | IOException e) {
+            log.debug("Exception while getting multicast groups. deviceId={}", deviceId, e);
+
+            if (e instanceof TException) {
+                throw parseTException((TException) e);
+            } else {
+                throw new Bmv2RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Creates multicast nodes one by one.
+     * Node handles obtained as the results of node creation operations are stored
+     * in given Bmv2PreGroup object.
+     *
+     * @param preGroup Bmv2PreGroup object
+     * @throws Bmv2RuntimeException
+     */
+    private void createMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+        for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+            node.setL1Handle(createMcNode(node));
+        }
+    }
+
+    /**
+     * Associates multicast nodes with a group one by one.
+     *
+     * @param preGroup Bmv2PreGroup object
+     * @throws Bmv2RuntimeException
+     */
+    private void associateMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+        int nativeGroupHandle = preGroup.nativeGroupHandle();
+        for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+            associateMcNode(nativeGroupHandle, node);
+        }
+    }
+
+    /**
+     * Deletes multicast nodes one by one.
+     *
+     * @param preGroup Bmv2PreGroup object
+     * @throws Bmv2RuntimeException
+     */
+    private void deleteMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+        for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+            destroyMcNode(node);
+        }
+    }
+
+    /**
+     * Disassociates multicast nodes from a group one by one.
+     *
+     * @param preGroup Bmv2PreGroup object
+     * @throws Bmv2RuntimeException
+     */
+    private void disassociateMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+        int nativeGroupHandle = preGroup.nativeGroupHandle();
+        for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+            disassociateMcNode(nativeGroupHandle, node);
+        }
+    }
+
+
+    /**
+     * Creates a multicast group with specified group Id.
+     *
+     * @param groupId identifier of a group
+     * @return group handle (BMv2 specific identifier associated with the group)
+     * @throws Bmv2RuntimeException
+     */
+    private int createMcGroup(int groupId) throws Bmv2RuntimeException {
+        log.debug("Creating the multicast group... > deviceId={}, groupId={}", deviceId, groupId);
+        try {
+            return simplePreLagClient.bm_mc_mgrp_create(CONTEXT_ID, groupId);
+        } catch (TException e) {
+            log.debug("Exception during creating multicast group. deviceId={}, groupId={}", deviceId, groupId);
+            throw parseTException(e);
+        }
+    }
+
+    /**
+     * Deletes a multicast group from a BMv2 device.
+     *
+     * @param preGroup
+     * @throws Bmv2RuntimeException
+     */
+    private void deleteMcGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+        log.debug("Destroying the multicast group... > deviceId={}, groupId={}, groupHandle={}",
+                  deviceId, preGroup.groupId(), preGroup.nativeGroupHandle());
+        try {
+            simplePreLagClient.bm_mc_mgrp_destroy(CONTEXT_ID, preGroup.nativeGroupHandle());
+        } catch (TException e) {
+            log.debug("Exception during destroying multicast group. deviceId={}, groupId={}, groupHandle={}",
+                      deviceId, preGroup.groupId(), preGroup.nativeGroupHandle());
+            throw parseTException(e);
+        }
+    }
+
+    /**
+     * Creates a multicast node on the BMv2 device.
+     *
+     * @param node Bmv2PreNode
+     * @return L1 handle
+     * @throws Bmv2RuntimeException
+     */
+    private int createMcNode(Bmv2PreNode node) throws Bmv2RuntimeException {
+        log.debug("Creating the multicast node... > deviceId={}, {}", deviceId, node);
+        try {
+            return simplePreLagClient.bm_mc_node_create(CONTEXT_ID, node.rid(), node.portMap(), DEFAULT_LAG_MAP);
+        } catch (TException e) {
+            log.debug("Exception during creating multicast node: {}", node);
+            throw parseTException(e);
+        }
+    }
+
+    /**
+     * Associates a multicast node with a group.
+     *
+     * @param groupHandle handle of the group that the node will be associated with
+     * @param node        Bmv2PreNode
+     * @throws Bmv2RuntimeException
+     */
+    private void associateMcNode(int groupHandle, Bmv2PreNode node) throws Bmv2RuntimeException {
+        log.debug("Associating the multicast node with the group... > deviceId={}, groupHandle:{}, node:{}",
+                  deviceId, groupHandle, node);
+        try {
+            simplePreLagClient.bm_mc_node_associate(CONTEXT_ID, groupHandle, node.l1Handle());
+        } catch (TException e) {
+            log.debug("Exception during associating multicast node with group. deviceId={} groupHandle:{}, node:{}",
+                      deviceId, groupHandle, node);
+            throw parseTException(e);
+        }
+    }
+
+    /**
+     * Disassociates a multicast node from a group.
+     *
+     * @param groupHandle handle of the group that the node will be disassociated from
+     * @param node        Bmv2PreNode
+     * @throws Bmv2RuntimeException
+     */
+    private void disassociateMcNode(int groupHandle, Bmv2PreNode node) throws Bmv2RuntimeException {
+        log.debug("Disassociating the multicast node from the group... > deviceId={}, groupHandle:{}, node:{}",
+                  deviceId, groupHandle, node);
+        try {
+            simplePreLagClient.bm_mc_node_dissociate(CONTEXT_ID, groupHandle, node.l1Handle());
+        } catch (TException e) {
+            log.debug("Failed to disassociate multicast node from group. deviceId={} groupHandle:{}, node:{}",
+                      deviceId, groupHandle, node);
+            throw parseTException(e);
+        }
+    }
+
+    /**
+     * Destroys the multicast node in a BMv2 device.
+     *
+     * @param node PRE node which is about to be destroyed
+     * @throws Bmv2RuntimeException
+     */
+    private void destroyMcNode(Bmv2PreNode node) throws Bmv2RuntimeException {
+        log.debug("Destroying the multicast node... > deviceId={}, node:{}", deviceId, node);
+        try {
+            simplePreLagClient.bm_mc_node_destroy(CONTEXT_ID, node.l1Handle());
+        } catch (TException e) {
+            log.debug("Exception during destroying multicast node. deviceId={}, node:{}", deviceId, node);
+            throw parseTException(e);
+        }
+    }
+
+
+    /**
+     * Defines identifiers of main group operation steps.
+     */
+    private enum GroupOperationState {
+        IDLE, // nothing has been done
+        GROUP_CREATED, //the last successful step is group creation
+        NODES_CREATED, //the last successful step is node creation
+        NODES_ASSOCIATED //the last successful step is node association.
+    }
+
+    /**
+     * Implementation of a simple state machine to keep track of complex (non-atomic) operations on groups and
+     * to execute essential rollback steps accordingly.
+     * For example, creating a multicast group is composed of multiple steps:
+     * 1- Group creation
+     * 2- Node creation
+     * 3- Node association
+     * Each step associates with a GroupOperationState to keep track of group creation operation.
+     * A rollback flow is executed with respect to the current state.
+     */
+    private class GroupRollbackMachine {
+        Bmv2PreGroup preGroup;
+        //indicates the last successful step
+        GroupOperationState state = GroupOperationState.IDLE;
+
+        private GroupRollbackMachine() {
+            //hidden constructor
+        }
+
+        public GroupRollbackMachine(Bmv2PreGroup preGroup) {
+            this.preGroup = preGroup;
+        }
+
+        GroupOperationState state() {
+            return state;
+        }
+
+        void setState(GroupOperationState state) {
+            this.state = checkNotNull(state);
+        }
+
+        /**
+         * Checks the state and executes necessary rollback flow if necessary.
+         */
+        void rollbackIfNecessary() {
+            switch (state) {
+                case GROUP_CREATED:
+                    //means node creation failed. Delete already created nodes and the group
+                    onGroupCreated();
+                    break;
+                case NODES_CREATED:
+                    //means node association failed.
+                    //Disassociate already associated nodes then delete nodes and the group.
+                    onNodesCreated();
+                    break;
+                default:
+                    //do nothing in IDLE and NODES_ASSOCIATED states. They do not signify a failure.
+                    break;
+            }
+        }
+
+        /**
+         * Executes necessary steps in case the last successful step is group creation.
+         * This means one of the node creation operations has been failed and all previous steps should rollback.
+         */
+        private void onGroupCreated() {
+            log.warn("One of the steps of mc group creation operation has been failed." +
+                             "Rolling back in state {}...> deviceId={}, groupId={}",
+                     state, deviceId, preGroup.groupId());
+            deleteNodes(preGroup);
+            deleteGroup(preGroup);
+        }
+
+        /**
+         * Executes necessary steps in case the last successful step is node creation.
+         * This means one of the node association operations has been failed and all previous steps should rollback.
+         */
+        private void onNodesCreated() {
+            log.warn("One of the steps of mc group creation operation has been failed." +
+                             "Rolling back in state {}...> deviceId={}, groupId={}",
+                     state, deviceId, preGroup.groupId());
+            disassociateNodes(preGroup);
+            deleteNodes(preGroup);
+            deleteGroup(preGroup);
+        }
+
+        /**
+         * Deletes a group in the scope of rollback operation.
+         */
+        private void deleteGroup(Bmv2PreGroup preGroup) {
+            try {
+                deleteMcGroup(preGroup);
+            } catch (Bmv2RuntimeException e) {
+                log.error("Unable to destroy multicast group in the scope of rollback operation." +
+                                  "deviceId={}, groupId={}", deviceId, preGroup.groupId());
+            }
+        }
+
+
+        /**
+         * Disassociates all nodes from their group in the scope of rollback operation.
+         */
+        private void disassociateNodes(Bmv2PreGroup preGroup) {
+            preGroup.nodes().nodes().forEach(node -> {
+                try {
+                    disassociateMcNode(preGroup.nativeGroupHandle(), node);
+                } catch (Bmv2RuntimeException e) {
+                    log.error("Unable to disassociate the node in the scope of rollback operation." +
+                                      "deviceId={}, groupHandle={}, l1Handle={}",
+                              deviceId, preGroup.nativeGroupHandle(), node.l1Handle(), e);
+                }
+            });
+        }
+
+        /**
+         * Deletes all nodes of a group in the scope of rollback operation.
+         */
+        private void deleteNodes(Bmv2PreGroup preGroup) {
+            //filter created nodes and destroy them
+            preGroup.nodes().nodes().stream()
+                    .filter(node -> node.l1Handle() != null)
+                    .forEach(node -> {
+                        try {
+                            destroyMcNode(node);
+                        } catch (Bmv2RuntimeException e) {
+                            log.error("Unable to destroy the node in the scope of rollback operation." +
+                                              "deviceId={}, l1Handle={}", deviceId, node.l1Handle(), e);
+                        }
+                    });
+        }
+    }
+}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2PreControllerImpl.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2PreControllerImpl.java
new file mode 100644
index 0000000..4c80e79
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2PreControllerImpl.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.onlab.util.Tools;
+import org.onosproject.bmv2.thriftapi.SimplePreLAG;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.drivers.bmv2.api.Bmv2DeviceAgent;
+import org.onosproject.drivers.bmv2.api.Bmv2PreController;
+import org.onosproject.net.DeviceId;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * BMv2 PRE controller implementation.
+ */
+@Component(immediate = true)
+@Service
+public class Bmv2PreControllerImpl implements Bmv2PreController {
+
+    private static final int DEVICE_LOCK_CACHE_EXPIRE_TIME_IN_MIN = 10;
+    private static final int DEVICE_LOCK_WAITING_TIME_IN_SEC = 60;
+    private static final int DEFAULT_NUM_CONNECTION_RETRIES = 2;
+    private static final int DEFAULT_TIME_BETWEEN_RETRIES = 10;
+    private static final String THRIFT_SERVICE_NAME = "simple_pre_lag";
+    private final Logger log = getLogger(getClass());
+    private final Map<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> clients = Maps.newHashMap();
+    //TODO consider a timeout mechanism for locks to limit the retention time
+    private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
+            .expireAfterAccess(DEVICE_LOCK_CACHE_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+            .build(new CacheLoader<DeviceId, ReadWriteLock>() {
+                @Override
+                public ReadWriteLock load(DeviceId deviceId) {
+                    return new ReentrantReadWriteLock();
+                }
+            });
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+    @Property(name = "numConnectionRetries", intValue = DEFAULT_NUM_CONNECTION_RETRIES,
+            label = "Number of connection retries after a network error")
+    private int numConnectionRetries = DEFAULT_NUM_CONNECTION_RETRIES;
+    @Property(name = "timeBetweenRetries", intValue = DEFAULT_TIME_BETWEEN_RETRIES,
+            label = "Time between retries in milliseconds")
+    private int timeBetweenRetries = DEFAULT_TIME_BETWEEN_RETRIES;
+    @Property(name = "deviceLockWaitingTime", intValue = DEVICE_LOCK_WAITING_TIME_IN_SEC,
+            label = "Waiting time for a read/write lock in seconds")
+    private int deviceLockWaitingTime = DEVICE_LOCK_WAITING_TIME_IN_SEC;
+
+    @Activate
+    public void activate() {
+        cfgService.registerProperties(getClass());
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
+        log.info("Stopped");
+    }
+
+    @Modified
+    protected void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+
+        Integer newNumConnRetries = Tools.getIntegerProperty(properties, "numConnectionRetries");
+        if (newNumConnRetries != null && newNumConnRetries >= 0) {
+            numConnectionRetries = newNumConnRetries;
+        } else {
+            log.warn("numConnectionRetries must be equal to or greater than 0");
+        }
+
+        Integer newTimeBtwRetries = Tools.getIntegerProperty(properties, "timeBetweenRetries");
+        if (newTimeBtwRetries != null && newTimeBtwRetries >= 0) {
+            timeBetweenRetries = newTimeBtwRetries;
+        } else {
+            log.warn("timeBetweenRetries must be equal to or greater than 0");
+        }
+
+        Integer newDeviceLockWaitingTime = Tools.getIntegerProperty(properties, "deviceLockWaitingTime");
+        if (newDeviceLockWaitingTime != null && newDeviceLockWaitingTime >= 0) {
+            deviceLockWaitingTime = newDeviceLockWaitingTime;
+        } else {
+            log.warn("deviceLockWaitingTime must be equal to or greater than 0");
+        }
+    }
+
+    @Override
+    public boolean createPreClient(DeviceId deviceId, String thriftServerIp, Integer thriftServerPort) {
+        checkNotNull(deviceId);
+        checkNotNull(thriftServerIp);
+        checkNotNull(thriftServerPort);
+
+        if (!acquireWriteLock(deviceId)) {
+            //reason is already logged during the lock acquisition
+            return false;
+        }
+
+        log.info("Creating PRE client for {} through Thrift server {}:{}", deviceId, thriftServerIp, thriftServerPort);
+
+        try {
+            if (clients.containsKey(deviceId)) {
+                throw new IllegalStateException(format("A client already exists for %s", deviceId));
+            } else {
+                return doCreateClient(deviceId, thriftServerIp, thriftServerPort);
+            }
+        } finally {
+            releaseWriteLock(deviceId);
+        }
+    }
+
+    @Override
+    public Bmv2DeviceAgent getPreClient(DeviceId deviceId) {
+        if (!acquireReadLock(deviceId)) {
+            return null;
+        }
+        try {
+            return clients.containsKey(deviceId) ? clients.get(deviceId).getRight() : null;
+        } finally {
+            releaseReadLock(deviceId);
+        }
+    }
+
+    @Override
+    public void removePreClient(DeviceId deviceId) {
+        if (!acquireWriteLock(deviceId)) {
+            //reason is already logged during the lock acquisition
+            return;
+        }
+
+        try {
+            if (clients.containsKey(deviceId)) {
+                TTransport transport = clients.get(deviceId).getLeft();
+                if (transport.isOpen()) {
+                    transport.close();
+                }
+                clients.remove(deviceId);
+            }
+        } finally {
+            releaseWriteLock(deviceId);
+        }
+    }
+
+    private boolean acquireWriteLock(DeviceId deviceId) {
+        try {
+            return deviceLocks.getUnchecked(deviceId).writeLock().tryLock(deviceLockWaitingTime, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Unable to acquire write lock for device {} due to {}", deviceId, e.toString());
+        }
+        return false;
+    }
+
+    private boolean acquireReadLock(DeviceId deviceId) {
+        try {
+            return deviceLocks.getUnchecked(deviceId).readLock().tryLock(deviceLockWaitingTime, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Unable to acquire read lock for device {} due to {}", deviceId, e.toString());
+        }
+        return false;
+    }
+
+    private void releaseWriteLock(DeviceId deviceId) {
+        deviceLocks.getUnchecked(deviceId).writeLock().unlock();
+    }
+
+    private void releaseReadLock(DeviceId deviceId) {
+        deviceLocks.getUnchecked(deviceId).readLock().unlock();
+    }
+
+    private boolean doCreateClient(DeviceId deviceId, String thriftServerIp, Integer thriftServerPort) {
+        SafeThriftClient.Options options = new SafeThriftClient.Options(numConnectionRetries, timeBetweenRetries);
+
+        try {
+            // Make the expensive call
+            TTransport transport = new TSocket(thriftServerIp, thriftServerPort);
+
+            TProtocol protocol = new TBinaryProtocol(transport);
+            // Create a client for simple_pre service.
+            SimplePreLAG.Client simplePreClient = new SimplePreLAG.Client(
+                    new TMultiplexedProtocol(protocol, THRIFT_SERVICE_NAME));
+
+            SimplePreLAG.Iface safeSimplePreClient = SafeThriftClient.wrap(simplePreClient,
+                                                                           SimplePreLAG.Iface.class,
+                                                                           options);
+
+            Bmv2DeviceThriftClient client = new Bmv2DeviceThriftClient(deviceId, safeSimplePreClient);
+            clients.put(deviceId, Pair.of(transport, client));
+            return true;
+
+        } catch (RuntimeException e) {
+            log.warn("Failed to create Thrift client for BMv2 device. deviceId={}, cause={}", deviceId, e);
+            return false;
+        }
+    }
+}
\ No newline at end of file
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2TExceptionParser.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2TExceptionParser.java
new file mode 100644
index 0000000..b765eca
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2TExceptionParser.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+
+import org.apache.thrift.TException;
+import org.onosproject.bmv2.thriftapi.InvalidMcOperation;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2RuntimeException;
+
+import static org.onosproject.drivers.bmv2.api.runtime.Bmv2RuntimeException.Code;
+
+/**
+ * Utility class to translate a Thrift exception into a Bmv2RuntimeException.
+ */
+final class Bmv2TExceptionParser {
+
+    private Bmv2TExceptionParser() {
+        // ban constructor.
+    }
+
+    static Bmv2RuntimeException parseTException(TException cause) {
+        try {
+            return new Bmv2RuntimeException(getCode(cause));
+        } catch (ParserException e) {
+            return new Bmv2RuntimeException(e.codeString);
+        }
+    }
+
+    private static Code getCode(TException e) throws ParserException {
+        if (e instanceof InvalidMcOperation) {
+            switch (((InvalidMcOperation) e).getCode()) {
+                case TABLE_FULL:
+                    return Code.TABLE_FULL;
+                case INVALID_MGID:
+                    return Code.INVALID_MGID;
+                case INVALID_L1_HANDLE:
+                    return Code.INVALID_L1_HANDLE;
+                case INVALID_MGRP_HANDLE:
+                    return Code.INVALID_MGRP_HANDLE;
+                case ERROR:
+                    return Code.MC_GENERAL_ERROR;
+                default:
+                    return Code.MC_UNKNOWN_ERROR;
+            }
+        } else {
+            throw new ParserException(e.toString());
+        }
+    }
+
+    private static class ParserException extends Exception {
+
+        private String codeString;
+
+        public ParserException(String codeString) {
+            this.codeString = codeString;
+        }
+    }
+}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java
new file mode 100644
index 0000000..a510954
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * Most of the code of this class was copied from:
+ * http://liveramp.com/engineering/reconnecting-thrift-client/
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Set;
+
+/**
+ * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It also provides
+ * synchronization between calls over the same transport.
+ */
+final class SafeThriftClient {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
+
+    /**
+     * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
+     */
+    private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
+                                                                           TTransportException.END_OF_FILE,
+                                                                           TTransportException.TIMED_OUT,
+                                                                           TTransportException.UNKNOWN);
+
+    private SafeThriftClient() {
+        // ban constructor.
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient      the client to wrap
+     * @param clientInterface the interface that the client implements
+     * @param options         options that control behavior of the reconnecting client
+     * @param <T>             a class extending TServiceClient
+     * @param <C>             a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
+        Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
+                                                    new Class<?>[]{clientInterface},
+                                                    new ReconnectingClientProxy<T>(baseClient,
+                                                                                   options.getNumRetries(),
+                                                                                   options.getTimeBetweenRetries()));
+
+        return (C) proxyObject;
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient the client to wrap
+     * @param options    options that control behavior of the reconnecting client
+     * @param <T>        a class that extends TServiceClient
+     * @param <C>        a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
+        Class<?>[] interfaces = baseClient.getClass().getInterfaces();
+
+        for (Class<?> iface : interfaces) {
+            if (iface.getSimpleName().equals("Iface")
+                    && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
+                return (C) wrap(baseClient, iface, options);
+            }
+        }
+
+        throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient      the client to wrap
+     * @param clientInterface the interface that the client implements
+     * @param <T>             a class that extends TServiceClient
+     * @param <C>             a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
+        return wrap(baseClient, clientInterface, Options.defaults());
+    }
+
+    /**
+     * Reflectively wraps an already existing Thrift client.
+     *
+     * @param baseClient the client to wrap
+     * @param <T>        a class that extends TServiceClient
+     * @param <C>        a client interface
+     * @return a wrapped client interface
+     */
+    public static <T extends TServiceClient, C> C wrap(T baseClient) {
+        return wrap(baseClient, Options.defaults());
+    }
+
+    /**
+     * Reconnection options for {@link SafeThriftClient}.
+     */
+    public static class Options {
+        private int numRetries;
+        private long timeBetweenRetries;
+
+        /**
+         * Creates new options with the given parameters.
+         *
+         * @param numRetries         the maximum number of times to try reconnecting before giving up and throwing an
+         *                           exception
+         * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
+         */
+        public Options(int numRetries, long timeBetweenRetries) {
+            this.numRetries = numRetries;
+            this.timeBetweenRetries = timeBetweenRetries;
+        }
+
+        private static Options defaults() {
+            return new Options(5, 10000L);
+        }
+
+        private int getNumRetries() {
+            return numRetries;
+        }
+
+        private long getTimeBetweenRetries() {
+            return timeBetweenRetries;
+        }
+    }
+
+    /**
+     * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
+     * reconnect and tries the method again.
+     *
+     * @param <T> a class that extends TServiceClient
+     */
+    private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
+        private final T baseClient;
+        private final TTransport transport;
+        private final int maxRetries;
+        private final long timeBetweenRetries;
+
+        public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
+            this.baseClient = baseClient;
+            this.transport = baseClient.getInputProtocol().getTransport();
+            this.maxRetries = maxRetries;
+            this.timeBetweenRetries = timeBetweenRetries;
+        }
+
+        private void reconnectOrThrowException()
+                throws TTransportException {
+            int errors = 0;
+            try {
+                if (transport.isOpen()) {
+                    transport.close();
+                }
+            } catch (Exception e) {
+                // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
+                // However, such an exception is not advertised by .close(), hence the general-purpose catch.
+                LOG.debug("Exception while closing transport", e);
+            }
+
+            while (errors < maxRetries) {
+                try {
+                    LOG.debug("Attempting to reconnect...");
+                    transport.open();
+                    LOG.debug("Reconnection successful");
+                    break;
+                } catch (TTransportException e) {
+                    LOG.debug("Error while reconnecting:", e);
+                    errors++;
+
+                    if (errors < maxRetries) {
+                        try {
+                            LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
+                            Thread.sleep(timeBetweenRetries);
+                        } catch (InterruptedException e2) {
+                            Thread.currentThread().interrupt();
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }
+
+            if (errors >= maxRetries) {
+                throw new TTransportException("Failed to reconnect");
+            }
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+            // Thrift transport layer is not thread-safe (it's a wrapper on a socket), hence we need locking.
+            synchronized (transport) {
+
+                LOG.debug("Invoking method... > fromThread={}, method={}, args={}",
+                          Thread.currentThread().getId(), method.getName(), args);
+
+                try {
+
+                    return method.invoke(baseClient, args);
+                } catch (InvocationTargetException e) {
+                    if (e.getTargetException() instanceof TTransportException) {
+                        TTransportException cause = (TTransportException) e.getTargetException();
+
+                        if (RESTARTABLE_CAUSES.contains(cause.getType())) {
+                            // Try to reconnect. If fail, a TTransportException will be thrown.
+                            reconnectOrThrowException();
+                            try {
+                                // If here, transport has been successfully open, hence new exceptions will be thrown.
+                                return method.invoke(baseClient, args);
+                            } catch (InvocationTargetException e1) {
+                                LOG.debug("Exception: {}", e1.getTargetException());
+                                throw e1.getTargetException();
+                            }
+                        }
+                    }
+                    // Target exception is neither a TTransportException nor a restartable cause.
+                    LOG.debug("Exception: {}", e.getTargetException());
+                    throw e.getTargetException();
+                }
+            }
+        }
+    }
+}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/package-info.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/package-info.java
new file mode 100644
index 0000000..f273b19
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Package for BMv2 controller.
+ */
+package org.onosproject.drivers.bmv2.ctl;
\ No newline at end of file