blob: 412ffd110a7571c62eac2db6fa7db7c8adc972a6 [file] [log] [blame]
yoonseonbd8a93d2016-12-07 15:51:21 -08001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalListener;
22import com.google.common.cache.RemovalNotification;
23import com.google.common.collect.FluentIterable;
24import com.google.common.collect.Sets;
25import com.google.common.util.concurrent.SettableFuture;
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Modified;
30import org.apache.felix.scr.annotations.Property;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.onlab.util.Tools;
35import org.onosproject.incubator.net.virtual.NetworkId;
36import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.flow.CompletedBatchOperation;
39import org.onosproject.net.flow.DefaultFlowEntry;
40import org.onosproject.net.flow.FlowEntry;
41import org.onosproject.net.flow.FlowId;
42import org.onosproject.net.flow.FlowRule;
43import org.onosproject.net.flow.FlowRuleBatchEntry;
44import org.onosproject.net.flow.FlowRuleBatchEvent;
45import org.onosproject.net.flow.FlowRuleBatchOperation;
46import org.onosproject.net.flow.FlowRuleBatchRequest;
47import org.onosproject.net.flow.FlowRuleEvent;
48import org.onosproject.net.flow.FlowRuleStoreDelegate;
49import org.onosproject.net.flow.StoredFlowEntry;
50import org.onosproject.net.flow.TableStatisticsEntry;
51import org.onosproject.store.service.StorageService;
52import org.osgi.service.component.ComponentContext;
53import org.slf4j.Logger;
54
55import java.util.ArrayList;
56import java.util.Collections;
57import java.util.Dictionary;
58import java.util.List;
59import java.util.concurrent.ConcurrentHashMap;
60import java.util.concurrent.ConcurrentMap;
61import java.util.concurrent.CopyOnWriteArrayList;
62import java.util.concurrent.ExecutionException;
63import java.util.concurrent.TimeUnit;
64import java.util.concurrent.TimeoutException;
65import java.util.concurrent.atomic.AtomicInteger;
66
67import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
68import static org.slf4j.LoggerFactory.getLogger;
69
yoonseondc3210d2017-01-25 16:03:10 -080070/**
71 * Manages inventory of virtual flow rules using trivial in-memory implementation.
72 */
yoonseonbd8a93d2016-12-07 15:51:21 -080073
yoonseondc3210d2017-01-25 16:03:10 -080074//TODO: support distributed flowrule store for virtual networks
yoonseonbd8a93d2016-12-07 15:51:21 -080075@Component(immediate = true)
76@Service
77public class SimpleVirtualFlowRuleStore
78 extends AbstractVirtualStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
79 implements VirtualNetworkFlowRuleStore {
80
81 private final Logger log = getLogger(getClass());
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected StorageService storageService;
85
86 private final ConcurrentMap<NetworkId,
87 ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>>
88 flowEntries = new ConcurrentHashMap<>();
89
90
91 private final AtomicInteger localBatchIdGen = new AtomicInteger();
92
93 private static final int DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES = 5;
94 @Property(name = "pendingFutureTimeoutMinutes", intValue = DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES,
95 label = "Expiration time after an entry is created that it should be automatically removed")
96 private int pendingFutureTimeoutMinutes = DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES;
97
98 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
99 CacheBuilder.newBuilder()
100 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
101 .removalListener(new TimeoutFuture())
102 .build();
103
104 @Activate
105 public void activate() {
106 log.info("Started");
107 }
108
109 @Deactivate
110 public void deactivate() {
111 flowEntries.clear();
112 log.info("Stopped");
113 }
114
115 @Modified
116 public void modified(ComponentContext context) {
117
118 readComponentConfiguration(context);
119
120 // Reset Cache and copy all.
121 Cache<Integer, SettableFuture<CompletedBatchOperation>> prevFutures = pendingFutures;
122 pendingFutures = CacheBuilder.newBuilder()
123 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
124 .removalListener(new TimeoutFuture())
125 .build();
126
127 pendingFutures.putAll(prevFutures.asMap());
128 }
129
130 /**
131 * Extracts properties from the component configuration context.
132 *
133 * @param context the component context
134 */
135 private void readComponentConfiguration(ComponentContext context) {
136 Dictionary<?, ?> properties = context.getProperties();
137
138 Integer newPendingFutureTimeoutMinutes =
139 Tools.getIntegerProperty(properties, "pendingFutureTimeoutMinutes");
140 if (newPendingFutureTimeoutMinutes == null) {
141 pendingFutureTimeoutMinutes = DEFAULT_PENDING_FUTURE_TIMEOUT_MINUTES;
142 log.info("Pending future timeout is not configured, " +
143 "using current value of {}", pendingFutureTimeoutMinutes);
144 } else {
145 pendingFutureTimeoutMinutes = newPendingFutureTimeoutMinutes;
146 log.info("Configured. Pending future timeout is configured to {}",
147 pendingFutureTimeoutMinutes);
148 }
149 }
150
151 @Override
152 public int getFlowRuleCount(NetworkId networkId) {
153
154 int sum = 0;
155 for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft :
156 flowEntries.get(networkId).values()) {
157 for (List<StoredFlowEntry> fes : ft.values()) {
158 sum += fes.size();
159 }
160 }
161 return sum;
162 }
163
164 @Override
165 public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
166 return getFlowEntryInternal(networkId, rule.deviceId(), rule);
167 }
168
169 @Override
170 public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
171 return FluentIterable.from(getFlowTable(networkId, deviceId).values())
172 .transformAndConcat(Collections::unmodifiableList);
173 }
174
175 @Override
176 public void storeFlowRule(NetworkId networkId, FlowRule rule) {
177 storeFlowRuleInternal(networkId, rule);
178 }
179
180 @Override
181 public void storeBatch(NetworkId networkId, FlowRuleBatchOperation batchOperation) {
182 List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
183 List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
184
185 for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
186 final FlowRule flowRule = entry.target();
187 if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.ADD)) {
188 if (!getFlowEntries(networkId, flowRule.deviceId(),
189 flowRule.id()).contains(flowRule)) {
190 storeFlowRule(networkId, flowRule);
191 toAdd.add(entry);
192 }
193 } else if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.REMOVE)) {
194 if (getFlowEntries(networkId, flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
195 deleteFlowRule(networkId, flowRule);
196 toRemove.add(entry);
197 }
198 } else {
199 throw new UnsupportedOperationException("Unsupported operation type");
200 }
201 }
202
203 if (toAdd.isEmpty() && toRemove.isEmpty()) {
204 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
205 new FlowRuleBatchRequest(batchOperation.id(), Collections.emptySet()),
206 new CompletedBatchOperation(true, Collections.emptySet(),
207 batchOperation.deviceId())));
208 return;
209 }
210
211 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
212 final int batchId = localBatchIdGen.incrementAndGet();
213
214 pendingFutures.put(batchId, r);
215
216 toAdd.addAll(toRemove);
217 notifyDelegate(networkId, FlowRuleBatchEvent.requested(
218 new FlowRuleBatchRequest(batchId, Sets.newHashSet(toAdd)), batchOperation.deviceId()));
219
220 }
221
222 @Override
223 public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) {
224 final Long batchId = event.subject().batchId();
225 SettableFuture<CompletedBatchOperation> future
226 = pendingFutures.getIfPresent(batchId);
227 if (future != null) {
228 future.set(event.result());
229 pendingFutures.invalidate(batchId);
230 }
231 notifyDelegate(networkId, event);
232 }
233
234 @Override
235 public void deleteFlowRule(NetworkId networkId, FlowRule rule) {
236 List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id());
237
238 synchronized (entries) {
239 for (StoredFlowEntry entry : entries) {
240 if (entry.equals(rule)) {
241 synchronized (entry) {
242 entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
243 }
244 }
245 }
246 }
247 }
248
249 @Override
250 public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) {
251 // check if this new rule is an update to an existing entry
252 List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id());
253 synchronized (entries) {
254 for (StoredFlowEntry stored : entries) {
255 if (stored.equals(rule)) {
256 synchronized (stored) {
257 //FIXME modification of "stored" flow entry outside of flow table
258 stored.setBytes(rule.bytes());
259 stored.setLife(rule.life());
260 stored.setPackets(rule.packets());
261 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
262 stored.setState(FlowEntry.FlowEntryState.ADDED);
263 // TODO: Do we need to change `rule` state?
264 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, rule);
265 }
266 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
267 }
268 }
269 }
270 }
271
272 // should not reach here
273 // storeFlowRule was expected to be called
274 log.error("FlowRule was not found in store {} to update", rule);
275
276 return null;
277 }
278
279 @Override
280 public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) {
281 // This is where one could mark a rule as removed and still keep it in the store.
282 final DeviceId did = rule.deviceId();
283
284 List<StoredFlowEntry> entries = getFlowEntries(networkId, did, rule.id());
285 synchronized (entries) {
286 if (entries.remove(rule)) {
287 return new FlowRuleEvent(RULE_REMOVED, rule);
288 }
289 }
290 return null;
291 }
292
293 @Override
294 public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) {
295 List<StoredFlowEntry> entries = getFlowEntries(networkId, rule.deviceId(), rule.id());
296 synchronized (entries) {
297 for (StoredFlowEntry entry : entries) {
298 if (entry.equals(rule) &&
299 entry.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
300 synchronized (entry) {
301 entry.setState(FlowEntry.FlowEntryState.PENDING_ADD);
302 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
303 }
304 }
305 }
306 }
307 return null;
308 }
309
310 @Override
311 public void purgeFlowRule(NetworkId networkId, DeviceId deviceId) {
312 flowEntries.get(networkId).remove(deviceId);
313 }
314
315 @Override
316 public void purgeFlowRules(NetworkId networkId) {
317 flowEntries.get(networkId).clear();
318 }
319
320 @Override
321 public FlowRuleEvent
322 updateTableStatistics(NetworkId networkId, DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
323 //TODO: Table operations are not supported yet
324 return null;
325 }
326
327 @Override
328 public Iterable<TableStatisticsEntry>
329 getTableStatistics(NetworkId networkId, DeviceId deviceId) {
330 //TODO: Table operations are not supported yet
331 return null;
332 }
333
334 /**
335 * Returns the flow table for specified device.
336 *
337 * @param networkId identifier of the virtual network
338 * @param deviceId identifier of the virtual device
339 * @return Map representing Flow Table of given device.
340 */
341 private ConcurrentMap<FlowId, List<StoredFlowEntry>>
342 getFlowTable(NetworkId networkId, DeviceId deviceId) {
343 return flowEntries
344 .computeIfAbsent(networkId, n -> new ConcurrentHashMap<>())
345 .computeIfAbsent(deviceId, k -> new ConcurrentHashMap<>());
346 }
347
348 private List<StoredFlowEntry>
349 getFlowEntries(NetworkId networkId, DeviceId deviceId, FlowId flowId) {
350 final ConcurrentMap<FlowId, List<StoredFlowEntry>> flowTable
351 = getFlowTable(networkId, deviceId);
352
353 List<StoredFlowEntry> r = flowTable.get(flowId);
354 if (r == null) {
355 final List<StoredFlowEntry> concurrentlyAdded;
356 r = new CopyOnWriteArrayList<>();
357 concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
358 if (concurrentlyAdded != null) {
359 return concurrentlyAdded;
360 }
361 }
362 return r;
363 }
364
365 private FlowEntry
366 getFlowEntryInternal(NetworkId networkId, DeviceId deviceId, FlowRule rule) {
367 List<StoredFlowEntry> fes = getFlowEntries(networkId, deviceId, rule.id());
368 for (StoredFlowEntry fe : fes) {
369 if (fe.equals(rule)) {
370 return fe;
371 }
372 }
373 return null;
374 }
375
376 private void storeFlowRuleInternal(NetworkId networkId, FlowRule rule) {
377 StoredFlowEntry f = new DefaultFlowEntry(rule);
378 final DeviceId did = f.deviceId();
379 final FlowId fid = f.id();
380 List<StoredFlowEntry> existing = getFlowEntries(networkId, did, fid);
381 synchronized (existing) {
382 for (StoredFlowEntry fe : existing) {
383 if (fe.equals(rule)) {
384 // was already there? ignore
385 return;
386 }
387 }
388 // new flow rule added
389 existing.add(f);
390 }
391 }
392
393 private static final class TimeoutFuture
394 implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
395 @Override
396 public void onRemoval(RemovalNotification<Integer,
397 SettableFuture<CompletedBatchOperation>> notification) {
398 // wrapping in ExecutionException to support Future.get
399 if (notification.wasEvicted()) {
400 notification.getValue()
401 .setException(new ExecutionException("Timed out",
402 new TimeoutException()));
403 }
404 }
405 }
406}