blob: 9dc2f60d990005f86fc78d1b9f388a2a15f99fbd [file] [log] [blame]
Jon Hallfa132292017-10-24 11:11:24 -07001 /*
2 * Copyright 2014-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
21import java.util.List;
22import java.util.Map;
23import java.util.Objects;
24import java.util.Set;
Jordan Halterman3664e8e2018-03-21 12:52:37 -070025import java.util.concurrent.CompletableFuture;
Jon Hallfa132292017-10-24 11:11:24 -070026import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.ScheduledExecutorService;
29import java.util.concurrent.ScheduledFuture;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.atomic.AtomicReference;
32import java.util.stream.Collectors;
Jordan Halterman000e4102018-06-12 15:34:19 -070033import java.util.stream.IntStream;
Jon Hallfa132292017-10-24 11:11:24 -070034
Jordan Halterman7ae81b82018-06-12 11:23:33 -070035import com.google.common.collect.ImmutableList;
Jordan Halterman000e4102018-06-12 15:34:19 -070036import com.google.common.collect.ImmutableSet;
Jordan Halterman7ae81b82018-06-12 11:23:33 -070037import com.google.common.collect.Maps;
38import com.google.common.collect.Sets;
Jon Hallfa132292017-10-24 11:11:24 -070039import com.google.common.collect.Streams;
Jordan Halterman7ae81b82018-06-12 11:23:33 -070040import com.google.common.util.concurrent.Futures;
Jon Hallfa132292017-10-24 11:11:24 -070041import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
44import org.apache.felix.scr.annotations.Modified;
45import org.apache.felix.scr.annotations.Property;
46import org.apache.felix.scr.annotations.Reference;
47import org.apache.felix.scr.annotations.ReferenceCardinality;
48import org.apache.felix.scr.annotations.Service;
49import org.onlab.util.KryoNamespace;
50import org.onlab.util.Tools;
51import org.onosproject.cfg.ComponentConfigService;
52import org.onosproject.cluster.ClusterService;
53import org.onosproject.cluster.NodeId;
54import org.onosproject.core.CoreService;
55import org.onosproject.core.IdGenerator;
56import org.onosproject.mastership.MastershipService;
57import org.onosproject.net.DeviceId;
58import org.onosproject.net.device.DeviceService;
59import org.onosproject.net.flow.CompletedBatchOperation;
60import org.onosproject.net.flow.DefaultFlowEntry;
61import org.onosproject.net.flow.FlowEntry;
62import org.onosproject.net.flow.FlowEntry.FlowEntryState;
63import org.onosproject.net.flow.FlowId;
64import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070065import org.onosproject.net.flow.FlowRuleEvent;
66import org.onosproject.net.flow.FlowRuleEvent.Type;
67import org.onosproject.net.flow.FlowRuleService;
68import org.onosproject.net.flow.FlowRuleStore;
69import org.onosproject.net.flow.FlowRuleStoreDelegate;
70import org.onosproject.net.flow.StoredFlowEntry;
71import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman7ae81b82018-06-12 11:23:33 -070072import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
73import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
74import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
75import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
76import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070077import org.onosproject.persistence.PersistenceService;
78import org.onosproject.store.AbstractStore;
79import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
80import org.onosproject.store.cluster.messaging.ClusterMessage;
81import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
82import org.onosproject.store.flow.ReplicaInfoEvent;
83import org.onosproject.store.flow.ReplicaInfoEventListener;
84import org.onosproject.store.flow.ReplicaInfoService;
85import org.onosproject.store.impl.MastershipBasedTimestamp;
86import org.onosproject.store.serializers.KryoNamespaces;
87import org.onosproject.store.service.EventuallyConsistentMap;
88import org.onosproject.store.service.EventuallyConsistentMapEvent;
89import org.onosproject.store.service.EventuallyConsistentMapListener;
90import org.onosproject.store.service.Serializer;
91import org.onosproject.store.service.StorageService;
92import org.onosproject.store.service.WallClockTimestamp;
93import org.osgi.service.component.ComponentContext;
94import org.slf4j.Logger;
95
Jon Hallfa132292017-10-24 11:11:24 -070096import static com.google.common.base.Strings.isNullOrEmpty;
97import static org.onlab.util.Tools.get;
98import static org.onlab.util.Tools.groupedThreads;
99import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
100import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
101import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
Jordan Halterman000e4102018-06-12 15:34:19 -0700102import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
Jon Hallfa132292017-10-24 11:11:24 -0700103import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
106import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
107import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
108import static org.slf4j.LoggerFactory.getLogger;
109
110/**
111 * Manages inventory of flow rules using a distributed state management protocol.
112 */
Thomas Vachuska71026b22018-01-05 16:01:44 -0800113@Component(immediate = true)
Jon Hallfa132292017-10-24 11:11:24 -0700114@Service
115public class ECFlowRuleStore
116 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
117 implements FlowRuleStore {
118
119 private final Logger log = getLogger(getClass());
120
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
122 private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
123 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
124 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Jordan Halterman000e4102018-06-12 15:34:19 -0700125 private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700126 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700127 private static final int NUM_BUCKETS = 1024;
Jon Hallfa132292017-10-24 11:11:24 -0700128
129 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
130 label = "Number of threads in the message handler pool")
131 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
132
133 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
134 label = "Delay in ms between successive backup runs")
135 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Jordan Halterman000e4102018-06-12 15:34:19 -0700136
137 @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
138 label = "Delay in ms between anti-entropy runs")
139 private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
140
Jon Hallfa132292017-10-24 11:11:24 -0700141 @Property(name = "persistenceEnabled", boolValue = false,
142 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
143 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
144
145 @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
146 label = "Max number of backup copies for each device")
147 private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
148
149 private InternalFlowTable flowTable = new InternalFlowTable();
150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
152 protected ReplicaInfoService replicaInfoManager;
153
154 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
155 protected ClusterCommunicationService clusterCommunicator;
156
157 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
158 protected ClusterService clusterService;
159
160 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
161 protected DeviceService deviceService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
164 protected CoreService coreService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
167 protected ComponentConfigService configService;
168
169 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
170 protected MastershipService mastershipService;
171
172 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
173 protected PersistenceService persistenceService;
174
175 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
176 private ExecutorService messageHandlingExecutor;
177 private ExecutorService eventHandler;
178
179 private ScheduledFuture<?> backupTask;
Jordan Halterman000e4102018-06-12 15:34:19 -0700180 private ScheduledFuture<?> antiEntropyTask;
Jon Hallfa132292017-10-24 11:11:24 -0700181 private final ScheduledExecutorService backupSenderExecutor =
182 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
183
184 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
185 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
186 new InternalTableStatsListener();
187
188 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
189 protected StorageService storageService;
190
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700191 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
192 .register(KryoNamespaces.API)
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700193 .register(BucketId.class)
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700194 .register(FlowBucket.class)
195 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700196
197 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700198 .register(KryoNamespaces.API)
199 .register(BucketId.class)
200 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700201
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700202 private EventuallyConsistentMap<BucketId, Integer> flowCounts;
Jon Hallfa132292017-10-24 11:11:24 -0700203
204 private IdGenerator idGenerator;
205 private NodeId local;
206
207 @Activate
208 public void activate(ComponentContext context) {
209 configService.registerProperties(getClass());
210
211 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
212
213 local = clusterService.getLocalNode().id();
214
215 eventHandler = Executors.newSingleThreadExecutor(
216 groupedThreads("onos/flow", "event-handler", log));
217 messageHandlingExecutor = Executors.newFixedThreadPool(
218 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
219
220 registerMessageHandlers(messageHandlingExecutor);
221
222 replicaInfoManager.addListener(flowTable);
223 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
224 flowTable::backup,
225 0,
226 backupPeriod,
227 TimeUnit.MILLISECONDS);
Jordan Halterman000e4102018-06-12 15:34:19 -0700228 antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
229 flowTable::runAntiEntropy,
230 0,
231 antiEntropyPeriod,
232 TimeUnit.MILLISECONDS);
Jon Hallfa132292017-10-24 11:11:24 -0700233
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700234 flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800235 .withName("onos-flow-counts")
236 .withSerializer(serializerBuilder)
237 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
238 .withTimestampProvider((k, v) -> new WallClockTimestamp())
239 .withTombstonesDisabled()
240 .build();
241
Jon Hallfa132292017-10-24 11:11:24 -0700242 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
243 .withName("onos-flow-table-stats")
244 .withSerializer(serializerBuilder)
245 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
246 .withTimestampProvider((k, v) -> new WallClockTimestamp())
247 .withTombstonesDisabled()
248 .build();
249 deviceTableStats.addListener(tableStatsListener);
250
251 logConfig("Started");
252 }
253
254 @Deactivate
255 public void deactivate(ComponentContext context) {
256 replicaInfoManager.removeListener(flowTable);
257 backupTask.cancel(true);
258 configService.unregisterProperties(getClass(), false);
259 unregisterMessageHandlers();
260 deviceTableStats.removeListener(tableStatsListener);
261 deviceTableStats.destroy();
262 eventHandler.shutdownNow();
263 messageHandlingExecutor.shutdownNow();
264 backupSenderExecutor.shutdownNow();
265 log.info("Stopped");
266 }
267
268 @SuppressWarnings("rawtypes")
269 @Modified
270 public void modified(ComponentContext context) {
271 if (context == null) {
272 logConfig("Default config");
273 return;
274 }
275
276 Dictionary properties = context.getProperties();
277 int newPoolSize;
278 int newBackupPeriod;
279 int newBackupCount;
Jordan Halterman000e4102018-06-12 15:34:19 -0700280 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700281 try {
282 String s = get(properties, "msgHandlerPoolSize");
283 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
284
285 s = get(properties, "backupPeriod");
286 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
287
288 s = get(properties, "backupCount");
289 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman000e4102018-06-12 15:34:19 -0700290
291 s = get(properties, "antiEntropyPeriod");
292 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700293 } catch (NumberFormatException | ClassCastException e) {
294 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
295 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
296 newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
Jordan Halterman000e4102018-06-12 15:34:19 -0700297 newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
Jon Hallfa132292017-10-24 11:11:24 -0700298 }
299
300 boolean restartBackupTask = false;
Jordan Halterman000e4102018-06-12 15:34:19 -0700301 boolean restartAntiEntropyTask = false;
Jon Hallfa132292017-10-24 11:11:24 -0700302
303 if (newBackupPeriod != backupPeriod) {
304 backupPeriod = newBackupPeriod;
305 restartBackupTask = true;
306 }
Jordan Halterman000e4102018-06-12 15:34:19 -0700307
308 if (newAntiEntropyPeriod != antiEntropyPeriod) {
309 antiEntropyPeriod = newAntiEntropyPeriod;
310 restartAntiEntropyTask = true;
311 }
312
Jon Hallfa132292017-10-24 11:11:24 -0700313 if (restartBackupTask) {
314 if (backupTask != null) {
315 // cancel previously running task
316 backupTask.cancel(false);
317 }
318 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
319 flowTable::backup,
320 0,
321 backupPeriod,
322 TimeUnit.MILLISECONDS);
323 }
Jordan Halterman000e4102018-06-12 15:34:19 -0700324
325 if (restartAntiEntropyTask) {
326 if (antiEntropyTask != null) {
327 // cancel previously running task
328 antiEntropyTask.cancel(false);
329 }
330 antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
331 flowTable::runAntiEntropy,
332 0,
333 antiEntropyPeriod,
334 TimeUnit.MILLISECONDS);
335 }
336
Jon Hallfa132292017-10-24 11:11:24 -0700337 if (newPoolSize != msgHandlerPoolSize) {
338 msgHandlerPoolSize = newPoolSize;
339 ExecutorService oldMsgHandler = messageHandlingExecutor;
340 messageHandlingExecutor = Executors.newFixedThreadPool(
341 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
342
343 // replace previously registered handlers.
344 registerMessageHandlers(messageHandlingExecutor);
345 oldMsgHandler.shutdown();
346 }
Jordan Halterman000e4102018-06-12 15:34:19 -0700347
Jon Hallfa132292017-10-24 11:11:24 -0700348 if (backupCount != newBackupCount) {
349 backupCount = newBackupCount;
350 }
351 logConfig("Reconfigured");
352 }
353
354 private void registerMessageHandlers(ExecutorService executor) {
355
356 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
357 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
358 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
359 clusterCommunicator.addSubscriber(
360 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
361 clusterCommunicator.addSubscriber(
362 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
363 clusterCommunicator.addSubscriber(
364 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
365 clusterCommunicator.addSubscriber(
Jordan Halterman7ae81b82018-06-12 11:23:33 -0700366 FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
Jordan Halterman000e4102018-06-12 15:34:19 -0700367 clusterCommunicator.addSubscriber(
368 FLOW_TABLE_ANTI_ENTROPY, serializer::decode, flowTable::onAntiEntropy, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700369 }
370
371 private void unregisterMessageHandlers() {
372 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
373 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
374 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
375 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
376 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
377 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
Jordan Halterman000e4102018-06-12 15:34:19 -0700378 clusterCommunicator.removeSubscriber(FLOW_TABLE_ANTI_ENTROPY);
Jon Hallfa132292017-10-24 11:11:24 -0700379 }
380
381 private void logConfig(String prefix) {
382 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
383 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
384 }
385
Jon Hallfa132292017-10-24 11:11:24 -0700386 @Override
387 public int getFlowRuleCount() {
388 return Streams.stream(deviceService.getDevices()).parallel()
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800389 .mapToInt(device -> getFlowRuleCount(device.id()))
390 .sum();
391 }
392
393 @Override
394 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700395 return flowCounts.entrySet().stream()
396 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
397 .mapToInt(entry -> entry.getValue())
398 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700399 }
400
401 @Override
402 public FlowEntry getFlowEntry(FlowRule rule) {
403 NodeId master = mastershipService.getMasterFor(rule.deviceId());
404
405 if (master == null) {
406 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
407 return null;
408 }
409
410 if (Objects.equals(local, master)) {
411 return flowTable.getFlowEntry(rule);
412 }
413
414 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
415 master, rule.deviceId());
416
417 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
418 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
419 serializer::encode,
420 serializer::decode,
421 master),
422 FLOW_RULE_STORE_TIMEOUT_MILLIS,
423 TimeUnit.MILLISECONDS,
424 null);
425 }
426
427 @Override
428 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
429 NodeId master = mastershipService.getMasterFor(deviceId);
430
431 if (master == null) {
432 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
433 return Collections.emptyList();
434 }
435
436 if (Objects.equals(local, master)) {
437 return flowTable.getFlowEntries(deviceId);
438 }
439
440 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
441 master, deviceId);
442
443 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
444 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
445 serializer::encode,
446 serializer::decode,
447 master),
448 FLOW_RULE_STORE_TIMEOUT_MILLIS,
449 TimeUnit.MILLISECONDS,
450 Collections.emptyList());
451 }
452
453 @Override
454 public void storeFlowRule(FlowRule rule) {
455 storeBatch(new FlowRuleBatchOperation(
456 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
457 rule.deviceId(), idGenerator.getNewId()));
458 }
459
460 @Override
461 public void storeBatch(FlowRuleBatchOperation operation) {
462 if (operation.getOperations().isEmpty()) {
463 notifyDelegate(FlowRuleBatchEvent.completed(
464 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
465 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
466 return;
467 }
468
469 DeviceId deviceId = operation.deviceId();
470 NodeId master = mastershipService.getMasterFor(deviceId);
471
472 if (master == null) {
473 log.warn("No master for {} ", deviceId);
474
475 updateStoreInternal(operation);
476
477 notifyDelegate(FlowRuleBatchEvent.completed(
478 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
479 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
480 return;
481 }
482
483 if (Objects.equals(local, master)) {
484 storeBatchInternal(operation);
485 return;
486 }
487
488 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
489 master, deviceId);
490
491 clusterCommunicator.unicast(operation,
492 APPLY_BATCH_FLOWS,
493 serializer::encode,
494 master)
495 .whenComplete((result, error) -> {
496 if (error != null) {
497 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
498
499 Set<FlowRule> allFailures = operation.getOperations()
500 .stream()
501 .map(op -> op.target())
502 .collect(Collectors.toSet());
503
504 notifyDelegate(FlowRuleBatchEvent.completed(
505 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
506 new CompletedBatchOperation(false, allFailures, deviceId)));
507 }
508 });
509 }
510
511 private void storeBatchInternal(FlowRuleBatchOperation operation) {
512
513 final DeviceId did = operation.deviceId();
514 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
515 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
516 if (currentOps.isEmpty()) {
517 batchOperationComplete(FlowRuleBatchEvent.completed(
518 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
519 new CompletedBatchOperation(true, Collections.emptySet(), did)));
520 return;
521 }
522
523 notifyDelegate(FlowRuleBatchEvent.requested(new
524 FlowRuleBatchRequest(operation.id(),
525 currentOps), operation.deviceId()));
526 }
527
528 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
529 return operation.getOperations().stream().map(
530 op -> {
531 StoredFlowEntry entry;
532 switch (op.operator()) {
533 case ADD:
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800534 entry = new DefaultFlowEntry(op.target());
535 flowTable.add(entry);
536 return op;
Thomas Vachuska914b0b12018-01-09 11:54:52 -0800537 case MODIFY:
Jon Hallfa132292017-10-24 11:11:24 -0700538 entry = new DefaultFlowEntry(op.target());
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800539 flowTable.update(entry);
Jon Hallfa132292017-10-24 11:11:24 -0700540 return op;
541 case REMOVE:
542 entry = flowTable.getFlowEntry(op.target());
543 if (entry != null) {
Jon Hallfa132292017-10-24 11:11:24 -0700544 entry.setState(FlowEntryState.PENDING_REMOVE);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800545 flowTable.update(entry);
Jon Hallfa132292017-10-24 11:11:24 -0700546 log.debug("Setting state of rule to pending remove: {}", entry);
547 return op;
548 }
549 break;
Jon Hallfa132292017-10-24 11:11:24 -0700550 default:
551 log.warn("Unknown flow operation operator: {}", op.operator());
552 }
553 return null;
554 }
555 ).filter(Objects::nonNull).collect(Collectors.toSet());
556 }
557
558 @Override
559 public void deleteFlowRule(FlowRule rule) {
560 storeBatch(
561 new FlowRuleBatchOperation(
562 Collections.singletonList(
563 new FlowRuleBatchEntry(
564 FlowRuleOperation.REMOVE,
565 rule)), rule.deviceId(), idGenerator.getNewId()));
566 }
567
568 @Override
569 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
570 if (mastershipService.isLocalMaster(rule.deviceId())) {
571 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
572 if (stored != null &&
573 stored.state() != FlowEntryState.PENDING_ADD) {
574 stored.setState(FlowEntryState.PENDING_ADD);
575 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
576 }
577 }
578 return null;
579 }
580
581 @Override
582 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
583 NodeId master = mastershipService.getMasterFor(rule.deviceId());
584 if (Objects.equals(local, master)) {
585 return addOrUpdateFlowRuleInternal(rule);
586 }
587
588 log.warn("Tried to update FlowRule {} state,"
589 + " while the Node was not the master.", rule);
590 return null;
591 }
592
593 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
594 // check if this new rule is an update to an existing entry
595 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
596 if (stored != null) {
Jon Hallfa132292017-10-24 11:11:24 -0700597 stored.setBytes(rule.bytes());
598 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
599 stored.setLiveType(rule.liveType());
600 stored.setPackets(rule.packets());
601 stored.setLastSeen();
602 if (stored.state() == FlowEntryState.PENDING_ADD) {
603 stored.setState(FlowEntryState.ADDED);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800604 // Update the flow table to ensure the changes are replicated
605 flowTable.update(stored);
Jon Hallfa132292017-10-24 11:11:24 -0700606 return new FlowRuleEvent(Type.RULE_ADDED, rule);
607 }
608 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
609 }
610
611 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
612 // TODO: also update backup if the behavior is correct.
613 flowTable.add(rule);
614 return null;
615 }
616
617 @Override
618 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
619 final DeviceId deviceId = rule.deviceId();
620 NodeId master = mastershipService.getMasterFor(deviceId);
621
622 if (Objects.equals(local, master)) {
623 // bypass and handle it locally
624 return removeFlowRuleInternal(rule);
625 }
626
627 if (master == null) {
628 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
629 // TODO: revisit if this should be null (="no-op") or Exception
630 return null;
631 }
632
633 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
634 master, deviceId);
635
636 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
637 rule,
638 REMOVE_FLOW_ENTRY,
639 serializer::encode,
640 serializer::decode,
641 master));
642 }
643
644 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700645 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800646 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700647 // rule may be partial rule that is missing treatment, we should use rule from store instead
648 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
649 }
650
651 @Override
652 public void purgeFlowRule(DeviceId deviceId) {
653 flowTable.purgeFlowRule(deviceId);
654 }
655
656 @Override
657 public void purgeFlowRules() {
658 flowTable.purgeFlowRules();
659 }
660
661 @Override
662 public void batchOperationComplete(FlowRuleBatchEvent event) {
663 //FIXME: need a per device pending response
664 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
665 if (nodeId == null) {
666 notifyDelegate(event);
667 } else {
668 // TODO check unicast return value
669 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
670 //error log: log.warn("Failed to respond to peer for batch operation result");
671 }
672 }
673
674 private final class OnStoreBatch implements ClusterMessageHandler {
675
676 @Override
677 public void handle(final ClusterMessage message) {
678 FlowRuleBatchOperation operation = serializer.decode(message.payload());
679 log.debug("received batch request {}", operation);
680
681 final DeviceId deviceId = operation.deviceId();
682 NodeId master = mastershipService.getMasterFor(deviceId);
683 if (!Objects.equals(local, master)) {
684 Set<FlowRule> failures = new HashSet<>(operation.size());
685 for (FlowRuleBatchEntry op : operation.getOperations()) {
686 failures.add(op.target());
687 }
688 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
689 // This node is no longer the master, respond as all failed.
690 // TODO: we might want to wrap response in envelope
691 // to distinguish sw programming failure and hand over
692 // it make sense in the latter case to retry immediately.
693 message.respond(serializer.encode(allFailed));
694 return;
695 }
696
697 pendingResponses.put(operation.id(), message.sender());
698 storeBatchInternal(operation);
699 }
700 }
701
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700702 /**
703 * Represents a backup to a of a distinct bucket to a distinct node.
704 */
Jon Hallfa132292017-10-24 11:11:24 -0700705 private class BackupOperation {
706 private final NodeId nodeId;
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700707 private final BucketId bucketId;
Jon Hallfa132292017-10-24 11:11:24 -0700708
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700709 BackupOperation(NodeId nodeId, BucketId bucketId) {
Jon Hallfa132292017-10-24 11:11:24 -0700710 this.nodeId = nodeId;
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700711 this.bucketId = bucketId;
712 }
713
714 NodeId nodeId() {
715 return nodeId;
716 }
717
718 BucketId bucketId() {
719 return bucketId;
Jon Hallfa132292017-10-24 11:11:24 -0700720 }
721
722 @Override
723 public int hashCode() {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700724 return Objects.hash(nodeId, bucketId);
Jon Hallfa132292017-10-24 11:11:24 -0700725 }
726
727 @Override
728 public boolean equals(Object other) {
729 if (other != null && other instanceof BackupOperation) {
730 BackupOperation that = (BackupOperation) other;
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700731 return this.nodeId.equals(that.nodeId)
732 && this.bucketId.equals(that.bucketId);
Jon Hallfa132292017-10-24 11:11:24 -0700733 }
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700734 return false;
Jon Hallfa132292017-10-24 11:11:24 -0700735 }
736 }
737
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700738 /**
739 * Represents a distinct device flow bucket.
740 */
741 private class BucketId {
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700742 private final DeviceId deviceId;
743 private final int bucket;
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700744
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700745 BucketId(DeviceId deviceId, int bucket) {
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700746 this.deviceId = deviceId;
747 this.bucket = bucket;
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700748 }
749
750 DeviceId deviceId() {
751 return deviceId;
752 }
753
754 int bucket() {
755 return bucket;
756 }
757
758 @Override
759 public int hashCode() {
760 return Objects.hash(deviceId, bucket);
761 }
762
763 @Override
764 public boolean equals(Object other) {
765 if (other != null && other instanceof BucketId) {
766 BucketId that = (BucketId) other;
767 return this.deviceId.equals(that.deviceId)
768 && this.bucket == that.bucket;
769 }
770 return false;
771 }
772 }
773
774 /**
775 * Container for flows in a specific bucket.
776 */
777 private class FlowBucket {
778 private final BucketId bucketId;
779 private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
Jordan Halterman000e4102018-06-12 15:34:19 -0700780 private final long timestamp;
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700781
782 BucketId bucketId() {
783 return bucketId;
784 }
785
786 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table() {
787 return table;
788 }
789
Jordan Halterman000e4102018-06-12 15:34:19 -0700790 long timestamp() {
791 return timestamp;
792 }
793
794 FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table, long timestamp) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700795 this.bucketId = bucketId;
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700796 this.table = table;
Jordan Halterman000e4102018-06-12 15:34:19 -0700797 this.timestamp = timestamp;
798 }
799 }
800
801 /**
802 * Device digest.
803 */
804 private class DeviceDigest {
805 private final DeviceId deviceId;
806 private final Set<FlowBucketDigest> digests;
807
808 DeviceDigest(DeviceId deviceId, Set<FlowBucketDigest> digests) {
809 this.deviceId = deviceId;
810 this.digests = digests;
811 }
812
813 DeviceId deviceId() {
814 return deviceId;
815 }
816
817 Set<FlowBucketDigest> digests() {
818 return digests;
819 }
820
821 @Override
822 public int hashCode() {
823 return Objects.hash(deviceId, digests);
824 }
825
826 @Override
827 public boolean equals(Object object) {
828 return object instanceof DeviceDigest
829 && ((DeviceDigest) object).deviceId.equals(deviceId);
830 }
831 }
832
833 /**
834 * Flow bucket digest.
835 */
836 private class FlowBucketDigest {
837 private final BucketId bucketId;
838 private final long timestamp;
839
840 FlowBucketDigest(BucketId bucketId, long timestamp) {
841 this.bucketId = bucketId;
842 this.timestamp = timestamp;
843 }
844
845 BucketId bucketId() {
846 return bucketId;
847 }
848
849 long timestamp() {
850 return timestamp;
851 }
852
853 @Override
854 public int hashCode() {
855 return Objects.hash(bucketId);
856 }
857
858 @Override
859 public boolean equals(Object object) {
860 return object instanceof FlowBucketDigest
861 && ((FlowBucketDigest) object).bucketId.equals(bucketId);
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700862 }
863 }
864
Jon Hallfa132292017-10-24 11:11:24 -0700865 private class InternalFlowTable implements ReplicaInfoEventListener {
866
867 //TODO replace the Map<V,V> with ExtendedSet
868 private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
869 flowEntries = Maps.newConcurrentMap();
870
871 private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700872 private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
873 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
Jon Hallfa132292017-10-24 11:11:24 -0700874
875 @Override
876 public void event(ReplicaInfoEvent event) {
877 eventHandler.execute(() -> handleEvent(event));
878 }
879
880 private void handleEvent(ReplicaInfoEvent event) {
881 DeviceId deviceId = event.subject();
882 if (!mastershipService.isLocalMaster(deviceId)) {
883 return;
884 }
885 if (event.type() == MASTER_CHANGED) {
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700886 for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700887 recordUpdate(new BucketId(deviceId, bucket));
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700888 }
Jon Hallfa132292017-10-24 11:11:24 -0700889 }
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700890 backupSenderExecutor.execute(this::backup);
Jon Hallfa132292017-10-24 11:11:24 -0700891 }
892
Jon Hallfa132292017-10-24 11:11:24 -0700893 /**
Jordan Halterman000e4102018-06-12 15:34:19 -0700894 * Returns the set of devices in the flow table.
895 *
896 * @return the set of devices in the flow table
897 */
898 private Set<DeviceId> getDevices() {
899 return flowEntries.keySet();
900 }
901
902 /**
903 * Returns the digests for all buckets in the flow table for the given device.
904 *
905 * @param deviceId the device for which to return digests
906 * @return the set of digests for all buckets for the given device
907 */
908 private Set<FlowBucketDigest> getDigests(DeviceId deviceId) {
909 return IntStream.range(0, NUM_BUCKETS)
910 .mapToObj(bucket -> {
911 BucketId bucketId = new BucketId(deviceId, bucket);
912 long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
913 return new FlowBucketDigest(bucketId, timestamp);
914 }).collect(Collectors.toSet());
915 }
916
917 /**
Jon Hallfa132292017-10-24 11:11:24 -0700918 * Returns the flow table for specified device.
919 *
920 * @param deviceId identifier of the device
921 * @return Map representing Flow Table of given device.
922 */
923 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
Jordan Halterman343995a2018-06-12 11:40:21 -0700924 // Use an external get/null check to avoid locks.
925 // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
Jon Hallfa132292017-10-24 11:11:24 -0700926 if (persistenceEnabled) {
Jordan Halterman343995a2018-06-12 11:40:21 -0700927 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
928 return flowTable != null ? flowTable
929 : flowEntries.computeIfAbsent(deviceId, id ->
930 persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
Jon Hallfa132292017-10-24 11:11:24 -0700931 .withName("FlowTable:" + deviceId.toString())
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700932 .withSerializer(serializer)
Jon Hallfa132292017-10-24 11:11:24 -0700933 .build());
934 } else {
Jordan Halterman343995a2018-06-12 11:40:21 -0700935 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
936 return flowTable != null ? flowTable
937 : flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
Jon Hallfa132292017-10-24 11:11:24 -0700938 }
939 }
940
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700941 private FlowBucket getFlowBucket(BucketId bucketId) {
Jordan Halterman000e4102018-06-12 15:34:19 -0700942 long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
Jordan Halterman343995a2018-06-12 11:40:21 -0700943 return new FlowBucket(bucketId, getFlowTable(bucketId.deviceId())
944 .entrySet()
945 .stream()
946 .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
Jordan Halterman000e4102018-06-12 15:34:19 -0700947 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
948 timestamp);
Jon Hallfa132292017-10-24 11:11:24 -0700949 }
950
951 private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
Jordan Halterman343995a2018-06-12 11:40:21 -0700952 // Use an external get/null check to avoid locks.
953 // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
954 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(deviceId);
955 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowTable.get(flowId);
956 return flowEntries != null ? flowEntries : flowTable.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
Jon Hallfa132292017-10-24 11:11:24 -0700957 }
958
959 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
960 return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
961 }
962
963 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
964 return getFlowTable(deviceId).values().stream()
965 .flatMap(m -> m.values().stream())
966 .collect(Collectors.toSet());
967 }
968
969 public StoredFlowEntry getFlowEntry(FlowRule rule) {
970 return getFlowEntryInternal(rule);
971 }
972
973 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
974 return getFlowEntriesInternal(deviceId);
975 }
976
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700977 private boolean isInBucket(FlowId flowId, int bucket) {
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700978 return bucket(flowId) == bucket;
979 }
980
981 private int bucket(FlowId flowId) {
982 return (int) (flowId.id() % NUM_BUCKETS);
983 }
984
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700985 private void recordUpdate(BucketId bucketId) {
986 lastUpdateTimes.put(bucketId, System.currentTimeMillis());
Jordan Halterman3664e8e2018-03-21 12:52:37 -0700987 }
988
Jon Hallfa132292017-10-24 11:11:24 -0700989 public void add(FlowEntry rule) {
Devin Limcdca1952018-03-28 18:13:33 -0700990 getFlowEntriesInternal(rule.deviceId(), rule.id())
991 .compute((StoredFlowEntry) rule, (k, stored) -> {
992 return (StoredFlowEntry) rule;
993 });
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -0700994 recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
Jon Hallfa132292017-10-24 11:11:24 -0700995 }
996
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800997 public void update(FlowEntry rule) {
998 getFlowEntriesInternal(rule.deviceId(), rule.id())
999 .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
1000 if (rule instanceof DefaultFlowEntry) {
1001 DefaultFlowEntry updated = (DefaultFlowEntry) rule;
1002 if (stored instanceof DefaultFlowEntry) {
1003 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
1004 if (updated.created() >= storedEntry.created()) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001005 recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
Jordan Halterman2edfeef2018-01-16 14:59:49 -08001006 return updated;
1007 } else {
1008 log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
1009 return stored;
1010 }
1011 }
1012 }
1013 return stored;
1014 });
1015 }
1016
1017 public FlowEntry remove(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -07001018 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
Jordan Halterman2edfeef2018-01-16 14:59:49 -08001019 final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
Jordan Haltermance336f72018-01-16 17:08:09 -08001020 flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
1021 flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
Jon Hallfa132292017-10-24 11:11:24 -07001022 if (rule instanceof DefaultFlowEntry) {
1023 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
1024 if (stored instanceof DefaultFlowEntry) {
1025 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
1026 if (toRemove.created() < storedEntry.created()) {
Jordan Halterman2edfeef2018-01-16 14:59:49 -08001027 log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
Jon Hallfa132292017-10-24 11:11:24 -07001028 // the key is not updated, removedRule remains null
1029 return stored;
1030 }
1031 }
1032 }
1033 removedRule.set(stored);
1034 return null;
1035 });
Jordan Haltermance336f72018-01-16 17:08:09 -08001036 return flowEntries.isEmpty() ? null : flowEntries;
1037 });
Jon Hallfa132292017-10-24 11:11:24 -07001038
1039 if (removedRule.get() != null) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001040 recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
Jon Hallfa132292017-10-24 11:11:24 -07001041 return removedRule.get();
1042 } else {
1043 return null;
1044 }
1045 }
1046
1047 public void purgeFlowRule(DeviceId deviceId) {
1048 flowEntries.remove(deviceId);
1049 }
1050
1051 public void purgeFlowRules() {
1052 flowEntries.clear();
1053 }
1054
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001055 /**
1056 * Returns a boolean indicating whether the local node is the current master for the given device.
1057 *
1058 * @param deviceId the device for which to indicate whether the local node is the current master
1059 * @return indicates whether the local node is the current master for the given device
1060 */
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001061 private boolean isMasterNode(DeviceId deviceId) {
1062 NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
1063 return Objects.equals(master, clusterService.getLocalNode().id());
1064 }
1065
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001066 /**
1067 * Backs up all devices to all backup nodes.
1068 */
Jon Hallfa132292017-10-24 11:11:24 -07001069 private void backup() {
Jordan Halterman000e4102018-06-12 15:34:19 -07001070 for (DeviceId deviceId : getDevices()) {
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001071 backup(deviceId);
1072 }
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001073 }
1074
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001075 /**
1076 * Backs up all buckets in the given device to the given node.
1077 *
1078 * @param deviceId the device to back up
1079 */
1080 private void backup(DeviceId deviceId) {
1081 if (!isMasterNode(deviceId)) {
1082 return;
1083 }
1084
1085 // Get a list of backup nodes for the device.
1086 List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
1087 int availableBackupCount = Math.min(backupCount, backupNodes.size());
1088
1089 // If the list of backup nodes is empty, update the flow count.
1090 if (availableBackupCount == 0) {
1091 updateDeviceFlowCounts(deviceId);
1092 } else {
1093 // Otherwise, iterate through backup nodes and backup the device.
1094 for (int index = 0; index < availableBackupCount; index++) {
1095 NodeId backupNode = backupNodes.get(index);
1096 try {
1097 backup(deviceId, backupNode);
1098 } catch (Exception e) {
1099 log.error("Backup of " + deviceId + " to " + backupNode + " failed", e);
1100 }
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001101 }
Jon Hallfa132292017-10-24 11:11:24 -07001102 }
1103 }
1104
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001105 /**
1106 * Backs up all buckets for the given device to the given node.
1107 *
1108 * @param deviceId the device to back up
1109 * @param nodeId the node to which to back up the device
1110 */
1111 private void backup(DeviceId deviceId, NodeId nodeId) {
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001112 final long timestamp = System.currentTimeMillis();
1113 for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001114 BucketId bucketId = new BucketId(deviceId, bucket);
1115 BackupOperation operation = new BackupOperation(nodeId, bucketId);
1116 if (startBackup(operation)) {
1117 backup(operation).whenCompleteAsync((succeeded, error) -> {
1118 if (error == null && succeeded) {
1119 succeedBackup(operation, timestamp);
1120 } else {
1121 failBackup(operation);
1122 }
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001123 backup(deviceId, nodeId);
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001124 }, backupSenderExecutor);
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001125 }
1126 }
1127 }
1128
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001129 /**
1130 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
1131 * <p>
1132 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
1133 * are pending replication for the backup operation.
1134 *
1135 * @param operation the operation to start
1136 * @return indicates whether the given backup operation should be started
1137 */
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001138 private boolean startBackup(BackupOperation operation) {
1139 long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
1140 long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
1141 return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001142 }
1143
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001144 /**
1145 * Fails the given backup operation.
1146 *
1147 * @param operation the backup operation to fail
1148 */
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001149 private void failBackup(BackupOperation operation) {
1150 inFlightUpdates.remove(operation);
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001151 }
1152
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001153 /**
1154 * Succeeds the given backup operation.
1155 * <p>
1156 * The last backup time for the operation will be updated and the operation will be removed from
1157 * in-flight updates.
1158 *
1159 * @param operation the operation to succeed
1160 * @param timestamp the timestamp at which the operation was <em>started</em>
1161 */
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001162 private void succeedBackup(BackupOperation operation, long timestamp) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001163 lastBackupTimes.put(operation, timestamp);
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001164 inFlightUpdates.remove(operation);
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001165 }
1166
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001167 /**
1168 * Performs the given backup operation.
1169 *
1170 * @param operation the operation to perform
1171 * @return a future to be completed with a boolean indicating whether the backup operation was successful
1172 */
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001173 private CompletableFuture<Boolean> backup(BackupOperation operation) {
1174 log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
1175 operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
1176 FlowBucket flowBucket = getFlowBucket(operation.bucketId());
1177
1178 CompletableFuture<Boolean> future = new CompletableFuture<>();
Jordan Halterman000e4102018-06-12 15:34:19 -07001179 clusterCommunicator.<FlowBucket, Set<FlowId>>sendAndReceive(
1180 flowBucket,
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001181 FLOW_TABLE_BACKUP,
1182 serializer::encode,
1183 serializer::decode,
1184 operation.nodeId())
1185 .whenComplete((backedupFlows, error) -> {
1186 Set<FlowId> flowsNotBackedUp = error != null ?
1187 flowBucket.table().keySet() :
1188 Sets.difference(flowBucket.table().keySet(), backedupFlows);
1189 if (flowsNotBackedUp.size() > 0) {
1190 log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
1191 flowsNotBackedUp, error != null ? error.getMessage() : "none", operation.nodeId());
1192 }
1193 future.complete(backedupFlows != null);
1194 });
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001195
1196 updateFlowCounts(flowBucket);
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001197 return future;
1198 }
1199
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001200 /**
1201 * Handles a flow bucket backup from a remote peer.
1202 *
1203 * @param flowBucket the flow bucket to back up
1204 * @return the set of flows that could not be backed up
1205 */
1206 private Set<FlowId> onBackup(FlowBucket flowBucket) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001207 log.debug("Received flowEntries for {} bucket {} to backup",
1208 flowBucket.bucketId().deviceId(), flowBucket.bucketId);
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001209 Set<FlowId> backedupFlows = Sets.newHashSet();
Jon Hallfa132292017-10-24 11:11:24 -07001210 try {
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001211 // Only process those devices are that not managed by the local node.
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001212 NodeId master = replicaInfoManager.getReplicaInfoFor(flowBucket.bucketId().deviceId())
1213 .master()
1214 .orElse(null);
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001215 if (!Objects.equals(local, master)) {
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001216 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
1217 getFlowTable(flowBucket.bucketId().deviceId());
1218 backupFlowTable.putAll(flowBucket.table());
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001219 backupFlowTable.entrySet()
Jordan Haltermanfd73c6e2018-06-01 00:40:56 -07001220 .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
1221 && !flowBucket.table().containsKey(entry.getKey()));
1222 backedupFlows.addAll(flowBucket.table().keySet());
Jordan Halterman000e4102018-06-12 15:34:19 -07001223 lastUpdateTimes.put(flowBucket.bucketId(), flowBucket.timestamp());
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001224 }
Jon Hallfa132292017-10-24 11:11:24 -07001225 } catch (Exception e) {
1226 log.warn("Failure processing backup request", e);
1227 }
Jordan Halterman3664e8e2018-03-21 12:52:37 -07001228 return backedupFlows;
Jon Hallfa132292017-10-24 11:11:24 -07001229 }
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001230
1231 /**
Jordan Halterman000e4102018-06-12 15:34:19 -07001232 * Runs the anti-entropy protocol.
1233 */
1234 private void runAntiEntropy() {
1235 for (DeviceId deviceId : getDevices()) {
1236 runAntiEntropy(deviceId);
1237 }
1238 }
1239
1240 /**
1241 * Runs the anti-entropy protocol for the given device.
1242 *
1243 * @param deviceId the device for which to run the anti-entropy protocol
1244 */
1245 private void runAntiEntropy(DeviceId deviceId) {
1246 if (!isMasterNode(deviceId)) {
1247 return;
1248 }
1249
1250 // Get the set of digests for the node.
1251 Set<FlowBucketDigest> digests = getDigests(deviceId);
1252
1253 // Get a list of backup nodes for the device and compute the real backup count.
1254 List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
1255 int availableBackupCount = Math.min(backupCount, backupNodes.size());
1256
1257 // Iterate through backup nodes and run the anti-entropy protocol.
1258 for (int index = 0; index < availableBackupCount; index++) {
1259 NodeId backupNode = backupNodes.get(index);
1260 try {
1261 runAntiEntropy(deviceId, backupNode, digests);
1262 } catch (Exception e) {
1263 log.error("Anti-entropy for " + deviceId + " to " + backupNode + " failed", e);
1264 }
1265 }
1266 }
1267
1268 /**
1269 * Sends an anti-entropy advertisement to the given node.
1270 *
1271 * @param deviceId the device ID for which to send the advertisement
1272 * @param nodeId the node to which to send the advertisement
1273 * @param digests the digests to send to the given node
1274 */
1275 private void runAntiEntropy(DeviceId deviceId, NodeId nodeId, Set<FlowBucketDigest> digests) {
1276 log.trace("Sending anti-entropy advertisement for device {} to {}", deviceId, nodeId);
1277 clusterCommunicator.<Set<FlowBucketDigest>, Set<BucketId>>sendAndReceive(
1278 digests,
1279 FLOW_TABLE_ANTI_ENTROPY,
1280 serializer::encode,
1281 serializer::decode,
1282 nodeId)
1283 .whenComplete((missingBuckets, error) -> {
1284 if (error == null) {
1285 log.debug("Detected {} missing buckets on node {} for device {}",
1286 missingBuckets.size(), nodeId, deviceId);
1287 } else {
1288 log.trace("Anti-entropy advertisement for device {} to {} failed", deviceId, nodeId, error);
1289 }
1290 });
1291 }
1292
1293 /**
1294 * Handles a device anti-entropy request from a remote peer.
1295 *
1296 * @param digest the device digest
1297 * @return the set of flow buckets to update
1298 */
1299 private Set<BucketId> onAntiEntropy(DeviceDigest digest) {
1300 // If the local node is the master, reject the anti-entropy request.
1301 // TODO: We really should be using mastership terms in anti-entropy requests to determine whether
1302 // this node is a newer master, but that would only reduce the time it takes to resolve missing flows
1303 // as a later anti-entropy request will still succeed once this node recognizes it's no longer the master.
1304 NodeId master = replicaInfoManager.getReplicaInfoFor(digest.deviceId())
1305 .master()
1306 .orElse(null);
1307 if (Objects.equals(master, local)) {
1308 return ImmutableSet.of();
1309 }
1310
1311 // Compute a set of missing BucketIds based on digest times and send them back to the master.
1312 Set<BucketId> missingBuckets = new HashSet<>();
1313 for (FlowBucketDigest flowBucketDigest : digest.digests()) {
1314 long lastUpdated = lastUpdateTimes.getOrDefault(flowBucketDigest.bucketId(), 0L);
1315 if (lastUpdated < flowBucketDigest.timestamp()) {
1316 missingBuckets.add(flowBucketDigest.bucketId());
1317 }
1318 }
1319 return missingBuckets;
1320 }
1321
1322 /**
Jordan Halterman7ae81b82018-06-12 11:23:33 -07001323 * Updates all flow counts for the given device.
1324 *
1325 * @param deviceId the device for which to update flow counts
1326 */
1327 private void updateDeviceFlowCounts(DeviceId deviceId) {
1328 for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
1329 BucketId bucketId = new BucketId(deviceId, bucket);
1330 FlowBucket flowBucket = getFlowBucket(bucketId);
1331 updateFlowCounts(flowBucket);
1332 }
1333 }
1334
1335 /**
1336 * Updates the eventually consistent flow count for the given bucket.
1337 *
1338 * @param flowBucket the flow bucket for which to update flow counts
1339 */
1340 private void updateFlowCounts(FlowBucket flowBucket) {
1341 int flowCount = flowBucket.table().entrySet()
1342 .stream()
1343 .mapToInt(e -> e.getValue().values().size())
1344 .sum();
1345 flowCounts.put(flowBucket.bucketId(), flowCount);
1346 }
Jon Hallfa132292017-10-24 11:11:24 -07001347 }
1348
1349 @Override
Jordan Halterman000e4102018-06-12 15:34:19 -07001350 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -07001351 deviceTableStats.put(deviceId, tableStats);
1352 return null;
1353 }
1354
1355 @Override
1356 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
1357 NodeId master = mastershipService.getMasterFor(deviceId);
1358
1359 if (master == null) {
1360 log.debug("Failed to getTableStats: No master for {}", deviceId);
1361 return Collections.emptyList();
1362 }
1363
1364 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
1365 if (tableStats == null) {
1366 return Collections.emptyList();
1367 }
1368 return ImmutableList.copyOf(tableStats);
1369 }
1370
1371 @Override
1372 public long getActiveFlowRuleCount(DeviceId deviceId) {
1373 return Streams.stream(getTableStatistics(deviceId))
1374 .mapToLong(TableStatisticsEntry::activeFlowEntries)
1375 .sum();
1376 }
1377
1378 private class InternalTableStatsListener
1379 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
1380 @Override
1381 public void event(EventuallyConsistentMapEvent<DeviceId,
1382 List<TableStatisticsEntry>> event) {
1383 //TODO: Generate an event to listeners (do we need?)
1384 }
1385 }
Jordan Halterman000e4102018-06-12 15:34:19 -07001386}