blob: c12044d30714dca7fce4dcb0ba503684e9bd9ca6 [file] [log] [blame]
yoonseonbd8a93d2016-12-07 15:51:21 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
yoonseonbd8a93d2016-12-07 15:51:21 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.FluentIterable;
24import com.google.common.collect.Sets;
25import com.google.common.util.concurrent.SettableFuture;
yoonseonbd8a93d2016-12-07 15:51:21 -080026import org.onlab.util.Tools;
27import org.onosproject.incubator.net.virtual.NetworkId;
28import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
29import org.onosproject.net.DeviceId;
30import org.onosproject.net.flow.CompletedBatchOperation;
31import org.onosproject.net.flow.DefaultFlowEntry;
32import org.onosproject.net.flow.FlowEntry;
33import org.onosproject.net.flow.FlowId;
34import org.onosproject.net.flow.FlowRule;
yoonseonbd8a93d2016-12-07 15:51:21 -080035import org.onosproject.net.flow.FlowRuleEvent;
36import org.onosproject.net.flow.FlowRuleStoreDelegate;
37import org.onosproject.net.flow.StoredFlowEntry;
38import org.onosproject.net.flow.TableStatisticsEntry;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070039import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
40import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
41import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
42import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
yoonseonbd8a93d2016-12-07 15:51:21 -080043import org.onosproject.store.service.StorageService;
44import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070045import org.osgi.service.component.annotations.Activate;
46import org.osgi.service.component.annotations.Component;
47import org.osgi.service.component.annotations.Deactivate;
48import org.osgi.service.component.annotations.Modified;
49import org.osgi.service.component.annotations.Reference;
50import org.osgi.service.component.annotations.ReferenceCardinality;
yoonseonbd8a93d2016-12-07 15:51:21 -080051import org.slf4j.Logger;
52
53import java.util.ArrayList;
54import java.util.Collections;
55import java.util.Dictionary;
56import java.util.List;
57import java.util.concurrent.ConcurrentHashMap;
58import java.util.concurrent.ConcurrentMap;
59import java.util.concurrent.CopyOnWriteArrayList;
60import java.util.concurrent.ExecutionException;
61import java.util.concurrent.TimeUnit;
62import java.util.concurrent.TimeoutException;
63import java.util.concurrent.atomic.AtomicInteger;
64
65import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
66import static org.slf4j.LoggerFactory.getLogger;
67
yoonseondc3210d2017-01-25 16:03:10 -080068/**
yoonseon97b9b592017-01-31 14:35:06 -080069 * Implementation of the virtual network flow rule store to manage inventory of
70 * virtual flow rules using trivial in-memory implementation.
yoonseondc3210d2017-01-25 16:03:10 -080071 */
yoonseondc3210d2017-01-25 16:03:10 -080072//TODO: support distributed flowrule store for virtual networks
yoonseon97b9b592017-01-31 14:35:06 -080073
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074@Component(immediate = true, service = VirtualNetworkFlowRuleStore.class)
yoonseonbd8a93d2016-12-07 15:51:21 -080075public class SimpleVirtualFlowRuleStore
76 extends AbstractVirtualStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
77 implements VirtualNetworkFlowRuleStore {
78
79 private final Logger log = getLogger(getClass());
80
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081 @Reference(cardinality = ReferenceCardinality.MANDATORY)
yoonseonbd8a93d2016-12-07 15:51:21 -080082 protected StorageService storageService;
83
84 private final ConcurrentMap<NetworkId,
85 ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>>
86 flowEntries = new ConcurrentHashMap<>();
87
88
89 private final AtomicInteger localBatchIdGen = new AtomicInteger();
90
91 private static final int DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES = 5;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070092 //@Property(name = "pendingFutureTimeoutMinutes", intValue = DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES,
93 // label = "Expiration time after an entry is created that it should be automatically removed")
yoonseonbd8a93d2016-12-07 15:51:21 -080094 private int pendingFutureTimeoutMinutes = DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES;
95
96 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
97 CacheBuilder.newBuilder()
98 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
99 .removalListener(new TimeoutFuture())
100 .build();
101
102 @Activate
103 public void activate() {
104 log.info("Started");
105 }
106
107 @Deactivate
108 public void deactivate() {
109 flowEntries.clear();
110 log.info("Stopped");
111 }
112
113 @Modified
114 public void modified(ComponentContext context) {
115
116 readComponentConfiguration(context);
117
118 // Reset Cache and copy all.
119 Cache<Integer, SettableFuture<CompletedBatchOperation>> prevFutures = pendingFutures;
120 pendingFutures = CacheBuilder.newBuilder()
121 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
122 .removalListener(new TimeoutFuture())
123 .build();
124
125 pendingFutures.putAll(prevFutures.asMap());
126 }
127
128 /**
129 * Extracts properties from the component configuration context.
130 *
131 * @param context the component context
132 */
133 private void readComponentConfiguration(ComponentContext context) {
134 Dictionary<?, ?> properties = context.getProperties();
135
136 Integer newPendingFutureTimeoutMinutes =
137 Tools.getIntegerProperty(properties, "pendingFutureTimeoutMinutes");
138 if (newPendingFutureTimeoutMinutes == null) {
139 pendingFutureTimeoutMinutes = DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES;
140 log.info("Pending future timeout is not configured, " +
141 "using current value of {}", pendingFutureTimeoutMinutes);
142 } else {
143 pendingFutureTimeoutMinutes = newPendingFutureTimeoutMinutes;
144 log.info("Configured. Pending future timeout is configured to {}",
145 pendingFutureTimeoutMinutes);
146 }
147 }
148
149 @Override
150 public int getFlowRuleCount(NetworkId networkId) {
yoonseonbd8a93d2016-12-07 15:51:21 -0800151 int sum = 0;
yoonseon86bebed2017-02-03 15:23:57 -0800152
153 if (flowEntries.get(networkId) == null) {
154 return 0;
155 }
156
yoonseonbd8a93d2016-12-07 15:51:21 -0800157 for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft :
158 flowEntries.get(networkId).values()) {
159 for (List<StoredFlowEntry> fes : ft.values()) {
160 sum += fes.size();
161 }
162 }
163 return sum;
164 }
165
166 @Override
167 public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
168 return getFlowEntryInternal(networkId, rule.deviceId(), rule);
169 }
170
171 @Override
172 public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
173 return FluentIterable.from(getFlowTable(networkId, deviceId).values())
174 .transformAndConcat(Collections::unmodifiableList);
175 }
176
Ray Milkey7639bf92017-05-12 13:39:23 -0700177 private void storeFlowRule(NetworkId networkId, FlowRule rule) {
yoonseonbd8a93d2016-12-07 15:51:21 -0800178 storeFlowRuleInternal(networkId, rule);
179 }
180
181 @Override
182 public void storeBatch(NetworkId networkId, FlowRuleBatchOperation batchOperation) {
183 List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
184 List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
185
186 for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
187 final FlowRule flowRule = entry.target();
188 if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.ADD)) {
189 if (!getFlowEntries(networkId, flowRule.deviceId(),
190 flowRule.id()).contains(flowRule)) {
191 storeFlowRule(networkId, flowRule);
192 toAdd.add(entry);
193 }
194 } else if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.REMOVE)) {
195 if (getFlowEntries(networkId, flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
196 deleteFlowRule(networkId, flowRule);
197 toRemove.add(entry);
198 }
199 } else {
200 throw new UnsupportedOperationException("Unsupported operation type");
201 }
202 }
203
204 if (toAdd.isEmpty() && toRemove.isEmpty()) {
205 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
206 new FlowRuleBatchRequest(batchOperation.id(), Collections.emptySet()),
207 new CompletedBatchOperation(true, Collections.emptySet(),
208 batchOperation.deviceId())));
209 return;
210 }
211
212 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
Yoonseon Han997c8422017-04-24 16:20:03 -0700213 final int futureId = localBatchIdGen.incrementAndGet();
yoonseonbd8a93d2016-12-07 15:51:21 -0800214
Yoonseon Han997c8422017-04-24 16:20:03 -0700215 pendingFutures.put(futureId, r);
yoonseonbd8a93d2016-12-07 15:51:21 -0800216
217 toAdd.addAll(toRemove);
218 notifyDelegate(networkId, FlowRuleBatchEvent.requested(
Yoonseon Han997c8422017-04-24 16:20:03 -0700219 new FlowRuleBatchRequest(batchOperation.id(),
220 Sets.newHashSet(toAdd)), batchOperation.deviceId()));
yoonseonbd8a93d2016-12-07 15:51:21 -0800221
222 }
223
224 @Override
225 public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) {
226 final Long batchId = event.subject().batchId();
227 SettableFuture<CompletedBatchOperation> future
228 = pendingFutures.getIfPresent(batchId);
229 if (future != null) {
230 future.set(event.result());
231 pendingFutures.invalidate(batchId);
232 }
233 notifyDelegate(networkId, event);
234 }
235
236 @Override
237 public void deleteFlowRule(NetworkId networkId, FlowRule rule) {
238 List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id());
239
240 synchronized (entries) {
241 for (StoredFlowEntry entry : entries) {
242 if (entry.equals(rule)) {
243 synchronized (entry) {
244 entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
245 }
246 }
247 }
248 }
249 }
250
251 @Override
252 public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) {
253 // check if this new rule is an update to an existing entry
254 List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id());
255 synchronized (entries) {
256 for (StoredFlowEntry stored : entries) {
257 if (stored.equals(rule)) {
258 synchronized (stored) {
259 //FIXME modification of "stored" flow entry outside of flow table
260 stored.setBytes(rule.bytes());
261 stored.setLife(rule.life());
262 stored.setPackets(rule.packets());
263 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
264 stored.setState(FlowEntry.FlowEntryState.ADDED);
265 // TODO: Do we need to change `rule` state?
266 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, rule);
267 }
268 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
269 }
270 }
271 }
272 }
273
274 // should not reach here
275 // storeFlowRule was expected to be called
276 log.error("FlowRule was not found in store {} to update", rule);
277
278 return null;
279 }
280
281 @Override
282 public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) {
283 // This is where one could mark a rule as removed and still keep it in the store.
284 final DeviceId did = rule.deviceId();
285
286 List<StoredFlowEntry> entries = getFlowEntries(networkId, did, rule.id());
287 synchronized (entries) {
288 if (entries.remove(rule)) {
289 return new FlowRuleEvent(RULE_REMOVED, rule);
290 }
291 }
292 return null;
293 }
294
295 @Override
296 public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) {
297 List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id());
298 synchronized (entries) {
299 for (StoredFlowEntry entry : entries) {
300 if (entry.equals(rule) &&
301 entry.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
302 synchronized (entry) {
303 entry.setState(FlowEntry.FlowEntryState.PENDING_ADD);
304 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
305 }
306 }
307 }
308 }
309 return null;
310 }
311
312 @Override
313 public void purgeFlowRule(NetworkId networkId, DeviceId deviceId) {
314 flowEntries.get(networkId).remove(deviceId);
315 }
316
317 @Override
318 public void purgeFlowRules(NetworkId networkId) {
319 flowEntries.get(networkId).clear();
320 }
321
322 @Override
323 public FlowRuleEvent
324 updateTableStatistics(NetworkId networkId, DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
325 //TODO: Table operations are not supported yet
326 return null;
327 }
328
329 @Override
330 public Iterable<TableStatisticsEntry>
331 getTableStatistics(NetworkId networkId, DeviceId deviceId) {
332 //TODO: Table operations are not supported yet
333 return null;
334 }
335
336 /**
337 * Returns the flow table for specified device.
338 *
339 * @param networkId identifier of the virtual network
340 * @param deviceId identifier of the virtual device
341 * @return Map representing Flow Table of given device.
342 */
343 private ConcurrentMap<FlowId, List<StoredFlowEntry>>
344 getFlowTable(NetworkId networkId, DeviceId deviceId) {
345 return flowEntries
346 .computeIfAbsent(networkId, n -> new ConcurrentHashMap<>())
347 .computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
348 }
349
350 private List<StoredFlowEntry>
351 getFlowEntries(NetworkId networkId, DeviceId deviceId, FlowId flowId) {
352 final ConcurrentMap<FlowId, List<StoredFlowEntry>> flowTable
353 = getFlowTable(networkId, deviceId);
354
355 List<StoredFlowEntry> r = flowTable.get(flowId);
356 if (r == null) {
357 final List<StoredFlowEntry> concurrentlyAdded;
358 r = new CopyOnWriteArrayList<>();
359 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
360 if (concurrentlyAdded != null) {
361 return concurrentlyAdded;
362 }
363 }
364 return r;
365 }
366
367 private FlowEntry
368 getFlowEntryInternal(NetworkId networkId, DeviceId deviceId, FlowRule rule) {
369 List<StoredFlowEntry> fes = getFlowEntries(networkId, deviceId, rule.id());
370 for (StoredFlowEntry fe : fes) {
371 if (fe.equals(rule)) {
372 return fe;
373 }
374 }
375 return null;
376 }
377
378 private void storeFlowRuleInternal(NetworkId networkId, FlowRule rule) {
379 StoredFlowEntry f = new DefaultFlowEntry(rule);
380 final DeviceId did = f.deviceId();
381 final FlowId fid = f.id();
382 List<StoredFlowEntry> existing = getFlowEntries(networkId, did, fid);
383 synchronized (existing) {
384 for (StoredFlowEntry fe : existing) {
385 if (fe.equals(rule)) {
386 // was already there? ignore
387 return;
388 }
389 }
390 // new flow rule added
391 existing.add(f);
392 }
393 }
394
395 private static final class TimeoutFuture
396 implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
397 @Override
398 public void onRemoval(RemovalNotification<Integer,
399 SettableFuture<CompletedBatchOperation>> notification) {
400 // wrapping in ExecutionException to support Future.get
401 if (notification.wasEvicted()) {
402 notification.getValue()
403 .setException(new ExecutionException("Timed out",
404 new TimeoutException()));
405 }
406 }
407 }
408}