blob: 3382abe6e4a83879d7fbd0cfc8ebe256257dd42a [file] [log] [blame]
Jordan Haltermanf7554092017-07-30 15:05:51 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070018import java.util.Collections;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070019import java.util.List;
20import java.util.Map;
21import java.util.Objects;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070022import java.util.Random;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070023import java.util.Set;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070024import java.util.concurrent.CompletableFuture;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070025import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070027import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070028import java.util.concurrent.TimeUnit;
Jordan Haltermanf7554092017-07-30 15:05:51 -070029import java.util.function.Supplier;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070030import java.util.stream.Collectors;
Ray Milkey2b6ff422016-08-26 13:03:15 -070031
Jordan Haltermanf7554092017-07-30 15:05:51 -070032import com.google.common.collect.ImmutableList;
33import com.google.common.collect.Iterables;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070036import com.google.common.collect.Streams;
37import org.apache.felix.scr.annotations.Activate;
38import org.apache.felix.scr.annotations.Component;
39import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070040import org.apache.felix.scr.annotations.Reference;
41import org.apache.felix.scr.annotations.ReferenceCardinality;
42import org.apache.felix.scr.annotations.Service;
43import org.onlab.util.KryoNamespace;
44import org.onlab.util.Tools;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070045import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.core.CoreService;
48import org.onosproject.core.IdGenerator;
49import org.onosproject.mastership.MastershipService;
50import org.onosproject.net.DeviceId;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.net.flow.CompletedBatchOperation;
53import org.onosproject.net.flow.DefaultFlowEntry;
54import org.onosproject.net.flow.FlowEntry;
55import org.onosproject.net.flow.FlowEntry.FlowEntryState;
56import org.onosproject.net.flow.FlowId;
57import org.onosproject.net.flow.FlowRule;
58import org.onosproject.net.flow.FlowRuleBatchEntry;
59import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
60import org.onosproject.net.flow.FlowRuleBatchEvent;
61import org.onosproject.net.flow.FlowRuleBatchOperation;
62import org.onosproject.net.flow.FlowRuleBatchRequest;
63import org.onosproject.net.flow.FlowRuleEvent;
64import org.onosproject.net.flow.FlowRuleEvent.Type;
65import org.onosproject.net.flow.FlowRuleService;
66import org.onosproject.net.flow.FlowRuleStore;
67import org.onosproject.net.flow.FlowRuleStoreDelegate;
68import org.onosproject.net.flow.StoredFlowEntry;
69import org.onosproject.net.flow.TableStatisticsEntry;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070070import org.onosproject.store.AbstractStore;
71import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070072import org.onosproject.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070073import org.onosproject.store.impl.MastershipBasedTimestamp;
74import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070075import org.onosproject.store.service.AsyncDocumentTree;
Jordan Haltermanf7554092017-07-30 15:05:51 -070076import org.onosproject.store.service.DocumentPath;
77import org.onosproject.store.service.DocumentTree;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070078import org.onosproject.store.service.EventuallyConsistentMap;
79import org.onosproject.store.service.EventuallyConsistentMapEvent;
80import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Haltermanf7554092017-07-30 15:05:51 -070081import org.onosproject.store.service.NoSuchDocumentPathException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070082import org.onosproject.store.service.Serializer;
Jordan Haltermanf7554092017-07-30 15:05:51 -070083import org.onosproject.store.service.StorageException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070084import org.onosproject.store.service.StorageService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070085import org.onosproject.store.service.Versioned;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070086import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070087import org.slf4j.Logger;
Ray Milkey2b6ff422016-08-26 13:03:15 -070088
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070089import static org.onlab.util.Tools.groupedThreads;
90import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jordan Haltermanf7554092017-07-30 15:05:51 -070091import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070092import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -070093
94/**
95 * Manages inventory of flow rules using a distributed state management protocol.
96 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070097@Component(immediate = true)
Madan Jampani86940d92015-05-06 11:47:57 -070098@Service
Madan Jampani37d04c62016-04-25 15:53:55 -070099public class DistributedFlowRuleStore
Madan Jampani86940d92015-05-06 11:47:57 -0700100 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
101 implements FlowRuleStore {
102
103 private final Logger log = getLogger(getClass());
104
Jordan Haltermanf7554092017-07-30 15:05:51 -0700105 // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
106 // We don't want to populate a stack trace every time an optimistic lock is retried.
107 private static final StorageException.ConcurrentModification RETRY;
108
109 // Initialize retry exception with an empty stack trace.
110 static {
111 RETRY = new StorageException.ConcurrentModification();
112 RETRY.setStackTrace(new StackTraceElement[0]);
113 }
114
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700115 private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
Madan Jampani86940d92015-05-06 11:47:57 -0700116 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700117 private static final int MAX_RETRY_DELAY_MILLIS = 50;
Madan Jampani86940d92015-05-06 11:47:57 -0700118
Jordan Haltermanf7554092017-07-30 15:05:51 -0700119 private static final String FLOW_TABLE = "onos-flow-table";
Madan Jampani86940d92015-05-06 11:47:57 -0700120
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700121 private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
Jordan Haltermanf7554092017-07-30 15:05:51 -0700122 private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
Madan Jampani86940d92015-05-06 11:47:57 -0700123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected DeviceService deviceService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected CoreService coreService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani86940d92015-05-06 11:47:57 -0700131 protected MastershipService mastershipService;
132
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Haltermanf7554092017-07-30 15:05:51 -0700134 protected ClusterCommunicationService clusterCommunicator;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700135
Jordan Haltermanf7554092017-07-30 15:05:51 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected ClusterService clusterService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected StorageService storageService;
141
Madan Jampani884d4432016-08-23 10:46:55 -0700142 protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
Madan Jampani86940d92015-05-06 11:47:57 -0700143
Madan Jampani884d4432016-08-23 10:46:55 -0700144 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700145 .register(KryoNamespaces.API)
146 .register(MastershipBasedTimestamp.class);
147
Jordan Haltermanf7554092017-07-30 15:05:51 -0700148 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
149 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
150 new InternalTableStatsListener();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700151
Jordan Haltermanf7554092017-07-30 15:05:51 -0700152 private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700153 private ScheduledExecutorService scheduledExecutor;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700154 private ExecutorService messageHandlingExecutor;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700155 private final Random random = new Random();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700156
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700157 private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700158 private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
Madan Jampani86940d92015-05-06 11:47:57 -0700159 private IdGenerator idGenerator;
160 private NodeId local;
161
162 @Activate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700163 public void activate() {
Madan Jampani86940d92015-05-06 11:47:57 -0700164 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
165
166 local = clusterService.getLocalNode().id();
167
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700168 scheduledExecutor = Executors.newScheduledThreadPool(
169 SCHEDULED_THREAD_POOL_SIZE,
170 groupedThreads("onos/store/flow", "schedulers", log));
171
Madan Jampani86940d92015-05-06 11:47:57 -0700172 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Haltermanf7554092017-07-30 15:05:51 -0700173 MESSAGE_HANDLER_THREAD_POOL_SIZE,
174 groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700175
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700176 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
177 .withName("onos-flow-table-stats")
Madan Jampani884d4432016-08-23 10:46:55 -0700178 .withSerializer(serializerBuilder)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700179 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
180 .withTimestampProvider((k, v) -> new WallClockTimestamp())
181 .withTombstonesDisabled()
182 .build();
183 deviceTableStats.addListener(tableStatsListener);
184
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700185 asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700186 .withName(FLOW_TABLE)
187 .withSerializer(serializer)
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700188 .buildDocumentTree();
189 flows = asyncFlows.asDocumentTree();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700190
191 clusterCommunicator.addSubscriber(
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700192 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700193 serializer::decode,
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700194 this::applyBatchFlows,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700195 messageHandlingExecutor);
196 clusterCommunicator.addSubscriber(
197 COMPLETE_BATCH,
198 serializer::decode,
199 this::completeBatch,
200 messageHandlingExecutor);
201
202 log.info("Started");
Madan Jampani86940d92015-05-06 11:47:57 -0700203 }
204
205 @Deactivate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700206 public void deactivate() {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700207 deviceTableStats.removeListener(tableStatsListener);
208 deviceTableStats.destroy();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700209 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700210 clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
Madan Jampani86940d92015-05-06 11:47:57 -0700211 messageHandlingExecutor.shutdownNow();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700212 scheduledExecutor.shutdownNow();
Madan Jampani86940d92015-05-06 11:47:57 -0700213 log.info("Stopped");
214 }
215
Jordan Haltermanf7554092017-07-30 15:05:51 -0700216 /**
217 * Retries the given supplier until successful.
218 * <p>
219 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
220 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
221 *
222 * @param supplier the supplier to retry
223 * @param <T> the return type
224 * @return the return value of the given supplier once it runs successfully
225 */
226 private <T> T retryUntilSuccess(Supplier<T> supplier) {
227 return Tools.retryable(
228 supplier,
229 StorageException.ConcurrentModification.class,
230 Integer.MAX_VALUE,
231 MAX_RETRY_DELAY_MILLIS)
232 .get();
Madan Jampani86940d92015-05-06 11:47:57 -0700233 }
234
Jordan Haltermanf7554092017-07-30 15:05:51 -0700235 /**
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700236 * Retries the given asynchronous supplier until successful.
237 * <p>
238 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
239 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
240 *
241 * @param supplier the supplier to retry
242 * @param <T> the return type
243 * @return the return value of the given supplier once it runs successfully
244 */
245 private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
246 return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
247 }
248
249 /**
250 * Retries the given asynchronous supplier until successful.
251 * <p>
252 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
253 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
254 *
255 * @param supplier the supplier to retry
256 * @param future future to be completed once the operation has been successful
257 * @param <T> the return type
258 * @return the return value of the given supplier once it runs successfully
259 */
260 private <T> CompletableFuture<T> retryAsyncUntilSuccess(
261 Supplier<CompletableFuture<T>> supplier,
262 CompletableFuture<T> future) {
263 supplier.get().whenComplete((result, error) -> {
264 if (error == null) {
265 future.complete(result);
266 } else {
267 Throwable cause = error.getCause() != null ? error.getCause() : error;
268 if (cause instanceof StorageException.ConcurrentModification) {
269 scheduledExecutor.schedule(
270 () -> retryAsyncUntilSuccess(supplier, future),
271 random.nextInt(50),
272 TimeUnit.MILLISECONDS);
273 } else {
274 future.completeExceptionally(error);
275 }
276 }
277 });
278 return future;
279 }
280
281 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700282 * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
283 * retried after a randomized delay.
284 *
285 * @param <T> the return type
286 * @return nothing
287 * @throws StorageException.ConcurrentModification to force a retry of the callback
288 */
289 private <T> T retry() {
290 throw RETRY;
Madan Jampani86940d92015-05-06 11:47:57 -0700291 }
292
Jordan Haltermanf7554092017-07-30 15:05:51 -0700293 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700294 * Handles a completed batch event received from the master node.
295 * <p>
296 * If this node is the source of the batch, notifies event listeners to complete the operations.
297 *
298 * @param event the event to handle
299 */
300 private void completeBatch(FlowRuleBatchEvent event) {
301 if (pendingBatches.remove(event.subject().batchId())) {
302 notifyDelegate(event);
303 }
Madan Jampani86940d92015-05-06 11:47:57 -0700304 }
305
306 // This is not a efficient operation on a distributed sharded
307 // flow store. We need to revisit the need for this operation or at least
308 // make it device specific.
309 @Override
310 public int getFlowRuleCount() {
Yuta HIGUCHI10e91fb2017-06-15 13:30:50 -0700311 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700312 .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
313 .sum();
314 }
315
316 /**
317 * Returns the {@link DocumentPath} for the given {@link DeviceId}.
318 *
319 * @param deviceId the device identifier for which to return a path
320 * @return the path for the given device
321 */
322 private DocumentPath getPathFor(DeviceId deviceId) {
323 return DocumentPath.from("root", deviceId.toString());
324 }
325
326 /**
327 * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
328 *
329 * @param deviceId the device identifier for which to return the path
330 * @param flowId the flow identifier for which to return the path
331 * @return the path for the given device/flow
332 */
333 private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
334 return DocumentPath.from("root", deviceId.toString(), flowId.toString());
Madan Jampani86940d92015-05-06 11:47:57 -0700335 }
336
337 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700338 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700339 public FlowEntry getFlowEntry(FlowRule rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700340 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
341 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
342 return flowEntries != null ? flowEntries.value().get(rule) : null;
Madan Jampani86940d92015-05-06 11:47:57 -0700343 }
344
345 @Override
346 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700347 DocumentPath path = getPathFor(deviceId);
348 try {
349 return getFlowEntries(path);
350 } catch (NoSuchDocumentPathException e) {
Madan Jampani86940d92015-05-06 11:47:57 -0700351 return Collections.emptyList();
352 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700353 }
Madan Jampani86940d92015-05-06 11:47:57 -0700354
Jordan Haltermanf7554092017-07-30 15:05:51 -0700355 @SuppressWarnings("unchecked")
356 private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
357 return flows.getChildren(path)
358 .values()
359 .stream()
360 .flatMap(v -> v.value().values().stream())
361 .collect(Collectors.toList());
Madan Jampani86940d92015-05-06 11:47:57 -0700362 }
363
364 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700365 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700366 public void storeFlowRule(FlowRule rule) {
367 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700368 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700369 rule.deviceId(), idGenerator.getNewId()));
370 }
371
372 @Override
373 public void storeBatch(FlowRuleBatchOperation operation) {
374 if (operation.getOperations().isEmpty()) {
375 notifyDelegate(FlowRuleBatchEvent.completed(
376 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
377 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
378 return;
379 }
380
381 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700382 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700383
384 if (master == null) {
Sivachidambaram Subramanian605104e2017-06-21 07:40:04 +0530385 log.warn("No master for {} ", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700386
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700387 updateStoreInternal(operation).whenComplete((result, error) -> {
388 notifyDelegate(FlowRuleBatchEvent.completed(
389 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
390 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
391 });
Madan Jampani86940d92015-05-06 11:47:57 -0700392 return;
393 }
394
Jordan Haltermanf7554092017-07-30 15:05:51 -0700395 pendingBatches.add(operation.id());
Madan Jampani86940d92015-05-06 11:47:57 -0700396
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700397 // If the local node is the master, apply the flows. Otherwise, send them to the master.
398 if (Objects.equals(local, master)) {
399 applyBatchFlows(operation);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700400 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700401 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
402 clusterCommunicator.unicast(
403 operation,
404 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700405 serializer::encode,
406 master);
Madan Jampani86940d92015-05-06 11:47:57 -0700407 }
Madan Jampani86940d92015-05-06 11:47:57 -0700408 }
409
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700410 /**
411 * Asynchronously applies a batch of flows to the store.
412 * <p>
413 * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
414 * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
415 * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
416 * underlying {@code DocumentTree} primitive.
417 */
418 private void applyBatchFlows(FlowRuleBatchOperation operation) {
419 updateStoreInternal(operation).whenComplete((operations, error) -> {
420 if (error == null) {
421 if (operations.isEmpty()) {
422 batchOperationComplete(FlowRuleBatchEvent.completed(
423 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
424 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700425 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700426 notifyDelegate(FlowRuleBatchEvent.requested(
427 new FlowRuleBatchRequest(operation.id(), operations),
428 operation.deviceId()));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700429 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700430 }
431 });
432 }
433
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700434 private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
435 return Tools.allOf(operation.getOperations().stream().map(op -> {
436 switch (op.operator()) {
437 case ADD:
438 return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
439 case REMOVE:
440 return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
441 case MODIFY:
442 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
443 default:
444 log.warn("Unknown flow operation operator: {}", op.operator());
445 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
446 }
447 }).collect(Collectors.toList()))
448 .thenApply(results -> results.stream()
449 .filter(Objects::nonNull)
450 .collect(Collectors.toSet()));
451 }
452
Jordan Haltermanf7554092017-07-30 15:05:51 -0700453 @SuppressWarnings("unchecked")
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700454 private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
455 StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
456 DocumentPath path = getPathFor(entry.deviceId(), entry.id());
457 return retryAsyncUntilSuccess(() -> {
458 CompletableFuture<Boolean> future = new CompletableFuture<>();
459 asyncFlows.get(path).whenComplete((value, getError) -> {
460 if (getError == null) {
461 if (value != null) {
462 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
463 entries.put(entry, entry);
464 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
465 if (replaceError == null) {
466 if (succeeded) {
467 log.trace("Stored new flow rule: {}", entry);
468 future.complete(true);
469 } else {
470 log.trace("Failed to store new flow rule: {}", entry);
471 future.completeExceptionally(RETRY);
472 }
473 } else {
474 future.completeExceptionally(replaceError);
475 }
476 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700477 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700478 // If there are no entries stored for the device, initialize the device's flows.
479 Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
480 map.put(entry, entry);
481 asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
482 if (createError == null) {
483 if (succeeded) {
484 log.trace("Stored new flow rule: {}", entry);
485 future.complete(true);
486 } else {
487 log.trace("Failed to store new flow rule: {}", entry);
488 future.completeExceptionally(RETRY);
489 }
490 } else {
491 future.completeExceptionally(createError);
492 }
493 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700494 }
495 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700496 future.completeExceptionally(getError);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700497 }
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700498 });
499 return future;
500 });
501 }
502
503 @SuppressWarnings("unchecked")
504 private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
505 FlowRule rule = batchEntry.target();
506 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
507 return retryAsyncUntilSuccess(() -> {
508 CompletableFuture<Boolean> future = new CompletableFuture<>();
509 asyncFlows.get(path).whenComplete((value, getError) -> {
510 if (getError == null) {
511 if (value != null) {
512 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
513 StoredFlowEntry entry = entries.get(rule);
514 if (entry != null) {
515 entry.setState(FlowEntryState.PENDING_REMOVE);
516 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
517 if (error == null) {
518 if (succeeded) {
519 log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
520 future.complete(true);
521 } else {
522 log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
523 future.completeExceptionally(RETRY);
524 }
525 } else {
526 future.completeExceptionally(error);
527 }
528 });
529 } else {
530 future.complete(false);
531 }
532 } else {
533 future.complete(false);
534 }
535 } else {
536 future.completeExceptionally(getError);
537 }
538 });
539 return future;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700540 });
541 }
542
543 @Override
544 public void batchOperationComplete(FlowRuleBatchEvent event) {
545 if (pendingBatches.remove(event.subject().batchId())) {
546 notifyDelegate(event);
547 } else {
548 clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
549 }
550 }
551
Madan Jampani86940d92015-05-06 11:47:57 -0700552 @Override
553 public void deleteFlowRule(FlowRule rule) {
554 storeBatch(
555 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700556 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700557 new FlowRuleBatchEntry(
558 FlowRuleOperation.REMOVE,
559 rule)), rule.deviceId(), idGenerator.getNewId()));
560 }
561
562 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800563 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700564 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
565 return retryUntilSuccess(() -> {
566 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
567 if (value != null) {
568 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
569 StoredFlowEntry entry = entries.get(rule);
570 if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
571 entry.setState(FlowEntryState.PENDING_ADD);
572 if (flows.replace(path, entries, value.version())) {
573 log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
574 return new FlowRuleEvent(RULE_UPDATED, rule);
575 } else {
576 log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
577 return retry();
578 }
579 } else {
580 return null;
581 }
582 } else {
583 return null;
Charles Chan93fa7272016-01-26 22:27:02 -0800584 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700585 });
Charles Chan93fa7272016-01-26 22:27:02 -0800586 }
587
588 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700589 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700590 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700591 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
592 return retryUntilSuccess(() -> {
593 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
594 if (value != null) {
595 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
596 StoredFlowEntry entry = entries.get(rule);
597 if (entry != null) {
598 FlowRuleEvent event;
599 String message;
Madan Jampani86940d92015-05-06 11:47:57 -0700600
Jordan Haltermanf7554092017-07-30 15:05:51 -0700601 entry.setBytes(rule.bytes());
602 entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
603 entry.setLiveType(rule.liveType());
604 entry.setPackets(rule.packets());
605 entry.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700606
Jordan Haltermanf7554092017-07-30 15:05:51 -0700607 // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
608 if (entry.state() == FlowEntryState.PENDING_ADD) {
609 entry.setState(FlowEntryState.ADDED);
610 event = new FlowRuleEvent(Type.RULE_ADDED, rule);
611 message = "Updated flow rule state to ADDED: {}";
612 } else {
613 event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
614 message = "Updated flow rule: {}";
615 }
616
617 if (flows.replace(path, entries, value.version())) {
618 log.trace(message, entry);
619 return event;
620 } else {
621 log.trace("Failed to update flow rule: {}", entry);
622 return retry();
623 }
624 } else {
625 // If the rule does not exist, return null. Inserting the rule risks race conditions
626 // that can result in removed rules being retained.
627 return null;
628 }
629 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700630 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700631 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700632 });
Madan Jampani86940d92015-05-06 11:47:57 -0700633 }
634
635 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700636 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700637 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700638 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
639 return retryUntilSuccess(() -> {
640 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
641 if (value != null) {
642 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
643 StoredFlowEntry entry = entries.remove(rule);
644 if (entry != null) {
645 if (flows.replace(path, entries, value.version())) {
646 log.trace("Removed flow rule: {}", entry);
647 return new FlowRuleEvent(RULE_REMOVED, entry);
648 } else {
649 log.trace("Failed to remove flow rule: {}", entry);
650 return retry();
651 }
652 } else {
653 return null;
654 }
655 } else {
656 return null;
657 }
658 });
Madan Jampani86940d92015-05-06 11:47:57 -0700659 }
660
661 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800662 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700663 DocumentPath path = getPathFor(deviceId);
664 try {
665 for (String flowId : flows.getChildren(path).keySet()) {
666 flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
667 }
668 } catch (NoSuchDocumentPathException e) {
669 // Do nothing
670 }
671 try {
672 flows.removeNode(path);
673 } catch (NoSuchDocumentPathException e) {
674 // Do nothing
675 }
Charles Chan0c7c43b2016-01-14 17:39:20 -0800676 }
677
678 @Override
Victor Silva139bca42016-08-18 11:46:35 -0300679 public void purgeFlowRules() {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700680 try {
681 for (String deviceId : flows.getChildren(flows.root()).keySet()) {
682 purgeFlowRule(DeviceId.deviceId(deviceId));
Madan Jampani86940d92015-05-06 11:47:57 -0700683 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700684 } catch (NoSuchDocumentPathException e) {
685 // Do nothing
Madan Jampani86940d92015-05-06 11:47:57 -0700686 }
687 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700688
689 @Override
690 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700691 List<TableStatisticsEntry> tableStats) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700692 deviceTableStats.put(deviceId, tableStats);
693 return null;
694 }
695
696 @Override
697 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700698 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
699 if (tableStats == null) {
700 return Collections.emptyList();
701 }
702 return ImmutableList.copyOf(tableStats);
703 }
704
Patryk Konopka7e40c012017-06-06 13:38:06 +0200705 @Override
706 public long getActiveFlowRuleCount(DeviceId deviceId) {
707 return Streams.stream(getTableStatistics(deviceId))
708 .mapToLong(TableStatisticsEntry::activeFlowEntries)
709 .sum();
710 }
711
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700712 private class InternalTableStatsListener
Jordan Haltermanf7554092017-07-30 15:05:51 -0700713 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700714 @Override
715 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700716 List<TableStatisticsEntry>> event) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700717 //TODO: Generate an event to listeners (do we need?)
718 }
719 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700720}