blob: 2919f08e75857c32a9d5626ee196e092fed9262f [file] [log] [blame]
Jon Hallfa132292017-10-24 11:11:24 -07001 /*
2 * Copyright 2014-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Collections;
19import java.util.Dictionary;
20import java.util.HashSet;
21import java.util.List;
22import java.util.Map;
23import java.util.Objects;
24import java.util.Set;
Jordan Haltermanb2f57952018-03-21 12:52:37 -070025import java.util.concurrent.CompletableFuture;
Jon Hallfa132292017-10-24 11:11:24 -070026import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Executors;
28import java.util.concurrent.ScheduledExecutorService;
29import java.util.concurrent.ScheduledFuture;
30import java.util.concurrent.TimeUnit;
Jordan Haltermanaeea0bb2018-06-13 12:34:06 -070031import java.util.concurrent.atomic.AtomicLong;
Jon Hallfa132292017-10-24 11:11:24 -070032import java.util.concurrent.atomic.AtomicReference;
33import java.util.stream.Collectors;
Jordan Halterman5259b332018-06-12 15:34:19 -070034import java.util.stream.IntStream;
Jon Hallfa132292017-10-24 11:11:24 -070035
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070036import com.google.common.collect.ImmutableList;
Jordan Halterman5259b332018-06-12 15:34:19 -070037import com.google.common.collect.ImmutableSet;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070038import com.google.common.collect.Maps;
39import com.google.common.collect.Sets;
Jon Hallfa132292017-10-24 11:11:24 -070040import com.google.common.collect.Streams;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070041import com.google.common.util.concurrent.Futures;
Jon Hallfa132292017-10-24 11:11:24 -070042import org.apache.felix.scr.annotations.Activate;
43import org.apache.felix.scr.annotations.Component;
44import org.apache.felix.scr.annotations.Deactivate;
45import org.apache.felix.scr.annotations.Modified;
46import org.apache.felix.scr.annotations.Property;
47import org.apache.felix.scr.annotations.Reference;
48import org.apache.felix.scr.annotations.ReferenceCardinality;
49import org.apache.felix.scr.annotations.Service;
50import org.onlab.util.KryoNamespace;
51import org.onlab.util.Tools;
52import org.onosproject.cfg.ComponentConfigService;
53import org.onosproject.cluster.ClusterService;
54import org.onosproject.cluster.NodeId;
55import org.onosproject.core.CoreService;
56import org.onosproject.core.IdGenerator;
57import org.onosproject.mastership.MastershipService;
58import org.onosproject.net.DeviceId;
59import org.onosproject.net.device.DeviceService;
60import org.onosproject.net.flow.CompletedBatchOperation;
61import org.onosproject.net.flow.DefaultFlowEntry;
62import org.onosproject.net.flow.FlowEntry;
63import org.onosproject.net.flow.FlowEntry.FlowEntryState;
64import org.onosproject.net.flow.FlowId;
65import org.onosproject.net.flow.FlowRule;
Jon Hallfa132292017-10-24 11:11:24 -070066import org.onosproject.net.flow.FlowRuleEvent;
67import org.onosproject.net.flow.FlowRuleEvent.Type;
68import org.onosproject.net.flow.FlowRuleService;
69import org.onosproject.net.flow.FlowRuleStore;
70import org.onosproject.net.flow.FlowRuleStoreDelegate;
71import org.onosproject.net.flow.StoredFlowEntry;
72import org.onosproject.net.flow.TableStatisticsEntry;
Jordan Halterman8f90d6d2018-06-12 11:23:33 -070073import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
74import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
75import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
76import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
77import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Jon Hallfa132292017-10-24 11:11:24 -070078import org.onosproject.persistence.PersistenceService;
79import org.onosproject.store.AbstractStore;
80import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
81import org.onosproject.store.cluster.messaging.ClusterMessage;
82import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
83import org.onosproject.store.flow.ReplicaInfoEvent;
84import org.onosproject.store.flow.ReplicaInfoEventListener;
85import org.onosproject.store.flow.ReplicaInfoService;
86import org.onosproject.store.impl.MastershipBasedTimestamp;
87import org.onosproject.store.serializers.KryoNamespaces;
88import org.onosproject.store.service.EventuallyConsistentMap;
89import org.onosproject.store.service.EventuallyConsistentMapEvent;
90import org.onosproject.store.service.EventuallyConsistentMapListener;
91import org.onosproject.store.service.Serializer;
92import org.onosproject.store.service.StorageService;
93import org.onosproject.store.service.WallClockTimestamp;
94import org.osgi.service.component.ComponentContext;
95import org.slf4j.Logger;
96
Jon Hallfa132292017-10-24 11:11:24 -070097import static com.google.common.base.Strings.isNullOrEmpty;
98import static org.onlab.util.Tools.get;
99import static org.onlab.util.Tools.groupedThreads;
100import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
Jon Hallfa132292017-10-24 11:11:24 -0700101import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
Jordan Halterman5259b332018-06-12 15:34:19 -0700102import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
Jon Hallfa132292017-10-24 11:11:24 -0700103import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
104import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
105import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
106import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
107import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
108import static org.slf4j.LoggerFactory.getLogger;
109
110/**
111 * Manages inventory of flow rules using a distributed state management protocol.
112 */
Thomas Vachuska71026b22018-01-05 16:01:44 -0800113@Component(immediate = true)
Jon Hallfa132292017-10-24 11:11:24 -0700114@Service
115public class ECFlowRuleStore
116 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
117 implements FlowRuleStore {
118
119 private final Logger log = getLogger(getClass());
120
121 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
122 private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
123 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
124 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Jordan Halterman5259b332018-06-12 15:34:19 -0700125 private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
Jon Hallfa132292017-10-24 11:11:24 -0700126 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700127 private static final int NUM_BUCKETS = 1024;
Jon Hallfa132292017-10-24 11:11:24 -0700128
129 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
130 label = "Number of threads in the message handler pool")
131 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
132
133 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
134 label = "Delay in ms between successive backup runs")
135 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Jordan Halterman5259b332018-06-12 15:34:19 -0700136
137 @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
138 label = "Delay in ms between anti-entropy runs")
139 private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
140
Jon Hallfa132292017-10-24 11:11:24 -0700141 @Property(name = "persistenceEnabled", boolValue = false,
142 label = "Indicates whether or not changes in the flow table should be persisted to disk.")
143 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
144
145 @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
146 label = "Max number of backup copies for each device")
147 private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
148
149 private InternalFlowTable flowTable = new InternalFlowTable();
150
151 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
152 protected ReplicaInfoService replicaInfoManager;
153
154 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
155 protected ClusterCommunicationService clusterCommunicator;
156
157 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
158 protected ClusterService clusterService;
159
160 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
161 protected DeviceService deviceService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
164 protected CoreService coreService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
167 protected ComponentConfigService configService;
168
169 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
170 protected MastershipService mastershipService;
171
172 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
173 protected PersistenceService persistenceService;
174
175 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
176 private ExecutorService messageHandlingExecutor;
177 private ExecutorService eventHandler;
178
179 private ScheduledFuture<?> backupTask;
Jordan Halterman5259b332018-06-12 15:34:19 -0700180 private ScheduledFuture<?> antiEntropyTask;
Jon Hallfa132292017-10-24 11:11:24 -0700181 private final ScheduledExecutorService backupSenderExecutor =
182 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
183
184 private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
185 private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
186 new InternalTableStatsListener();
187
188 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
189 protected StorageService storageService;
190
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700191 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
192 .register(KryoNamespaces.API)
Jordan Haltermana765d222018-06-01 00:40:56 -0700193 .register(BucketId.class)
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700194 .register(FlowBucket.class)
195 .build());
Jon Hallfa132292017-10-24 11:11:24 -0700196
197 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
Jordan Haltermana765d222018-06-01 00:40:56 -0700198 .register(KryoNamespaces.API)
199 .register(BucketId.class)
200 .register(MastershipBasedTimestamp.class);
Jon Hallfa132292017-10-24 11:11:24 -0700201
Jordan Haltermana765d222018-06-01 00:40:56 -0700202 private EventuallyConsistentMap<BucketId, Integer> flowCounts;
Jon Hallfa132292017-10-24 11:11:24 -0700203
204 private IdGenerator idGenerator;
205 private NodeId local;
206
207 @Activate
208 public void activate(ComponentContext context) {
209 configService.registerProperties(getClass());
210
211 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
212
213 local = clusterService.getLocalNode().id();
214
215 eventHandler = Executors.newSingleThreadExecutor(
216 groupedThreads("onos/flow", "event-handler", log));
217 messageHandlingExecutor = Executors.newFixedThreadPool(
218 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
219
220 registerMessageHandlers(messageHandlingExecutor);
221
222 replicaInfoManager.addListener(flowTable);
223 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
224 flowTable::backup,
225 0,
226 backupPeriod,
227 TimeUnit.MILLISECONDS);
Jordan Halterman5259b332018-06-12 15:34:19 -0700228 antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
229 flowTable::runAntiEntropy,
230 0,
231 antiEntropyPeriod,
232 TimeUnit.MILLISECONDS);
Jon Hallfa132292017-10-24 11:11:24 -0700233
Jordan Haltermana765d222018-06-01 00:40:56 -0700234 flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800235 .withName("onos-flow-counts")
236 .withSerializer(serializerBuilder)
237 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
238 .withTimestampProvider((k, v) -> new WallClockTimestamp())
239 .withTombstonesDisabled()
240 .build();
241
Jon Hallfa132292017-10-24 11:11:24 -0700242 deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
243 .withName("onos-flow-table-stats")
244 .withSerializer(serializerBuilder)
245 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
246 .withTimestampProvider((k, v) -> new WallClockTimestamp())
247 .withTombstonesDisabled()
248 .build();
249 deviceTableStats.addListener(tableStatsListener);
250
251 logConfig("Started");
252 }
253
254 @Deactivate
255 public void deactivate(ComponentContext context) {
256 replicaInfoManager.removeListener(flowTable);
257 backupTask.cancel(true);
258 configService.unregisterProperties(getClass(), false);
259 unregisterMessageHandlers();
260 deviceTableStats.removeListener(tableStatsListener);
261 deviceTableStats.destroy();
262 eventHandler.shutdownNow();
263 messageHandlingExecutor.shutdownNow();
264 backupSenderExecutor.shutdownNow();
265 log.info("Stopped");
266 }
267
268 @SuppressWarnings("rawtypes")
269 @Modified
270 public void modified(ComponentContext context) {
271 if (context == null) {
272 logConfig("Default config");
273 return;
274 }
275
276 Dictionary properties = context.getProperties();
277 int newPoolSize;
278 int newBackupPeriod;
279 int newBackupCount;
Jordan Halterman5259b332018-06-12 15:34:19 -0700280 int newAntiEntropyPeriod;
Jon Hallfa132292017-10-24 11:11:24 -0700281 try {
282 String s = get(properties, "msgHandlerPoolSize");
283 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
284
285 s = get(properties, "backupPeriod");
286 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
287
288 s = get(properties, "backupCount");
289 newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
Jordan Halterman5259b332018-06-12 15:34:19 -0700290
291 s = get(properties, "antiEntropyPeriod");
292 newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
Jon Hallfa132292017-10-24 11:11:24 -0700293 } catch (NumberFormatException | ClassCastException e) {
294 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
295 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
296 newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
Jordan Halterman5259b332018-06-12 15:34:19 -0700297 newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
Jon Hallfa132292017-10-24 11:11:24 -0700298 }
299
300 boolean restartBackupTask = false;
Jordan Halterman5259b332018-06-12 15:34:19 -0700301 boolean restartAntiEntropyTask = false;
Jon Hallfa132292017-10-24 11:11:24 -0700302
303 if (newBackupPeriod != backupPeriod) {
304 backupPeriod = newBackupPeriod;
305 restartBackupTask = true;
306 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700307
308 if (newAntiEntropyPeriod != antiEntropyPeriod) {
309 antiEntropyPeriod = newAntiEntropyPeriod;
310 restartAntiEntropyTask = true;
311 }
312
Jon Hallfa132292017-10-24 11:11:24 -0700313 if (restartBackupTask) {
314 if (backupTask != null) {
315 // cancel previously running task
316 backupTask.cancel(false);
317 }
318 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
319 flowTable::backup,
320 0,
321 backupPeriod,
322 TimeUnit.MILLISECONDS);
323 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700324
325 if (restartAntiEntropyTask) {
326 if (antiEntropyTask != null) {
327 // cancel previously running task
328 antiEntropyTask.cancel(false);
329 }
330 antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
331 flowTable::runAntiEntropy,
332 0,
333 antiEntropyPeriod,
334 TimeUnit.MILLISECONDS);
335 }
336
Jon Hallfa132292017-10-24 11:11:24 -0700337 if (newPoolSize != msgHandlerPoolSize) {
338 msgHandlerPoolSize = newPoolSize;
339 ExecutorService oldMsgHandler = messageHandlingExecutor;
340 messageHandlingExecutor = Executors.newFixedThreadPool(
341 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
342
343 // replace previously registered handlers.
344 registerMessageHandlers(messageHandlingExecutor);
345 oldMsgHandler.shutdown();
346 }
Jordan Halterman5259b332018-06-12 15:34:19 -0700347
Jon Hallfa132292017-10-24 11:11:24 -0700348 if (backupCount != newBackupCount) {
349 backupCount = newBackupCount;
350 }
351 logConfig("Reconfigured");
352 }
353
354 private void registerMessageHandlers(ExecutorService executor) {
355
356 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
357 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
358 REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
359 clusterCommunicator.addSubscriber(
360 GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
361 clusterCommunicator.addSubscriber(
362 GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
363 clusterCommunicator.addSubscriber(
364 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
365 clusterCommunicator.addSubscriber(
Jordan Halterman8f90d6d2018-06-12 11:23:33 -0700366 FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
Jordan Halterman5259b332018-06-12 15:34:19 -0700367 clusterCommunicator.addSubscriber(
368 FLOW_TABLE_ANTI_ENTROPY, serializer::decode, flowTable::onAntiEntropy, serializer::encode, executor);
Jon Hallfa132292017-10-24 11:11:24 -0700369 }
370
371 private void unregisterMessageHandlers() {
372 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
373 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
374 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
375 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
376 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
377 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
Jordan Halterman5259b332018-06-12 15:34:19 -0700378 clusterCommunicator.removeSubscriber(FLOW_TABLE_ANTI_ENTROPY);
Jon Hallfa132292017-10-24 11:11:24 -0700379 }
380
381 private void logConfig(String prefix) {
382 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
383 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
384 }
385
Jon Hallfa132292017-10-24 11:11:24 -0700386 @Override
387 public int getFlowRuleCount() {
388 return Streams.stream(deviceService.getDevices()).parallel()
Thomas Vachuskaa8e74772018-02-26 11:33:35 -0800389 .mapToInt(device -> getFlowRuleCount(device.id()))
390 .sum();
391 }
392
393 @Override
394 public int getFlowRuleCount(DeviceId deviceId) {
Jordan Haltermana765d222018-06-01 00:40:56 -0700395 return flowCounts.entrySet().stream()
396 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
397 .mapToInt(entry -> entry.getValue())
398 .sum();
Jon Hallfa132292017-10-24 11:11:24 -0700399 }
400
401 @Override
402 public FlowEntry getFlowEntry(FlowRule rule) {
403 NodeId master = mastershipService.getMasterFor(rule.deviceId());
404
405 if (master == null) {
406 log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
407 return null;
408 }
409
410 if (Objects.equals(local, master)) {
411 return flowTable.getFlowEntry(rule);
412 }
413
414 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
415 master, rule.deviceId());
416
417 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
418 ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
419 serializer::encode,
420 serializer::decode,
421 master),
422 FLOW_RULE_STORE_TIMEOUT_MILLIS,
423 TimeUnit.MILLISECONDS,
424 null);
425 }
426
427 @Override
428 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
429 NodeId master = mastershipService.getMasterFor(deviceId);
430
431 if (master == null) {
432 log.debug("Failed to getFlowEntries: No master for {}", deviceId);
433 return Collections.emptyList();
434 }
435
436 if (Objects.equals(local, master)) {
437 return flowTable.getFlowEntries(deviceId);
438 }
439
440 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
441 master, deviceId);
442
443 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
444 ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
445 serializer::encode,
446 serializer::decode,
447 master),
448 FLOW_RULE_STORE_TIMEOUT_MILLIS,
449 TimeUnit.MILLISECONDS,
450 Collections.emptyList());
451 }
452
453 @Override
454 public void storeFlowRule(FlowRule rule) {
455 storeBatch(new FlowRuleBatchOperation(
456 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
457 rule.deviceId(), idGenerator.getNewId()));
458 }
459
460 @Override
461 public void storeBatch(FlowRuleBatchOperation operation) {
462 if (operation.getOperations().isEmpty()) {
463 notifyDelegate(FlowRuleBatchEvent.completed(
464 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
465 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
466 return;
467 }
468
469 DeviceId deviceId = operation.deviceId();
470 NodeId master = mastershipService.getMasterFor(deviceId);
471
472 if (master == null) {
473 log.warn("No master for {} ", deviceId);
474
475 updateStoreInternal(operation);
476
477 notifyDelegate(FlowRuleBatchEvent.completed(
478 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
479 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
480 return;
481 }
482
483 if (Objects.equals(local, master)) {
484 storeBatchInternal(operation);
485 return;
486 }
487
488 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
489 master, deviceId);
490
491 clusterCommunicator.unicast(operation,
492 APPLY_BATCH_FLOWS,
493 serializer::encode,
494 master)
495 .whenComplete((result, error) -> {
496 if (error != null) {
497 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
498
499 Set<FlowRule> allFailures = operation.getOperations()
500 .stream()
501 .map(op -> op.target())
502 .collect(Collectors.toSet());
503
504 notifyDelegate(FlowRuleBatchEvent.completed(
505 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
506 new CompletedBatchOperation(false, allFailures, deviceId)));
507 }
508 });
509 }
510
511 private void storeBatchInternal(FlowRuleBatchOperation operation) {
512
513 final DeviceId did = operation.deviceId();
514 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
515 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
516 if (currentOps.isEmpty()) {
517 batchOperationComplete(FlowRuleBatchEvent.completed(
518 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
519 new CompletedBatchOperation(true, Collections.emptySet(), did)));
520 return;
521 }
522
523 notifyDelegate(FlowRuleBatchEvent.requested(new
524 FlowRuleBatchRequest(operation.id(),
525 currentOps), operation.deviceId()));
526 }
527
528 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
529 return operation.getOperations().stream().map(
530 op -> {
531 StoredFlowEntry entry;
532 switch (op.operator()) {
533 case ADD:
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800534 entry = new DefaultFlowEntry(op.target());
535 flowTable.add(entry);
536 return op;
Thomas Vachuska914b0b12018-01-09 11:54:52 -0800537 case MODIFY:
Jon Hallfa132292017-10-24 11:11:24 -0700538 entry = new DefaultFlowEntry(op.target());
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800539 flowTable.update(entry);
Jon Hallfa132292017-10-24 11:11:24 -0700540 return op;
541 case REMOVE:
542 entry = flowTable.getFlowEntry(op.target());
543 if (entry != null) {
Jon Hallfa132292017-10-24 11:11:24 -0700544 entry.setState(FlowEntryState.PENDING_REMOVE);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800545 flowTable.update(entry);
Jon Hallfa132292017-10-24 11:11:24 -0700546 log.debug("Setting state of rule to pending remove: {}", entry);
547 return op;
548 }
549 break;
Jon Hallfa132292017-10-24 11:11:24 -0700550 default:
551 log.warn("Unknown flow operation operator: {}", op.operator());
552 }
553 return null;
554 }
555 ).filter(Objects::nonNull).collect(Collectors.toSet());
556 }
557
558 @Override
559 public void deleteFlowRule(FlowRule rule) {
560 storeBatch(
561 new FlowRuleBatchOperation(
562 Collections.singletonList(
563 new FlowRuleBatchEntry(
564 FlowRuleOperation.REMOVE,
565 rule)), rule.deviceId(), idGenerator.getNewId()));
566 }
567
568 @Override
569 public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
570 if (mastershipService.isLocalMaster(rule.deviceId())) {
571 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
572 if (stored != null &&
573 stored.state() != FlowEntryState.PENDING_ADD) {
574 stored.setState(FlowEntryState.PENDING_ADD);
575 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
576 }
577 }
578 return null;
579 }
580
581 @Override
582 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
583 NodeId master = mastershipService.getMasterFor(rule.deviceId());
584 if (Objects.equals(local, master)) {
585 return addOrUpdateFlowRuleInternal(rule);
586 }
587
588 log.warn("Tried to update FlowRule {} state,"
589 + " while the Node was not the master.", rule);
590 return null;
591 }
592
593 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
594 // check if this new rule is an update to an existing entry
595 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
596 if (stored != null) {
Jon Hallfa132292017-10-24 11:11:24 -0700597 stored.setBytes(rule.bytes());
598 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
599 stored.setLiveType(rule.liveType());
600 stored.setPackets(rule.packets());
601 stored.setLastSeen();
602 if (stored.state() == FlowEntryState.PENDING_ADD) {
603 stored.setState(FlowEntryState.ADDED);
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800604 // Update the flow table to ensure the changes are replicated
605 flowTable.update(stored);
Jon Hallfa132292017-10-24 11:11:24 -0700606 return new FlowRuleEvent(Type.RULE_ADDED, rule);
607 }
608 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
609 }
610
611 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
612 // TODO: also update backup if the behavior is correct.
613 flowTable.add(rule);
614 return null;
615 }
616
617 @Override
618 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
619 final DeviceId deviceId = rule.deviceId();
620 NodeId master = mastershipService.getMasterFor(deviceId);
621
622 if (Objects.equals(local, master)) {
623 // bypass and handle it locally
624 return removeFlowRuleInternal(rule);
625 }
626
627 if (master == null) {
628 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
629 // TODO: revisit if this should be null (="no-op") or Exception
630 return null;
631 }
632
633 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
634 master, deviceId);
635
636 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
637 rule,
638 REMOVE_FLOW_ENTRY,
639 serializer::encode,
640 serializer::decode,
641 master));
642 }
643
644 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -0700645 // This is where one could mark a rule as removed and still keep it in the store.
Jordan Halterman2edfeef2018-01-16 14:59:49 -0800646 final FlowEntry removed = flowTable.remove(rule);
Jon Hallfa132292017-10-24 11:11:24 -0700647 // rule may be partial rule that is missing treatment, we should use rule from store instead
648 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
649 }
650
651 @Override
652 public void purgeFlowRule(DeviceId deviceId) {
653 flowTable.purgeFlowRule(deviceId);
654 }
655
656 @Override
657 public void purgeFlowRules() {
658 flowTable.purgeFlowRules();
659 }
660
661 @Override
662 public void batchOperationComplete(FlowRuleBatchEvent event) {
663 //FIXME: need a per device pending response
664 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
665 if (nodeId == null) {
666 notifyDelegate(event);
667 } else {
668 // TODO check unicast return value
669 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
670 //error log: log.warn("Failed to respond to peer for batch operation result");
671 }
672 }
673
674 private final class OnStoreBatch implements ClusterMessageHandler {
675
676 @Override
677 public void handle(final ClusterMessage message) {
678 FlowRuleBatchOperation operation = serializer.decode(message.payload());
679 log.debug("received batch request {}", operation);
680
681 final DeviceId deviceId = operation.deviceId();
682 NodeId master = mastershipService.getMasterFor(deviceId);
683 if (!Objects.equals(local, master)) {
684 Set<FlowRule> failures = new HashSet<>(operation.size());
685 for (FlowRuleBatchEntry op : operation.getOperations()) {
686 failures.add(op.target());
687 }
688 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
689 // This node is no longer the master, respond as all failed.
690 // TODO: we might want to wrap response in envelope
691 // to distinguish sw programming failure and hand over
692 // it make sense in the latter case to retry immediately.
693 message.respond(serializer.encode(allFailed));
694 return;
695 }
696
697 pendingResponses.put(operation.id(), message.sender());
698 storeBatchInternal(operation);
699 }
700 }
701
Jordan Haltermana765d222018-06-01 00:40:56 -0700702 /**
703 * Represents a backup to a of a distinct bucket to a distinct node.
704 */
Jon Hallfa132292017-10-24 11:11:24 -0700705 private class BackupOperation {
706 private final NodeId nodeId;
Jordan Haltermana765d222018-06-01 00:40:56 -0700707 private final BucketId bucketId;
Jon Hallfa132292017-10-24 11:11:24 -0700708
Jordan Haltermana765d222018-06-01 00:40:56 -0700709 BackupOperation(NodeId nodeId, BucketId bucketId) {
Jon Hallfa132292017-10-24 11:11:24 -0700710 this.nodeId = nodeId;
Jordan Haltermana765d222018-06-01 00:40:56 -0700711 this.bucketId = bucketId;
712 }
713
714 NodeId nodeId() {
715 return nodeId;
716 }
717
718 BucketId bucketId() {
719 return bucketId;
Jon Hallfa132292017-10-24 11:11:24 -0700720 }
721
722 @Override
723 public int hashCode() {
Jordan Haltermana765d222018-06-01 00:40:56 -0700724 return Objects.hash(nodeId, bucketId);
Jon Hallfa132292017-10-24 11:11:24 -0700725 }
726
727 @Override
728 public boolean equals(Object other) {
729 if (other != null && other instanceof BackupOperation) {
730 BackupOperation that = (BackupOperation) other;
Jordan Haltermana765d222018-06-01 00:40:56 -0700731 return this.nodeId.equals(that.nodeId)
732 && this.bucketId.equals(that.bucketId);
Jon Hallfa132292017-10-24 11:11:24 -0700733 }
Jordan Haltermana765d222018-06-01 00:40:56 -0700734 return false;
Jon Hallfa132292017-10-24 11:11:24 -0700735 }
736 }
737
Jordan Haltermana765d222018-06-01 00:40:56 -0700738 /**
739 * Represents a distinct device flow bucket.
740 */
741 private class BucketId {
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700742 private final DeviceId deviceId;
743 private final int bucket;
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700744
Jordan Haltermana765d222018-06-01 00:40:56 -0700745 BucketId(DeviceId deviceId, int bucket) {
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700746 this.deviceId = deviceId;
747 this.bucket = bucket;
Jordan Haltermana765d222018-06-01 00:40:56 -0700748 }
749
750 DeviceId deviceId() {
751 return deviceId;
752 }
753
754 int bucket() {
755 return bucket;
756 }
757
758 @Override
759 public int hashCode() {
760 return Objects.hash(deviceId, bucket);
761 }
762
763 @Override
764 public boolean equals(Object other) {
765 if (other != null && other instanceof BucketId) {
766 BucketId that = (BucketId) other;
767 return this.deviceId.equals(that.deviceId)
768 && this.bucket == that.bucket;
769 }
770 return false;
771 }
772 }
773
774 /**
775 * Container for flows in a specific bucket.
776 */
777 private class FlowBucket {
778 private final BucketId bucketId;
779 private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
Jordan Halterman5259b332018-06-12 15:34:19 -0700780 private final long timestamp;
Jordan Haltermana765d222018-06-01 00:40:56 -0700781
782 BucketId bucketId() {
783 return bucketId;
784 }
785
786 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table() {
787 return table;
788 }
789
Jordan Halterman5259b332018-06-12 15:34:19 -0700790 long timestamp() {
791 return timestamp;
792 }
793
794 FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table, long timestamp) {
Jordan Haltermana765d222018-06-01 00:40:56 -0700795 this.bucketId = bucketId;
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700796 this.table = table;
Jordan Halterman5259b332018-06-12 15:34:19 -0700797 this.timestamp = timestamp;
798 }
799 }
800
801 /**
802 * Device digest.
803 */
804 private class DeviceDigest {
805 private final DeviceId deviceId;
806 private final Set<FlowBucketDigest> digests;
807
808 DeviceDigest(DeviceId deviceId, Set<FlowBucketDigest> digests) {
809 this.deviceId = deviceId;
810 this.digests = digests;
811 }
812
813 DeviceId deviceId() {
814 return deviceId;
815 }
816
817 Set<FlowBucketDigest> digests() {
818 return digests;
819 }
820
821 @Override
822 public int hashCode() {
823 return Objects.hash(deviceId, digests);
824 }
825
826 @Override
827 public boolean equals(Object object) {
828 return object instanceof DeviceDigest
829 && ((DeviceDigest) object).deviceId.equals(deviceId);
830 }
831 }
832
833 /**
834 * Flow bucket digest.
835 */
836 private class FlowBucketDigest {
837 private final BucketId bucketId;
838 private final long timestamp;
839
840 FlowBucketDigest(BucketId bucketId, long timestamp) {
841 this.bucketId = bucketId;
842 this.timestamp = timestamp;
843 }
844
845 BucketId bucketId() {
846 return bucketId;
847 }
848
849 long timestamp() {
850 return timestamp;
851 }
852
853 @Override
854 public int hashCode() {
855 return Objects.hash(bucketId);
856 }
857
858 @Override
859 public boolean equals(Object object) {
860 return object instanceof FlowBucketDigest
861 && ((FlowBucketDigest) object).bucketId.equals(bucketId);
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700862 }
863 }
864
Jon Hallfa132292017-10-24 11:11:24 -0700865 private class InternalFlowTable implements ReplicaInfoEventListener {
866
867 //TODO replace the Map<V,V> with ExtendedSet
868 private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
869 flowEntries = Maps.newConcurrentMap();
870
871 private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
Jordan Haltermana765d222018-06-01 00:40:56 -0700872 private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
873 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
Jon Hallfa132292017-10-24 11:11:24 -0700874
Jordan Haltermanaeea0bb2018-06-13 12:34:06 -0700875 private final AtomicLong currentTimestamp = new AtomicLong();
876
Jon Hallfa132292017-10-24 11:11:24 -0700877 @Override
878 public void event(ReplicaInfoEvent event) {
879 eventHandler.execute(() -> handleEvent(event));
880 }
881
Jordan Halterman7851f6d2018-06-13 12:26:30 -0700882 /**
883 * Handles a replica change event.
884 *
885 * @param event the replica change event to handle
886 */
Jon Hallfa132292017-10-24 11:11:24 -0700887 private void handleEvent(ReplicaInfoEvent event) {
888 DeviceId deviceId = event.subject();
Jordan Halterman7851f6d2018-06-13 12:26:30 -0700889
890 // If the local node is not the master, return.
891 if (!isMasterNode(deviceId)) {
892 // If the local node is neither the master or a backup, remove flow tables for the device.
893 if (!isBackupNode(deviceId)) {
894 purgeFlowRule(deviceId);
895 }
Jon Hallfa132292017-10-24 11:11:24 -0700896 return;
897 }
Jordan Haltermanaeea0bb2018-06-13 12:34:06 -0700898 backupSenderExecutor.execute(this::runAntiEntropy);
Jon Hallfa132292017-10-24 11:11:24 -0700899 }
900
Jon Hallfa132292017-10-24 11:11:24 -0700901 /**
Jordan Halterman5259b332018-06-12 15:34:19 -0700902 * Returns the set of devices in the flow table.
903 *
904 * @return the set of devices in the flow table
905 */
906 private Set<DeviceId> getDevices() {
907 return flowEntries.keySet();
908 }
909
910 /**
911 * Returns the digests for all buckets in the flow table for the given device.
912 *
913 * @param deviceId the device for which to return digests
914 * @return the set of digests for all buckets for the given device
915 */
916 private Set<FlowBucketDigest> getDigests(DeviceId deviceId) {
917 return IntStream.range(0, NUM_BUCKETS)
918 .mapToObj(bucket -> {
919 BucketId bucketId = new BucketId(deviceId, bucket);
920 long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
921 return new FlowBucketDigest(bucketId, timestamp);
922 }).collect(Collectors.toSet());
923 }
924
925 /**
Jon Hallfa132292017-10-24 11:11:24 -0700926 * Returns the flow table for specified device.
927 *
928 * @param deviceId identifier of the device
929 * @return Map representing Flow Table of given device.
930 */
931 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
Jordan Halterman4127d392018-06-12 11:40:21 -0700932 // Use an external get/null check to avoid locks.
933 // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
Jon Hallfa132292017-10-24 11:11:24 -0700934 if (persistenceEnabled) {
Jordan Halterman4127d392018-06-12 11:40:21 -0700935 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
936 return flowTable != null ? flowTable
937 : flowEntries.computeIfAbsent(deviceId, id ->
938 persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
Jon Hallfa132292017-10-24 11:11:24 -0700939 .withName("FlowTable:" + deviceId.toString())
Jordan Haltermana765d222018-06-01 00:40:56 -0700940 .withSerializer(serializer)
Jon Hallfa132292017-10-24 11:11:24 -0700941 .build());
942 } else {
Jordan Halterman4127d392018-06-12 11:40:21 -0700943 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
944 return flowTable != null ? flowTable
945 : flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
Jon Hallfa132292017-10-24 11:11:24 -0700946 }
947 }
948
Jordan Haltermana765d222018-06-01 00:40:56 -0700949 private FlowBucket getFlowBucket(BucketId bucketId) {
Jordan Halterman5259b332018-06-12 15:34:19 -0700950 long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
Jordan Halterman4127d392018-06-12 11:40:21 -0700951 return new FlowBucket(bucketId, getFlowTable(bucketId.deviceId())
952 .entrySet()
953 .stream()
954 .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
Jordan Halterman5259b332018-06-12 15:34:19 -0700955 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
956 timestamp);
Jon Hallfa132292017-10-24 11:11:24 -0700957 }
958
959 private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
Jordan Halterman4127d392018-06-12 11:40:21 -0700960 // Use an external get/null check to avoid locks.
961 // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
962 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(deviceId);
963 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowTable.get(flowId);
964 return flowEntries != null ? flowEntries : flowTable.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
Jon Hallfa132292017-10-24 11:11:24 -0700965 }
966
967 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
968 return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
969 }
970
971 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
972 return getFlowTable(deviceId).values().stream()
973 .flatMap(m -> m.values().stream())
974 .collect(Collectors.toSet());
975 }
976
977 public StoredFlowEntry getFlowEntry(FlowRule rule) {
978 return getFlowEntryInternal(rule);
979 }
980
981 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
982 return getFlowEntriesInternal(deviceId);
983 }
984
Jordan Haltermana765d222018-06-01 00:40:56 -0700985 private boolean isInBucket(FlowId flowId, int bucket) {
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700986 return bucket(flowId) == bucket;
987 }
988
989 private int bucket(FlowId flowId) {
990 return (int) (flowId.id() % NUM_BUCKETS);
991 }
992
Jordan Haltermana765d222018-06-01 00:40:56 -0700993 private void recordUpdate(BucketId bucketId) {
Jordan Haltermanaeea0bb2018-06-13 12:34:06 -0700994 recordUpdate(bucketId, currentTimestamp.accumulateAndGet(System.currentTimeMillis(), Math::max));
995 }
996
997 private void recordUpdate(BucketId bucketId, long timestamp) {
998 lastUpdateTimes.put(bucketId, timestamp);
Jordan Haltermanb2f57952018-03-21 12:52:37 -0700999 }
1000
Jon Hallfa132292017-10-24 11:11:24 -07001001 public void add(FlowEntry rule) {
Devin Limcdca1952018-03-28 18:13:33 -07001002 getFlowEntriesInternal(rule.deviceId(), rule.id())
1003 .compute((StoredFlowEntry) rule, (k, stored) -> {
1004 return (StoredFlowEntry) rule;
1005 });
Jordan Haltermana765d222018-06-01 00:40:56 -07001006 recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
Jon Hallfa132292017-10-24 11:11:24 -07001007 }
1008
Jordan Halterman2edfeef2018-01-16 14:59:49 -08001009 public void update(FlowEntry rule) {
1010 getFlowEntriesInternal(rule.deviceId(), rule.id())
1011 .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
1012 if (rule instanceof DefaultFlowEntry) {
1013 DefaultFlowEntry updated = (DefaultFlowEntry) rule;
1014 if (stored instanceof DefaultFlowEntry) {
1015 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
1016 if (updated.created() >= storedEntry.created()) {
Jordan Haltermana765d222018-06-01 00:40:56 -07001017 recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
Jordan Halterman2edfeef2018-01-16 14:59:49 -08001018 return updated;
1019 } else {
1020 log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
1021 return stored;
1022 }
1023 }
1024 }
1025 return stored;
1026 });
1027 }
1028
1029 public FlowEntry remove(FlowEntry rule) {
Jon Hallfa132292017-10-24 11:11:24 -07001030 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
Jordan Halterman2edfeef2018-01-16 14:59:49 -08001031 final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
Jordan Haltermance336f72018-01-16 17:08:09 -08001032 flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
1033 flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
Jon Hallfa132292017-10-24 11:11:24 -07001034 if (rule instanceof DefaultFlowEntry) {
1035 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
1036 if (stored instanceof DefaultFlowEntry) {
1037 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
1038 if (toRemove.created() < storedEntry.created()) {
Jordan Halterman2edfeef2018-01-16 14:59:49 -08001039 log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
Jon Hallfa132292017-10-24 11:11:24 -07001040 // the key is not updated, removedRule remains null
1041 return stored;
1042 }
1043 }
1044 }
1045 removedRule.set(stored);
1046 return null;
1047 });
Jordan Haltermance336f72018-01-16 17:08:09 -08001048 return flowEntries.isEmpty() ? null : flowEntries;
1049 });
Jon Hallfa132292017-10-24 11:11:24 -07001050
1051 if (removedRule.get() != null) {
Jordan Haltermana765d222018-06-01 00:40:56 -07001052 recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
Jon Hallfa132292017-10-24 11:11:24 -07001053 return removedRule.get();
1054 } else {
1055 return null;
1056 }
1057 }
1058
1059 public void purgeFlowRule(DeviceId deviceId) {
1060 flowEntries.remove(deviceId);
1061 }
1062
1063 public void purgeFlowRules() {
1064 flowEntries.clear();
1065 }
1066
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001067 /**
1068 * Returns a boolean indicating whether the local node is the current master for the given device.
1069 *
1070 * @param deviceId the device for which to indicate whether the local node is the current master
1071 * @return indicates whether the local node is the current master for the given device
1072 */
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001073 private boolean isMasterNode(DeviceId deviceId) {
1074 NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
1075 return Objects.equals(master, clusterService.getLocalNode().id());
1076 }
1077
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001078 /**
Jordan Halterman7851f6d2018-06-13 12:26:30 -07001079 * Returns a boolean indicating whether the local node is a backup for the given device.
1080 *
1081 * @param deviceId the device for which to indicate whether the local node is a backup
1082 * @return indicates whether the local node is a backup for the given device
1083 */
1084 private boolean isBackupNode(DeviceId deviceId) {
1085 List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
1086 int index = backupNodes.indexOf(local);
1087 return index != -1 && index < backupCount;
1088 }
1089
1090 /**
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001091 * Backs up all devices to all backup nodes.
1092 */
Jon Hallfa132292017-10-24 11:11:24 -07001093 private void backup() {
Jordan Halterman5259b332018-06-12 15:34:19 -07001094 for (DeviceId deviceId : getDevices()) {
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001095 backup(deviceId);
1096 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001097 }
1098
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001099 /**
1100 * Backs up all buckets in the given device to the given node.
1101 *
1102 * @param deviceId the device to back up
1103 */
1104 private void backup(DeviceId deviceId) {
1105 if (!isMasterNode(deviceId)) {
1106 return;
1107 }
1108
1109 // Get a list of backup nodes for the device.
1110 List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
1111 int availableBackupCount = Math.min(backupCount, backupNodes.size());
1112
1113 // If the list of backup nodes is empty, update the flow count.
1114 if (availableBackupCount == 0) {
1115 updateDeviceFlowCounts(deviceId);
1116 } else {
1117 // Otherwise, iterate through backup nodes and backup the device.
1118 for (int index = 0; index < availableBackupCount; index++) {
1119 NodeId backupNode = backupNodes.get(index);
1120 try {
1121 backup(deviceId, backupNode);
1122 } catch (Exception e) {
1123 log.error("Backup of " + deviceId + " to " + backupNode + " failed", e);
1124 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001125 }
Jon Hallfa132292017-10-24 11:11:24 -07001126 }
1127 }
1128
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001129 /**
1130 * Backs up all buckets for the given device to the given node.
1131 *
1132 * @param deviceId the device to back up
1133 * @param nodeId the node to which to back up the device
1134 */
1135 private void backup(DeviceId deviceId, NodeId nodeId) {
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001136 final long timestamp = System.currentTimeMillis();
1137 for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
Jordan Haltermana765d222018-06-01 00:40:56 -07001138 BucketId bucketId = new BucketId(deviceId, bucket);
1139 BackupOperation operation = new BackupOperation(nodeId, bucketId);
1140 if (startBackup(operation)) {
1141 backup(operation).whenCompleteAsync((succeeded, error) -> {
1142 if (error == null && succeeded) {
1143 succeedBackup(operation, timestamp);
1144 } else {
1145 failBackup(operation);
1146 }
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001147 backup(deviceId, nodeId);
Jordan Haltermana765d222018-06-01 00:40:56 -07001148 }, backupSenderExecutor);
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001149 }
1150 }
1151 }
1152
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001153 /**
1154 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
1155 * <p>
1156 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
1157 * are pending replication for the backup operation.
1158 *
1159 * @param operation the operation to start
1160 * @return indicates whether the given backup operation should be started
1161 */
Jordan Haltermana765d222018-06-01 00:40:56 -07001162 private boolean startBackup(BackupOperation operation) {
1163 long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
1164 long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
1165 return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001166 }
1167
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001168 /**
1169 * Fails the given backup operation.
1170 *
1171 * @param operation the backup operation to fail
1172 */
Jordan Haltermana765d222018-06-01 00:40:56 -07001173 private void failBackup(BackupOperation operation) {
1174 inFlightUpdates.remove(operation);
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001175 }
1176
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001177 /**
1178 * Succeeds the given backup operation.
1179 * <p>
1180 * The last backup time for the operation will be updated and the operation will be removed from
1181 * in-flight updates.
1182 *
1183 * @param operation the operation to succeed
1184 * @param timestamp the timestamp at which the operation was <em>started</em>
1185 */
Jordan Haltermana765d222018-06-01 00:40:56 -07001186 private void succeedBackup(BackupOperation operation, long timestamp) {
Jordan Haltermana765d222018-06-01 00:40:56 -07001187 lastBackupTimes.put(operation, timestamp);
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001188 inFlightUpdates.remove(operation);
Jordan Haltermana765d222018-06-01 00:40:56 -07001189 }
1190
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001191 /**
1192 * Performs the given backup operation.
1193 *
1194 * @param operation the operation to perform
1195 * @return a future to be completed with a boolean indicating whether the backup operation was successful
1196 */
Jordan Haltermana765d222018-06-01 00:40:56 -07001197 private CompletableFuture<Boolean> backup(BackupOperation operation) {
1198 log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
1199 operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
1200 FlowBucket flowBucket = getFlowBucket(operation.bucketId());
1201
1202 CompletableFuture<Boolean> future = new CompletableFuture<>();
Jordan Halterman5259b332018-06-12 15:34:19 -07001203 clusterCommunicator.<FlowBucket, Set<FlowId>>sendAndReceive(
1204 flowBucket,
Jordan Haltermana765d222018-06-01 00:40:56 -07001205 FLOW_TABLE_BACKUP,
1206 serializer::encode,
1207 serializer::decode,
1208 operation.nodeId())
1209 .whenComplete((backedupFlows, error) -> {
1210 Set<FlowId> flowsNotBackedUp = error != null ?
1211 flowBucket.table().keySet() :
1212 Sets.difference(flowBucket.table().keySet(), backedupFlows);
1213 if (flowsNotBackedUp.size() > 0) {
1214 log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
1215 flowsNotBackedUp, error != null ? error.getMessage() : "none", operation.nodeId());
1216 }
1217 future.complete(backedupFlows != null);
1218 });
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001219
1220 updateFlowCounts(flowBucket);
Jordan Haltermana765d222018-06-01 00:40:56 -07001221 return future;
1222 }
1223
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001224 /**
1225 * Handles a flow bucket backup from a remote peer.
1226 *
1227 * @param flowBucket the flow bucket to back up
1228 * @return the set of flows that could not be backed up
1229 */
1230 private Set<FlowId> onBackup(FlowBucket flowBucket) {
Jordan Haltermana765d222018-06-01 00:40:56 -07001231 log.debug("Received flowEntries for {} bucket {} to backup",
1232 flowBucket.bucketId().deviceId(), flowBucket.bucketId);
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001233 Set<FlowId> backedupFlows = Sets.newHashSet();
Jon Hallfa132292017-10-24 11:11:24 -07001234 try {
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001235 // Only process those devices are that not managed by the local node.
Jordan Haltermana765d222018-06-01 00:40:56 -07001236 NodeId master = replicaInfoManager.getReplicaInfoFor(flowBucket.bucketId().deviceId())
1237 .master()
1238 .orElse(null);
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001239 if (!Objects.equals(local, master)) {
Jordan Haltermana765d222018-06-01 00:40:56 -07001240 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
1241 getFlowTable(flowBucket.bucketId().deviceId());
1242 backupFlowTable.putAll(flowBucket.table());
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001243 backupFlowTable.entrySet()
Jordan Haltermana765d222018-06-01 00:40:56 -07001244 .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
1245 && !flowBucket.table().containsKey(entry.getKey()));
1246 backedupFlows.addAll(flowBucket.table().keySet());
Jordan Haltermanaeea0bb2018-06-13 12:34:06 -07001247 recordUpdate(flowBucket.bucketId(), flowBucket.timestamp());
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001248 }
Jon Hallfa132292017-10-24 11:11:24 -07001249 } catch (Exception e) {
1250 log.warn("Failure processing backup request", e);
1251 }
Jordan Haltermanb2f57952018-03-21 12:52:37 -07001252 return backedupFlows;
Jon Hallfa132292017-10-24 11:11:24 -07001253 }
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001254
1255 /**
Jordan Halterman5259b332018-06-12 15:34:19 -07001256 * Runs the anti-entropy protocol.
1257 */
1258 private void runAntiEntropy() {
1259 for (DeviceId deviceId : getDevices()) {
1260 runAntiEntropy(deviceId);
1261 }
1262 }
1263
1264 /**
1265 * Runs the anti-entropy protocol for the given device.
1266 *
1267 * @param deviceId the device for which to run the anti-entropy protocol
1268 */
1269 private void runAntiEntropy(DeviceId deviceId) {
1270 if (!isMasterNode(deviceId)) {
1271 return;
1272 }
1273
1274 // Get the set of digests for the node.
1275 Set<FlowBucketDigest> digests = getDigests(deviceId);
1276
1277 // Get a list of backup nodes for the device and compute the real backup count.
1278 List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
1279 int availableBackupCount = Math.min(backupCount, backupNodes.size());
1280
1281 // Iterate through backup nodes and run the anti-entropy protocol.
1282 for (int index = 0; index < availableBackupCount; index++) {
1283 NodeId backupNode = backupNodes.get(index);
1284 try {
1285 runAntiEntropy(deviceId, backupNode, digests);
1286 } catch (Exception e) {
1287 log.error("Anti-entropy for " + deviceId + " to " + backupNode + " failed", e);
1288 }
1289 }
1290 }
1291
1292 /**
1293 * Sends an anti-entropy advertisement to the given node.
1294 *
1295 * @param deviceId the device ID for which to send the advertisement
1296 * @param nodeId the node to which to send the advertisement
1297 * @param digests the digests to send to the given node
1298 */
1299 private void runAntiEntropy(DeviceId deviceId, NodeId nodeId, Set<FlowBucketDigest> digests) {
1300 log.trace("Sending anti-entropy advertisement for device {} to {}", deviceId, nodeId);
1301 clusterCommunicator.<Set<FlowBucketDigest>, Set<BucketId>>sendAndReceive(
1302 digests,
1303 FLOW_TABLE_ANTI_ENTROPY,
1304 serializer::encode,
1305 serializer::decode,
1306 nodeId)
1307 .whenComplete((missingBuckets, error) -> {
1308 if (error == null) {
1309 log.debug("Detected {} missing buckets on node {} for device {}",
1310 missingBuckets.size(), nodeId, deviceId);
1311 } else {
1312 log.trace("Anti-entropy advertisement for device {} to {} failed", deviceId, nodeId, error);
1313 }
1314 });
1315 }
1316
1317 /**
1318 * Handles a device anti-entropy request from a remote peer.
1319 *
1320 * @param digest the device digest
1321 * @return the set of flow buckets to update
1322 */
1323 private Set<BucketId> onAntiEntropy(DeviceDigest digest) {
1324 // If the local node is the master, reject the anti-entropy request.
1325 // TODO: We really should be using mastership terms in anti-entropy requests to determine whether
1326 // this node is a newer master, but that would only reduce the time it takes to resolve missing flows
1327 // as a later anti-entropy request will still succeed once this node recognizes it's no longer the master.
1328 NodeId master = replicaInfoManager.getReplicaInfoFor(digest.deviceId())
1329 .master()
1330 .orElse(null);
1331 if (Objects.equals(master, local)) {
1332 return ImmutableSet.of();
1333 }
1334
1335 // Compute a set of missing BucketIds based on digest times and send them back to the master.
1336 Set<BucketId> missingBuckets = new HashSet<>();
1337 for (FlowBucketDigest flowBucketDigest : digest.digests()) {
1338 long lastUpdated = lastUpdateTimes.getOrDefault(flowBucketDigest.bucketId(), 0L);
1339 if (lastUpdated < flowBucketDigest.timestamp()) {
1340 missingBuckets.add(flowBucketDigest.bucketId());
1341 }
1342 }
1343 return missingBuckets;
1344 }
1345
1346 /**
Jordan Halterman8f90d6d2018-06-12 11:23:33 -07001347 * Updates all flow counts for the given device.
1348 *
1349 * @param deviceId the device for which to update flow counts
1350 */
1351 private void updateDeviceFlowCounts(DeviceId deviceId) {
1352 for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
1353 BucketId bucketId = new BucketId(deviceId, bucket);
1354 FlowBucket flowBucket = getFlowBucket(bucketId);
1355 updateFlowCounts(flowBucket);
1356 }
1357 }
1358
1359 /**
1360 * Updates the eventually consistent flow count for the given bucket.
1361 *
1362 * @param flowBucket the flow bucket for which to update flow counts
1363 */
1364 private void updateFlowCounts(FlowBucket flowBucket) {
1365 int flowCount = flowBucket.table().entrySet()
1366 .stream()
1367 .mapToInt(e -> e.getValue().values().size())
1368 .sum();
1369 flowCounts.put(flowBucket.bucketId(), flowCount);
1370 }
Jon Hallfa132292017-10-24 11:11:24 -07001371 }
1372
1373 @Override
Jordan Halterman5259b332018-06-12 15:34:19 -07001374 public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
Jon Hallfa132292017-10-24 11:11:24 -07001375 deviceTableStats.put(deviceId, tableStats);
1376 return null;
1377 }
1378
1379 @Override
1380 public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
1381 NodeId master = mastershipService.getMasterFor(deviceId);
1382
1383 if (master == null) {
1384 log.debug("Failed to getTableStats: No master for {}", deviceId);
1385 return Collections.emptyList();
1386 }
1387
1388 List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
1389 if (tableStats == null) {
1390 return Collections.emptyList();
1391 }
1392 return ImmutableList.copyOf(tableStats);
1393 }
1394
1395 @Override
1396 public long getActiveFlowRuleCount(DeviceId deviceId) {
1397 return Streams.stream(getTableStatistics(deviceId))
1398 .mapToLong(TableStatisticsEntry::activeFlowEntries)
1399 .sum();
1400 }
1401
1402 private class InternalTableStatsListener
1403 implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
1404 @Override
1405 public void event(EventuallyConsistentMapEvent<DeviceId,
1406 List<TableStatisticsEntry>> event) {
1407 //TODO: Generate an event to listeners (do we need?)
1408 }
1409 }
Jordan Halterman5259b332018-06-12 15:34:19 -07001410}