blob: 4c80e79f694b51dc6c32278757296518cf4dbf2a [file] [log] [blame]
Esin Karaman971fb7f2017-12-28 13:44:52 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18package org.onosproject.drivers.bmv2.ctl;
19
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.CacheLoader;
22import com.google.common.cache.LoadingCache;
23import com.google.common.collect.Maps;
24import org.apache.commons.lang3.tuple.Pair;
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Modified;
29import org.apache.felix.scr.annotations.Property;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
33import org.apache.thrift.protocol.TBinaryProtocol;
34import org.apache.thrift.protocol.TMultiplexedProtocol;
35import org.apache.thrift.protocol.TProtocol;
36import org.apache.thrift.transport.TSocket;
37import org.apache.thrift.transport.TTransport;
38import org.onlab.util.Tools;
39import org.onosproject.bmv2.thriftapi.SimplePreLAG;
40import org.onosproject.cfg.ComponentConfigService;
41import org.onosproject.drivers.bmv2.api.Bmv2DeviceAgent;
42import org.onosproject.drivers.bmv2.api.Bmv2PreController;
43import org.onosproject.net.DeviceId;
44import org.osgi.service.component.ComponentContext;
45import org.slf4j.Logger;
46
47import java.util.Dictionary;
48import java.util.Map;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.locks.ReadWriteLock;
51import java.util.concurrent.locks.ReentrantReadWriteLock;
52
53import static com.google.common.base.Preconditions.checkNotNull;
54import static java.lang.String.format;
55import static org.slf4j.LoggerFactory.getLogger;
56
57/**
58 * BMv2 PRE controller implementation.
59 */
60@Component(immediate = true)
61@Service
62public class Bmv2PreControllerImpl implements Bmv2PreController {
63
64 private static final int DEVICE_LOCK_CACHE_EXPIRE_TIME_IN_MIN = 10;
65 private static final int DEVICE_LOCK_WAITING_TIME_IN_SEC = 60;
66 private static final int DEFAULT_NUM_CONNECTION_RETRIES = 2;
67 private static final int DEFAULT_TIME_BETWEEN_RETRIES = 10;
68 private static final String THRIFT_SERVICE_NAME = "simple_pre_lag";
69 private final Logger log = getLogger(getClass());
70 private final Map<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> clients = Maps.newHashMap();
71 //TODO consider a timeout mechanism for locks to limit the retention time
72 private final LoadingCache<DeviceId, ReadWriteLock> deviceLocks = CacheBuilder.newBuilder()
73 .expireAfterAccess(DEVICE_LOCK_CACHE_EXPIRE_TIME_IN_MIN, TimeUnit.MINUTES)
74 .build(new CacheLoader<DeviceId, ReadWriteLock>() {
75 @Override
76 public ReadWriteLock load(DeviceId deviceId) {
77 return new ReentrantReadWriteLock();
78 }
79 });
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected ComponentConfigService cfgService;
82 @Property(name = "numConnectionRetries", intValue = DEFAULT_NUM_CONNECTION_RETRIES,
83 label = "Number of connection retries after a network error")
84 private int numConnectionRetries = DEFAULT_NUM_CONNECTION_RETRIES;
85 @Property(name = "timeBetweenRetries", intValue = DEFAULT_TIME_BETWEEN_RETRIES,
86 label = "Time between retries in milliseconds")
87 private int timeBetweenRetries = DEFAULT_TIME_BETWEEN_RETRIES;
88 @Property(name = "deviceLockWaitingTime", intValue = DEVICE_LOCK_WAITING_TIME_IN_SEC,
89 label = "Waiting time for a read/write lock in seconds")
90 private int deviceLockWaitingTime = DEVICE_LOCK_WAITING_TIME_IN_SEC;
91
92 @Activate
93 public void activate() {
94 cfgService.registerProperties(getClass());
95 log.info("Started");
96 }
97
98 @Deactivate
99 public void deactivate() {
100 cfgService.unregisterProperties(getClass(), false);
101 log.info("Stopped");
102 }
103
104 @Modified
105 protected void modified(ComponentContext context) {
106 Dictionary<?, ?> properties = context.getProperties();
107
108 Integer newNumConnRetries = Tools.getIntegerProperty(properties, "numConnectionRetries");
109 if (newNumConnRetries != null && newNumConnRetries >= 0) {
110 numConnectionRetries = newNumConnRetries;
111 } else {
112 log.warn("numConnectionRetries must be equal to or greater than 0");
113 }
114
115 Integer newTimeBtwRetries = Tools.getIntegerProperty(properties, "timeBetweenRetries");
116 if (newTimeBtwRetries != null && newTimeBtwRetries >= 0) {
117 timeBetweenRetries = newTimeBtwRetries;
118 } else {
119 log.warn("timeBetweenRetries must be equal to or greater than 0");
120 }
121
122 Integer newDeviceLockWaitingTime = Tools.getIntegerProperty(properties, "deviceLockWaitingTime");
123 if (newDeviceLockWaitingTime != null && newDeviceLockWaitingTime >= 0) {
124 deviceLockWaitingTime = newDeviceLockWaitingTime;
125 } else {
126 log.warn("deviceLockWaitingTime must be equal to or greater than 0");
127 }
128 }
129
130 @Override
131 public boolean createPreClient(DeviceId deviceId, String thriftServerIp, Integer thriftServerPort) {
132 checkNotNull(deviceId);
133 checkNotNull(thriftServerIp);
134 checkNotNull(thriftServerPort);
135
136 if (!acquireWriteLock(deviceId)) {
137 //reason is already logged during the lock acquisition
138 return false;
139 }
140
141 log.info("Creating PRE client for {} through Thrift server {}:{}", deviceId, thriftServerIp, thriftServerPort);
142
143 try {
144 if (clients.containsKey(deviceId)) {
145 throw new IllegalStateException(format("A client already exists for %s", deviceId));
146 } else {
147 return doCreateClient(deviceId, thriftServerIp, thriftServerPort);
148 }
149 } finally {
150 releaseWriteLock(deviceId);
151 }
152 }
153
154 @Override
155 public Bmv2DeviceAgent getPreClient(DeviceId deviceId) {
156 if (!acquireReadLock(deviceId)) {
157 return null;
158 }
159 try {
160 return clients.containsKey(deviceId) ? clients.get(deviceId).getRight() : null;
161 } finally {
162 releaseReadLock(deviceId);
163 }
164 }
165
166 @Override
167 public void removePreClient(DeviceId deviceId) {
168 if (!acquireWriteLock(deviceId)) {
169 //reason is already logged during the lock acquisition
170 return;
171 }
172
173 try {
174 if (clients.containsKey(deviceId)) {
175 TTransport transport = clients.get(deviceId).getLeft();
176 if (transport.isOpen()) {
177 transport.close();
178 }
179 clients.remove(deviceId);
180 }
181 } finally {
182 releaseWriteLock(deviceId);
183 }
184 }
185
186 private boolean acquireWriteLock(DeviceId deviceId) {
187 try {
188 return deviceLocks.getUnchecked(deviceId).writeLock().tryLock(deviceLockWaitingTime, TimeUnit.SECONDS);
189 } catch (InterruptedException e) {
190 log.error("Unable to acquire write lock for device {} due to {}", deviceId, e.toString());
191 }
192 return false;
193 }
194
195 private boolean acquireReadLock(DeviceId deviceId) {
196 try {
197 return deviceLocks.getUnchecked(deviceId).readLock().tryLock(deviceLockWaitingTime, TimeUnit.SECONDS);
198 } catch (InterruptedException e) {
199 log.error("Unable to acquire read lock for device {} due to {}", deviceId, e.toString());
200 }
201 return false;
202 }
203
204 private void releaseWriteLock(DeviceId deviceId) {
205 deviceLocks.getUnchecked(deviceId).writeLock().unlock();
206 }
207
208 private void releaseReadLock(DeviceId deviceId) {
209 deviceLocks.getUnchecked(deviceId).readLock().unlock();
210 }
211
212 private boolean doCreateClient(DeviceId deviceId, String thriftServerIp, Integer thriftServerPort) {
213 SafeThriftClient.Options options = new SafeThriftClient.Options(numConnectionRetries, timeBetweenRetries);
214
215 try {
216 // Make the expensive call
217 TTransport transport = new TSocket(thriftServerIp, thriftServerPort);
218
219 TProtocol protocol = new TBinaryProtocol(transport);
220 // Create a client for simple_pre service.
221 SimplePreLAG.Client simplePreClient = new SimplePreLAG.Client(
222 new TMultiplexedProtocol(protocol, THRIFT_SERVICE_NAME));
223
224 SimplePreLAG.Iface safeSimplePreClient = SafeThriftClient.wrap(simplePreClient,
225 SimplePreLAG.Iface.class,
226 options);
227
228 Bmv2DeviceThriftClient client = new Bmv2DeviceThriftClient(deviceId, safeSimplePreClient);
229 clients.put(deviceId, Pair.of(transport, client));
230 return true;
231
232 } catch (RuntimeException e) {
233 log.warn("Failed to create Thrift client for BMv2 device. deviceId={}, cause={}", deviceId, e);
234 return false;
235 }
236 }
237}