blob: 68ba06830234f1d13ea2d139366c74d177a9945d [file] [log] [blame]
Jordan Haltermanf7554092017-07-30 15:05:51 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070018import java.util.Collections;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070019import java.util.List;
20import java.util.Map;
21import java.util.Objects;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070022import java.util.Random;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070023import java.util.Set;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070024import java.util.concurrent.CompletableFuture;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070025import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070027import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070028import java.util.concurrent.TimeUnit;
Jordan Haltermanf7554092017-07-30 15:05:51 -070029import java.util.function.Supplier;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070030import java.util.stream.Collectors;
Ray Milkey2b6ff422016-08-26 13:03:15 -070031
Jordan Haltermanf7554092017-07-30 15:05:51 -070032import com.google.common.collect.ImmutableList;
33import com.google.common.collect.Iterables;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070036import com.google.common.collect.Streams;
37import org.apache.felix.scr.annotations.Activate;
38import org.apache.felix.scr.annotations.Component;
39import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070040import org.apache.felix.scr.annotations.Reference;
41import org.apache.felix.scr.annotations.ReferenceCardinality;
42import org.apache.felix.scr.annotations.Service;
43import org.onlab.util.KryoNamespace;
44import org.onlab.util.Tools;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070045import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.core.CoreService;
48import org.onosproject.core.IdGenerator;
49import org.onosproject.mastership.MastershipService;
50import org.onosproject.net.DeviceId;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.net.flow.CompletedBatchOperation;
53import org.onosproject.net.flow.DefaultFlowEntry;
54import org.onosproject.net.flow.FlowEntry;
55import org.onosproject.net.flow.FlowEntry.FlowEntryState;
56import org.onosproject.net.flow.FlowId;
57import org.onosproject.net.flow.FlowRule;
Ray Milkey7bf273c2017-09-27 16:15:15 -070058import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
59import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
60import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
61import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
62import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070063import org.onosproject.net.flow.FlowRuleEvent;
64import org.onosproject.net.flow.FlowRuleEvent.Type;
65import org.onosproject.net.flow.FlowRuleService;
66import org.onosproject.net.flow.FlowRuleStore;
67import org.onosproject.net.flow.FlowRuleStoreDelegate;
68import org.onosproject.net.flow.StoredFlowEntry;
69import org.onosproject.net.flow.TableStatisticsEntry;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070070import org.onosproject.store.AbstractStore;
71import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070072import org.onosproject.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070073import org.onosproject.store.impl.MastershipBasedTimestamp;
74import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070075import org.onosproject.store.service.AsyncDocumentTree;
Jordan Haltermanf7554092017-07-30 15:05:51 -070076import org.onosproject.store.service.DocumentPath;
77import org.onosproject.store.service.DocumentTree;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070078import org.onosproject.store.service.EventuallyConsistentMap;
79import org.onosproject.store.service.EventuallyConsistentMapEvent;
80import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Haltermanf5295f62017-09-12 15:07:18 -070081import org.onosproject.store.service.IllegalDocumentModificationException;
Jordan Haltermanf7554092017-07-30 15:05:51 -070082import org.onosproject.store.service.NoSuchDocumentPathException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070083import org.onosproject.store.service.Serializer;
Jordan Haltermanf7554092017-07-30 15:05:51 -070084import org.onosproject.store.service.StorageException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070085import org.onosproject.store.service.StorageService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070086import org.onosproject.store.service.Versioned;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070087import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070088import org.slf4j.Logger;
Ray Milkey2b6ff422016-08-26 13:03:15 -070089
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070090import static org.onlab.util.Tools.groupedThreads;
91import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jordan Haltermanf7554092017-07-30 15:05:51 -070092import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070093import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -070094
95/**
96 * Manages inventory of flow rules using a distributed state management protocol.
97 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070098@Component(immediate = true)
Madan Jampani86940d92015-05-06 11:47:57 -070099@Service
Madan Jampani37d04c62016-04-25 15:53:55 -0700100public class DistributedFlowRuleStore
Madan Jampani86940d92015-05-06 11:47:57 -0700101 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
102 implements FlowRuleStore {
103
104 private final Logger log = getLogger(getClass());
105
Jordan Haltermanf7554092017-07-30 15:05:51 -0700106 // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
107 // We don't want to populate a stack trace every time an optimistic lock is retried.
108 private static final StorageException.ConcurrentModification RETRY;
109
110 // Initialize retry exception with an empty stack trace.
111 static {
112 RETRY = new StorageException.ConcurrentModification();
113 RETRY.setStackTrace(new StackTraceElement[0]);
114 }
115
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700116 private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
Madan Jampani86940d92015-05-06 11:47:57 -0700117 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700118 private static final int MAX_RETRY_DELAY_MILLIS = 50;
Madan Jampani86940d92015-05-06 11:47:57 -0700119
Jordan Haltermanf7554092017-07-30 15:05:51 -0700120 private static final String FLOW_TABLE = "onos-flow-table";
Madan Jampani86940d92015-05-06 11:47:57 -0700121
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700122 private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
Jordan Haltermanf7554092017-07-30 15:05:51 -0700123 private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
Madan Jampani86940d92015-05-06 11:47:57 -0700124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected DeviceService deviceService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected CoreService coreService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani86940d92015-05-06 11:47:57 -0700132 protected MastershipService mastershipService;
133
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Haltermanf7554092017-07-30 15:05:51 -0700135 protected ClusterCommunicationService clusterCommunicator;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700136
Jordan Haltermanf7554092017-07-30 15:05:51 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected ClusterService clusterService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected StorageService storageService;
142
Madan Jampani884d4432016-08-23 10:46:55 -0700143 protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
Madan Jampani86940d92015-05-06 11:47:57 -0700144
Madan Jampani884d4432016-08-23 10:46:55 -0700145 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700146 .register(KryoNamespaces.API)
147 .register(MastershipBasedTimestamp.class);
148
Jordan Haltermanf7554092017-07-30 15:05:51 -0700149 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
150 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
151 new InternalTableStatsListener();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700152
Jordan Haltermanf7554092017-07-30 15:05:51 -0700153 private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700154 private ScheduledExecutorService scheduledExecutor;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700155 private ExecutorService messageHandlingExecutor;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700156 private final Random random = new Random();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700157
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700158 private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700159 private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
Madan Jampani86940d92015-05-06 11:47:57 -0700160 private IdGenerator idGenerator;
161 private NodeId local;
162
163 @Activate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700164 public void activate() {
Madan Jampani86940d92015-05-06 11:47:57 -0700165 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
166
167 local = clusterService.getLocalNode().id();
168
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700169 scheduledExecutor = Executors.newScheduledThreadPool(
170 SCHEDULED_THREAD_POOL_SIZE,
171 groupedThreads("onos/store/flow", "schedulers", log));
172
Madan Jampani86940d92015-05-06 11:47:57 -0700173 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Haltermanf7554092017-07-30 15:05:51 -0700174 MESSAGE_HANDLER_THREAD_POOL_SIZE,
175 groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700176
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700177 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
178 .withName("onos-flow-table-stats")
Madan Jampani884d4432016-08-23 10:46:55 -0700179 .withSerializer(serializerBuilder)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700180 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
181 .withTimestampProvider((k, v) -> new WallClockTimestamp())
182 .withTombstonesDisabled()
183 .build();
184 deviceTableStats.addListener(tableStatsListener);
185
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700186 asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700187 .withName(FLOW_TABLE)
188 .withSerializer(serializer)
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700189 .buildDocumentTree();
190 flows = asyncFlows.asDocumentTree();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700191
192 clusterCommunicator.addSubscriber(
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700193 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700194 serializer::decode,
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700195 this::applyBatchFlows,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700196 messageHandlingExecutor);
197 clusterCommunicator.addSubscriber(
198 COMPLETE_BATCH,
199 serializer::decode,
200 this::completeBatch,
201 messageHandlingExecutor);
202
203 log.info("Started");
Madan Jampani86940d92015-05-06 11:47:57 -0700204 }
205
206 @Deactivate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700207 public void deactivate() {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700208 deviceTableStats.removeListener(tableStatsListener);
209 deviceTableStats.destroy();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700210 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700211 clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
Madan Jampani86940d92015-05-06 11:47:57 -0700212 messageHandlingExecutor.shutdownNow();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700213 scheduledExecutor.shutdownNow();
Madan Jampani86940d92015-05-06 11:47:57 -0700214 log.info("Stopped");
215 }
216
Jordan Haltermanf7554092017-07-30 15:05:51 -0700217 /**
218 * Retries the given supplier until successful.
219 * <p>
220 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
221 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
222 *
223 * @param supplier the supplier to retry
224 * @param <T> the return type
225 * @return the return value of the given supplier once it runs successfully
226 */
227 private <T> T retryUntilSuccess(Supplier<T> supplier) {
228 return Tools.retryable(
229 supplier,
230 StorageException.ConcurrentModification.class,
231 Integer.MAX_VALUE,
232 MAX_RETRY_DELAY_MILLIS)
233 .get();
Madan Jampani86940d92015-05-06 11:47:57 -0700234 }
235
Jordan Haltermanf7554092017-07-30 15:05:51 -0700236 /**
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700237 * Retries the given asynchronous supplier until successful.
238 * <p>
239 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
240 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
241 *
242 * @param supplier the supplier to retry
243 * @param <T> the return type
244 * @return the return value of the given supplier once it runs successfully
245 */
246 private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
247 return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
248 }
249
250 /**
251 * Retries the given asynchronous supplier until successful.
252 * <p>
253 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
254 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
255 *
256 * @param supplier the supplier to retry
257 * @param future future to be completed once the operation has been successful
258 * @param <T> the return type
259 * @return the return value of the given supplier once it runs successfully
260 */
261 private <T> CompletableFuture<T> retryAsyncUntilSuccess(
262 Supplier<CompletableFuture<T>> supplier,
263 CompletableFuture<T> future) {
264 supplier.get().whenComplete((result, error) -> {
265 if (error == null) {
266 future.complete(result);
267 } else {
268 Throwable cause = error.getCause() != null ? error.getCause() : error;
269 if (cause instanceof StorageException.ConcurrentModification) {
270 scheduledExecutor.schedule(
271 () -> retryAsyncUntilSuccess(supplier, future),
272 random.nextInt(50),
273 TimeUnit.MILLISECONDS);
274 } else {
275 future.completeExceptionally(error);
276 }
277 }
278 });
279 return future;
280 }
281
282 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700283 * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
284 * retried after a randomized delay.
285 *
286 * @param <T> the return type
287 * @return nothing
288 * @throws StorageException.ConcurrentModification to force a retry of the callback
289 */
290 private <T> T retry() {
291 throw RETRY;
Madan Jampani86940d92015-05-06 11:47:57 -0700292 }
293
Jordan Haltermanf7554092017-07-30 15:05:51 -0700294 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700295 * Handles a completed batch event received from the master node.
296 * <p>
297 * If this node is the source of the batch, notifies event listeners to complete the operations.
298 *
299 * @param event the event to handle
300 */
301 private void completeBatch(FlowRuleBatchEvent event) {
302 if (pendingBatches.remove(event.subject().batchId())) {
303 notifyDelegate(event);
304 }
Madan Jampani86940d92015-05-06 11:47:57 -0700305 }
306
307 // This is not a efficient operation on a distributed sharded
308 // flow store. We need to revisit the need for this operation or at least
309 // make it device specific.
310 @Override
311 public int getFlowRuleCount() {
Yuta HIGUCHI10e91fb2017-06-15 13:30:50 -0700312 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700313 .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
314 .sum();
315 }
316
317 /**
318 * Returns the {@link DocumentPath} for the given {@link DeviceId}.
319 *
320 * @param deviceId the device identifier for which to return a path
321 * @return the path for the given device
322 */
323 private DocumentPath getPathFor(DeviceId deviceId) {
324 return DocumentPath.from("root", deviceId.toString());
325 }
326
327 /**
328 * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
329 *
330 * @param deviceId the device identifier for which to return the path
331 * @param flowId the flow identifier for which to return the path
332 * @return the path for the given device/flow
333 */
334 private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
335 return DocumentPath.from("root", deviceId.toString(), flowId.toString());
Madan Jampani86940d92015-05-06 11:47:57 -0700336 }
337
338 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700339 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700340 public FlowEntry getFlowEntry(FlowRule rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700341 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
342 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
343 return flowEntries != null ? flowEntries.value().get(rule) : null;
Madan Jampani86940d92015-05-06 11:47:57 -0700344 }
345
346 @Override
347 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700348 DocumentPath path = getPathFor(deviceId);
349 try {
350 return getFlowEntries(path);
351 } catch (NoSuchDocumentPathException e) {
Madan Jampani86940d92015-05-06 11:47:57 -0700352 return Collections.emptyList();
353 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700354 }
Madan Jampani86940d92015-05-06 11:47:57 -0700355
Jordan Haltermanf7554092017-07-30 15:05:51 -0700356 @SuppressWarnings("unchecked")
357 private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
358 return flows.getChildren(path)
359 .values()
360 .stream()
361 .flatMap(v -> v.value().values().stream())
362 .collect(Collectors.toList());
Madan Jampani86940d92015-05-06 11:47:57 -0700363 }
364
365 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700366 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700367 public void storeFlowRule(FlowRule rule) {
368 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700369 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700370 rule.deviceId(), idGenerator.getNewId()));
371 }
372
373 @Override
374 public void storeBatch(FlowRuleBatchOperation operation) {
375 if (operation.getOperations().isEmpty()) {
376 notifyDelegate(FlowRuleBatchEvent.completed(
377 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
378 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
379 return;
380 }
381
382 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700383 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700384
385 if (master == null) {
Sivachidambaram Subramanian605104e2017-06-21 07:40:04 +0530386 log.warn("No master for {} ", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700387
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700388 updateStoreInternal(operation).whenComplete((result, error) -> {
389 notifyDelegate(FlowRuleBatchEvent.completed(
390 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
391 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
392 });
Madan Jampani86940d92015-05-06 11:47:57 -0700393 return;
394 }
395
Jordan Haltermanf7554092017-07-30 15:05:51 -0700396 pendingBatches.add(operation.id());
Madan Jampani86940d92015-05-06 11:47:57 -0700397
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700398 // If the local node is the master, apply the flows. Otherwise, send them to the master.
399 if (Objects.equals(local, master)) {
400 applyBatchFlows(operation);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700401 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700402 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
403 clusterCommunicator.unicast(
404 operation,
405 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700406 serializer::encode,
407 master);
Madan Jampani86940d92015-05-06 11:47:57 -0700408 }
Madan Jampani86940d92015-05-06 11:47:57 -0700409 }
410
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700411 /**
412 * Asynchronously applies a batch of flows to the store.
413 * <p>
414 * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
415 * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
416 * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
417 * underlying {@code DocumentTree} primitive.
418 */
419 private void applyBatchFlows(FlowRuleBatchOperation operation) {
420 updateStoreInternal(operation).whenComplete((operations, error) -> {
421 if (error == null) {
422 if (operations.isEmpty()) {
423 batchOperationComplete(FlowRuleBatchEvent.completed(
424 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
425 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700426 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700427 notifyDelegate(FlowRuleBatchEvent.requested(
428 new FlowRuleBatchRequest(operation.id(), operations),
429 operation.deviceId()));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700430 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700431 }
432 });
433 }
434
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700435 private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
436 return Tools.allOf(operation.getOperations().stream().map(op -> {
437 switch (op.operator()) {
438 case ADD:
Yi Tsengc29d8822017-10-25 16:19:25 -0700439 case MODIFY:
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700440 return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
441 case REMOVE:
442 return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700443 default:
444 log.warn("Unknown flow operation operator: {}", op.operator());
445 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
446 }
447 }).collect(Collectors.toList()))
448 .thenApply(results -> results.stream()
449 .filter(Objects::nonNull)
450 .collect(Collectors.toSet()));
451 }
452
Jordan Haltermanf7554092017-07-30 15:05:51 -0700453 @SuppressWarnings("unchecked")
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700454 private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
455 StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
456 DocumentPath path = getPathFor(entry.deviceId(), entry.id());
457 return retryAsyncUntilSuccess(() -> {
458 CompletableFuture<Boolean> future = new CompletableFuture<>();
459 asyncFlows.get(path).whenComplete((value, getError) -> {
460 if (getError == null) {
461 if (value != null) {
462 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
463 entries.put(entry, entry);
464 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
465 if (replaceError == null) {
466 if (succeeded) {
467 log.trace("Stored new flow rule: {}", entry);
468 future.complete(true);
469 } else {
470 log.trace("Failed to store new flow rule: {}", entry);
471 future.completeExceptionally(RETRY);
472 }
473 } else {
474 future.completeExceptionally(replaceError);
475 }
476 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700477 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700478 // If there are no entries stored for the device, initialize the device's flows.
479 Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
480 map.put(entry, entry);
481 asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
482 if (createError == null) {
483 if (succeeded) {
484 log.trace("Stored new flow rule: {}", entry);
485 future.complete(true);
486 } else {
487 log.trace("Failed to store new flow rule: {}", entry);
488 future.completeExceptionally(RETRY);
489 }
490 } else {
491 future.completeExceptionally(createError);
492 }
493 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700494 }
495 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700496 future.completeExceptionally(getError);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700497 }
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700498 });
499 return future;
500 });
501 }
502
503 @SuppressWarnings("unchecked")
504 private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
505 FlowRule rule = batchEntry.target();
506 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
507 return retryAsyncUntilSuccess(() -> {
508 CompletableFuture<Boolean> future = new CompletableFuture<>();
509 asyncFlows.get(path).whenComplete((value, getError) -> {
510 if (getError == null) {
511 if (value != null) {
512 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
513 StoredFlowEntry entry = entries.get(rule);
514 if (entry != null) {
515 entry.setState(FlowEntryState.PENDING_REMOVE);
516 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
517 if (error == null) {
518 if (succeeded) {
519 log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
520 future.complete(true);
521 } else {
522 log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
523 future.completeExceptionally(RETRY);
524 }
525 } else {
526 future.completeExceptionally(error);
527 }
528 });
529 } else {
530 future.complete(false);
531 }
532 } else {
533 future.complete(false);
534 }
535 } else {
536 future.completeExceptionally(getError);
537 }
538 });
539 return future;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700540 });
541 }
542
543 @Override
544 public void batchOperationComplete(FlowRuleBatchEvent event) {
545 if (pendingBatches.remove(event.subject().batchId())) {
546 notifyDelegate(event);
547 } else {
548 clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
549 }
550 }
551
Madan Jampani86940d92015-05-06 11:47:57 -0700552 @Override
553 public void deleteFlowRule(FlowRule rule) {
554 storeBatch(
555 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700556 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700557 new FlowRuleBatchEntry(
558 FlowRuleOperation.REMOVE,
559 rule)), rule.deviceId(), idGenerator.getNewId()));
560 }
561
562 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800563 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700564 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
565 return retryUntilSuccess(() -> {
566 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
567 if (value != null) {
568 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
569 StoredFlowEntry entry = entries.get(rule);
570 if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
571 entry.setState(FlowEntryState.PENDING_ADD);
572 if (flows.replace(path, entries, value.version())) {
573 log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
574 return new FlowRuleEvent(RULE_UPDATED, rule);
575 } else {
576 log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
577 return retry();
578 }
579 } else {
580 return null;
581 }
582 } else {
583 return null;
Charles Chan93fa7272016-01-26 22:27:02 -0800584 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700585 });
Charles Chan93fa7272016-01-26 22:27:02 -0800586 }
587
588 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700589 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700590 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700591 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
592 return retryUntilSuccess(() -> {
593 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
594 if (value != null) {
595 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
596 StoredFlowEntry entry = entries.get(rule);
597 if (entry != null) {
598 FlowRuleEvent event;
599 String message;
Madan Jampani86940d92015-05-06 11:47:57 -0700600
Jordan Haltermanf7554092017-07-30 15:05:51 -0700601 entry.setBytes(rule.bytes());
602 entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
603 entry.setLiveType(rule.liveType());
604 entry.setPackets(rule.packets());
605 entry.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700606
Jordan Haltermanf7554092017-07-30 15:05:51 -0700607 // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
608 if (entry.state() == FlowEntryState.PENDING_ADD) {
609 entry.setState(FlowEntryState.ADDED);
610 event = new FlowRuleEvent(Type.RULE_ADDED, rule);
611 message = "Updated flow rule state to ADDED: {}";
612 } else {
613 event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
614 message = "Updated flow rule: {}";
615 }
616
617 if (flows.replace(path, entries, value.version())) {
618 log.trace(message, entry);
619 return event;
620 } else {
621 log.trace("Failed to update flow rule: {}", entry);
622 return retry();
623 }
624 } else {
625 // If the rule does not exist, return null. Inserting the rule risks race conditions
626 // that can result in removed rules being retained.
627 return null;
628 }
629 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700630 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700631 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700632 });
Madan Jampani86940d92015-05-06 11:47:57 -0700633 }
634
635 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700636 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700637 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700638 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
639 return retryUntilSuccess(() -> {
640 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
641 if (value != null) {
642 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
643 StoredFlowEntry entry = entries.remove(rule);
644 if (entry != null) {
645 if (flows.replace(path, entries, value.version())) {
646 log.trace("Removed flow rule: {}", entry);
647 return new FlowRuleEvent(RULE_REMOVED, entry);
648 } else {
649 log.trace("Failed to remove flow rule: {}", entry);
650 return retry();
651 }
652 } else {
653 return null;
654 }
655 } else {
656 return null;
657 }
658 });
Madan Jampani86940d92015-05-06 11:47:57 -0700659 }
660
661 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800662 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700663 DocumentPath path = getPathFor(deviceId);
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700664 retryUntilSuccess(() -> {
665 try {
666 for (String flowId : flows.getChildren(path).keySet()) {
667 flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
668 }
669 } catch (NoSuchDocumentPathException e) {
670 // Do nothing. There are no flows for the device.
Jordan Haltermanf7554092017-07-30 15:05:51 -0700671 }
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700672
673 // New children may have been created since they were removed above. Catch
674 // IllegalDocumentModificationException and retry if necessary.
675 try {
676 flows.removeNode(path);
677 } catch (NoSuchDocumentPathException e) {
678 return null;
679 } catch (IllegalDocumentModificationException e) {
680 return retry();
681 }
682 return null;
683 });
Charles Chan0c7c43b2016-01-14 17:39:20 -0800684 }
685
686 @Override
Victor Silva139bca42016-08-18 11:46:35 -0300687 public void purgeFlowRules() {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700688 try {
689 for (String deviceId : flows.getChildren(flows.root()).keySet()) {
690 purgeFlowRule(DeviceId.deviceId(deviceId));
Madan Jampani86940d92015-05-06 11:47:57 -0700691 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700692 } catch (NoSuchDocumentPathException e) {
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700693 // Do nothing if no children exist.
Madan Jampani86940d92015-05-06 11:47:57 -0700694 }
695 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700696
697 @Override
698 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700699 List<TableStatisticsEntry> tableStats) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700700 deviceTableStats.put(deviceId, tableStats);
701 return null;
702 }
703
704 @Override
705 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700706 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
707 if (tableStats == null) {
708 return Collections.emptyList();
709 }
710 return ImmutableList.copyOf(tableStats);
711 }
712
Patryk Konopka7e40c012017-06-06 13:38:06 +0200713 @Override
714 public long getActiveFlowRuleCount(DeviceId deviceId) {
715 return Streams.stream(getTableStatistics(deviceId))
716 .mapToLong(TableStatisticsEntry::activeFlowEntries)
717 .sum();
718 }
719
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700720 private class InternalTableStatsListener
Jordan Haltermanf7554092017-07-30 15:05:51 -0700721 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700722 @Override
723 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700724 List<TableStatisticsEntry>> event) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700725 //TODO: Generate an event to listeners (do we need?)
726 }
727 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700728}