blob: 32226fd744b6adda41887ddb13c62126981b054c [file] [log] [blame]
Jordan Haltermanf7554092017-07-30 15:05:51 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Ray Milkeyb68bbbc2017-12-18 10:05:49 -080018import java.security.SecureRandom;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070019import java.util.Collections;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070020import java.util.List;
21import java.util.Map;
22import java.util.Objects;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070023import java.util.Random;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070024import java.util.Set;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070025import java.util.concurrent.CompletableFuture;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070026import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070028import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070029import java.util.concurrent.TimeUnit;
Jordan Haltermanf7554092017-07-30 15:05:51 -070030import java.util.function.Supplier;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070031import java.util.stream.Collectors;
Ray Milkey2b6ff422016-08-26 13:03:15 -070032
Jordan Haltermanf7554092017-07-30 15:05:51 -070033import com.google.common.collect.ImmutableList;
34import com.google.common.collect.Iterables;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070037import com.google.common.collect.Streams;
38import org.apache.felix.scr.annotations.Activate;
39import org.apache.felix.scr.annotations.Component;
40import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070041import org.apache.felix.scr.annotations.Reference;
42import org.apache.felix.scr.annotations.ReferenceCardinality;
43import org.apache.felix.scr.annotations.Service;
44import org.onlab.util.KryoNamespace;
45import org.onlab.util.Tools;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070046import org.onosproject.cluster.ClusterService;
47import org.onosproject.cluster.NodeId;
48import org.onosproject.core.CoreService;
49import org.onosproject.core.IdGenerator;
50import org.onosproject.mastership.MastershipService;
51import org.onosproject.net.DeviceId;
52import org.onosproject.net.device.DeviceService;
53import org.onosproject.net.flow.CompletedBatchOperation;
54import org.onosproject.net.flow.DefaultFlowEntry;
55import org.onosproject.net.flow.FlowEntry;
56import org.onosproject.net.flow.FlowEntry.FlowEntryState;
57import org.onosproject.net.flow.FlowId;
58import org.onosproject.net.flow.FlowRule;
Ray Milkey7bf273c2017-09-27 16:15:15 -070059import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
60import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
61import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
62import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
63import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070064import org.onosproject.net.flow.FlowRuleEvent;
65import org.onosproject.net.flow.FlowRuleEvent.Type;
66import org.onosproject.net.flow.FlowRuleService;
67import org.onosproject.net.flow.FlowRuleStore;
68import org.onosproject.net.flow.FlowRuleStoreDelegate;
69import org.onosproject.net.flow.StoredFlowEntry;
70import org.onosproject.net.flow.TableStatisticsEntry;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070071import org.onosproject.store.AbstractStore;
72import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070073import org.onosproject.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070074import org.onosproject.store.impl.MastershipBasedTimestamp;
75import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070076import org.onosproject.store.service.AsyncDocumentTree;
Jordan Haltermanf7554092017-07-30 15:05:51 -070077import org.onosproject.store.service.DocumentPath;
78import org.onosproject.store.service.DocumentTree;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070079import org.onosproject.store.service.EventuallyConsistentMap;
80import org.onosproject.store.service.EventuallyConsistentMapEvent;
81import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Haltermanf5295f62017-09-12 15:07:18 -070082import org.onosproject.store.service.IllegalDocumentModificationException;
Jordan Haltermanf7554092017-07-30 15:05:51 -070083import org.onosproject.store.service.NoSuchDocumentPathException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070084import org.onosproject.store.service.Serializer;
Jordan Haltermanf7554092017-07-30 15:05:51 -070085import org.onosproject.store.service.StorageException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070086import org.onosproject.store.service.StorageService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070087import org.onosproject.store.service.Versioned;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070088import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070089import org.slf4j.Logger;
Ray Milkey2b6ff422016-08-26 13:03:15 -070090
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070091import static org.onlab.util.Tools.groupedThreads;
92import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jordan Haltermanf7554092017-07-30 15:05:51 -070093import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070094import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -070095
96/**
97 * Manages inventory of flow rules using a distributed state management protocol.
Ray Milkeyed9bace2018-03-20 10:05:56 -070098 *
99 * @deprecated in Nightingale Release (1.13)
Madan Jampani86940d92015-05-06 11:47:57 -0700100 */
Ray Milkeyed9bace2018-03-20 10:05:56 -0700101@Deprecated
Thomas Vachuska71026b22018-01-05 16:01:44 -0800102@Component(enabled = false)
Madan Jampani86940d92015-05-06 11:47:57 -0700103@Service
Madan Jampani37d04c62016-04-25 15:53:55 -0700104public class DistributedFlowRuleStore
Madan Jampani86940d92015-05-06 11:47:57 -0700105 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
106 implements FlowRuleStore {
107
108 private final Logger log = getLogger(getClass());
109
Jordan Haltermanf7554092017-07-30 15:05:51 -0700110 // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
111 // We don't want to populate a stack trace every time an optimistic lock is retried.
112 private static final StorageException.ConcurrentModification RETRY;
113
114 // Initialize retry exception with an empty stack trace.
115 static {
116 RETRY = new StorageException.ConcurrentModification();
117 RETRY.setStackTrace(new StackTraceElement[0]);
118 }
119
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700120 private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
Madan Jampani86940d92015-05-06 11:47:57 -0700121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700122 private static final int MAX_RETRY_DELAY_MILLIS = 50;
Madan Jampani86940d92015-05-06 11:47:57 -0700123
Jordan Haltermanf7554092017-07-30 15:05:51 -0700124 private static final String FLOW_TABLE = "onos-flow-table";
Madan Jampani86940d92015-05-06 11:47:57 -0700125
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700126 private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
Jordan Haltermanf7554092017-07-30 15:05:51 -0700127 private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
Madan Jampani86940d92015-05-06 11:47:57 -0700128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected DeviceService deviceService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected CoreService coreService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani86940d92015-05-06 11:47:57 -0700136 protected MastershipService mastershipService;
137
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Haltermanf7554092017-07-30 15:05:51 -0700139 protected ClusterCommunicationService clusterCommunicator;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700140
Jordan Haltermanf7554092017-07-30 15:05:51 -0700141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected ClusterService clusterService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected StorageService storageService;
146
Madan Jampani884d4432016-08-23 10:46:55 -0700147 protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
Madan Jampani86940d92015-05-06 11:47:57 -0700148
Madan Jampani884d4432016-08-23 10:46:55 -0700149 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700150 .register(KryoNamespaces.API)
151 .register(MastershipBasedTimestamp.class);
152
Jordan Haltermanf7554092017-07-30 15:05:51 -0700153 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
154 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
155 new InternalTableStatsListener();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700156
Jordan Haltermanf7554092017-07-30 15:05:51 -0700157 private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700158 private ScheduledExecutorService scheduledExecutor;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700159 private ExecutorService messageHandlingExecutor;
Ray Milkeyb68bbbc2017-12-18 10:05:49 -0800160 private final Random random = new SecureRandom();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700161
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700162 private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700163 private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
Madan Jampani86940d92015-05-06 11:47:57 -0700164 private IdGenerator idGenerator;
165 private NodeId local;
166
167 @Activate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700168 public void activate() {
Madan Jampani86940d92015-05-06 11:47:57 -0700169 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
170
171 local = clusterService.getLocalNode().id();
172
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700173 scheduledExecutor = Executors.newScheduledThreadPool(
174 SCHEDULED_THREAD_POOL_SIZE,
175 groupedThreads("onos/store/flow", "schedulers", log));
176
Madan Jampani86940d92015-05-06 11:47:57 -0700177 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Haltermanf7554092017-07-30 15:05:51 -0700178 MESSAGE_HANDLER_THREAD_POOL_SIZE,
179 groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700180
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700181 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
182 .withName("onos-flow-table-stats")
Madan Jampani884d4432016-08-23 10:46:55 -0700183 .withSerializer(serializerBuilder)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700184 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
185 .withTimestampProvider((k, v) -> new WallClockTimestamp())
186 .withTombstonesDisabled()
187 .build();
188 deviceTableStats.addListener(tableStatsListener);
189
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700190 asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700191 .withName(FLOW_TABLE)
192 .withSerializer(serializer)
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700193 .buildDocumentTree();
194 flows = asyncFlows.asDocumentTree();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700195
196 clusterCommunicator.addSubscriber(
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700197 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700198 serializer::decode,
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700199 this::applyBatchFlows,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700200 messageHandlingExecutor);
201 clusterCommunicator.addSubscriber(
202 COMPLETE_BATCH,
203 serializer::decode,
204 this::completeBatch,
205 messageHandlingExecutor);
206
207 log.info("Started");
Madan Jampani86940d92015-05-06 11:47:57 -0700208 }
209
210 @Deactivate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700211 public void deactivate() {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700212 deviceTableStats.removeListener(tableStatsListener);
213 deviceTableStats.destroy();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700214 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700215 clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
Madan Jampani86940d92015-05-06 11:47:57 -0700216 messageHandlingExecutor.shutdownNow();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700217 scheduledExecutor.shutdownNow();
Madan Jampani86940d92015-05-06 11:47:57 -0700218 log.info("Stopped");
219 }
220
Jordan Haltermanf7554092017-07-30 15:05:51 -0700221 /**
222 * Retries the given supplier until successful.
223 * <p>
224 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
225 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
226 *
227 * @param supplier the supplier to retry
228 * @param <T> the return type
229 * @return the return value of the given supplier once it runs successfully
230 */
231 private <T> T retryUntilSuccess(Supplier<T> supplier) {
232 return Tools.retryable(
233 supplier,
234 StorageException.ConcurrentModification.class,
235 Integer.MAX_VALUE,
236 MAX_RETRY_DELAY_MILLIS)
237 .get();
Madan Jampani86940d92015-05-06 11:47:57 -0700238 }
239
Jordan Haltermanf7554092017-07-30 15:05:51 -0700240 /**
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700241 * Retries the given asynchronous supplier until successful.
242 * <p>
243 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
244 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
245 *
246 * @param supplier the supplier to retry
247 * @param <T> the return type
248 * @return the return value of the given supplier once it runs successfully
249 */
250 private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
251 return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
252 }
253
254 /**
255 * Retries the given asynchronous supplier until successful.
256 * <p>
257 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
258 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
259 *
260 * @param supplier the supplier to retry
261 * @param future future to be completed once the operation has been successful
262 * @param <T> the return type
263 * @return the return value of the given supplier once it runs successfully
264 */
265 private <T> CompletableFuture<T> retryAsyncUntilSuccess(
266 Supplier<CompletableFuture<T>> supplier,
267 CompletableFuture<T> future) {
268 supplier.get().whenComplete((result, error) -> {
269 if (error == null) {
270 future.complete(result);
271 } else {
272 Throwable cause = error.getCause() != null ? error.getCause() : error;
273 if (cause instanceof StorageException.ConcurrentModification) {
274 scheduledExecutor.schedule(
275 () -> retryAsyncUntilSuccess(supplier, future),
276 random.nextInt(50),
277 TimeUnit.MILLISECONDS);
278 } else {
279 future.completeExceptionally(error);
280 }
281 }
282 });
283 return future;
284 }
285
286 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700287 * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
288 * retried after a randomized delay.
289 *
290 * @param <T> the return type
291 * @return nothing
292 * @throws StorageException.ConcurrentModification to force a retry of the callback
293 */
294 private <T> T retry() {
295 throw RETRY;
Madan Jampani86940d92015-05-06 11:47:57 -0700296 }
297
Jordan Haltermanf7554092017-07-30 15:05:51 -0700298 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700299 * Handles a completed batch event received from the master node.
300 * <p>
301 * If this node is the source of the batch, notifies event listeners to complete the operations.
302 *
303 * @param event the event to handle
304 */
305 private void completeBatch(FlowRuleBatchEvent event) {
306 if (pendingBatches.remove(event.subject().batchId())) {
307 notifyDelegate(event);
308 }
Madan Jampani86940d92015-05-06 11:47:57 -0700309 }
310
311 // This is not a efficient operation on a distributed sharded
312 // flow store. We need to revisit the need for this operation or at least
313 // make it device specific.
314 @Override
315 public int getFlowRuleCount() {
Yuta HIGUCHI10e91fb2017-06-15 13:30:50 -0700316 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700317 .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
318 .sum();
319 }
320
321 /**
322 * Returns the {@link DocumentPath} for the given {@link DeviceId}.
323 *
324 * @param deviceId the device identifier for which to return a path
325 * @return the path for the given device
326 */
327 private DocumentPath getPathFor(DeviceId deviceId) {
328 return DocumentPath.from("root", deviceId.toString());
329 }
330
331 /**
332 * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
333 *
334 * @param deviceId the device identifier for which to return the path
335 * @param flowId the flow identifier for which to return the path
336 * @return the path for the given device/flow
337 */
338 private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
339 return DocumentPath.from("root", deviceId.toString(), flowId.toString());
Madan Jampani86940d92015-05-06 11:47:57 -0700340 }
341
342 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700343 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700344 public FlowEntry getFlowEntry(FlowRule rule) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000345 DeviceId deviceId = rule.deviceId();
346 if (mastershipService.getMasterFor(deviceId) != null) {
347 DocumentPath path = getPathFor(deviceId, rule.id());
348 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
349 return flowEntries != null ? flowEntries.value().get(rule) : null;
350 } else {
351 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
352 return null;
353 }
354
355
Madan Jampani86940d92015-05-06 11:47:57 -0700356 }
357
358 @Override
359 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000360 if (mastershipService.getMasterFor(deviceId) != null) {
361 DocumentPath path = getPathFor(deviceId);
362 try {
363 return getFlowEntries(path);
364 } catch (NoSuchDocumentPathException e) {
365 return Collections.emptyList();
366 }
367 } else {
368 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700369 return Collections.emptyList();
370 }
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000371
Jordan Haltermanf7554092017-07-30 15:05:51 -0700372 }
Madan Jampani86940d92015-05-06 11:47:57 -0700373
Jordan Haltermanf7554092017-07-30 15:05:51 -0700374 @SuppressWarnings("unchecked")
375 private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
376 return flows.getChildren(path)
377 .values()
378 .stream()
379 .flatMap(v -> v.value().values().stream())
380 .collect(Collectors.toList());
Madan Jampani86940d92015-05-06 11:47:57 -0700381 }
382
383 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700384 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700385 public void storeFlowRule(FlowRule rule) {
386 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700387 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700388 rule.deviceId(), idGenerator.getNewId()));
389 }
390
391 @Override
392 public void storeBatch(FlowRuleBatchOperation operation) {
393 if (operation.getOperations().isEmpty()) {
394 notifyDelegate(FlowRuleBatchEvent.completed(
395 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
396 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
397 return;
398 }
399
400 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700401 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700402
403 if (master == null) {
Sivachidambaram Subramanian605104e2017-06-21 07:40:04 +0530404 log.warn("No master for {} ", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700405
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700406 updateStoreInternal(operation).whenComplete((result, error) -> {
407 notifyDelegate(FlowRuleBatchEvent.completed(
408 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
409 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
410 });
Madan Jampani86940d92015-05-06 11:47:57 -0700411 return;
412 }
413
Jordan Haltermanf7554092017-07-30 15:05:51 -0700414 pendingBatches.add(operation.id());
Madan Jampani86940d92015-05-06 11:47:57 -0700415
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700416 // If the local node is the master, apply the flows. Otherwise, send them to the master.
417 if (Objects.equals(local, master)) {
418 applyBatchFlows(operation);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700419 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700420 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
421 clusterCommunicator.unicast(
422 operation,
423 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700424 serializer::encode,
425 master);
Madan Jampani86940d92015-05-06 11:47:57 -0700426 }
Madan Jampani86940d92015-05-06 11:47:57 -0700427 }
428
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700429 /**
430 * Asynchronously applies a batch of flows to the store.
431 * <p>
432 * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
433 * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
434 * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
435 * underlying {@code DocumentTree} primitive.
436 */
437 private void applyBatchFlows(FlowRuleBatchOperation operation) {
438 updateStoreInternal(operation).whenComplete((operations, error) -> {
439 if (error == null) {
440 if (operations.isEmpty()) {
441 batchOperationComplete(FlowRuleBatchEvent.completed(
442 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
443 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700444 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700445 notifyDelegate(FlowRuleBatchEvent.requested(
446 new FlowRuleBatchRequest(operation.id(), operations),
447 operation.deviceId()));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700448 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700449 }
450 });
451 }
452
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700453 private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
454 return Tools.allOf(operation.getOperations().stream().map(op -> {
455 switch (op.operator()) {
456 case ADD:
Yi Tsengc29d8822017-10-25 16:19:25 -0700457 case MODIFY:
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700458 return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
459 case REMOVE:
460 return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700461 default:
462 log.warn("Unknown flow operation operator: {}", op.operator());
463 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
464 }
465 }).collect(Collectors.toList()))
466 .thenApply(results -> results.stream()
467 .filter(Objects::nonNull)
468 .collect(Collectors.toSet()));
469 }
470
Jordan Haltermanf7554092017-07-30 15:05:51 -0700471 @SuppressWarnings("unchecked")
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700472 private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
473 StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
474 DocumentPath path = getPathFor(entry.deviceId(), entry.id());
475 return retryAsyncUntilSuccess(() -> {
476 CompletableFuture<Boolean> future = new CompletableFuture<>();
477 asyncFlows.get(path).whenComplete((value, getError) -> {
478 if (getError == null) {
479 if (value != null) {
480 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
481 entries.put(entry, entry);
482 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
483 if (replaceError == null) {
484 if (succeeded) {
485 log.trace("Stored new flow rule: {}", entry);
486 future.complete(true);
487 } else {
488 log.trace("Failed to store new flow rule: {}", entry);
489 future.completeExceptionally(RETRY);
490 }
491 } else {
492 future.completeExceptionally(replaceError);
493 }
494 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700495 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700496 // If there are no entries stored for the device, initialize the device's flows.
497 Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
498 map.put(entry, entry);
499 asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
500 if (createError == null) {
501 if (succeeded) {
502 log.trace("Stored new flow rule: {}", entry);
503 future.complete(true);
504 } else {
505 log.trace("Failed to store new flow rule: {}", entry);
506 future.completeExceptionally(RETRY);
507 }
508 } else {
509 future.completeExceptionally(createError);
510 }
511 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700512 }
513 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700514 future.completeExceptionally(getError);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700515 }
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700516 });
517 return future;
518 });
519 }
520
521 @SuppressWarnings("unchecked")
522 private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
523 FlowRule rule = batchEntry.target();
524 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
525 return retryAsyncUntilSuccess(() -> {
526 CompletableFuture<Boolean> future = new CompletableFuture<>();
527 asyncFlows.get(path).whenComplete((value, getError) -> {
528 if (getError == null) {
529 if (value != null) {
530 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
531 StoredFlowEntry entry = entries.get(rule);
532 if (entry != null) {
533 entry.setState(FlowEntryState.PENDING_REMOVE);
534 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
535 if (error == null) {
536 if (succeeded) {
537 log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
538 future.complete(true);
539 } else {
540 log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
541 future.completeExceptionally(RETRY);
542 }
543 } else {
544 future.completeExceptionally(error);
545 }
546 });
547 } else {
548 future.complete(false);
549 }
550 } else {
551 future.complete(false);
552 }
553 } else {
554 future.completeExceptionally(getError);
555 }
556 });
557 return future;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700558 });
559 }
560
561 @Override
562 public void batchOperationComplete(FlowRuleBatchEvent event) {
563 if (pendingBatches.remove(event.subject().batchId())) {
564 notifyDelegate(event);
565 } else {
566 clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
567 }
568 }
569
Madan Jampani86940d92015-05-06 11:47:57 -0700570 @Override
571 public void deleteFlowRule(FlowRule rule) {
572 storeBatch(
573 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700574 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700575 new FlowRuleBatchEntry(
576 FlowRuleOperation.REMOVE,
577 rule)), rule.deviceId(), idGenerator.getNewId()));
578 }
579
580 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800581 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700582 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
583 return retryUntilSuccess(() -> {
584 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
585 if (value != null) {
586 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
587 StoredFlowEntry entry = entries.get(rule);
588 if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
589 entry.setState(FlowEntryState.PENDING_ADD);
590 if (flows.replace(path, entries, value.version())) {
591 log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
592 return new FlowRuleEvent(RULE_UPDATED, rule);
593 } else {
594 log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
595 return retry();
596 }
597 } else {
598 return null;
599 }
600 } else {
601 return null;
Charles Chan93fa7272016-01-26 22:27:02 -0800602 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700603 });
Charles Chan93fa7272016-01-26 22:27:02 -0800604 }
605
606 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700607 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700608 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700609 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
610 return retryUntilSuccess(() -> {
611 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
612 if (value != null) {
613 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
614 StoredFlowEntry entry = entries.get(rule);
615 if (entry != null) {
616 FlowRuleEvent event;
617 String message;
Madan Jampani86940d92015-05-06 11:47:57 -0700618
Jordan Haltermanf7554092017-07-30 15:05:51 -0700619 entry.setBytes(rule.bytes());
620 entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
621 entry.setLiveType(rule.liveType());
622 entry.setPackets(rule.packets());
623 entry.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700624
Jordan Haltermanf7554092017-07-30 15:05:51 -0700625 // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
626 if (entry.state() == FlowEntryState.PENDING_ADD) {
627 entry.setState(FlowEntryState.ADDED);
628 event = new FlowRuleEvent(Type.RULE_ADDED, rule);
629 message = "Updated flow rule state to ADDED: {}";
630 } else {
631 event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
632 message = "Updated flow rule: {}";
633 }
634
635 if (flows.replace(path, entries, value.version())) {
636 log.trace(message, entry);
637 return event;
638 } else {
639 log.trace("Failed to update flow rule: {}", entry);
640 return retry();
641 }
642 } else {
643 // If the rule does not exist, return null. Inserting the rule risks race conditions
644 // that can result in removed rules being retained.
645 return null;
646 }
647 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700648 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700649 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700650 });
Madan Jampani86940d92015-05-06 11:47:57 -0700651 }
652
653 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700654 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700655 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700656 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
657 return retryUntilSuccess(() -> {
658 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
659 if (value != null) {
660 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
661 StoredFlowEntry entry = entries.remove(rule);
662 if (entry != null) {
663 if (flows.replace(path, entries, value.version())) {
664 log.trace("Removed flow rule: {}", entry);
665 return new FlowRuleEvent(RULE_REMOVED, entry);
666 } else {
667 log.trace("Failed to remove flow rule: {}", entry);
668 return retry();
669 }
670 } else {
671 return null;
672 }
673 } else {
674 return null;
675 }
676 });
Madan Jampani86940d92015-05-06 11:47:57 -0700677 }
678
679 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800680 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700681 DocumentPath path = getPathFor(deviceId);
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700682 retryUntilSuccess(() -> {
683 try {
684 for (String flowId : flows.getChildren(path).keySet()) {
685 flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
686 }
687 } catch (NoSuchDocumentPathException e) {
688 // Do nothing. There are no flows for the device.
Jordan Haltermanf7554092017-07-30 15:05:51 -0700689 }
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700690
691 // New children may have been created since they were removed above. Catch
692 // IllegalDocumentModificationException and retry if necessary.
693 try {
694 flows.removeNode(path);
695 } catch (NoSuchDocumentPathException e) {
696 return null;
697 } catch (IllegalDocumentModificationException e) {
698 return retry();
699 }
700 return null;
701 });
Charles Chan0c7c43b2016-01-14 17:39:20 -0800702 }
703
704 @Override
Victor Silva139bca42016-08-18 11:46:35 -0300705 public void purgeFlowRules() {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700706 try {
707 for (String deviceId : flows.getChildren(flows.root()).keySet()) {
708 purgeFlowRule(DeviceId.deviceId(deviceId));
Madan Jampani86940d92015-05-06 11:47:57 -0700709 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700710 } catch (NoSuchDocumentPathException e) {
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700711 // Do nothing if no children exist.
Madan Jampani86940d92015-05-06 11:47:57 -0700712 }
713 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700714
715 @Override
716 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700717 List<TableStatisticsEntry> tableStats) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700718 deviceTableStats.put(deviceId, tableStats);
719 return null;
720 }
721
722 @Override
723 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000724 if (mastershipService.getMasterFor(deviceId) != null) {
725 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
726 if (tableStats == null) {
727 return Collections.emptyList();
728 }
729 return ImmutableList.copyOf(tableStats);
730 } else {
731 log.debug("Failed to getTableStatistics: No master for {}", deviceId);
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700732 return Collections.emptyList();
733 }
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000734
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700735 }
736
Patryk Konopka7e40c012017-06-06 13:38:06 +0200737 @Override
738 public long getActiveFlowRuleCount(DeviceId deviceId) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000739 if (mastershipService.getMasterFor(deviceId) != null) {
740 return Streams.stream(getTableStatistics(deviceId))
741 .mapToLong(TableStatisticsEntry::activeFlowEntries)
742 .sum();
743 } else {
744 log.debug("Failed to getActiveFlowRuleCount: No master for {}", deviceId);
745 return 0;
746 }
Patryk Konopka7e40c012017-06-06 13:38:06 +0200747 }
748
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700749 private class InternalTableStatsListener
Jordan Haltermanf7554092017-07-30 15:05:51 -0700750 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700751 @Override
752 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700753 List<TableStatisticsEntry>> event) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700754 //TODO: Generate an event to listeners (do we need?)
755 }
756 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700757}