blob: 7b6ae55f933bd6c4d32bb468ff063445ae8ecd93 [file] [log] [blame]
Jordan Haltermanf7554092017-07-30 15:05:51 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070018import java.util.Collections;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070019import java.util.List;
20import java.util.Map;
21import java.util.Objects;
22import java.util.Set;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.Executors;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070025import java.util.concurrent.TimeUnit;
Jordan Haltermanf7554092017-07-30 15:05:51 -070026import java.util.function.Supplier;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070027import java.util.stream.Collectors;
Ray Milkey2b6ff422016-08-26 13:03:15 -070028
Jordan Haltermanf7554092017-07-30 15:05:51 -070029import com.google.common.collect.ImmutableList;
30import com.google.common.collect.Iterables;
31import com.google.common.collect.Maps;
32import com.google.common.collect.Sets;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070033import com.google.common.collect.Streams;
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070037import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.util.KryoNamespace;
41import org.onlab.util.Tools;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070042import org.onosproject.cluster.ClusterService;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.core.CoreService;
45import org.onosproject.core.IdGenerator;
46import org.onosproject.mastership.MastershipService;
47import org.onosproject.net.DeviceId;
48import org.onosproject.net.device.DeviceService;
49import org.onosproject.net.flow.CompletedBatchOperation;
50import org.onosproject.net.flow.DefaultFlowEntry;
51import org.onosproject.net.flow.FlowEntry;
52import org.onosproject.net.flow.FlowEntry.FlowEntryState;
53import org.onosproject.net.flow.FlowId;
54import org.onosproject.net.flow.FlowRule;
55import org.onosproject.net.flow.FlowRuleBatchEntry;
56import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
57import org.onosproject.net.flow.FlowRuleBatchEvent;
58import org.onosproject.net.flow.FlowRuleBatchOperation;
59import org.onosproject.net.flow.FlowRuleBatchRequest;
60import org.onosproject.net.flow.FlowRuleEvent;
61import org.onosproject.net.flow.FlowRuleEvent.Type;
62import org.onosproject.net.flow.FlowRuleService;
63import org.onosproject.net.flow.FlowRuleStore;
64import org.onosproject.net.flow.FlowRuleStoreDelegate;
65import org.onosproject.net.flow.StoredFlowEntry;
66import org.onosproject.net.flow.TableStatisticsEntry;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070067import org.onosproject.store.AbstractStore;
68import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070069import org.onosproject.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070070import org.onosproject.store.impl.MastershipBasedTimestamp;
71import org.onosproject.store.serializers.KryoNamespaces;
Jordan Haltermanf7554092017-07-30 15:05:51 -070072import org.onosproject.store.service.DocumentPath;
73import org.onosproject.store.service.DocumentTree;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070074import org.onosproject.store.service.EventuallyConsistentMap;
75import org.onosproject.store.service.EventuallyConsistentMapEvent;
76import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Haltermanf7554092017-07-30 15:05:51 -070077import org.onosproject.store.service.NoSuchDocumentPathException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070078import org.onosproject.store.service.Serializer;
Jordan Haltermanf7554092017-07-30 15:05:51 -070079import org.onosproject.store.service.StorageException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070080import org.onosproject.store.service.StorageService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070081import org.onosproject.store.service.Versioned;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070082import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070083import org.slf4j.Logger;
Ray Milkey2b6ff422016-08-26 13:03:15 -070084
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070085import static org.onlab.util.Tools.groupedThreads;
86import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jordan Haltermanf7554092017-07-30 15:05:51 -070087import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070088import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -070089
90/**
91 * Manages inventory of flow rules using a distributed state management protocol.
92 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070093@Component(immediate = true)
Madan Jampani86940d92015-05-06 11:47:57 -070094@Service
Madan Jampani37d04c62016-04-25 15:53:55 -070095public class DistributedFlowRuleStore
Madan Jampani86940d92015-05-06 11:47:57 -070096 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
97 implements FlowRuleStore {
98
99 private final Logger log = getLogger(getClass());
100
Jordan Haltermanf7554092017-07-30 15:05:51 -0700101 // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
102 // We don't want to populate a stack trace every time an optimistic lock is retried.
103 private static final StorageException.ConcurrentModification RETRY;
104
105 // Initialize retry exception with an empty stack trace.
106 static {
107 RETRY = new StorageException.ConcurrentModification();
108 RETRY.setStackTrace(new StackTraceElement[0]);
109 }
110
Madan Jampani86940d92015-05-06 11:47:57 -0700111 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700112 private static final int MAX_RETRY_DELAY_MILLIS = 50;
Madan Jampani86940d92015-05-06 11:47:57 -0700113
Jordan Haltermanf7554092017-07-30 15:05:51 -0700114 private static final String FLOW_TABLE = "onos-flow-table";
Madan Jampani86940d92015-05-06 11:47:57 -0700115
Jordan Haltermanf7554092017-07-30 15:05:51 -0700116 private static final MessageSubject APPLY_FLOWS = new MessageSubject("onos-flow-apply");
117 private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
Madan Jampani86940d92015-05-06 11:47:57 -0700118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected DeviceService deviceService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected CoreService coreService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani86940d92015-05-06 11:47:57 -0700126 protected MastershipService mastershipService;
127
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Haltermanf7554092017-07-30 15:05:51 -0700129 protected ClusterCommunicationService clusterCommunicator;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700130
Jordan Haltermanf7554092017-07-30 15:05:51 -0700131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected ClusterService clusterService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected StorageService storageService;
136
Madan Jampani884d4432016-08-23 10:46:55 -0700137 protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
Madan Jampani86940d92015-05-06 11:47:57 -0700138
Madan Jampani884d4432016-08-23 10:46:55 -0700139 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700140 .register(KryoNamespaces.API)
141 .register(MastershipBasedTimestamp.class);
142
Jordan Haltermanf7554092017-07-30 15:05:51 -0700143 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
144 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
145 new InternalTableStatsListener();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700146
Jordan Haltermanf7554092017-07-30 15:05:51 -0700147 private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
148 private ExecutorService messageHandlingExecutor;
149
150 private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
Madan Jampani86940d92015-05-06 11:47:57 -0700151 private IdGenerator idGenerator;
152 private NodeId local;
153
154 @Activate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700155 public void activate() {
Madan Jampani86940d92015-05-06 11:47:57 -0700156 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
157
158 local = clusterService.getLocalNode().id();
159
160 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Haltermanf7554092017-07-30 15:05:51 -0700161 MESSAGE_HANDLER_THREAD_POOL_SIZE,
162 groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700163
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700164 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
165 .withName("onos-flow-table-stats")
Madan Jampani884d4432016-08-23 10:46:55 -0700166 .withSerializer(serializerBuilder)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700167 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
168 .withTimestampProvider((k, v) -> new WallClockTimestamp())
169 .withTombstonesDisabled()
170 .build();
171 deviceTableStats.addListener(tableStatsListener);
172
Jordan Haltermanf7554092017-07-30 15:05:51 -0700173 flows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
174 .withName(FLOW_TABLE)
175 .withSerializer(serializer)
176 .buildDocumentTree()
177 .asDocumentTree();
178
179 clusterCommunicator.addSubscriber(
180 APPLY_FLOWS,
181 serializer::decode,
182 this::applyFlows,
183 messageHandlingExecutor);
184 clusterCommunicator.addSubscriber(
185 COMPLETE_BATCH,
186 serializer::decode,
187 this::completeBatch,
188 messageHandlingExecutor);
189
190 log.info("Started");
Madan Jampani86940d92015-05-06 11:47:57 -0700191 }
192
193 @Deactivate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700194 public void deactivate() {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700195 deviceTableStats.removeListener(tableStatsListener);
196 deviceTableStats.destroy();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700197 clusterCommunicator.removeSubscriber(APPLY_FLOWS);
198 clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
Madan Jampani86940d92015-05-06 11:47:57 -0700199 messageHandlingExecutor.shutdownNow();
Madan Jampani86940d92015-05-06 11:47:57 -0700200 log.info("Stopped");
201 }
202
Jordan Haltermanf7554092017-07-30 15:05:51 -0700203 /**
204 * Retries the given supplier until successful.
205 * <p>
206 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
207 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
208 *
209 * @param supplier the supplier to retry
210 * @param <T> the return type
211 * @return the return value of the given supplier once it runs successfully
212 */
213 private <T> T retryUntilSuccess(Supplier<T> supplier) {
214 return Tools.retryable(
215 supplier,
216 StorageException.ConcurrentModification.class,
217 Integer.MAX_VALUE,
218 MAX_RETRY_DELAY_MILLIS)
219 .get();
Madan Jampani86940d92015-05-06 11:47:57 -0700220 }
221
Jordan Haltermanf7554092017-07-30 15:05:51 -0700222 /**
223 * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
224 * retried after a randomized delay.
225 *
226 * @param <T> the return type
227 * @return nothing
228 * @throws StorageException.ConcurrentModification to force a retry of the callback
229 */
230 private <T> T retry() {
231 throw RETRY;
Madan Jampani86940d92015-05-06 11:47:57 -0700232 }
233
Jordan Haltermanf7554092017-07-30 15:05:51 -0700234 /**
235 * Handles a flow rule batch event forwarded to the master node.
236 * <p>
237 * If this node is the master for the associated device, notifies event listeners to install flow rules.
238 *
239 * @param event the event to handle
240 */
241 private void applyFlows(FlowRuleBatchEvent event) {
242 if (mastershipService.isLocalMaster(event.deviceId())) {
243 notifyDelegate(event);
244 }
Madan Jampani86940d92015-05-06 11:47:57 -0700245 }
246
Jordan Haltermanf7554092017-07-30 15:05:51 -0700247 /**
248 * Handles a completed batch event received from the master node.
249 * <p>
250 * If this node is the source of the batch, notifies event listeners to complete the operations.
251 *
252 * @param event the event to handle
253 */
254 private void completeBatch(FlowRuleBatchEvent event) {
255 if (pendingBatches.remove(event.subject().batchId())) {
256 notifyDelegate(event);
257 }
Madan Jampani86940d92015-05-06 11:47:57 -0700258 }
259
260 // This is not a efficient operation on a distributed sharded
261 // flow store. We need to revisit the need for this operation or at least
262 // make it device specific.
263 @Override
264 public int getFlowRuleCount() {
Yuta HIGUCHI10e91fb2017-06-15 13:30:50 -0700265 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700266 .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
267 .sum();
268 }
269
270 /**
271 * Returns the {@link DocumentPath} for the given {@link DeviceId}.
272 *
273 * @param deviceId the device identifier for which to return a path
274 * @return the path for the given device
275 */
276 private DocumentPath getPathFor(DeviceId deviceId) {
277 return DocumentPath.from("root", deviceId.toString());
278 }
279
280 /**
281 * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
282 *
283 * @param deviceId the device identifier for which to return the path
284 * @param flowId the flow identifier for which to return the path
285 * @return the path for the given device/flow
286 */
287 private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
288 return DocumentPath.from("root", deviceId.toString(), flowId.toString());
Madan Jampani86940d92015-05-06 11:47:57 -0700289 }
290
291 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700292 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700293 public FlowEntry getFlowEntry(FlowRule rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700294 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
295 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
296 return flowEntries != null ? flowEntries.value().get(rule) : null;
Madan Jampani86940d92015-05-06 11:47:57 -0700297 }
298
299 @Override
300 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700301 DocumentPath path = getPathFor(deviceId);
302 try {
303 return getFlowEntries(path);
304 } catch (NoSuchDocumentPathException e) {
Madan Jampani86940d92015-05-06 11:47:57 -0700305 return Collections.emptyList();
306 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700307 }
Madan Jampani86940d92015-05-06 11:47:57 -0700308
Jordan Haltermanf7554092017-07-30 15:05:51 -0700309 @SuppressWarnings("unchecked")
310 private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
311 return flows.getChildren(path)
312 .values()
313 .stream()
314 .flatMap(v -> v.value().values().stream())
315 .collect(Collectors.toList());
Madan Jampani86940d92015-05-06 11:47:57 -0700316 }
317
318 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700319 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700320 public void storeFlowRule(FlowRule rule) {
321 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700322 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700323 rule.deviceId(), idGenerator.getNewId()));
324 }
325
326 @Override
327 public void storeBatch(FlowRuleBatchOperation operation) {
328 if (operation.getOperations().isEmpty()) {
329 notifyDelegate(FlowRuleBatchEvent.completed(
330 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
331 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
332 return;
333 }
334
335 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700336 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700337
338 if (master == null) {
Sivachidambaram Subramanian605104e2017-06-21 07:40:04 +0530339 log.warn("No master for {} ", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700340
341 updateStoreInternal(operation);
342
343 notifyDelegate(FlowRuleBatchEvent.completed(
344 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
345 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
346 return;
347 }
348
Jordan Haltermanf7554092017-07-30 15:05:51 -0700349 pendingBatches.add(operation.id());
Madan Jampani86940d92015-05-06 11:47:57 -0700350
Madan Jampani86940d92015-05-06 11:47:57 -0700351 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
352 if (currentOps.isEmpty()) {
353 batchOperationComplete(FlowRuleBatchEvent.completed(
354 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
Jordan Haltermanf7554092017-07-30 15:05:51 -0700355 new CompletedBatchOperation(true, Collections.emptySet(), deviceId)));
356 } else if (Objects.equals(local, master)) {
357 notifyDelegate(FlowRuleBatchEvent.requested(
358 new FlowRuleBatchRequest(operation.id(), currentOps),
359 operation.deviceId()));
360 } else {
361 clusterCommunicator.unicast(FlowRuleBatchEvent.requested(
362 new FlowRuleBatchRequest(operation.id(), currentOps),
363 operation.deviceId()),
364 APPLY_FLOWS,
365 serializer::encode,
366 master);
Madan Jampani86940d92015-05-06 11:47:57 -0700367 }
Madan Jampani86940d92015-05-06 11:47:57 -0700368 }
369
370 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
371 return operation.getOperations().stream().map(
372 op -> {
Madan Jampani86940d92015-05-06 11:47:57 -0700373 switch (op.operator()) {
374 case ADD:
Jordan Haltermanf7554092017-07-30 15:05:51 -0700375 addBatchEntry(op);
Madan Jampani86940d92015-05-06 11:47:57 -0700376 return op;
377 case REMOVE:
Jordan Haltermanf7554092017-07-30 15:05:51 -0700378 if (removeBatchEntry(op)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700379 return op;
380 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700381 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700382 case MODIFY:
383 //TODO: figure this out at some point
384 break;
385 default:
386 log.warn("Unknown flow operation operator: {}", op.operator());
387 }
388 return null;
389 }
Sho SHIMIZU828bc162016-01-13 23:10:43 -0800390 ).filter(Objects::nonNull).collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700391 }
392
Jordan Haltermanf7554092017-07-30 15:05:51 -0700393 @SuppressWarnings("unchecked")
394 private void addBatchEntry(FlowRuleBatchEntry batchEntry) {
395 StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
396 DocumentPath path = getPathFor(entry.deviceId(), entry.id());
397 retryUntilSuccess(() -> {
398 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
399 if (value != null) {
400 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
401 entries.put(entry, entry);
402 if (flows.replace(path, entries, value.version())) {
403 log.trace("Stored new flow rule: {}", entry);
404 return null;
405 } else {
406 log.trace("Failed to store new flow rule: {}", entry);
407 return retry();
408 }
409 } else {
410 // If there are no entries stored for the device, initialize the device's flows.
411 flows.createRecursive(path, Maps.newHashMap());
412 return retry();
413 }
414 });
415 }
416
417 @SuppressWarnings("unchecked")
418 private boolean removeBatchEntry(FlowRuleBatchEntry batchEntry) {
419 FlowRule rule = batchEntry.target();
420 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
421 return retryUntilSuccess(() -> {
422 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
423 if (value != null) {
424 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
425 StoredFlowEntry entry = entries.get(rule);
426 if (entry != null) {
427 entry.setState(FlowEntryState.PENDING_REMOVE);
428 if (flows.replace(path, entries, value.version())) {
429 log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
430 return true;
431 } else {
432 log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
433 return retry();
434 }
435 } else {
436 return false;
437 }
438 } else {
439 return false;
440 }
441 });
442 }
443
444 @Override
445 public void batchOperationComplete(FlowRuleBatchEvent event) {
446 if (pendingBatches.remove(event.subject().batchId())) {
447 notifyDelegate(event);
448 } else {
449 clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
450 }
451 }
452
Madan Jampani86940d92015-05-06 11:47:57 -0700453 @Override
454 public void deleteFlowRule(FlowRule rule) {
455 storeBatch(
456 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700457 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700458 new FlowRuleBatchEntry(
459 FlowRuleOperation.REMOVE,
460 rule)), rule.deviceId(), idGenerator.getNewId()));
461 }
462
463 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800464 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700465 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
466 return retryUntilSuccess(() -> {
467 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
468 if (value != null) {
469 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
470 StoredFlowEntry entry = entries.get(rule);
471 if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
472 entry.setState(FlowEntryState.PENDING_ADD);
473 if (flows.replace(path, entries, value.version())) {
474 log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
475 return new FlowRuleEvent(RULE_UPDATED, rule);
476 } else {
477 log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
478 return retry();
479 }
480 } else {
481 return null;
482 }
483 } else {
484 return null;
Charles Chan93fa7272016-01-26 22:27:02 -0800485 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700486 });
Charles Chan93fa7272016-01-26 22:27:02 -0800487 }
488
489 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700490 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700491 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700492 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
493 return retryUntilSuccess(() -> {
494 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
495 if (value != null) {
496 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
497 StoredFlowEntry entry = entries.get(rule);
498 if (entry != null) {
499 FlowRuleEvent event;
500 String message;
Madan Jampani86940d92015-05-06 11:47:57 -0700501
Jordan Haltermanf7554092017-07-30 15:05:51 -0700502 entry.setBytes(rule.bytes());
503 entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
504 entry.setLiveType(rule.liveType());
505 entry.setPackets(rule.packets());
506 entry.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700507
Jordan Haltermanf7554092017-07-30 15:05:51 -0700508 // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
509 if (entry.state() == FlowEntryState.PENDING_ADD) {
510 entry.setState(FlowEntryState.ADDED);
511 event = new FlowRuleEvent(Type.RULE_ADDED, rule);
512 message = "Updated flow rule state to ADDED: {}";
513 } else {
514 event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
515 message = "Updated flow rule: {}";
516 }
517
518 if (flows.replace(path, entries, value.version())) {
519 log.trace(message, entry);
520 return event;
521 } else {
522 log.trace("Failed to update flow rule: {}", entry);
523 return retry();
524 }
525 } else {
526 // If the rule does not exist, return null. Inserting the rule risks race conditions
527 // that can result in removed rules being retained.
528 return null;
529 }
530 } else {
531 // If there are no entries stored for the device, initialize the device's flows.
532 flows.createRecursive(path, Maps.newHashMap());
533 return retry();
Madan Jampani86940d92015-05-06 11:47:57 -0700534 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700535 });
Madan Jampani86940d92015-05-06 11:47:57 -0700536 }
537
538 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700539 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700540 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700541 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
542 return retryUntilSuccess(() -> {
543 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
544 if (value != null) {
545 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
546 StoredFlowEntry entry = entries.remove(rule);
547 if (entry != null) {
548 if (flows.replace(path, entries, value.version())) {
549 log.trace("Removed flow rule: {}", entry);
550 return new FlowRuleEvent(RULE_REMOVED, entry);
551 } else {
552 log.trace("Failed to remove flow rule: {}", entry);
553 return retry();
554 }
555 } else {
556 return null;
557 }
558 } else {
559 return null;
560 }
561 });
Madan Jampani86940d92015-05-06 11:47:57 -0700562 }
563
564 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800565 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700566 DocumentPath path = getPathFor(deviceId);
567 try {
568 for (String flowId : flows.getChildren(path).keySet()) {
569 flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
570 }
571 } catch (NoSuchDocumentPathException e) {
572 // Do nothing
573 }
574 try {
575 flows.removeNode(path);
576 } catch (NoSuchDocumentPathException e) {
577 // Do nothing
578 }
Charles Chan0c7c43b2016-01-14 17:39:20 -0800579 }
580
581 @Override
Victor Silva139bca42016-08-18 11:46:35 -0300582 public void purgeFlowRules() {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700583 try {
584 for (String deviceId : flows.getChildren(flows.root()).keySet()) {
585 purgeFlowRule(DeviceId.deviceId(deviceId));
Madan Jampani86940d92015-05-06 11:47:57 -0700586 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700587 } catch (NoSuchDocumentPathException e) {
588 // Do nothing
Madan Jampani86940d92015-05-06 11:47:57 -0700589 }
590 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700591
592 @Override
593 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700594 List<TableStatisticsEntry> tableStats) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700595 deviceTableStats.put(deviceId, tableStats);
596 return null;
597 }
598
599 @Override
600 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700601 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
602 if (tableStats == null) {
603 return Collections.emptyList();
604 }
605 return ImmutableList.copyOf(tableStats);
606 }
607
Patryk Konopka7e40c012017-06-06 13:38:06 +0200608 @Override
609 public long getActiveFlowRuleCount(DeviceId deviceId) {
610 return Streams.stream(getTableStatistics(deviceId))
611 .mapToLong(TableStatisticsEntry::activeFlowEntries)
612 .sum();
613 }
614
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700615 private class InternalTableStatsListener
Jordan Haltermanf7554092017-07-30 15:05:51 -0700616 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700617 @Override
618 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700619 List<TableStatisticsEntry>> event) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700620 //TODO: Generate an event to listeners (do we need?)
621 }
622 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700623}