blob: e0fe04d183bf8db9dac166f8e8c36e4edc06c03a [file] [log] [blame]
Jordan Haltermanf7554092017-07-30 15:05:51 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Madan Jampani86940d92015-05-06 11:47:57 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Jordan Haltermanf7554092017-07-30 15:05:51 -070018import com.google.common.collect.ImmutableList;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070022import com.google.common.collect.Streams;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070023import org.onlab.util.KryoNamespace;
24import org.onlab.util.Tools;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.core.CoreService;
28import org.onosproject.core.IdGenerator;
29import org.onosproject.mastership.MastershipService;
30import org.onosproject.net.DeviceId;
31import org.onosproject.net.device.DeviceService;
32import org.onosproject.net.flow.CompletedBatchOperation;
33import org.onosproject.net.flow.DefaultFlowEntry;
34import org.onosproject.net.flow.FlowEntry;
35import org.onosproject.net.flow.FlowEntry.FlowEntryState;
36import org.onosproject.net.flow.FlowId;
37import org.onosproject.net.flow.FlowRule;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070038import org.onosproject.net.flow.FlowRuleEvent;
39import org.onosproject.net.flow.FlowRuleEvent.Type;
40import org.onosproject.net.flow.FlowRuleService;
41import org.onosproject.net.flow.FlowRuleStore;
42import org.onosproject.net.flow.FlowRuleStoreDelegate;
43import org.onosproject.net.flow.StoredFlowEntry;
44import org.onosproject.net.flow.TableStatisticsEntry;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070045import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
46import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
47import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
48import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
49import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070050import org.onosproject.store.AbstractStore;
51import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070052import org.onosproject.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070053import org.onosproject.store.impl.MastershipBasedTimestamp;
54import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -070055import org.onosproject.store.service.AsyncDocumentTree;
Jordan Haltermanf7554092017-07-30 15:05:51 -070056import org.onosproject.store.service.DocumentPath;
57import org.onosproject.store.service.DocumentTree;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070058import org.onosproject.store.service.EventuallyConsistentMap;
59import org.onosproject.store.service.EventuallyConsistentMapEvent;
60import org.onosproject.store.service.EventuallyConsistentMapListener;
Jordan Haltermanf5295f62017-09-12 15:07:18 -070061import org.onosproject.store.service.IllegalDocumentModificationException;
Jordan Haltermanf7554092017-07-30 15:05:51 -070062import org.onosproject.store.service.NoSuchDocumentPathException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070063import org.onosproject.store.service.Serializer;
Jordan Haltermanf7554092017-07-30 15:05:51 -070064import org.onosproject.store.service.StorageException;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070065import org.onosproject.store.service.StorageService;
Jordan Haltermanf7554092017-07-30 15:05:51 -070066import org.onosproject.store.service.Versioned;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070067import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068import org.osgi.service.component.annotations.Activate;
69import org.osgi.service.component.annotations.Component;
70import org.osgi.service.component.annotations.Deactivate;
71import org.osgi.service.component.annotations.Reference;
72import org.osgi.service.component.annotations.ReferenceCardinality;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070073import org.slf4j.Logger;
Ray Milkey2b6ff422016-08-26 13:03:15 -070074
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075import java.security.SecureRandom;
76import java.util.Collections;
77import java.util.List;
78import java.util.Map;
79import java.util.Objects;
80import java.util.Random;
81import java.util.Set;
82import java.util.concurrent.CompletableFuture;
83import java.util.concurrent.ExecutorService;
84import java.util.concurrent.Executors;
85import java.util.concurrent.ScheduledExecutorService;
86import java.util.concurrent.TimeUnit;
87import java.util.function.Supplier;
88import java.util.stream.Collectors;
89
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070090import static org.onlab.util.Tools.groupedThreads;
91import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jordan Haltermanf7554092017-07-30 15:05:51 -070092import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
Yuta HIGUCHIbaaf8dc2017-06-21 17:44:09 -070093import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani86940d92015-05-06 11:47:57 -070094
95/**
96 * Manages inventory of flow rules using a distributed state management protocol.
Ray Milkeyed9bace2018-03-20 10:05:56 -070097 *
98 * @deprecated in Nightingale Release (1.13)
Madan Jampani86940d92015-05-06 11:47:57 -070099 */
Ray Milkeyed9bace2018-03-20 10:05:56 -0700100@Deprecated
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700101@Component(enabled = false, service = FlowRuleStore.class)
Madan Jampani37d04c62016-04-25 15:53:55 -0700102public class DistributedFlowRuleStore
Madan Jampani86940d92015-05-06 11:47:57 -0700103 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
104 implements FlowRuleStore {
105
106 private final Logger log = getLogger(getClass());
107
Jordan Haltermanf7554092017-07-30 15:05:51 -0700108 // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
109 // We don't want to populate a stack trace every time an optimistic lock is retried.
110 private static final StorageException.ConcurrentModification RETRY;
111
112 // Initialize retry exception with an empty stack trace.
113 static {
114 RETRY = new StorageException.ConcurrentModification();
115 RETRY.setStackTrace(new StackTraceElement[0]);
116 }
117
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700118 private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
Madan Jampani86940d92015-05-06 11:47:57 -0700119 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700120 private static final int MAX_RETRY_DELAY_MILLIS = 50;
Madan Jampani86940d92015-05-06 11:47:57 -0700121
Jordan Haltermanf7554092017-07-30 15:05:51 -0700122 private static final String FLOW_TABLE = "onos-flow-table";
Madan Jampani86940d92015-05-06 11:47:57 -0700123
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700124 private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
Jordan Haltermanf7554092017-07-30 15:05:51 -0700125 private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
Madan Jampani86940d92015-05-06 11:47:57 -0700126
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampani86940d92015-05-06 11:47:57 -0700128 protected DeviceService deviceService;
129
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampani86940d92015-05-06 11:47:57 -0700131 protected CoreService coreService;
132
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampani86940d92015-05-06 11:47:57 -0700134 protected MastershipService mastershipService;
135
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Haltermanf7554092017-07-30 15:05:51 -0700137 protected ClusterCommunicationService clusterCommunicator;
Aaron Kruglikova62fdbb2015-10-27 16:32:06 -0700138
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Haltermanf7554092017-07-30 15:05:51 -0700140 protected ClusterService clusterService;
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700141
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700143 protected StorageService storageService;
144
Madan Jampani884d4432016-08-23 10:46:55 -0700145 protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
Madan Jampani86940d92015-05-06 11:47:57 -0700146
Madan Jampani884d4432016-08-23 10:46:55 -0700147 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700148 .register(KryoNamespaces.API)
149 .register(MastershipBasedTimestamp.class);
150
Jordan Haltermanf7554092017-07-30 15:05:51 -0700151 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
152 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
153 new InternalTableStatsListener();
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700154
Jordan Haltermanf7554092017-07-30 15:05:51 -0700155 private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700156 private ScheduledExecutorService scheduledExecutor;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700157 private ExecutorService messageHandlingExecutor;
Ray Milkeyb68bbbc2017-12-18 10:05:49 -0800158 private final Random random = new SecureRandom();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700159
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700160 private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700161 private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
Madan Jampani86940d92015-05-06 11:47:57 -0700162 private IdGenerator idGenerator;
163 private NodeId local;
164
165 @Activate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700166 public void activate() {
Madan Jampani86940d92015-05-06 11:47:57 -0700167 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
168
169 local = clusterService.getLocalNode().id();
170
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700171 scheduledExecutor = Executors.newScheduledThreadPool(
172 SCHEDULED_THREAD_POOL_SIZE,
173 groupedThreads("onos/store/flow", "schedulers", log));
174
Madan Jampani86940d92015-05-06 11:47:57 -0700175 messageHandlingExecutor = Executors.newFixedThreadPool(
Jordan Haltermanf7554092017-07-30 15:05:51 -0700176 MESSAGE_HANDLER_THREAD_POOL_SIZE,
177 groupedThreads("onos/store/flow", "message-handlers", log));
Madan Jampani86940d92015-05-06 11:47:57 -0700178
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700179 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
180 .withName("onos-flow-table-stats")
Madan Jampani884d4432016-08-23 10:46:55 -0700181 .withSerializer(serializerBuilder)
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700182 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
183 .withTimestampProvider((k, v) -> new WallClockTimestamp())
184 .withTombstonesDisabled()
185 .build();
186 deviceTableStats.addListener(tableStatsListener);
187
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700188 asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700189 .withName(FLOW_TABLE)
190 .withSerializer(serializer)
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700191 .buildDocumentTree();
192 flows = asyncFlows.asDocumentTree();
Jordan Haltermanf7554092017-07-30 15:05:51 -0700193
194 clusterCommunicator.addSubscriber(
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700195 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700196 serializer::decode,
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700197 this::applyBatchFlows,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700198 messageHandlingExecutor);
199 clusterCommunicator.addSubscriber(
200 COMPLETE_BATCH,
201 serializer::decode,
202 this::completeBatch,
203 messageHandlingExecutor);
204
205 log.info("Started");
Madan Jampani86940d92015-05-06 11:47:57 -0700206 }
207
208 @Deactivate
Jordan Haltermanf7554092017-07-30 15:05:51 -0700209 public void deactivate() {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700210 deviceTableStats.removeListener(tableStatsListener);
211 deviceTableStats.destroy();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700212 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700213 clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
Madan Jampani86940d92015-05-06 11:47:57 -0700214 messageHandlingExecutor.shutdownNow();
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700215 scheduledExecutor.shutdownNow();
Madan Jampani86940d92015-05-06 11:47:57 -0700216 log.info("Stopped");
217 }
218
Jordan Haltermanf7554092017-07-30 15:05:51 -0700219 /**
220 * Retries the given supplier until successful.
221 * <p>
222 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
223 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
224 *
225 * @param supplier the supplier to retry
226 * @param <T> the return type
227 * @return the return value of the given supplier once it runs successfully
228 */
229 private <T> T retryUntilSuccess(Supplier<T> supplier) {
230 return Tools.retryable(
231 supplier,
232 StorageException.ConcurrentModification.class,
233 Integer.MAX_VALUE,
234 MAX_RETRY_DELAY_MILLIS)
235 .get();
Madan Jampani86940d92015-05-06 11:47:57 -0700236 }
237
Jordan Haltermanf7554092017-07-30 15:05:51 -0700238 /**
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700239 * Retries the given asynchronous supplier until successful.
240 * <p>
241 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
242 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
243 *
244 * @param supplier the supplier to retry
245 * @param <T> the return type
246 * @return the return value of the given supplier once it runs successfully
247 */
248 private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
249 return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
250 }
251
252 /**
253 * Retries the given asynchronous supplier until successful.
254 * <p>
255 * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
256 * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
257 *
258 * @param supplier the supplier to retry
259 * @param future future to be completed once the operation has been successful
260 * @param <T> the return type
261 * @return the return value of the given supplier once it runs successfully
262 */
263 private <T> CompletableFuture<T> retryAsyncUntilSuccess(
264 Supplier<CompletableFuture<T>> supplier,
265 CompletableFuture<T> future) {
266 supplier.get().whenComplete((result, error) -> {
267 if (error == null) {
268 future.complete(result);
269 } else {
270 Throwable cause = error.getCause() != null ? error.getCause() : error;
271 if (cause instanceof StorageException.ConcurrentModification) {
272 scheduledExecutor.schedule(
273 () -> retryAsyncUntilSuccess(supplier, future),
274 random.nextInt(50),
275 TimeUnit.MILLISECONDS);
276 } else {
277 future.completeExceptionally(error);
278 }
279 }
280 });
281 return future;
282 }
283
284 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700285 * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
286 * retried after a randomized delay.
287 *
288 * @param <T> the return type
289 * @return nothing
290 * @throws StorageException.ConcurrentModification to force a retry of the callback
291 */
292 private <T> T retry() {
293 throw RETRY;
Madan Jampani86940d92015-05-06 11:47:57 -0700294 }
295
Jordan Haltermanf7554092017-07-30 15:05:51 -0700296 /**
Jordan Haltermanf7554092017-07-30 15:05:51 -0700297 * Handles a completed batch event received from the master node.
298 * <p>
299 * If this node is the source of the batch, notifies event listeners to complete the operations.
300 *
301 * @param event the event to handle
302 */
303 private void completeBatch(FlowRuleBatchEvent event) {
304 if (pendingBatches.remove(event.subject().batchId())) {
305 notifyDelegate(event);
306 }
Madan Jampani86940d92015-05-06 11:47:57 -0700307 }
308
309 // This is not a efficient operation on a distributed sharded
310 // flow store. We need to revisit the need for this operation or at least
311 // make it device specific.
312 @Override
313 public int getFlowRuleCount() {
Yuta HIGUCHI10e91fb2017-06-15 13:30:50 -0700314 return Streams.stream(deviceService.getDevices()).parallel()
Jordan Haltermanf7554092017-07-30 15:05:51 -0700315 .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
316 .sum();
317 }
318
319 /**
320 * Returns the {@link DocumentPath} for the given {@link DeviceId}.
321 *
322 * @param deviceId the device identifier for which to return a path
323 * @return the path for the given device
324 */
325 private DocumentPath getPathFor(DeviceId deviceId) {
326 return DocumentPath.from("root", deviceId.toString());
327 }
328
329 /**
330 * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
331 *
332 * @param deviceId the device identifier for which to return the path
333 * @param flowId the flow identifier for which to return the path
334 * @return the path for the given device/flow
335 */
336 private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
337 return DocumentPath.from("root", deviceId.toString(), flowId.toString());
Madan Jampani86940d92015-05-06 11:47:57 -0700338 }
339
340 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700341 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700342 public FlowEntry getFlowEntry(FlowRule rule) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000343 DeviceId deviceId = rule.deviceId();
344 if (mastershipService.getMasterFor(deviceId) != null) {
345 DocumentPath path = getPathFor(deviceId, rule.id());
346 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
347 return flowEntries != null ? flowEntries.value().get(rule) : null;
348 } else {
349 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
350 return null;
351 }
352
353
Madan Jampani86940d92015-05-06 11:47:57 -0700354 }
355
356 @Override
357 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000358 if (mastershipService.getMasterFor(deviceId) != null) {
359 DocumentPath path = getPathFor(deviceId);
360 try {
361 return getFlowEntries(path);
362 } catch (NoSuchDocumentPathException e) {
363 return Collections.emptyList();
364 }
365 } else {
366 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700367 return Collections.emptyList();
368 }
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000369
Jordan Haltermanf7554092017-07-30 15:05:51 -0700370 }
Madan Jampani86940d92015-05-06 11:47:57 -0700371
Jordan Haltermanf7554092017-07-30 15:05:51 -0700372 @SuppressWarnings("unchecked")
373 private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
374 return flows.getChildren(path)
375 .values()
376 .stream()
377 .flatMap(v -> v.value().values().stream())
378 .collect(Collectors.toList());
Madan Jampani86940d92015-05-06 11:47:57 -0700379 }
380
381 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700382 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700383 public void storeFlowRule(FlowRule rule) {
384 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700385 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700386 rule.deviceId(), idGenerator.getNewId()));
387 }
388
389 @Override
390 public void storeBatch(FlowRuleBatchOperation operation) {
391 if (operation.getOperations().isEmpty()) {
392 notifyDelegate(FlowRuleBatchEvent.completed(
393 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
394 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
395 return;
396 }
397
398 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700399 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700400
401 if (master == null) {
Sivachidambaram Subramanian605104e2017-06-21 07:40:04 +0530402 log.warn("No master for {} ", deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700403
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700404 updateStoreInternal(operation).whenComplete((result, error) -> {
405 notifyDelegate(FlowRuleBatchEvent.completed(
406 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
407 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
408 });
Madan Jampani86940d92015-05-06 11:47:57 -0700409 return;
410 }
411
Jordan Haltermanf7554092017-07-30 15:05:51 -0700412 pendingBatches.add(operation.id());
Madan Jampani86940d92015-05-06 11:47:57 -0700413
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700414 // If the local node is the master, apply the flows. Otherwise, send them to the master.
415 if (Objects.equals(local, master)) {
416 applyBatchFlows(operation);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700417 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700418 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
419 clusterCommunicator.unicast(
420 operation,
421 APPLY_BATCH_FLOWS,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700422 serializer::encode,
423 master);
Madan Jampani86940d92015-05-06 11:47:57 -0700424 }
Madan Jampani86940d92015-05-06 11:47:57 -0700425 }
426
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700427 /**
428 * Asynchronously applies a batch of flows to the store.
429 * <p>
430 * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
431 * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
432 * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
433 * underlying {@code DocumentTree} primitive.
434 */
435 private void applyBatchFlows(FlowRuleBatchOperation operation) {
436 updateStoreInternal(operation).whenComplete((operations, error) -> {
437 if (error == null) {
438 if (operations.isEmpty()) {
439 batchOperationComplete(FlowRuleBatchEvent.completed(
440 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
441 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700442 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700443 notifyDelegate(FlowRuleBatchEvent.requested(
444 new FlowRuleBatchRequest(operation.id(), operations),
445 operation.deviceId()));
Jordan Haltermanf7554092017-07-30 15:05:51 -0700446 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700447 }
448 });
449 }
450
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700451 private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
452 return Tools.allOf(operation.getOperations().stream().map(op -> {
453 switch (op.operator()) {
454 case ADD:
Yi Tsengc29d8822017-10-25 16:19:25 -0700455 case MODIFY:
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700456 return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
457 case REMOVE:
458 return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700459 default:
460 log.warn("Unknown flow operation operator: {}", op.operator());
461 return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
462 }
463 }).collect(Collectors.toList()))
464 .thenApply(results -> results.stream()
465 .filter(Objects::nonNull)
466 .collect(Collectors.toSet()));
467 }
468
Jordan Haltermanf7554092017-07-30 15:05:51 -0700469 @SuppressWarnings("unchecked")
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700470 private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
471 StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
472 DocumentPath path = getPathFor(entry.deviceId(), entry.id());
473 return retryAsyncUntilSuccess(() -> {
474 CompletableFuture<Boolean> future = new CompletableFuture<>();
475 asyncFlows.get(path).whenComplete((value, getError) -> {
476 if (getError == null) {
477 if (value != null) {
478 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
479 entries.put(entry, entry);
480 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
481 if (replaceError == null) {
482 if (succeeded) {
483 log.trace("Stored new flow rule: {}", entry);
484 future.complete(true);
485 } else {
486 log.trace("Failed to store new flow rule: {}", entry);
487 future.completeExceptionally(RETRY);
488 }
489 } else {
490 future.completeExceptionally(replaceError);
491 }
492 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700493 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700494 // If there are no entries stored for the device, initialize the device's flows.
495 Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
496 map.put(entry, entry);
497 asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
498 if (createError == null) {
499 if (succeeded) {
500 log.trace("Stored new flow rule: {}", entry);
501 future.complete(true);
502 } else {
503 log.trace("Failed to store new flow rule: {}", entry);
504 future.completeExceptionally(RETRY);
505 }
506 } else {
507 future.completeExceptionally(createError);
508 }
509 });
Jordan Haltermanf7554092017-07-30 15:05:51 -0700510 }
511 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700512 future.completeExceptionally(getError);
Jordan Haltermanf7554092017-07-30 15:05:51 -0700513 }
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700514 });
515 return future;
516 });
517 }
518
519 @SuppressWarnings("unchecked")
520 private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
521 FlowRule rule = batchEntry.target();
522 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
523 return retryAsyncUntilSuccess(() -> {
524 CompletableFuture<Boolean> future = new CompletableFuture<>();
525 asyncFlows.get(path).whenComplete((value, getError) -> {
526 if (getError == null) {
527 if (value != null) {
528 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
529 StoredFlowEntry entry = entries.get(rule);
530 if (entry != null) {
531 entry.setState(FlowEntryState.PENDING_REMOVE);
532 asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
533 if (error == null) {
534 if (succeeded) {
535 log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
536 future.complete(true);
537 } else {
538 log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
539 future.completeExceptionally(RETRY);
540 }
541 } else {
542 future.completeExceptionally(error);
543 }
544 });
545 } else {
546 future.complete(false);
547 }
548 } else {
549 future.complete(false);
550 }
551 } else {
552 future.completeExceptionally(getError);
553 }
554 });
555 return future;
Jordan Haltermanf7554092017-07-30 15:05:51 -0700556 });
557 }
558
559 @Override
560 public void batchOperationComplete(FlowRuleBatchEvent event) {
561 if (pendingBatches.remove(event.subject().batchId())) {
562 notifyDelegate(event);
563 } else {
564 clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
565 }
566 }
567
Madan Jampani86940d92015-05-06 11:47:57 -0700568 @Override
569 public void deleteFlowRule(FlowRule rule) {
570 storeBatch(
571 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700572 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700573 new FlowRuleBatchEntry(
574 FlowRuleOperation.REMOVE,
575 rule)), rule.deviceId(), idGenerator.getNewId()));
576 }
577
578 @Override
Charles Chan93fa7272016-01-26 22:27:02 -0800579 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700580 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
581 return retryUntilSuccess(() -> {
582 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
583 if (value != null) {
584 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
585 StoredFlowEntry entry = entries.get(rule);
586 if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
587 entry.setState(FlowEntryState.PENDING_ADD);
588 if (flows.replace(path, entries, value.version())) {
589 log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
590 return new FlowRuleEvent(RULE_UPDATED, rule);
591 } else {
592 log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
593 return retry();
594 }
595 } else {
596 return null;
597 }
598 } else {
599 return null;
Charles Chan93fa7272016-01-26 22:27:02 -0800600 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700601 });
Charles Chan93fa7272016-01-26 22:27:02 -0800602 }
603
604 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700605 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700606 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700607 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
608 return retryUntilSuccess(() -> {
609 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
610 if (value != null) {
611 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
612 StoredFlowEntry entry = entries.get(rule);
613 if (entry != null) {
614 FlowRuleEvent event;
615 String message;
Madan Jampani86940d92015-05-06 11:47:57 -0700616
Jordan Haltermanf7554092017-07-30 15:05:51 -0700617 entry.setBytes(rule.bytes());
618 entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
619 entry.setLiveType(rule.liveType());
620 entry.setPackets(rule.packets());
621 entry.setLastSeen();
Madan Jampani86940d92015-05-06 11:47:57 -0700622
Jordan Haltermanf7554092017-07-30 15:05:51 -0700623 // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
624 if (entry.state() == FlowEntryState.PENDING_ADD) {
625 entry.setState(FlowEntryState.ADDED);
626 event = new FlowRuleEvent(Type.RULE_ADDED, rule);
627 message = "Updated flow rule state to ADDED: {}";
628 } else {
629 event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
630 message = "Updated flow rule: {}";
631 }
632
633 if (flows.replace(path, entries, value.version())) {
634 log.trace(message, entry);
635 return event;
636 } else {
637 log.trace("Failed to update flow rule: {}", entry);
638 return retry();
639 }
640 } else {
641 // If the rule does not exist, return null. Inserting the rule risks race conditions
642 // that can result in removed rules being retained.
643 return null;
644 }
645 } else {
Jordan Halterman9b3a7ce2017-08-25 15:12:54 -0700646 return null;
Madan Jampani86940d92015-05-06 11:47:57 -0700647 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700648 });
Madan Jampani86940d92015-05-06 11:47:57 -0700649 }
650
651 @Override
Jordan Haltermanf7554092017-07-30 15:05:51 -0700652 @SuppressWarnings("unchecked")
Madan Jampani86940d92015-05-06 11:47:57 -0700653 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700654 DocumentPath path = getPathFor(rule.deviceId(), rule.id());
655 return retryUntilSuccess(() -> {
656 Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
657 if (value != null) {
658 Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
659 StoredFlowEntry entry = entries.remove(rule);
660 if (entry != null) {
661 if (flows.replace(path, entries, value.version())) {
662 log.trace("Removed flow rule: {}", entry);
663 return new FlowRuleEvent(RULE_REMOVED, entry);
664 } else {
665 log.trace("Failed to remove flow rule: {}", entry);
666 return retry();
667 }
668 } else {
669 return null;
670 }
671 } else {
672 return null;
673 }
674 });
Madan Jampani86940d92015-05-06 11:47:57 -0700675 }
676
677 @Override
Charles Chan0c7c43b2016-01-14 17:39:20 -0800678 public void purgeFlowRule(DeviceId deviceId) {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700679 DocumentPath path = getPathFor(deviceId);
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700680 retryUntilSuccess(() -> {
681 try {
682 for (String flowId : flows.getChildren(path).keySet()) {
683 flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
684 }
685 } catch (NoSuchDocumentPathException e) {
686 // Do nothing. There are no flows for the device.
Jordan Haltermanf7554092017-07-30 15:05:51 -0700687 }
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700688
689 // New children may have been created since they were removed above. Catch
690 // IllegalDocumentModificationException and retry if necessary.
691 try {
692 flows.removeNode(path);
693 } catch (NoSuchDocumentPathException e) {
694 return null;
695 } catch (IllegalDocumentModificationException e) {
696 return retry();
697 }
698 return null;
699 });
Charles Chan0c7c43b2016-01-14 17:39:20 -0800700 }
701
702 @Override
Victor Silva139bca42016-08-18 11:46:35 -0300703 public void purgeFlowRules() {
Jordan Haltermanf7554092017-07-30 15:05:51 -0700704 try {
705 for (String deviceId : flows.getChildren(flows.root()).keySet()) {
706 purgeFlowRule(DeviceId.deviceId(deviceId));
Madan Jampani86940d92015-05-06 11:47:57 -0700707 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700708 } catch (NoSuchDocumentPathException e) {
Jordan Haltermanf5295f62017-09-12 15:07:18 -0700709 // Do nothing if no children exist.
Madan Jampani86940d92015-05-06 11:47:57 -0700710 }
711 }
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700712
713 @Override
714 public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700715 List<TableStatisticsEntry> tableStats) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700716 deviceTableStats.put(deviceId, tableStats);
717 return null;
718 }
719
720 @Override
721 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000722 if (mastershipService.getMasterFor(deviceId) != null) {
723 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
724 if (tableStats == null) {
725 return Collections.emptyList();
726 }
727 return ImmutableList.copyOf(tableStats);
728 } else {
729 log.debug("Failed to getTableStatistics: No master for {}", deviceId);
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700730 return Collections.emptyList();
731 }
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000732
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700733 }
734
Patryk Konopka7e40c012017-06-06 13:38:06 +0200735 @Override
736 public long getActiveFlowRuleCount(DeviceId deviceId) {
Deepa Vaddireddy2e85cfc2017-11-07 12:58:00 +0000737 if (mastershipService.getMasterFor(deviceId) != null) {
738 return Streams.stream(getTableStatistics(deviceId))
739 .mapToLong(TableStatisticsEntry::activeFlowEntries)
740 .sum();
741 } else {
742 log.debug("Failed to getActiveFlowRuleCount: No master for {}", deviceId);
743 return 0;
744 }
Patryk Konopka7e40c012017-06-06 13:38:06 +0200745 }
746
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700747 private class InternalTableStatsListener
Jordan Haltermanf7554092017-07-30 15:05:51 -0700748 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700749 @Override
750 public void event(EventuallyConsistentMapEvent<DeviceId,
Jordan Haltermanf7554092017-07-30 15:05:51 -0700751 List<TableStatisticsEntry>> event) {
Srikanth Vavilapalli95810f52015-09-14 15:49:56 -0700752 //TODO: Generate an event to listeners (do we need?)
753 }
754 }
Jordan Haltermanf7554092017-07-30 15:05:51 -0700755}