ONOS-7251 - Initial implementation of fabric.p4 L2 broadcast feature.
Thrift client cherry-picked from the commit dd5792ac9ee38a702c3128a34224852b5c284687
Change-Id: I989f2b2074485a892195889a7c976b518510da88
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2DeviceThriftClient.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2DeviceThriftClient.java
new file mode 100644
index 0000000..cdcfb0c
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2DeviceThriftClient.java
@@ -0,0 +1,406 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+import org.apache.thrift.TException;
+import org.onosproject.bmv2.thriftapi.SimplePreLAG;
+import org.onosproject.drivers.bmv2.api.Bmv2DeviceAgent;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2PreGroup;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2PreNode;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.drivers.bmv2.impl.Bmv2PreGroupTranslatorImpl;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.drivers.bmv2.ctl.Bmv2TExceptionParser.parseTException;
+
+/**
+ * Implementation of a Thrift client to control a BMv2 device.
+ */
+final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent {
+
+ // FIXME: make context_id arbitrary for each call
+ // See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
+ private static final int CONTEXT_ID = 0;
+ private static final String DEFAULT_LAG_MAP = "";
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private final SimplePreLAG.Iface simplePreLagClient;
+ private final DeviceId deviceId;
+
+ // ban constructor
+ protected Bmv2DeviceThriftClient(DeviceId deviceId, SimplePreLAG.Iface simplePreLagClient) {
+ this.deviceId = deviceId;
+ this.simplePreLagClient = simplePreLagClient;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ @Override
+ public Bmv2PreGroup writePreGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+ log.debug("Creating a multicast group... > deviceId={}, {}", deviceId, preGroup);
+
+ GroupRollbackMachine groupRollbackMachine = new GroupRollbackMachine(preGroup);
+ try {
+ //first create mc group
+ preGroup.setNativeGroupHandle(createMcGroup(preGroup.groupId()));
+ groupRollbackMachine.setState(GroupOperationState.GROUP_CREATED);
+ //create mc nodes
+ createMcNodesOfGroup(preGroup);
+ groupRollbackMachine.setState(GroupOperationState.NODES_CREATED);
+ //associate nodes with group
+ associateMcNodesOfGroup(preGroup);
+ groupRollbackMachine.setState(GroupOperationState.NODES_ASSOCIATED);
+
+ log.debug("Multicast group created successfully. deviceId={}, {}", deviceId, preGroup);
+
+ return preGroup;
+ } finally {
+ groupRollbackMachine.rollbackIfNecessary();
+ }
+ }
+
+ @Override
+ public void deletePreGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+ log.debug("Deleting a multicast group... > deviceId={}, {}", deviceId, preGroup);
+ //disassociate mc nodes from group
+ disassociateMcNodesOfGroup(preGroup);
+ //delete mc nodes
+ deleteMcNodesOfGroup(preGroup);
+ //delete group
+ deleteMcGroup(preGroup);
+
+ log.debug("Multicast group deleted. deviceId={}, {}", deviceId, preGroup);
+ }
+
+ @Override
+ public List<Bmv2PreGroup> getPreGroups() throws Bmv2RuntimeException {
+ try {
+ String entries = simplePreLagClient.bm_mc_get_entries(CONTEXT_ID);
+ return Bmv2PreGroupTranslatorImpl.translate(entries);
+
+ } catch (TException | IOException e) {
+ log.debug("Exception while getting multicast groups. deviceId={}", deviceId, e);
+
+ if (e instanceof TException) {
+ throw parseTException((TException) e);
+ } else {
+ throw new Bmv2RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Creates multicast nodes one by one.
+ * Node handles obtained as the results of node creation operations are stored
+ * in given Bmv2PreGroup object.
+ *
+ * @param preGroup Bmv2PreGroup object
+ * @throws Bmv2RuntimeException
+ */
+ private void createMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+ for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+ node.setL1Handle(createMcNode(node));
+ }
+ }
+
+ /**
+ * Associates multicast nodes with a group one by one.
+ *
+ * @param preGroup Bmv2PreGroup object
+ * @throws Bmv2RuntimeException
+ */
+ private void associateMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+ int nativeGroupHandle = preGroup.nativeGroupHandle();
+ for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+ associateMcNode(nativeGroupHandle, node);
+ }
+ }
+
+ /**
+ * Deletes multicast nodes one by one.
+ *
+ * @param preGroup Bmv2PreGroup object
+ * @throws Bmv2RuntimeException
+ */
+ private void deleteMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+ for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+ destroyMcNode(node);
+ }
+ }
+
+ /**
+ * Disassociates multicast nodes from a group one by one.
+ *
+ * @param preGroup Bmv2PreGroup object
+ * @throws Bmv2RuntimeException
+ */
+ private void disassociateMcNodesOfGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+ int nativeGroupHandle = preGroup.nativeGroupHandle();
+ for (Bmv2PreNode node : preGroup.nodes().nodes()) {
+ disassociateMcNode(nativeGroupHandle, node);
+ }
+ }
+
+
+ /**
+ * Creates a multicast group with specified group Id.
+ *
+ * @param groupId identifier of a group
+ * @return group handle (BMv2 specific identifier associated with the group)
+ * @throws Bmv2RuntimeException
+ */
+ private int createMcGroup(int groupId) throws Bmv2RuntimeException {
+ log.debug("Creating the multicast group... > deviceId={}, groupId={}", deviceId, groupId);
+ try {
+ return simplePreLagClient.bm_mc_mgrp_create(CONTEXT_ID, groupId);
+ } catch (TException e) {
+ log.debug("Exception during creating multicast group. deviceId={}, groupId={}", deviceId, groupId);
+ throw parseTException(e);
+ }
+ }
+
+ /**
+ * Deletes a multicast group from a BMv2 device.
+ *
+ * @param preGroup
+ * @throws Bmv2RuntimeException
+ */
+ private void deleteMcGroup(Bmv2PreGroup preGroup) throws Bmv2RuntimeException {
+ log.debug("Destroying the multicast group... > deviceId={}, groupId={}, groupHandle={}",
+ deviceId, preGroup.groupId(), preGroup.nativeGroupHandle());
+ try {
+ simplePreLagClient.bm_mc_mgrp_destroy(CONTEXT_ID, preGroup.nativeGroupHandle());
+ } catch (TException e) {
+ log.debug("Exception during destroying multicast group. deviceId={}, groupId={}, groupHandle={}",
+ deviceId, preGroup.groupId(), preGroup.nativeGroupHandle());
+ throw parseTException(e);
+ }
+ }
+
+ /**
+ * Creates a multicast node on the BMv2 device.
+ *
+ * @param node Bmv2PreNode
+ * @return L1 handle
+ * @throws Bmv2RuntimeException
+ */
+ private int createMcNode(Bmv2PreNode node) throws Bmv2RuntimeException {
+ log.debug("Creating the multicast node... > deviceId={}, {}", deviceId, node);
+ try {
+ return simplePreLagClient.bm_mc_node_create(CONTEXT_ID, node.rid(), node.portMap(), DEFAULT_LAG_MAP);
+ } catch (TException e) {
+ log.debug("Exception during creating multicast node: {}", node);
+ throw parseTException(e);
+ }
+ }
+
+ /**
+ * Associates a multicast node with a group.
+ *
+ * @param groupHandle handle of the group that the node will be associated with
+ * @param node Bmv2PreNode
+ * @throws Bmv2RuntimeException
+ */
+ private void associateMcNode(int groupHandle, Bmv2PreNode node) throws Bmv2RuntimeException {
+ log.debug("Associating the multicast node with the group... > deviceId={}, groupHandle:{}, node:{}",
+ deviceId, groupHandle, node);
+ try {
+ simplePreLagClient.bm_mc_node_associate(CONTEXT_ID, groupHandle, node.l1Handle());
+ } catch (TException e) {
+ log.debug("Exception during associating multicast node with group. deviceId={} groupHandle:{}, node:{}",
+ deviceId, groupHandle, node);
+ throw parseTException(e);
+ }
+ }
+
+ /**
+ * Disassociates a multicast node from a group.
+ *
+ * @param groupHandle handle of the group that the node will be disassociated from
+ * @param node Bmv2PreNode
+ * @throws Bmv2RuntimeException
+ */
+ private void disassociateMcNode(int groupHandle, Bmv2PreNode node) throws Bmv2RuntimeException {
+ log.debug("Disassociating the multicast node from the group... > deviceId={}, groupHandle:{}, node:{}",
+ deviceId, groupHandle, node);
+ try {
+ simplePreLagClient.bm_mc_node_dissociate(CONTEXT_ID, groupHandle, node.l1Handle());
+ } catch (TException e) {
+ log.debug("Failed to disassociate multicast node from group. deviceId={} groupHandle:{}, node:{}",
+ deviceId, groupHandle, node);
+ throw parseTException(e);
+ }
+ }
+
+ /**
+ * Destroys the multicast node in a BMv2 device.
+ *
+ * @param node PRE node which is about to be destroyed
+ * @throws Bmv2RuntimeException
+ */
+ private void destroyMcNode(Bmv2PreNode node) throws Bmv2RuntimeException {
+ log.debug("Destroying the multicast node... > deviceId={}, node:{}", deviceId, node);
+ try {
+ simplePreLagClient.bm_mc_node_destroy(CONTEXT_ID, node.l1Handle());
+ } catch (TException e) {
+ log.debug("Exception during destroying multicast node. deviceId={}, node:{}", deviceId, node);
+ throw parseTException(e);
+ }
+ }
+
+
+ /**
+ * Defines identifiers of main group operation steps.
+ */
+ private enum GroupOperationState {
+ IDLE, // nothing has been done
+ GROUP_CREATED, //the last successful step is group creation
+ NODES_CREATED, //the last successful step is node creation
+ NODES_ASSOCIATED //the last successful step is node association.
+ }
+
+ /**
+ * Implementation of a simple state machine to keep track of complex (non-atomic) operations on groups and
+ * to execute essential rollback steps accordingly.
+ * For example, creating a multicast group is composed of multiple steps:
+ * 1- Group creation
+ * 2- Node creation
+ * 3- Node association
+ * Each step associates with a GroupOperationState to keep track of group creation operation.
+ * A rollback flow is executed with respect to the current state.
+ */
+ private class GroupRollbackMachine {
+ Bmv2PreGroup preGroup;
+ //indicates the last successful step
+ GroupOperationState state = GroupOperationState.IDLE;
+
+ private GroupRollbackMachine() {
+ //hidden constructor
+ }
+
+ public GroupRollbackMachine(Bmv2PreGroup preGroup) {
+ this.preGroup = preGroup;
+ }
+
+ GroupOperationState state() {
+ return state;
+ }
+
+ void setState(GroupOperationState state) {
+ this.state = checkNotNull(state);
+ }
+
+ /**
+ * Checks the state and executes necessary rollback flow if necessary.
+ */
+ void rollbackIfNecessary() {
+ switch (state) {
+ case GROUP_CREATED:
+ //means node creation failed. Delete already created nodes and the group
+ onGroupCreated();
+ break;
+ case NODES_CREATED:
+ //means node association failed.
+ //Disassociate already associated nodes then delete nodes and the group.
+ onNodesCreated();
+ break;
+ default:
+ //do nothing in IDLE and NODES_ASSOCIATED states. They do not signify a failure.
+ break;
+ }
+ }
+
+ /**
+ * Executes necessary steps in case the last successful step is group creation.
+ * This means one of the node creation operations has been failed and all previous steps should rollback.
+ */
+ private void onGroupCreated() {
+ log.warn("One of the steps of mc group creation operation has been failed." +
+ "Rolling back in state {}...> deviceId={}, groupId={}",
+ state, deviceId, preGroup.groupId());
+ deleteNodes(preGroup);
+ deleteGroup(preGroup);
+ }
+
+ /**
+ * Executes necessary steps in case the last successful step is node creation.
+ * This means one of the node association operations has been failed and all previous steps should rollback.
+ */
+ private void onNodesCreated() {
+ log.warn("One of the steps of mc group creation operation has been failed." +
+ "Rolling back in state {}...> deviceId={}, groupId={}",
+ state, deviceId, preGroup.groupId());
+ disassociateNodes(preGroup);
+ deleteNodes(preGroup);
+ deleteGroup(preGroup);
+ }
+
+ /**
+ * Deletes a group in the scope of rollback operation.
+ */
+ private void deleteGroup(Bmv2PreGroup preGroup) {
+ try {
+ deleteMcGroup(preGroup);
+ } catch (Bmv2RuntimeException e) {
+ log.error("Unable to destroy multicast group in the scope of rollback operation." +
+ "deviceId={}, groupId={}", deviceId, preGroup.groupId());
+ }
+ }
+
+
+ /**
+ * Disassociates all nodes from their group in the scope of rollback operation.
+ */
+ private void disassociateNodes(Bmv2PreGroup preGroup) {
+ preGroup.nodes().nodes().forEach(node -> {
+ try {
+ disassociateMcNode(preGroup.nativeGroupHandle(), node);
+ } catch (Bmv2RuntimeException e) {
+ log.error("Unable to disassociate the node in the scope of rollback operation." +
+ "deviceId={}, groupHandle={}, l1Handle={}",
+ deviceId, preGroup.nativeGroupHandle(), node.l1Handle(), e);
+ }
+ });
+ }
+
+ /**
+ * Deletes all nodes of a group in the scope of rollback operation.
+ */
+ private void deleteNodes(Bmv2PreGroup preGroup) {
+ //filter created nodes and destroy them
+ preGroup.nodes().nodes().stream()
+ .filter(node -> node.l1Handle() != null)
+ .forEach(node -> {
+ try {
+ destroyMcNode(node);
+ } catch (Bmv2RuntimeException e) {
+ log.error("Unable to destroy the node in the scope of rollback operation." +
+ "deviceId={}, l1Handle={}", deviceId, node.l1Handle(), e);
+ }
+ });
+ }
+ }
+}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2PreControllerImpl.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2PreControllerImpl.java
new file mode 100644
index 0000000..4c80e79
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2PreControllerImpl.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TMultiplexedProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.onlab.util.Tools;
+import org.onosproject.bmv2.thriftapi.SimplePreLAG;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.drivers.bmv2.api.Bmv2DeviceAgent;
+import org.onosproject.drivers.bmv2.api.Bmv2PreController;
+import org.onosproject.net.DeviceId;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * BMv2 PRE controller implementation.
+ */
+@Component(immediate = true)
+@Service
+public class Bmv2PreControllerImpl implements Bmv2PreController {
+
+ private static final int DEVICE_LOCK_CACHE_EXPIRE_TIME_IN_MIN = 10;
+ private static final int DEVICE_LOCK_WAITING_TIME_IN_SEC = 60;
+ private static final int DEFAULT_NUM_CONNECTION_RETRIES = 2;
+ private static final int DEFAULT_TIME_BETWEEN_RETRIES = 10;
+ private static final String THRIFT_SERVICE_NAME = "simple_pre_lag";
+ private final Logger log = getLogger(getClass());
+ private final Map<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> clients = Maps.newHashMap();
+ //TODO consider a timeout mechanism for locks to limit the retention time
+ private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
+ .expireAfterAccess(DEVICE_LOCK_CACHE_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
+ .build(new CacheLoader<DeviceId, ReadWriteLock>() {
+ @Override
+ public ReadWriteLock load(DeviceId deviceId) {
+ return new ReentrantReadWriteLock();
+ }
+ });
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+ @Property(name = "numConnectionRetries", intValue = DEFAULT_NUM_CONNECTION_RETRIES,
+ label = "Number of connection retries after a network error")
+ private int numConnectionRetries = DEFAULT_NUM_CONNECTION_RETRIES;
+ @Property(name = "timeBetweenRetries", intValue = DEFAULT_TIME_BETWEEN_RETRIES,
+ label = "Time between retries in milliseconds")
+ private int timeBetweenRetries = DEFAULT_TIME_BETWEEN_RETRIES;
+ @Property(name = "deviceLockWaitingTime", intValue = DEVICE_LOCK_WAITING_TIME_IN_SEC,
+ label = "Waiting time for a read/write lock in seconds")
+ private int deviceLockWaitingTime = DEVICE_LOCK_WAITING_TIME_IN_SEC;
+
+ @Activate
+ public void activate() {
+ cfgService.registerProperties(getClass());
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ cfgService.unregisterProperties(getClass(), false);
+ log.info("Stopped");
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+
+ Integer newNumConnRetries = Tools.getIntegerProperty(properties, "numConnectionRetries");
+ if (newNumConnRetries != null && newNumConnRetries >= 0) {
+ numConnectionRetries = newNumConnRetries;
+ } else {
+ log.warn("numConnectionRetries must be equal to or greater than 0");
+ }
+
+ Integer newTimeBtwRetries = Tools.getIntegerProperty(properties, "timeBetweenRetries");
+ if (newTimeBtwRetries != null && newTimeBtwRetries >= 0) {
+ timeBetweenRetries = newTimeBtwRetries;
+ } else {
+ log.warn("timeBetweenRetries must be equal to or greater than 0");
+ }
+
+ Integer newDeviceLockWaitingTime = Tools.getIntegerProperty(properties, "deviceLockWaitingTime");
+ if (newDeviceLockWaitingTime != null && newDeviceLockWaitingTime >= 0) {
+ deviceLockWaitingTime = newDeviceLockWaitingTime;
+ } else {
+ log.warn("deviceLockWaitingTime must be equal to or greater than 0");
+ }
+ }
+
+ @Override
+ public boolean createPreClient(DeviceId deviceId, String thriftServerIp, Integer thriftServerPort) {
+ checkNotNull(deviceId);
+ checkNotNull(thriftServerIp);
+ checkNotNull(thriftServerPort);
+
+ if (!acquireWriteLock(deviceId)) {
+ //reason is already logged during the lock acquisition
+ return false;
+ }
+
+ log.info("Creating PRE client for {} through Thrift server {}:{}", deviceId, thriftServerIp, thriftServerPort);
+
+ try {
+ if (clients.containsKey(deviceId)) {
+ throw new IllegalStateException(format("A client already exists for %s", deviceId));
+ } else {
+ return doCreateClient(deviceId, thriftServerIp, thriftServerPort);
+ }
+ } finally {
+ releaseWriteLock(deviceId);
+ }
+ }
+
+ @Override
+ public Bmv2DeviceAgent getPreClient(DeviceId deviceId) {
+ if (!acquireReadLock(deviceId)) {
+ return null;
+ }
+ try {
+ return clients.containsKey(deviceId) ? clients.get(deviceId).getRight() : null;
+ } finally {
+ releaseReadLock(deviceId);
+ }
+ }
+
+ @Override
+ public void removePreClient(DeviceId deviceId) {
+ if (!acquireWriteLock(deviceId)) {
+ //reason is already logged during the lock acquisition
+ return;
+ }
+
+ try {
+ if (clients.containsKey(deviceId)) {
+ TTransport transport = clients.get(deviceId).getLeft();
+ if (transport.isOpen()) {
+ transport.close();
+ }
+ clients.remove(deviceId);
+ }
+ } finally {
+ releaseWriteLock(deviceId);
+ }
+ }
+
+ private boolean acquireWriteLock(DeviceId deviceId) {
+ try {
+ return deviceLocks.getUnchecked(deviceId).writeLock().tryLock(deviceLockWaitingTime, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Unable to acquire write lock for device {} due to {}", deviceId, e.toString());
+ }
+ return false;
+ }
+
+ private boolean acquireReadLock(DeviceId deviceId) {
+ try {
+ return deviceLocks.getUnchecked(deviceId).readLock().tryLock(deviceLockWaitingTime, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Unable to acquire read lock for device {} due to {}", deviceId, e.toString());
+ }
+ return false;
+ }
+
+ private void releaseWriteLock(DeviceId deviceId) {
+ deviceLocks.getUnchecked(deviceId).writeLock().unlock();
+ }
+
+ private void releaseReadLock(DeviceId deviceId) {
+ deviceLocks.getUnchecked(deviceId).readLock().unlock();
+ }
+
+ private boolean doCreateClient(DeviceId deviceId, String thriftServerIp, Integer thriftServerPort) {
+ SafeThriftClient.Options options = new SafeThriftClient.Options(numConnectionRetries, timeBetweenRetries);
+
+ try {
+ // Make the expensive call
+ TTransport transport = new TSocket(thriftServerIp, thriftServerPort);
+
+ TProtocol protocol = new TBinaryProtocol(transport);
+ // Create a client for simple_pre service.
+ SimplePreLAG.Client simplePreClient = new SimplePreLAG.Client(
+ new TMultiplexedProtocol(protocol, THRIFT_SERVICE_NAME));
+
+ SimplePreLAG.Iface safeSimplePreClient = SafeThriftClient.wrap(simplePreClient,
+ SimplePreLAG.Iface.class,
+ options);
+
+ Bmv2DeviceThriftClient client = new Bmv2DeviceThriftClient(deviceId, safeSimplePreClient);
+ clients.put(deviceId, Pair.of(transport, client));
+ return true;
+
+ } catch (RuntimeException e) {
+ log.warn("Failed to create Thrift client for BMv2 device. deviceId={}, cause={}", deviceId, e);
+ return false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2TExceptionParser.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2TExceptionParser.java
new file mode 100644
index 0000000..b765eca
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/Bmv2TExceptionParser.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+
+import org.apache.thrift.TException;
+import org.onosproject.bmv2.thriftapi.InvalidMcOperation;
+import org.onosproject.drivers.bmv2.api.runtime.Bmv2RuntimeException;
+
+import static org.onosproject.drivers.bmv2.api.runtime.Bmv2RuntimeException.Code;
+
+/**
+ * Utility class to translate a Thrift exception into a Bmv2RuntimeException.
+ */
+final class Bmv2TExceptionParser {
+
+ private Bmv2TExceptionParser() {
+ // ban constructor.
+ }
+
+ static Bmv2RuntimeException parseTException(TException cause) {
+ try {
+ return new Bmv2RuntimeException(getCode(cause));
+ } catch (ParserException e) {
+ return new Bmv2RuntimeException(e.codeString);
+ }
+ }
+
+ private static Code getCode(TException e) throws ParserException {
+ if (e instanceof InvalidMcOperation) {
+ switch (((InvalidMcOperation) e).getCode()) {
+ case TABLE_FULL:
+ return Code.TABLE_FULL;
+ case INVALID_MGID:
+ return Code.INVALID_MGID;
+ case INVALID_L1_HANDLE:
+ return Code.INVALID_L1_HANDLE;
+ case INVALID_MGRP_HANDLE:
+ return Code.INVALID_MGRP_HANDLE;
+ case ERROR:
+ return Code.MC_GENERAL_ERROR;
+ default:
+ return Code.MC_UNKNOWN_ERROR;
+ }
+ } else {
+ throw new ParserException(e.toString());
+ }
+ }
+
+ private static class ParserException extends Exception {
+
+ private String codeString;
+
+ public ParserException(String codeString) {
+ this.codeString = codeString;
+ }
+ }
+}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java
new file mode 100644
index 0000000..a510954
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/SafeThriftClient.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * Most of the code of this class was copied from:
+ * http://liveramp.com/engineering/reconnecting-thrift-client/
+ */
+
+package org.onosproject.drivers.bmv2.ctl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.thrift.TServiceClient;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Set;
+
+/**
+ * Thrift client wrapper that attempts a few reconnects before giving up a method call execution. It also provides
+ * synchronization between calls over the same transport.
+ */
+final class SafeThriftClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SafeThriftClient.class);
+
+ /**
+ * List of causes which suggest a restart might fix things (defined as constants in {@link TTransportException}).
+ */
+ private static final Set<Integer> RESTARTABLE_CAUSES = ImmutableSet.of(TTransportException.NOT_OPEN,
+ TTransportException.END_OF_FILE,
+ TTransportException.TIMED_OUT,
+ TTransportException.UNKNOWN);
+
+ private SafeThriftClient() {
+ // ban constructor.
+ }
+
+ /**
+ * Reflectively wraps an already existing Thrift client.
+ *
+ * @param baseClient the client to wrap
+ * @param clientInterface the interface that the client implements
+ * @param options options that control behavior of the reconnecting client
+ * @param <T> a class extending TServiceClient
+ * @param <C> a client interface
+ * @return a wrapped client interface
+ */
+ public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface, Options options) {
+ Object proxyObject = Proxy.newProxyInstance(clientInterface.getClassLoader(),
+ new Class<?>[]{clientInterface},
+ new ReconnectingClientProxy<T>(baseClient,
+ options.getNumRetries(),
+ options.getTimeBetweenRetries()));
+
+ return (C) proxyObject;
+ }
+
+ /**
+ * Reflectively wraps an already existing Thrift client.
+ *
+ * @param baseClient the client to wrap
+ * @param options options that control behavior of the reconnecting client
+ * @param <T> a class that extends TServiceClient
+ * @param <C> a client interface
+ * @return a wrapped client interface
+ */
+ public static <T extends TServiceClient, C> C wrap(T baseClient, Options options) {
+ Class<?>[] interfaces = baseClient.getClass().getInterfaces();
+
+ for (Class<?> iface : interfaces) {
+ if (iface.getSimpleName().equals("Iface")
+ && iface.getEnclosingClass().equals(baseClient.getClass().getEnclosingClass())) {
+ return (C) wrap(baseClient, iface, options);
+ }
+ }
+
+ throw new RuntimeException("Class needs to implement Iface directly. Use wrap(TServiceClient, Class) instead.");
+ }
+
+ /**
+ * Reflectively wraps an already existing Thrift client.
+ *
+ * @param baseClient the client to wrap
+ * @param clientInterface the interface that the client implements
+ * @param <T> a class that extends TServiceClient
+ * @param <C> a client interface
+ * @return a wrapped client interface
+ */
+ public static <T extends TServiceClient, C> C wrap(T baseClient, Class<C> clientInterface) {
+ return wrap(baseClient, clientInterface, Options.defaults());
+ }
+
+ /**
+ * Reflectively wraps an already existing Thrift client.
+ *
+ * @param baseClient the client to wrap
+ * @param <T> a class that extends TServiceClient
+ * @param <C> a client interface
+ * @return a wrapped client interface
+ */
+ public static <T extends TServiceClient, C> C wrap(T baseClient) {
+ return wrap(baseClient, Options.defaults());
+ }
+
+ /**
+ * Reconnection options for {@link SafeThriftClient}.
+ */
+ public static class Options {
+ private int numRetries;
+ private long timeBetweenRetries;
+
+ /**
+ * Creates new options with the given parameters.
+ *
+ * @param numRetries the maximum number of times to try reconnecting before giving up and throwing an
+ * exception
+ * @param timeBetweenRetries the number of milliseconds to wait in between reconnection attempts.
+ */
+ public Options(int numRetries, long timeBetweenRetries) {
+ this.numRetries = numRetries;
+ this.timeBetweenRetries = timeBetweenRetries;
+ }
+
+ private static Options defaults() {
+ return new Options(5, 10000L);
+ }
+
+ private int getNumRetries() {
+ return numRetries;
+ }
+
+ private long getTimeBetweenRetries() {
+ return timeBetweenRetries;
+ }
+ }
+
+ /**
+ * Helper proxy class. Attempts to call method on proxy object wrapped in try/catch. If it fails, it attempts a
+ * reconnect and tries the method again.
+ *
+ * @param <T> a class that extends TServiceClient
+ */
+ private static class ReconnectingClientProxy<T extends TServiceClient> implements InvocationHandler {
+ private final T baseClient;
+ private final TTransport transport;
+ private final int maxRetries;
+ private final long timeBetweenRetries;
+
+ public ReconnectingClientProxy(T baseClient, int maxRetries, long timeBetweenRetries) {
+ this.baseClient = baseClient;
+ this.transport = baseClient.getInputProtocol().getTransport();
+ this.maxRetries = maxRetries;
+ this.timeBetweenRetries = timeBetweenRetries;
+ }
+
+ private void reconnectOrThrowException()
+ throws TTransportException {
+ int errors = 0;
+ try {
+ if (transport.isOpen()) {
+ transport.close();
+ }
+ } catch (Exception e) {
+ // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
+ // However, such an exception is not advertised by .close(), hence the general-purpose catch.
+ LOG.debug("Exception while closing transport", e);
+ }
+
+ while (errors < maxRetries) {
+ try {
+ LOG.debug("Attempting to reconnect...");
+ transport.open();
+ LOG.debug("Reconnection successful");
+ break;
+ } catch (TTransportException e) {
+ LOG.debug("Error while reconnecting:", e);
+ errors++;
+
+ if (errors < maxRetries) {
+ try {
+ LOG.debug("Sleeping for {} milliseconds before retrying", timeBetweenRetries);
+ Thread.sleep(timeBetweenRetries);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ if (errors >= maxRetries) {
+ throw new TTransportException("Failed to reconnect");
+ }
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+
+ // Thrift transport layer is not thread-safe (it's a wrapper on a socket), hence we need locking.
+ synchronized (transport) {
+
+ LOG.debug("Invoking method... > fromThread={}, method={}, args={}",
+ Thread.currentThread().getId(), method.getName(), args);
+
+ try {
+
+ return method.invoke(baseClient, args);
+ } catch (InvocationTargetException e) {
+ if (e.getTargetException() instanceof TTransportException) {
+ TTransportException cause = (TTransportException) e.getTargetException();
+
+ if (RESTARTABLE_CAUSES.contains(cause.getType())) {
+ // Try to reconnect. If fail, a TTransportException will be thrown.
+ reconnectOrThrowException();
+ try {
+ // If here, transport has been successfully open, hence new exceptions will be thrown.
+ return method.invoke(baseClient, args);
+ } catch (InvocationTargetException e1) {
+ LOG.debug("Exception: {}", e1.getTargetException());
+ throw e1.getTargetException();
+ }
+ }
+ }
+ // Target exception is neither a TTransportException nor a restartable cause.
+ LOG.debug("Exception: {}", e.getTargetException());
+ throw e.getTargetException();
+ }
+ }
+ }
+ }
+}
diff --git a/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/package-info.java b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/package-info.java
new file mode 100644
index 0000000..f273b19
--- /dev/null
+++ b/drivers/bmv2/src/main/java/org/onosproject/drivers/bmv2/ctl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Package for BMv2 controller.
+ */
+package org.onosproject.drivers.bmv2.ctl;
\ No newline at end of file