blob: c4c748583a341ebd3beb22284512a0d7885597f0 [file] [log] [blame]
Jordan Haltermanf7554092017-07-30 15:05:51 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070018import java.util.Collections;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070019import java.util.List;
20import java.util.Map;
21import java.util.Objects;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070022import java.util.Random;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070023import java.util.Set;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070024import java.util.concurrent.CompletableFuture;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070025import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070027import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070028import java.util.concurrent.TimeUnit;
Jordan Haltermanf7554092017-07-30 15:05:51 -070029import java.util.function.Supplier;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070030import java.util.stream.Collectors;
Ray Milkey2b6ff422016-08-26 13:03:15 -070031
Jordan Haltermanf7554092017-07-30 15:05:51 -070032import com.google.common.collect.ImmutableList;
33import com.google.common.collect.Iterables;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070036import com.google.common.collect.Streams;
37import org.apache.felix.scr.annotations.Activate;
38import org.apache.felix.scr.annotations.Component;
39import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070040import org.apache.felix.scr.annotations.Reference;
41import org.apache.felix.scr.annotations.ReferenceCardinality;
42import org.apache.felix.scr.annotations.Service;
43import org.onlab.util.KryoNamespace;
44import org.onlab.util.Tools;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070045import org.onosproject.cluster.ClusterService;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.core.CoreService;
48import org.onosproject.core.IdGenerator;
49import org.onosproject.mastership.MastershipService;
50import org.onosproject.net.DeviceId;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.net.flow.CompletedBatchOperation;
53import org.onosproject.net.flow.DefaultFlowEntry;
54import org.onosproject.net.flow.FlowEntry;
55import org.onosproject.net.flow.FlowEntry.FlowEntryState;
56import org.onosproject.net.flow.FlowId;
57import org.onosproject.net.flow.FlowRule;
Ray Milkey7bf273c2017-09-27 16:15:15 -070058import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
59import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
60import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
61import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
62import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070063import org.onosproject.net.flow.FlowRuleEvent;
64import org.onosproject.net.flow.FlowRuleEvent.Type;
65import org.onosproject.net.flow.FlowRuleService;
66import org.onosproject.net.flow.FlowRuleStore;
67import org.onosproject.net.flow.FlowRuleStoreDelegate;
68import org.onosproject.net.flow.StoredFlowEntry;
69import org.onosproject.net.flow.TableStatisticsEntry;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070070import org.onosproject.store.AbstractStore;
71import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070072import org.onosproject.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070073import org.onosproject.store.impl.MastershipBasedTimestamp;
74import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070075import org.onosproject.store.service.AsyncDocumentTree;
Jordan Haltermanf7554092017-07-30 15:05:51 -070076import org.onosproject.store.service.DocumentPath;
77import org.onosproject.store.service.DocumentTree;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070078import org.onosproject.store.service.EventuallyConsistentMap;
79import org.onosproject.store.service.EventuallyConsistentMapEvent;
80import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Haltermanf5295f62017-09-12 15:07:18 -070081import org.onosproject.store.service.IllegalDocumentModificationException;
Jordan Haltermanf7554092017-07-30 15:05:51 -070082import org.onosproject.store.service.NoSuchDocumentPathException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070083import org.onosproject.store.service.Serializer;
Jordan Haltermanf7554092017-07-30 15:05:51 -070084import org.onosproject.store.service.StorageException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070085import org.onosproject.store.service.StorageService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070086import org.onosproject.store.service.Versioned;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070087import org.onosproject.store.service.WallClockTimestamp;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070088import org.slf4j.Logger;
Ray Milkey2b6ff422016-08-26 13:03:15 -070089
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070090import static org.onlab.util.Tools.groupedThreads;
91import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jordan Haltermanf7554092017-07-30 15:05:51 -070092import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070093import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -070094
95/**
96 * Manages inventory of flow rules using a distributed state management protocol.
97 */
Sho SHIMIZU5c396e32016-08-12 15:19:12 -070098@Component(immediate = true)
Madan Jampani86940d92015-05-06 11:47:57 -070099@Service
Madan Jampani37d04c62016-04-25 15:53:55 -0700100public class DistributedFlowRuleStore
Madan Jampani86940d92015-05-06 11:47:57 -0700101 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
102 implements FlowRuleStore {
103
104 private final Logger log = getLogger(getClass());
105
Jordan Haltermanf7554092017-07-30 15:05:51 -0700106 // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
107 // We don't want to populate a stack trace every time an optimistic lock is retried.
108 private static final StorageException.ConcurrentModification RETRY;
109
110 // Initialize retry exception with an empty stack trace.
111 static {
112 RETRY = new StorageException.ConcurrentModification();
113 RETRY.setStackTrace(new StackTraceElement[0]);
114 }
115
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700116 private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
Madan Jampani86940d92015-05-06 11:47:57 -0700117 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700118 private static final int MAX_RETRY_DELAY_MILLIS = 50;
Madan Jampani86940d92015-05-06 11:47:57 -0700119
Jordan Haltermanf7554092017-07-30 15:05:51 -0700120 private static final String FLOW_TABLE = "onos-flow-table";
Madan Jampani86940d92015-05-06 11:47:57 -0700121
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700122 private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
Jordan Haltermanf7554092017-07-30 15:05:51 -0700123 private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
Madan Jampani86940d92015-05-06 11:47:57 -0700124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected DeviceService deviceService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected CoreService coreService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani86940d92015-05-06 11:47:57 -0700132 protected MastershipService mastershipService;
133
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Haltermanf7554092017-07-30 15:05:51 -0700135 protected ClusterCommunicationService clusterCommunicator;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700136
Jordan Haltermanf7554092017-07-30 15:05:51 -0700137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected ClusterService clusterService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected StorageService storageService;
142
Madan Jampani884d4432016-08-23 10:46:55 -0700143 protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
Madan Jampani86940d92015-05-06 11:47:57 -0700144
Madan Jampani884d4432016-08-23 10:46:55 -0700145 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700146 .register(KryoNamespaces.API)
147 .register(MastershipBasedTimestamp.class);
148
Jordan Haltermanf7554092017-07-30 15:05:51 -0700149 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
150 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
151 new InternalTableStatsListener();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700152
Jordan Haltermanf7554092017-07-30 15:05:51 -0700153 private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700154 private ScheduledExecutorService scheduledExecutor;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700155 private ExecutorService messageHandlingExecutor;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700156 private final Random random = new Random();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700157
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700158 private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700159 private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
Madan Jampani86940d92015-05-06 11:47:57 -0700160 private IdGenerator idGenerator;
161 private NodeId local;
162
163 @Activate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700164 public void activate() {
Madan Jampani86940d92015-05-06 11:47:57 -0700165 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
166
167 local = clusterService.getLocalNode().id();
168
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700169 scheduledExecutor = Executors.newScheduledThreadPool(
170 SCHEDULED_THREAD_POOL_SIZE,
171 groupedThreads("onos/store/flow", "schedulers", log));
172
Madan Jampani86940d92015-05-06 11:47:57 -0700173 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Haltermanf7554092017-07-30 15:05:51 -0700174 MESSAGE_HANDLER_THREAD_POOL_SIZE,
175 groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700176
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700177 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
178 .withName("onos-flow-table-stats")
Madan Jampani884d4432016-08-23 10:46:55 -0700179 .withSerializer(serializerBuilder)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700180 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
181 .withTimestampProvider((k, v) -> new WallClockTimestamp())
182 .withTombstonesDisabled()
183 .build();
184 deviceTableStats.addListener(tableStatsListener);
185
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700186 asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700187 .withName(FLOW_TABLE)
188 .withSerializer(serializer)
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700189 .buildDocumentTree();
190 flows = asyncFlows.asDocumentTree();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700191
192 clusterCommunicator.addSubscriber(
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700193 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700194 serializer::decode,
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700195 this::applyBatchFlows,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700196 messageHandlingExecutor);
197 clusterCommunicator.addSubscriber(
198 COMPLETE_BATCH,
199 serializer::decode,
200 this::completeBatch,
201 messageHandlingExecutor);
202
203 log.info("Started");
Madan Jampani86940d92015-05-06 11:47:57 -0700204 }
205
206 @Deactivate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700207 public void deactivate() {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700208 deviceTableStats.removeListener(tableStatsListener);
209 deviceTableStats.destroy();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700210 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700211 clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
Madan Jampani86940d92015-05-06 11:47:57 -0700212 messageHandlingExecutor.shutdownNow();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700213 scheduledExecutor.shutdownNow();
Madan Jampani86940d92015-05-06 11:47:57 -0700214 log.info("Stopped");
215 }
216
Jordan Haltermanf7554092017-07-30 15:05:51 -0700217 /**
218 * Retries the given supplier until successful.
219 * <p>
220 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
221 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
222 *
223 * @param supplier the supplier to retry
224 * @param <T> the return type
225 * @return the return value of the given supplier once it runs successfully
226 */
227 private <T> T retryUntilSuccess(Supplier<T> supplier) {
228 return Tools.retryable(
229 supplier,
230 StorageException.ConcurrentModification.class,
231 Integer.MAX_VALUE,
232 MAX_RETRY_DELAY_MILLIS)
233 .get();
Madan Jampani86940d92015-05-06 11:47:57 -0700234 }
235
Jordan Haltermanf7554092017-07-30 15:05:51 -0700236 /**
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700237 * Retries the given asynchronous supplier until successful.
238 * <p>
239 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
240 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
241 *
242 * @param supplier the supplier to retry
243 * @param <T> the return type
244 * @return the return value of the given supplier once it runs successfully
245 */
246 private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
247 return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
248 }
249
250 /**
251 * Retries the given asynchronous supplier until successful.
252 * <p>
253 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
254 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
255 *
256 * @param supplier the supplier to retry
257 * @param future future to be completed once the operation has been successful
258 * @param <T> the return type
259 * @return the return value of the given supplier once it runs successfully
260 */
261 private <T> CompletableFuture<T> retryAsyncUntilSuccess(
262 Supplier<CompletableFuture<T>> supplier,
263 CompletableFuture<T> future) {
264 supplier.get().whenComplete((result, error) -> {
265 if (error == null) {
266 future.complete(result);
267 } else {
268 Throwable cause = error.getCause() != null ? error.getCause() : error;
269 if (cause instanceof StorageException.ConcurrentModification) {
270 scheduledExecutor.schedule(
271 () -> retryAsyncUntilSuccess(supplier, future),
272 random.nextInt(50),
273 TimeUnit.MILLISECONDS);
274 } else {
275 future.completeExceptionally(error);
276 }
277 }
278 });
279 return future;
280 }
281
282 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700283 * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
284 * retried after a randomized delay.
285 *
286 * @param <T> the return type
287 * @return nothing
288 * @throws StorageException.ConcurrentModification to force a retry of the callback
289 */
290 private <T> T retry() {
291 throw RETRY;
Madan Jampani86940d92015-05-06 11:47:57 -0700292 }
293
Jordan Haltermanf7554092017-07-30 15:05:51 -0700294 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700295 * Handles a completed batch event received from the master node.
296 * <p>
297 * If this node is the source of the batch, notifies event listeners to complete the operations.
298 *
299 * @param event the event to handle
300 */
301 private void completeBatch(FlowRuleBatchEvent event) {
302 if (pendingBatches.remove(event.subject().batchId())) {
303 notifyDelegate(event);
304 }
Madan Jampani86940d92015-05-06 11:47:57 -0700305 }
306
307 // This is not a efficient operation on a distributed sharded
308 // flow store. We need to revisit the need for this operation or at least
309 // make it device specific.
310 @Override
311 public int getFlowRuleCount() {
Yuta HIGUCHI10e91fb2017-06-15 13:30:50 -0700312 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700313 .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
314 .sum();
315 }
316
317 /**
318 * Returns the {@link DocumentPath} for the given {@link DeviceId}.
319 *
320 * @param deviceId the device identifier for which to return a path
321 * @return the path for the given device
322 */
323 private DocumentPath getPathFor(DeviceId deviceId) {
324 return DocumentPath.from("root", deviceId.toString());
325 }
326
327 /**
328 * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
329 *
330 * @param deviceId the device identifier for which to return the path
331 * @param flowId the flow identifier for which to return the path
332 * @return the path for the given device/flow
333 */
334 private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
335 return DocumentPath.from("root", deviceId.toString(), flowId.toString());
Madan Jampani86940d92015-05-06 11:47:57 -0700336 }
337
338 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700339 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700340 public FlowEntry getFlowEntry(FlowRule rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700341 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
342 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
343 return flowEntries != null ? flowEntries.value().get(rule) : null;
Madan Jampani86940d92015-05-06 11:47:57 -0700344 }
345
346 @Override
347 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700348 DocumentPath path = getPathFor(deviceId);
349 try {
350 return getFlowEntries(path);
351 } catch (NoSuchDocumentPathException e) {
Madan Jampani86940d92015-05-06 11:47:57 -0700352 return Collections.emptyList();
353 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700354 }
Madan Jampani86940d92015-05-06 11:47:57 -0700355
Jordan Haltermanf7554092017-07-30 15:05:51 -0700356 @SuppressWarnings("unchecked")
357 private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
358 return flows.getChildren(path)
359 .values()
360 .stream()
361 .flatMap(v -> v.value().values().stream())
362 .collect(Collectors.toList());
Madan Jampani86940d92015-05-06 11:47:57 -0700363 }
364
365 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700366 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700367 public void storeFlowRule(FlowRule rule) {
368 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700369 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700370 rule.deviceId(), idGenerator.getNewId()));
371 }
372
373 @Override
374 public void storeBatch(FlowRuleBatchOperation operation) {
375 if (operation.getOperations().isEmpty()) {
376 notifyDelegate(FlowRuleBatchEvent.completed(
377 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
378 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
379 return;
380 }
381
382 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700383 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700384
385 if (master == null) {
Sivachidambaram Subramanian605104e2017-06-21 07:40:04 +0530386 log.warn("No master for {} ", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700387
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700388 updateStoreInternal(operation).whenComplete((result, error) -> {
389 notifyDelegate(FlowRuleBatchEvent.completed(
390 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
391 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
392 });
Madan Jampani86940d92015-05-06 11:47:57 -0700393 return;
394 }
395
Jordan Haltermanf7554092017-07-30 15:05:51 -0700396 pendingBatches.add(operation.id());
Madan Jampani86940d92015-05-06 11:47:57 -0700397
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700398 // If the local node is the master, apply the flows. Otherwise, send them to the master.
399 if (Objects.equals(local, master)) {
400 applyBatchFlows(operation);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700401 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700402 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
403 clusterCommunicator.unicast(
404 operation,
405 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700406 serializer::encode,
407 master);
Madan Jampani86940d92015-05-06 11:47:57 -0700408 }
Madan Jampani86940d92015-05-06 11:47:57 -0700409 }
410
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700411 /**
412 * Asynchronously applies a batch of flows to the store.
413 * <p>
414 * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
415 * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
416 * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
417 * underlying {@code DocumentTree} primitive.
418 */
419 private void applyBatchFlows(FlowRuleBatchOperation operation) {
420 updateStoreInternal(operation).whenComplete((operations, error) -> {
421 if (error == null) {
422 if (operations.isEmpty()) {
423 batchOperationComplete(FlowRuleBatchEvent.completed(
424 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
425 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700426 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700427 notifyDelegate(FlowRuleBatchEvent.requested(
428 new FlowRuleBatchRequest(operation.id(), operations),
429 operation.deviceId()));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700430 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700431 }
432 });
433 }
434
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700435 private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
436 return Tools.allOf(operation.getOperations().stream().map(op -> {
437 switch (op.operator()) {
438 case ADD:
439 return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
440 case REMOVE:
441 return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
442 case MODIFY:
443 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
444 default:
445 log.warn("Unknown flow operation operator: {}", op.operator());
446 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
447 }
448 }).collect(Collectors.toList()))
449 .thenApply(results -> results.stream()
450 .filter(Objects::nonNull)
451 .collect(Collectors.toSet()));
452 }
453
Jordan Haltermanf7554092017-07-30 15:05:51 -0700454 @SuppressWarnings("unchecked")
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700455 private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
456 StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
457 DocumentPath path = getPathFor(entry.deviceId(), entry.id());
458 return retryAsyncUntilSuccess(() -> {
459 CompletableFuture<Boolean> future = new CompletableFuture<>();
460 asyncFlows.get(path).whenComplete((value, getError) -> {
461 if (getError == null) {
462 if (value != null) {
463 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
464 entries.put(entry, entry);
465 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
466 if (replaceError == null) {
467 if (succeeded) {
468 log.trace("Stored new flow rule: {}", entry);
469 future.complete(true);
470 } else {
471 log.trace("Failed to store new flow rule: {}", entry);
472 future.completeExceptionally(RETRY);
473 }
474 } else {
475 future.completeExceptionally(replaceError);
476 }
477 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700478 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700479 // If there are no entries stored for the device, initialize the device's flows.
480 Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
481 map.put(entry, entry);
482 asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
483 if (createError == null) {
484 if (succeeded) {
485 log.trace("Stored new flow rule: {}", entry);
486 future.complete(true);
487 } else {
488 log.trace("Failed to store new flow rule: {}", entry);
489 future.completeExceptionally(RETRY);
490 }
491 } else {
492 future.completeExceptionally(createError);
493 }
494 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700495 }
496 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700497 future.completeExceptionally(getError);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700498 }
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700499 });
500 return future;
501 });
502 }
503
504 @SuppressWarnings("unchecked")
505 private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
506 FlowRule rule = batchEntry.target();
507 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
508 return retryAsyncUntilSuccess(() -> {
509 CompletableFuture<Boolean> future = new CompletableFuture<>();
510 asyncFlows.get(path).whenComplete((value, getError) -> {
511 if (getError == null) {
512 if (value != null) {
513 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
514 StoredFlowEntry entry = entries.get(rule);
515 if (entry != null) {
516 entry.setState(FlowEntryState.PENDING_REMOVE);
517 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
518 if (error == null) {
519 if (succeeded) {
520 log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
521 future.complete(true);
522 } else {
523 log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
524 future.completeExceptionally(RETRY);
525 }
526 } else {
527 future.completeExceptionally(error);
528 }
529 });
530 } else {
531 future.complete(false);
532 }
533 } else {
534 future.complete(false);
535 }
536 } else {
537 future.completeExceptionally(getError);
538 }
539 });
540 return future;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700541 });
542 }
543
544 @Override
545 public void batchOperationComplete(FlowRuleBatchEvent event) {
546 if (pendingBatches.remove(event.subject().batchId())) {
547 notifyDelegate(event);
548 } else {
549 clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
550 }
551 }
552
Madan Jampani86940d92015-05-06 11:47:57 -0700553 @Override
554 public void deleteFlowRule(FlowRule rule) {
555 storeBatch(
556 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700557 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700558 new FlowRuleBatchEntry(
559 FlowRuleOperation.REMOVE,
560 rule)), rule.deviceId(), idGenerator.getNewId()));
561 }
562
563 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800564 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700565 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
566 return retryUntilSuccess(() -> {
567 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
568 if (value != null) {
569 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
570 StoredFlowEntry entry = entries.get(rule);
571 if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
572 entry.setState(FlowEntryState.PENDING_ADD);
573 if (flows.replace(path, entries, value.version())) {
574 log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
575 return new FlowRuleEvent(RULE_UPDATED, rule);
576 } else {
577 log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
578 return retry();
579 }
580 } else {
581 return null;
582 }
583 } else {
584 return null;
Charles Chan93fa7272016-01-26 22:27:02 -0800585 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700586 });
Charles Chan93fa7272016-01-26 22:27:02 -0800587 }
588
589 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700590 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700591 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700592 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
593 return retryUntilSuccess(() -> {
594 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
595 if (value != null) {
596 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
597 StoredFlowEntry entry = entries.get(rule);
598 if (entry != null) {
599 FlowRuleEvent event;
600 String message;
Madan Jampani86940d92015-05-06 11:47:57 -0700601
Jordan Haltermanf7554092017-07-30 15:05:51 -0700602 entry.setBytes(rule.bytes());
603 entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
604 entry.setLiveType(rule.liveType());
605 entry.setPackets(rule.packets());
606 entry.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700607
Jordan Haltermanf7554092017-07-30 15:05:51 -0700608 // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
609 if (entry.state() == FlowEntryState.PENDING_ADD) {
610 entry.setState(FlowEntryState.ADDED);
611 event = new FlowRuleEvent(Type.RULE_ADDED, rule);
612 message = "Updated flow rule state to ADDED: {}";
613 } else {
614 event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
615 message = "Updated flow rule: {}";
616 }
617
618 if (flows.replace(path, entries, value.version())) {
619 log.trace(message, entry);
620 return event;
621 } else {
622 log.trace("Failed to update flow rule: {}", entry);
623 return retry();
624 }
625 } else {
626 // If the rule does not exist, return null. Inserting the rule risks race conditions
627 // that can result in removed rules being retained.
628 return null;
629 }
630 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700631 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700632 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700633 });
Madan Jampani86940d92015-05-06 11:47:57 -0700634 }
635
636 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700637 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700638 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700639 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
640 return retryUntilSuccess(() -> {
641 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
642 if (value != null) {
643 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
644 StoredFlowEntry entry = entries.remove(rule);
645 if (entry != null) {
646 if (flows.replace(path, entries, value.version())) {
647 log.trace("Removed flow rule: {}", entry);
648 return new FlowRuleEvent(RULE_REMOVED, entry);
649 } else {
650 log.trace("Failed to remove flow rule: {}", entry);
651 return retry();
652 }
653 } else {
654 return null;
655 }
656 } else {
657 return null;
658 }
659 });
Madan Jampani86940d92015-05-06 11:47:57 -0700660 }
661
662 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800663 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700664 DocumentPath path = getPathFor(deviceId);
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700665 retryUntilSuccess(() -> {
666 try {
667 for (String flowId : flows.getChildren(path).keySet()) {
668 flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
669 }
670 } catch (NoSuchDocumentPathException e) {
671 // Do nothing. There are no flows for the device.
Jordan Haltermanf7554092017-07-30 15:05:51 -0700672 }
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700673
674 // New children may have been created since they were removed above. Catch
675 // IllegalDocumentModificationException and retry if necessary.
676 try {
677 flows.removeNode(path);
678 } catch (NoSuchDocumentPathException e) {
679 return null;
680 } catch (IllegalDocumentModificationException e) {
681 return retry();
682 }
683 return null;
684 });
Charles Chan0c7c43b2016-01-14 17:39:20 -0800685 }
686
687 @Override
Victor Silva139bca42016-08-18 11:46:35 -0300688 public void purgeFlowRules() {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700689 try {
690 for (String deviceId : flows.getChildren(flows.root()).keySet()) {
691 purgeFlowRule(DeviceId.deviceId(deviceId));
Madan Jampani86940d92015-05-06 11:47:57 -0700692 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700693 } catch (NoSuchDocumentPathException e) {
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700694 // Do nothing if no children exist.
Madan Jampani86940d92015-05-06 11:47:57 -0700695 }
696 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700697
698 @Override
699 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700700 List<TableStatisticsEntry> tableStats) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700701 deviceTableStats.put(deviceId, tableStats);
702 return null;
703 }
704
705 @Override
706 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700707 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
708 if (tableStats == null) {
709 return Collections.emptyList();
710 }
711 return ImmutableList.copyOf(tableStats);
712 }
713
Patryk Konopka7e40c012017-06-06 13:38:06 +0200714 @Override
715 public long getActiveFlowRuleCount(DeviceId deviceId) {
716 return Streams.stream(getTableStatistics(deviceId))
717 .mapToLong(TableStatisticsEntry::activeFlowEntries)
718 .sum();
719 }
720
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700721 private class InternalTableStatsListener
Jordan Haltermanf7554092017-07-30 15:05:51 -0700722 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700723 @Override
724 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700725 List<TableStatisticsEntry>> event) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700726 //TODO: Generate an event to listeners (do we need?)
727 }
728 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700729}