blob: e3f419bdf5af32c22689557bd38956ebcb28490d [file] [log] [blame]
Jordan Haltermane0355ff2017-07-30 15:05:51 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070018import java.util.Collections;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070019import java.util.List;
20import java.util.Map;
21import java.util.Objects;
Jordan Haltermanb8925dc2017-08-25 15:12:54 -070022import java.util.Random;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070023import java.util.Set;
Jordan Haltermanb8925dc2017-08-25 15:12:54 -070024import java.util.concurrent.CompletableFuture;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070025import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
Jordan Haltermanb8925dc2017-08-25 15:12:54 -070027import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070028import java.util.concurrent.TimeUnit;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070029import java.util.function.Supplier;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070030import java.util.stream.Collectors;
Ray Milkey2b6ff422016-08-26 13:03:15 -070031
Jordan Haltermane0355ff2017-07-30 15:05:51 -070032import com.google.common.collect.ImmutableList;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070033import com.google.common.collect.Maps;
34import com.google.common.collect.Sets;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070035import com.google.common.collect.Streams;
36import org.apache.felix.scr.annotations.Activate;
37import org.apache.felix.scr.annotations.Component;
38import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070039import org.apache.felix.scr.annotations.Reference;
40import org.apache.felix.scr.annotations.ReferenceCardinality;
41import org.apache.felix.scr.annotations.Service;
42import org.onlab.util.KryoNamespace;
43import org.onlab.util.Tools;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070044import org.onosproject.cluster.ClusterService;
45import org.onosproject.cluster.NodeId;
46import org.onosproject.core.CoreService;
47import org.onosproject.core.IdGenerator;
48import org.onosproject.mastership.MastershipService;
49import org.onosproject.net.DeviceId;
50import org.onosproject.net.device.DeviceService;
51import org.onosproject.net.flow.CompletedBatchOperation;
52import org.onosproject.net.flow.DefaultFlowEntry;
53import org.onosproject.net.flow.FlowEntry;
54import org.onosproject.net.flow.FlowEntry.FlowEntryState;
55import org.onosproject.net.flow.FlowId;
56import org.onosproject.net.flow.FlowRule;
57import org.onosproject.net.flow.FlowRuleBatchEntry;
58import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
59import org.onosproject.net.flow.FlowRuleBatchEvent;
60import org.onosproject.net.flow.FlowRuleBatchOperation;
61import org.onosproject.net.flow.FlowRuleBatchRequest;
62import org.onosproject.net.flow.FlowRuleEvent;
63import org.onosproject.net.flow.FlowRuleEvent.Type;
64import org.onosproject.net.flow.FlowRuleService;
65import org.onosproject.net.flow.FlowRuleStore;
66import org.onosproject.net.flow.FlowRuleStoreDelegate;
67import org.onosproject.net.flow.StoredFlowEntry;
68import org.onosproject.net.flow.TableStatisticsEntry;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070069import org.onosproject.store.AbstractStore;
70import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070071import org.onosproject.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070072import org.onosproject.store.impl.MastershipBasedTimestamp;
73import org.onosproject.store.serializers.KryoNamespaces;
Jordan Haltermanb8925dc2017-08-25 15:12:54 -070074import org.onosproject.store.service.AsyncDocumentTree;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070075import org.onosproject.store.service.DocumentPath;
76import org.onosproject.store.service.DocumentTree;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070077import org.onosproject.store.service.EventuallyConsistentMap;
78import org.onosproject.store.service.EventuallyConsistentMapEvent;
79import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Halterman6fd32912017-09-12 15:07:18 -070080import org.onosproject.store.service.IllegalDocumentModificationException;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070081import org.onosproject.store.service.NoSuchDocumentPathException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070082import org.onosproject.store.service.Serializer;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070083import org.onosproject.store.service.StorageException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070084import org.onosproject.store.service.StorageService;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070085import org.onosproject.store.service.Versioned;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070086import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070087import org.slf4j.Logger;
Ray Milkey2b6ff422016-08-26 13:03:15 -070088
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070089import static org.onlab.util.Tools.groupedThreads;
90import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jordan Haltermane0355ff2017-07-30 15:05:51 -070091import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070092import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -070093
94/**
95 * Manages inventory of flow rules using a distributed state management protocol.
96 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070097@Component(immediate = true)
Madan Jampani86940d92015-05-06 11:47:57 -070098@Service
Madan Jampani37d04c62016-04-25 15:53:55 -070099public class DistributedFlowRuleStore
Madan Jampani86940d92015-05-06 11:47:57 -0700100 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
101 implements FlowRuleStore {
102
103 private final Logger log = getLogger(getClass());
104
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700105 // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
106 // We don't want to populate a stack trace every time an optimistic lock is retried.
107 private static final StorageException.ConcurrentModification RETRY;
108
109 // Initialize retry exception with an empty stack trace.
110 static {
111 RETRY = new StorageException.ConcurrentModification();
112 RETRY.setStackTrace(new StackTraceElement[0]);
113 }
114
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700115 private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
Madan Jampani86940d92015-05-06 11:47:57 -0700116 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700117 private static final int MAX_RETRY_DELAY_MILLIS = 50;
Madan Jampani86940d92015-05-06 11:47:57 -0700118
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700119 private static final String FLOW_TABLE = "onos-flow-table";
Madan Jampani86940d92015-05-06 11:47:57 -0700120
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700121 private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700122 private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
Madan Jampani86940d92015-05-06 11:47:57 -0700123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected DeviceService deviceService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected CoreService coreService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani86940d92015-05-06 11:47:57 -0700131 protected MastershipService mastershipService;
132
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700134 protected ClusterCommunicationService clusterCommunicator;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700135
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected ClusterService clusterService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected StorageService storageService;
141
Madan Jampani884d4432016-08-23 10:46:55 -0700142 protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
Madan Jampani86940d92015-05-06 11:47:57 -0700143
Madan Jampani884d4432016-08-23 10:46:55 -0700144 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700145 .register(KryoNamespaces.API)
146 .register(MastershipBasedTimestamp.class);
147
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700148 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
149 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
150 new InternalTableStatsListener();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700151
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700152 private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700153 private ScheduledExecutorService scheduledExecutor;
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700154 private ExecutorService messageHandlingExecutor;
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700155 private final Random random = new Random();
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700156
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700157 private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700158 private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
Madan Jampani86940d92015-05-06 11:47:57 -0700159 private IdGenerator idGenerator;
160 private NodeId local;
161
162 @Activate
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700163 public void activate() {
Madan Jampani86940d92015-05-06 11:47:57 -0700164 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
165
166 local = clusterService.getLocalNode().id();
167
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700168 scheduledExecutor = Executors.newScheduledThreadPool(
169 SCHEDULED_THREAD_POOL_SIZE,
170 groupedThreads("onos/store/flow", "schedulers", log));
171
Madan Jampani86940d92015-05-06 11:47:57 -0700172 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700173 MESSAGE_HANDLER_THREAD_POOL_SIZE,
174 groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700175
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700176 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
177 .withName("onos-flow-table-stats")
Madan Jampani884d4432016-08-23 10:46:55 -0700178 .withSerializer(serializerBuilder)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700179 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
180 .withTimestampProvider((k, v) -> new WallClockTimestamp())
181 .withTombstonesDisabled()
182 .build();
183 deviceTableStats.addListener(tableStatsListener);
184
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700185 asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700186 .withName(FLOW_TABLE)
187 .withSerializer(serializer)
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700188 .buildDocumentTree();
189 flows = asyncFlows.asDocumentTree();
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700190
191 clusterCommunicator.addSubscriber(
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700192 APPLY_BATCH_FLOWS,
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700193 serializer::decode,
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700194 this::applyBatchFlows,
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700195 messageHandlingExecutor);
196 clusterCommunicator.addSubscriber(
197 COMPLETE_BATCH,
198 serializer::decode,
199 this::completeBatch,
200 messageHandlingExecutor);
201
202 log.info("Started");
Madan Jampani86940d92015-05-06 11:47:57 -0700203 }
204
205 @Deactivate
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700206 public void deactivate() {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700207 deviceTableStats.removeListener(tableStatsListener);
208 deviceTableStats.destroy();
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700209 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700210 clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
Madan Jampani86940d92015-05-06 11:47:57 -0700211 messageHandlingExecutor.shutdownNow();
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700212 scheduledExecutor.shutdownNow();
Madan Jampani86940d92015-05-06 11:47:57 -0700213 log.info("Stopped");
214 }
215
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700216 /**
217 * Retries the given supplier until successful.
218 * <p>
219 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
220 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
221 *
222 * @param supplier the supplier to retry
223 * @param <T> the return type
224 * @return the return value of the given supplier once it runs successfully
225 */
226 private <T> T retryUntilSuccess(Supplier<T> supplier) {
227 return Tools.retryable(
228 supplier,
229 StorageException.ConcurrentModification.class,
230 Integer.MAX_VALUE,
231 MAX_RETRY_DELAY_MILLIS)
232 .get();
Madan Jampani86940d92015-05-06 11:47:57 -0700233 }
234
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700235 /**
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700236 * Retries the given asynchronous supplier until successful.
237 * <p>
238 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
239 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
240 *
241 * @param supplier the supplier to retry
242 * @param <T> the return type
243 * @return the return value of the given supplier once it runs successfully
244 */
245 private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
246 return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
247 }
248
249 /**
250 * Retries the given asynchronous supplier until successful.
251 * <p>
252 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
253 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
254 *
255 * @param supplier the supplier to retry
256 * @param future future to be completed once the operation has been successful
257 * @param <T> the return type
258 * @return the return value of the given supplier once it runs successfully
259 */
260 private <T> CompletableFuture<T> retryAsyncUntilSuccess(
261 Supplier<CompletableFuture<T>> supplier,
262 CompletableFuture<T> future) {
263 supplier.get().whenComplete((result, error) -> {
264 if (error == null) {
265 future.complete(result);
266 } else {
267 Throwable cause = error.getCause() != null ? error.getCause() : error;
268 if (cause instanceof StorageException.ConcurrentModification) {
269 scheduledExecutor.schedule(
270 () -> retryAsyncUntilSuccess(supplier, future),
271 random.nextInt(50),
272 TimeUnit.MILLISECONDS);
273 } else {
274 future.completeExceptionally(error);
275 }
276 }
277 });
278 return future;
279 }
280
281 /**
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700282 * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
283 * retried after a randomized delay.
284 *
285 * @param <T> the return type
286 * @return nothing
287 * @throws StorageException.ConcurrentModification to force a retry of the callback
288 */
289 private <T> T retry() {
290 throw RETRY;
Madan Jampani86940d92015-05-06 11:47:57 -0700291 }
292
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700293 /**
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700294 * Handles a completed batch event received from the master node.
295 * <p>
296 * If this node is the source of the batch, notifies event listeners to complete the operations.
297 *
298 * @param event the event to handle
299 */
300 private void completeBatch(FlowRuleBatchEvent event) {
301 if (pendingBatches.remove(event.subject().batchId())) {
302 notifyDelegate(event);
303 }
Madan Jampani86940d92015-05-06 11:47:57 -0700304 }
305
306 // This is not a efficient operation on a distributed sharded
307 // flow store. We need to revisit the need for this operation or at least
308 // make it device specific.
309 @Override
310 public int getFlowRuleCount() {
Yuta HIGUCHI10e91fb2017-06-15 13:30:50 -0700311 return Streams.stream(deviceService.getDevices()).parallel()
Thomas Vachuska62a379c2018-03-05 18:03:40 -0800312 .mapToInt(device -> getFlowRuleCount(device.id()))
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700313 .sum();
314 }
315
Thomas Vachuska62a379c2018-03-05 18:03:40 -0800316 @Override
317 public int getFlowRuleCount(DeviceId deviceId) {
318 DocumentPath path = getPathFor(deviceId);
319 try {
320 return flows.getChildren(path).values().stream()
321 .mapToInt(v -> v.value().values().size())
322 .sum();
323 } catch (NoSuchDocumentPathException e) {
324 return 0;
325 }
326 }
327
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700328 /**
329 * Returns the {@link DocumentPath} for the given {@link DeviceId}.
330 *
331 * @param deviceId the device identifier for which to return a path
332 * @return the path for the given device
333 */
334 private DocumentPath getPathFor(DeviceId deviceId) {
335 return DocumentPath.from("root", deviceId.toString());
336 }
337
338 /**
339 * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
340 *
341 * @param deviceId the device identifier for which to return the path
342 * @param flowId the flow identifier for which to return the path
343 * @return the path for the given device/flow
344 */
345 private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
346 return DocumentPath.from("root", deviceId.toString(), flowId.toString());
Madan Jampani86940d92015-05-06 11:47:57 -0700347 }
348
349 @Override
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700350 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700351 public FlowEntry getFlowEntry(FlowRule rule) {
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700352 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
353 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
354 return flowEntries != null ? flowEntries.value().get(rule) : null;
Madan Jampani86940d92015-05-06 11:47:57 -0700355 }
356
357 @Override
358 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700359 DocumentPath path = getPathFor(deviceId);
360 try {
361 return getFlowEntries(path);
362 } catch (NoSuchDocumentPathException e) {
Madan Jampani86940d92015-05-06 11:47:57 -0700363 return Collections.emptyList();
364 }
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700365 }
Madan Jampani86940d92015-05-06 11:47:57 -0700366
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700367 @SuppressWarnings("unchecked")
368 private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
369 return flows.getChildren(path)
370 .values()
371 .stream()
372 .flatMap(v -> v.value().values().stream())
373 .collect(Collectors.toList());
Madan Jampani86940d92015-05-06 11:47:57 -0700374 }
375
376 @Override
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700377 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700378 public void storeFlowRule(FlowRule rule) {
379 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700380 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700381 rule.deviceId(), idGenerator.getNewId()));
382 }
383
384 @Override
385 public void storeBatch(FlowRuleBatchOperation operation) {
386 if (operation.getOperations().isEmpty()) {
387 notifyDelegate(FlowRuleBatchEvent.completed(
388 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
389 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
390 return;
391 }
392
393 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700394 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700395
396 if (master == null) {
Sivachidambaram Subramanian605104e2017-06-21 07:40:04 +0530397 log.warn("No master for {} ", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700398
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700399 updateStoreInternal(operation).whenComplete((result, error) -> {
400 notifyDelegate(FlowRuleBatchEvent.completed(
401 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
402 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
403 });
Madan Jampani86940d92015-05-06 11:47:57 -0700404 return;
405 }
406
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700407 pendingBatches.add(operation.id());
Madan Jampani86940d92015-05-06 11:47:57 -0700408
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700409 // If the local node is the master, apply the flows. Otherwise, send them to the master.
410 if (Objects.equals(local, master)) {
411 applyBatchFlows(operation);
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700412 } else {
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700413 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
414 clusterCommunicator.unicast(
415 operation,
416 APPLY_BATCH_FLOWS,
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700417 serializer::encode,
418 master);
Madan Jampani86940d92015-05-06 11:47:57 -0700419 }
Madan Jampani86940d92015-05-06 11:47:57 -0700420 }
421
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700422 /**
423 * Asynchronously applies a batch of flows to the store.
424 * <p>
425 * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
426 * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
427 * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
428 * underlying {@code DocumentTree} primitive.
429 */
430 private void applyBatchFlows(FlowRuleBatchOperation operation) {
431 updateStoreInternal(operation).whenComplete((operations, error) -> {
432 if (error == null) {
433 if (operations.isEmpty()) {
434 batchOperationComplete(FlowRuleBatchEvent.completed(
435 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
436 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700437 } else {
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700438 notifyDelegate(FlowRuleBatchEvent.requested(
439 new FlowRuleBatchRequest(operation.id(), operations),
440 operation.deviceId()));
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700441 }
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700442 }
443 });
444 }
445
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700446 private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
447 return Tools.allOf(operation.getOperations().stream().map(op -> {
448 switch (op.operator()) {
449 case ADD:
Yi Tsengcb4fbe52017-10-25 16:19:25 -0700450 case MODIFY:
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700451 return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
452 case REMOVE:
453 return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700454 default:
455 log.warn("Unknown flow operation operator: {}", op.operator());
456 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
457 }
458 }).collect(Collectors.toList()))
459 .thenApply(results -> results.stream()
460 .filter(Objects::nonNull)
461 .collect(Collectors.toSet()));
462 }
463
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700464 @SuppressWarnings("unchecked")
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700465 private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
466 StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
467 DocumentPath path = getPathFor(entry.deviceId(), entry.id());
468 return retryAsyncUntilSuccess(() -> {
469 CompletableFuture<Boolean> future = new CompletableFuture<>();
470 asyncFlows.get(path).whenComplete((value, getError) -> {
471 if (getError == null) {
472 if (value != null) {
473 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
474 entries.put(entry, entry);
475 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
476 if (replaceError == null) {
477 if (succeeded) {
478 log.trace("Stored new flow rule: {}", entry);
479 future.complete(true);
480 } else {
481 log.trace("Failed to store new flow rule: {}", entry);
482 future.completeExceptionally(RETRY);
483 }
484 } else {
485 future.completeExceptionally(replaceError);
486 }
487 });
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700488 } else {
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700489 // If there are no entries stored for the device, initialize the device's flows.
490 Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
491 map.put(entry, entry);
492 asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
493 if (createError == null) {
494 if (succeeded) {
495 log.trace("Stored new flow rule: {}", entry);
496 future.complete(true);
497 } else {
498 log.trace("Failed to store new flow rule: {}", entry);
499 future.completeExceptionally(RETRY);
500 }
501 } else {
502 future.completeExceptionally(createError);
503 }
504 });
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700505 }
506 } else {
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700507 future.completeExceptionally(getError);
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700508 }
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700509 });
510 return future;
511 });
512 }
513
514 @SuppressWarnings("unchecked")
515 private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
516 FlowRule rule = batchEntry.target();
517 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
518 return retryAsyncUntilSuccess(() -> {
519 CompletableFuture<Boolean> future = new CompletableFuture<>();
520 asyncFlows.get(path).whenComplete((value, getError) -> {
521 if (getError == null) {
522 if (value != null) {
523 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
524 StoredFlowEntry entry = entries.get(rule);
525 if (entry != null) {
526 entry.setState(FlowEntryState.PENDING_REMOVE);
527 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
528 if (error == null) {
529 if (succeeded) {
530 log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
531 future.complete(true);
532 } else {
533 log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
534 future.completeExceptionally(RETRY);
535 }
536 } else {
537 future.completeExceptionally(error);
538 }
539 });
540 } else {
541 future.complete(false);
542 }
543 } else {
544 future.complete(false);
545 }
546 } else {
547 future.completeExceptionally(getError);
548 }
549 });
550 return future;
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700551 });
552 }
553
554 @Override
555 public void batchOperationComplete(FlowRuleBatchEvent event) {
556 if (pendingBatches.remove(event.subject().batchId())) {
557 notifyDelegate(event);
558 } else {
559 clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
560 }
561 }
562
Madan Jampani86940d92015-05-06 11:47:57 -0700563 @Override
564 public void deleteFlowRule(FlowRule rule) {
565 storeBatch(
566 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700567 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700568 new FlowRuleBatchEntry(
569 FlowRuleOperation.REMOVE,
570 rule)), rule.deviceId(), idGenerator.getNewId()));
571 }
572
573 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800574 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700575 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
576 return retryUntilSuccess(() -> {
577 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
578 if (value != null) {
579 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
580 StoredFlowEntry entry = entries.get(rule);
581 if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
582 entry.setState(FlowEntryState.PENDING_ADD);
583 if (flows.replace(path, entries, value.version())) {
584 log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
585 return new FlowRuleEvent(RULE_UPDATED, rule);
586 } else {
587 log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
588 return retry();
589 }
590 } else {
591 return null;
592 }
593 } else {
594 return null;
Charles Chan93fa7272016-01-26 22:27:02 -0800595 }
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700596 });
Charles Chan93fa7272016-01-26 22:27:02 -0800597 }
598
599 @Override
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700600 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700601 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700602 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
603 return retryUntilSuccess(() -> {
604 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
605 if (value != null) {
606 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
607 StoredFlowEntry entry = entries.get(rule);
608 if (entry != null) {
609 FlowRuleEvent event;
610 String message;
Madan Jampani86940d92015-05-06 11:47:57 -0700611
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700612 entry.setBytes(rule.bytes());
613 entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
614 entry.setLiveType(rule.liveType());
615 entry.setPackets(rule.packets());
616 entry.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700617
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700618 // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
619 if (entry.state() == FlowEntryState.PENDING_ADD) {
620 entry.setState(FlowEntryState.ADDED);
621 event = new FlowRuleEvent(Type.RULE_ADDED, rule);
622 message = "Updated flow rule state to ADDED: {}";
623 } else {
624 event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
625 message = "Updated flow rule: {}";
626 }
627
628 if (flows.replace(path, entries, value.version())) {
629 log.trace(message, entry);
630 return event;
631 } else {
632 log.trace("Failed to update flow rule: {}", entry);
633 return retry();
634 }
635 } else {
636 // If the rule does not exist, return null. Inserting the rule risks race conditions
637 // that can result in removed rules being retained.
638 return null;
639 }
640 } else {
Jordan Haltermanb8925dc2017-08-25 15:12:54 -0700641 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700642 }
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700643 });
Madan Jampani86940d92015-05-06 11:47:57 -0700644 }
645
646 @Override
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700647 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700648 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700649 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
650 return retryUntilSuccess(() -> {
651 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
652 if (value != null) {
653 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
654 StoredFlowEntry entry = entries.remove(rule);
655 if (entry != null) {
656 if (flows.replace(path, entries, value.version())) {
657 log.trace("Removed flow rule: {}", entry);
658 return new FlowRuleEvent(RULE_REMOVED, entry);
659 } else {
660 log.trace("Failed to remove flow rule: {}", entry);
661 return retry();
662 }
663 } else {
664 return null;
665 }
666 } else {
667 return null;
668 }
669 });
Madan Jampani86940d92015-05-06 11:47:57 -0700670 }
671
672 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800673 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700674 DocumentPath path = getPathFor(deviceId);
Jordan Halterman6fd32912017-09-12 15:07:18 -0700675 retryUntilSuccess(() -> {
676 try {
677 for (String flowId : flows.getChildren(path).keySet()) {
678 flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
679 }
680 } catch (NoSuchDocumentPathException e) {
681 // Do nothing. There are no flows for the device.
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700682 }
Jordan Halterman6fd32912017-09-12 15:07:18 -0700683
684 // New children may have been created since they were removed above. Catch
685 // IllegalDocumentModificationException and retry if necessary.
686 try {
687 flows.removeNode(path);
688 } catch (NoSuchDocumentPathException e) {
689 return null;
690 } catch (IllegalDocumentModificationException e) {
691 return retry();
692 }
693 return null;
694 });
Charles Chan0c7c43b2016-01-14 17:39:20 -0800695 }
696
697 @Override
Victor Silva139bca42016-08-18 11:46:35 -0300698 public void purgeFlowRules() {
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700699 try {
700 for (String deviceId : flows.getChildren(flows.root()).keySet()) {
701 purgeFlowRule(DeviceId.deviceId(deviceId));
Madan Jampani86940d92015-05-06 11:47:57 -0700702 }
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700703 } catch (NoSuchDocumentPathException e) {
Jordan Halterman6fd32912017-09-12 15:07:18 -0700704 // Do nothing if no children exist.
Madan Jampani86940d92015-05-06 11:47:57 -0700705 }
706 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700707
708 @Override
709 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700710 List<TableStatisticsEntry> tableStats) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700711 deviceTableStats.put(deviceId, tableStats);
712 return null;
713 }
714
715 @Override
716 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700717 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
718 if (tableStats == null) {
719 return Collections.emptyList();
720 }
721 return ImmutableList.copyOf(tableStats);
722 }
723
Patryk Konopka7e40c012017-06-06 13:38:06 +0200724 @Override
725 public long getActiveFlowRuleCount(DeviceId deviceId) {
726 return Streams.stream(getTableStatistics(deviceId))
727 .mapToLong(TableStatisticsEntry::activeFlowEntries)
728 .sum();
729 }
730
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700731 private class InternalTableStatsListener
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700732 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700733 @Override
734 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700735 List<TableStatisticsEntry>> event) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700736 //TODO: Generate an event to listeners (do we need?)
737 }
738 }
Jordan Haltermane0355ff2017-07-30 15:05:51 -0700739}