yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2016-present Open Networking Foundation |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.onosproject.incubator.store.virtual.impl; |
| 18 | |
| 19 | import com.google.common.cache.Cache; |
| 20 | import com.google.common.cache.CacheBuilder; |
| 21 | import com.google.common.cache.RemovalListener; |
| 22 | import com.google.common.cache.RemovalNotification; |
| 23 | import com.google.common.collect.FluentIterable; |
| 24 | import com.google.common.collect.Sets; |
| 25 | import com.google.common.util.concurrent.SettableFuture; |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 26 | import org.onlab.util.Tools; |
| 27 | import org.onosproject.incubator.net.virtual.NetworkId; |
| 28 | import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore; |
| 29 | import org.onosproject.net.DeviceId; |
| 30 | import org.onosproject.net.flow.CompletedBatchOperation; |
| 31 | import org.onosproject.net.flow.DefaultFlowEntry; |
| 32 | import org.onosproject.net.flow.FlowEntry; |
| 33 | import org.onosproject.net.flow.FlowId; |
| 34 | import org.onosproject.net.flow.FlowRule; |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 35 | import org.onosproject.net.flow.FlowRuleEvent; |
| 36 | import org.onosproject.net.flow.FlowRuleStoreDelegate; |
| 37 | import org.onosproject.net.flow.StoredFlowEntry; |
| 38 | import org.onosproject.net.flow.TableStatisticsEntry; |
Ray Milkey | d84f89b | 2018-08-17 14:54:17 -0700 | [diff] [blame] | 39 | import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry; |
| 40 | import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent; |
| 41 | import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation; |
| 42 | import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest; |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 43 | import org.onosproject.store.service.StorageService; |
| 44 | import org.osgi.service.component.ComponentContext; |
Ray Milkey | d84f89b | 2018-08-17 14:54:17 -0700 | [diff] [blame] | 45 | import org.osgi.service.component.annotations.Activate; |
| 46 | import org.osgi.service.component.annotations.Component; |
| 47 | import org.osgi.service.component.annotations.Deactivate; |
| 48 | import org.osgi.service.component.annotations.Modified; |
| 49 | import org.osgi.service.component.annotations.Reference; |
| 50 | import org.osgi.service.component.annotations.ReferenceCardinality; |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 51 | import org.slf4j.Logger; |
| 52 | |
| 53 | import java.util.ArrayList; |
| 54 | import java.util.Collections; |
| 55 | import java.util.Dictionary; |
| 56 | import java.util.List; |
| 57 | import java.util.concurrent.ConcurrentHashMap; |
| 58 | import java.util.concurrent.ConcurrentMap; |
| 59 | import java.util.concurrent.CopyOnWriteArrayList; |
| 60 | import java.util.concurrent.ExecutionException; |
| 61 | import java.util.concurrent.TimeUnit; |
| 62 | import java.util.concurrent.TimeoutException; |
| 63 | import java.util.concurrent.atomic.AtomicInteger; |
| 64 | |
Thomas Vachuska | 00b5d4f | 2018-10-30 15:13:20 -0700 | [diff] [blame] | 65 | import static org.onosproject.incubator.store.virtual.impl.OsgiPropertyConstants.PENDING_FUTURE_TIMEOUT_MINUTES; |
| 66 | import static org.onosproject.incubator.store.virtual.impl.OsgiPropertyConstants.PENDING_FUTURE_TIMEOUT_MINUTES_DEFAULT; |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 67 | import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED; |
| 68 | import static org.slf4j.LoggerFactory.getLogger; |
| 69 | |
yoonseon | dc3210d | 2017-01-25 16:03:10 -0800 | [diff] [blame] | 70 | /** |
yoonseon | 97b9b59 | 2017-01-31 14:35:06 -0800 | [diff] [blame] | 71 | * Implementation of the virtual network flow rule store to manage inventory of |
| 72 | * virtual flow rules using trivial in-memory implementation. |
yoonseon | dc3210d | 2017-01-25 16:03:10 -0800 | [diff] [blame] | 73 | */ |
yoonseon | dc3210d | 2017-01-25 16:03:10 -0800 | [diff] [blame] | 74 | //TODO: support distributed flowrule store for virtual networks |
yoonseon | 97b9b59 | 2017-01-31 14:35:06 -0800 | [diff] [blame] | 75 | |
Thomas Vachuska | edf9c4d | 2018-10-15 19:14:19 -0700 | [diff] [blame] | 76 | @Component(immediate = true, service = VirtualNetworkFlowRuleStore.class, |
| 77 | property = { |
Thomas Vachuska | 00b5d4f | 2018-10-30 15:13:20 -0700 | [diff] [blame] | 78 | PENDING_FUTURE_TIMEOUT_MINUTES + ":Integer=" + PENDING_FUTURE_TIMEOUT_MINUTES_DEFAULT, |
Thomas Vachuska | edf9c4d | 2018-10-15 19:14:19 -0700 | [diff] [blame] | 79 | }) |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 80 | public class SimpleVirtualFlowRuleStore |
| 81 | extends AbstractVirtualStore<FlowRuleBatchEvent, FlowRuleStoreDelegate> |
| 82 | implements VirtualNetworkFlowRuleStore { |
| 83 | |
| 84 | private final Logger log = getLogger(getClass()); |
| 85 | |
Ray Milkey | d84f89b | 2018-08-17 14:54:17 -0700 | [diff] [blame] | 86 | @Reference(cardinality = ReferenceCardinality.MANDATORY) |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 87 | protected StorageService storageService; |
| 88 | |
| 89 | private final ConcurrentMap<NetworkId, |
| 90 | ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>> |
| 91 | flowEntries = new ConcurrentHashMap<>(); |
| 92 | |
| 93 | |
| 94 | private final AtomicInteger localBatchIdGen = new AtomicInteger(); |
| 95 | |
Thomas Vachuska | 00b5d4f | 2018-10-30 15:13:20 -0700 | [diff] [blame] | 96 | /** Expiration time after an entry is created that it should be automatically removed. */ |
Thomas Vachuska | edf9c4d | 2018-10-15 19:14:19 -0700 | [diff] [blame] | 97 | private int pendingFutureTimeoutMinutes = PENDING_FUTURE_TIMEOUT_MINUTES_DEFAULT; |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 98 | |
| 99 | private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = |
| 100 | CacheBuilder.newBuilder() |
| 101 | .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES) |
| 102 | .removalListener(new TimeoutFuture()) |
| 103 | .build(); |
| 104 | |
| 105 | @Activate |
| 106 | public void activate() { |
| 107 | log.info("Started"); |
| 108 | } |
| 109 | |
| 110 | @Deactivate |
| 111 | public void deactivate() { |
| 112 | flowEntries.clear(); |
| 113 | log.info("Stopped"); |
| 114 | } |
| 115 | |
| 116 | @Modified |
| 117 | public void modified(ComponentContext context) { |
| 118 | |
| 119 | readComponentConfiguration(context); |
| 120 | |
| 121 | // Reset Cache and copy all. |
| 122 | Cache<Integer, SettableFuture<CompletedBatchOperation>> prevFutures = pendingFutures; |
| 123 | pendingFutures = CacheBuilder.newBuilder() |
| 124 | .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES) |
| 125 | .removalListener(new TimeoutFuture()) |
| 126 | .build(); |
| 127 | |
| 128 | pendingFutures.putAll(prevFutures.asMap()); |
| 129 | } |
| 130 | |
| 131 | /** |
| 132 | * Extracts properties from the component configuration context. |
| 133 | * |
| 134 | * @param context the component context |
| 135 | */ |
| 136 | private void readComponentConfiguration(ComponentContext context) { |
| 137 | Dictionary<?, ?> properties = context.getProperties(); |
| 138 | |
| 139 | Integer newPendingFutureTimeoutMinutes = |
| 140 | Tools.getIntegerProperty(properties, "pendingFutureTimeoutMinutes"); |
| 141 | if (newPendingFutureTimeoutMinutes == null) { |
Thomas Vachuska | edf9c4d | 2018-10-15 19:14:19 -0700 | [diff] [blame] | 142 | pendingFutureTimeoutMinutes = PENDING_FUTURE_TIMEOUT_MINUTES_DEFAULT; |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 143 | log.info("Pending future timeout is not configured, " + |
| 144 | "using current value of {}", pendingFutureTimeoutMinutes); |
| 145 | } else { |
| 146 | pendingFutureTimeoutMinutes = newPendingFutureTimeoutMinutes; |
| 147 | log.info("Configured. Pending future timeout is configured to {}", |
| 148 | pendingFutureTimeoutMinutes); |
| 149 | } |
| 150 | } |
| 151 | |
| 152 | @Override |
| 153 | public int getFlowRuleCount(NetworkId networkId) { |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 154 | int sum = 0; |
yoonseon | 86bebed | 2017-02-03 15:23:57 -0800 | [diff] [blame] | 155 | |
| 156 | if (flowEntries.get(networkId) == null) { |
| 157 | return 0; |
| 158 | } |
| 159 | |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 160 | for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft : |
| 161 | flowEntries.get(networkId).values()) { |
| 162 | for (List<StoredFlowEntry> fes : ft.values()) { |
| 163 | sum += fes.size(); |
| 164 | } |
| 165 | } |
| 166 | return sum; |
| 167 | } |
| 168 | |
| 169 | @Override |
| 170 | public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) { |
| 171 | return getFlowEntryInternal(networkId, rule.deviceId(), rule); |
| 172 | } |
| 173 | |
| 174 | @Override |
| 175 | public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) { |
| 176 | return FluentIterable.from(getFlowTable(networkId, deviceId).values()) |
| 177 | .transformAndConcat(Collections::unmodifiableList); |
| 178 | } |
| 179 | |
Ray Milkey | 7639bf9 | 2017-05-12 13:39:23 -0700 | [diff] [blame] | 180 | private void storeFlowRule(NetworkId networkId, FlowRule rule) { |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 181 | storeFlowRuleInternal(networkId, rule); |
| 182 | } |
| 183 | |
| 184 | @Override |
| 185 | public void storeBatch(NetworkId networkId, FlowRuleBatchOperation batchOperation) { |
| 186 | List<FlowRuleBatchEntry> toAdd = new ArrayList<>(); |
| 187 | List<FlowRuleBatchEntry> toRemove = new ArrayList<>(); |
| 188 | |
| 189 | for (FlowRuleBatchEntry entry : batchOperation.getOperations()) { |
| 190 | final FlowRule flowRule = entry.target(); |
| 191 | if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.ADD)) { |
| 192 | if (!getFlowEntries(networkId, flowRule.deviceId(), |
| 193 | flowRule.id()).contains(flowRule)) { |
| 194 | storeFlowRule(networkId, flowRule); |
| 195 | toAdd.add(entry); |
| 196 | } |
| 197 | } else if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.REMOVE)) { |
| 198 | if (getFlowEntries(networkId, flowRule.deviceId(), flowRule.id()).contains(flowRule)) { |
| 199 | deleteFlowRule(networkId, flowRule); |
| 200 | toRemove.add(entry); |
| 201 | } |
| 202 | } else { |
| 203 | throw new UnsupportedOperationException("Unsupported operation type"); |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | if (toAdd.isEmpty() && toRemove.isEmpty()) { |
| 208 | notifyDelegate(networkId, FlowRuleBatchEvent.completed( |
| 209 | new FlowRuleBatchRequest(batchOperation.id(), Collections.emptySet()), |
| 210 | new CompletedBatchOperation(true, Collections.emptySet(), |
| 211 | batchOperation.deviceId()))); |
| 212 | return; |
| 213 | } |
| 214 | |
| 215 | SettableFuture<CompletedBatchOperation> r = SettableFuture.create(); |
Yoonseon Han | 997c842 | 2017-04-24 16:20:03 -0700 | [diff] [blame] | 216 | final int futureId = localBatchIdGen.incrementAndGet(); |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 217 | |
Yoonseon Han | 997c842 | 2017-04-24 16:20:03 -0700 | [diff] [blame] | 218 | pendingFutures.put(futureId, r); |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 219 | |
| 220 | toAdd.addAll(toRemove); |
| 221 | notifyDelegate(networkId, FlowRuleBatchEvent.requested( |
Yoonseon Han | 997c842 | 2017-04-24 16:20:03 -0700 | [diff] [blame] | 222 | new FlowRuleBatchRequest(batchOperation.id(), |
| 223 | Sets.newHashSet(toAdd)), batchOperation.deviceId())); |
yoonseon | bd8a93d | 2016-12-07 15:51:21 -0800 | [diff] [blame] | 224 | |
| 225 | } |
| 226 | |
| 227 | @Override |
| 228 | public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) { |
| 229 | final Long batchId = event.subject().batchId(); |
| 230 | SettableFuture<CompletedBatchOperation> future |
| 231 | = pendingFutures.getIfPresent(batchId); |
| 232 | if (future != null) { |
| 233 | future.set(event.result()); |
| 234 | pendingFutures.invalidate(batchId); |
| 235 | } |
| 236 | notifyDelegate(networkId, event); |
| 237 | } |
| 238 | |
| 239 | @Override |
| 240 | public void deleteFlowRule(NetworkId networkId, FlowRule rule) { |
| 241 | List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id()); |
| 242 | |
| 243 | synchronized (entries) { |
| 244 | for (StoredFlowEntry entry : entries) { |
| 245 | if (entry.equals(rule)) { |
| 246 | synchronized (entry) { |
| 247 | entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE); |
| 248 | } |
| 249 | } |
| 250 | } |
| 251 | } |
| 252 | } |
| 253 | |
| 254 | @Override |
| 255 | public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) { |
| 256 | // check if this new rule is an update to an existing entry |
| 257 | List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id()); |
| 258 | synchronized (entries) { |
| 259 | for (StoredFlowEntry stored : entries) { |
| 260 | if (stored.equals(rule)) { |
| 261 | synchronized (stored) { |
| 262 | //FIXME modification of "stored" flow entry outside of flow table |
| 263 | stored.setBytes(rule.bytes()); |
| 264 | stored.setLife(rule.life()); |
| 265 | stored.setPackets(rule.packets()); |
| 266 | if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) { |
| 267 | stored.setState(FlowEntry.FlowEntryState.ADDED); |
| 268 | // TODO: Do we need to change `rule` state? |
| 269 | return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, rule); |
| 270 | } |
| 271 | return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule); |
| 272 | } |
| 273 | } |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | // should not reach here |
| 278 | // storeFlowRule was expected to be called |
| 279 | log.error("FlowRule was not found in store {} to update", rule); |
| 280 | |
| 281 | return null; |
| 282 | } |
| 283 | |
| 284 | @Override |
| 285 | public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) { |
| 286 | // This is where one could mark a rule as removed and still keep it in the store. |
| 287 | final DeviceId did = rule.deviceId(); |
| 288 | |
| 289 | List<StoredFlowEntry> entries = getFlowEntries(networkId, did, rule.id()); |
| 290 | synchronized (entries) { |
| 291 | if (entries.remove(rule)) { |
| 292 | return new FlowRuleEvent(RULE_REMOVED, rule); |
| 293 | } |
| 294 | } |
| 295 | return null; |
| 296 | } |
| 297 | |
| 298 | @Override |
| 299 | public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) { |
| 300 | List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id()); |
| 301 | synchronized (entries) { |
| 302 | for (StoredFlowEntry entry : entries) { |
| 303 | if (entry.equals(rule) && |
| 304 | entry.state() != FlowEntry.FlowEntryState.PENDING_ADD) { |
| 305 | synchronized (entry) { |
| 306 | entry.setState(FlowEntry.FlowEntryState.PENDING_ADD); |
| 307 | return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule); |
| 308 | } |
| 309 | } |
| 310 | } |
| 311 | } |
| 312 | return null; |
| 313 | } |
| 314 | |
| 315 | @Override |
| 316 | public void purgeFlowRule(NetworkId networkId, DeviceId deviceId) { |
| 317 | flowEntries.get(networkId).remove(deviceId); |
| 318 | } |
| 319 | |
| 320 | @Override |
| 321 | public void purgeFlowRules(NetworkId networkId) { |
| 322 | flowEntries.get(networkId).clear(); |
| 323 | } |
| 324 | |
| 325 | @Override |
| 326 | public FlowRuleEvent |
| 327 | updateTableStatistics(NetworkId networkId, DeviceId deviceId, List<TableStatisticsEntry> tableStats) { |
| 328 | //TODO: Table operations are not supported yet |
| 329 | return null; |
| 330 | } |
| 331 | |
| 332 | @Override |
| 333 | public Iterable<TableStatisticsEntry> |
| 334 | getTableStatistics(NetworkId networkId, DeviceId deviceId) { |
| 335 | //TODO: Table operations are not supported yet |
| 336 | return null; |
| 337 | } |
| 338 | |
| 339 | /** |
| 340 | * Returns the flow table for specified device. |
| 341 | * |
| 342 | * @param networkId identifier of the virtual network |
| 343 | * @param deviceId identifier of the virtual device |
| 344 | * @return Map representing Flow Table of given device. |
| 345 | */ |
| 346 | private ConcurrentMap<FlowId, List<StoredFlowEntry>> |
| 347 | getFlowTable(NetworkId networkId, DeviceId deviceId) { |
| 348 | return flowEntries |
| 349 | .computeIfAbsent(networkId, n -> new ConcurrentHashMap<>()) |
| 350 | .computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>()); |
| 351 | } |
| 352 | |
| 353 | private List<StoredFlowEntry> |
| 354 | getFlowEntries(NetworkId networkId, DeviceId deviceId, FlowId flowId) { |
| 355 | final ConcurrentMap<FlowId, List<StoredFlowEntry>> flowTable |
| 356 | = getFlowTable(networkId, deviceId); |
| 357 | |
| 358 | List<StoredFlowEntry> r = flowTable.get(flowId); |
| 359 | if (r == null) { |
| 360 | final List<StoredFlowEntry> concurrentlyAdded; |
| 361 | r = new CopyOnWriteArrayList<>(); |
| 362 | concurrentlyAdded = flowTable.putIfAbsent(flowId, r); |
| 363 | if (concurrentlyAdded != null) { |
| 364 | return concurrentlyAdded; |
| 365 | } |
| 366 | } |
| 367 | return r; |
| 368 | } |
| 369 | |
| 370 | private FlowEntry |
| 371 | getFlowEntryInternal(NetworkId networkId, DeviceId deviceId, FlowRule rule) { |
| 372 | List<StoredFlowEntry> fes = getFlowEntries(networkId, deviceId, rule.id()); |
| 373 | for (StoredFlowEntry fe : fes) { |
| 374 | if (fe.equals(rule)) { |
| 375 | return fe; |
| 376 | } |
| 377 | } |
| 378 | return null; |
| 379 | } |
| 380 | |
| 381 | private void storeFlowRuleInternal(NetworkId networkId, FlowRule rule) { |
| 382 | StoredFlowEntry f = new DefaultFlowEntry(rule); |
| 383 | final DeviceId did = f.deviceId(); |
| 384 | final FlowId fid = f.id(); |
| 385 | List<StoredFlowEntry> existing = getFlowEntries(networkId, did, fid); |
| 386 | synchronized (existing) { |
| 387 | for (StoredFlowEntry fe : existing) { |
| 388 | if (fe.equals(rule)) { |
| 389 | // was already there? ignore |
| 390 | return; |
| 391 | } |
| 392 | } |
| 393 | // new flow rule added |
| 394 | existing.add(f); |
| 395 | } |
| 396 | } |
| 397 | |
| 398 | private static final class TimeoutFuture |
| 399 | implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> { |
| 400 | @Override |
| 401 | public void onRemoval(RemovalNotification<Integer, |
| 402 | SettableFuture<CompletedBatchOperation>> notification) { |
| 403 | // wrapping in ExecutionException to support Future.get |
| 404 | if (notification.wasEvicted()) { |
| 405 | notification.getValue() |
| 406 | .setException(new ExecutionException("Timed out", |
| 407 | new TimeoutException())); |
| 408 | } |
| 409 | } |
| 410 | } |
| 411 | } |