Clean up implementation of buckets for flow rule replication
Change-Id: Ifc6aaed08c3a74e8b8f8fd9d69d5802351e5f47c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index f998dad..3080615 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -87,7 +88,6 @@
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
@@ -121,8 +121,7 @@
private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
- // number of devices whose flow entries will be backed up in one communication round
- private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
+ private static final int NUM_BUCKETS = 1024;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
label = "Number of threads in the message handler pool")
@@ -180,13 +179,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
+ protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(FlowBucket.class)
+ .build());
protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MastershipBasedTimestamp.class);
- private EventuallyConsistentMap<DeviceId, Integer> flowCounts;
+ private EventuallyConsistentMap<DeviceId, Map<Integer, Integer>> flowCounts;
private IdGenerator idGenerator;
private NodeId local;
@@ -213,7 +215,7 @@
backupPeriod,
TimeUnit.MILLISECONDS);
- flowCounts = storageService.<DeviceId, Integer>eventuallyConsistentMapBuilder()
+ flowCounts = storageService.<DeviceId, Map<Integer, Integer>>eventuallyConsistentMapBuilder()
.withName("onos-flow-counts")
.withSerializer(serializerBuilder)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
@@ -348,9 +350,11 @@
@Override
public int getFlowRuleCount(DeviceId deviceId) {
- Integer count = flowCounts.get(deviceId);
- return count != null ? count : flowTable.flowEntries.get(deviceId) != null ?
- flowTable.flowEntries.get(deviceId).keySet().size() : 0;
+ Map<Integer, Integer> counts = flowCounts.get(deviceId);
+ return counts != null
+ ? counts.values().stream().mapToInt(v -> v).sum()
+ : flowTable.flowEntries.get(deviceId) != null
+ ? flowTable.flowEntries.get(deviceId).keySet().size() : 0;
}
@Override
@@ -657,15 +661,17 @@
private class BackupOperation {
private final NodeId nodeId;
private final DeviceId deviceId;
+ private final int bucket;
- public BackupOperation(NodeId nodeId, DeviceId deviceId) {
+ public BackupOperation(NodeId nodeId, DeviceId deviceId, int bucket) {
this.nodeId = nodeId;
this.deviceId = deviceId;
+ this.bucket = bucket;
}
@Override
public int hashCode() {
- return Objects.hash(nodeId, deviceId);
+ return Objects.hash(nodeId, deviceId, bucket);
}
@Override
@@ -673,13 +679,26 @@
if (other != null && other instanceof BackupOperation) {
BackupOperation that = (BackupOperation) other;
return this.nodeId.equals(that.nodeId) &&
- this.deviceId.equals(that.deviceId);
+ this.deviceId.equals(that.deviceId) &&
+ this.bucket == that.bucket;
} else {
return false;
}
}
}
+ private class FlowBucket {
+ private final DeviceId deviceId;
+ private final int bucket;
+ private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
+
+ FlowBucket(DeviceId deviceId, int bucket, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+ this.deviceId = deviceId;
+ this.bucket = bucket;
+ this.table = table;
+ }
+ }
+
private class InternalFlowTable implements ReplicaInfoEventListener {
//TODO replace the Map<V,V> with ExtendedSet
@@ -687,7 +706,8 @@
flowEntries = Maps.newConcurrentMap();
private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
- private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
+ private final Map<DeviceId, Map<Integer, Long>> lastUpdateTimes = Maps.newConcurrentMap();
+ private final Map<NodeId, Set<DeviceId>> inFlightUpdates = Maps.newConcurrentMap();
@Override
public void event(ReplicaInfoEvent event) {
@@ -700,54 +720,48 @@
return;
}
if (event.type() == MASTER_CHANGED) {
- lastUpdateTimes.put(deviceId, System.currentTimeMillis());
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ recordUpdate(deviceId, bucket);
+ }
}
- backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
+ backupSenderExecutor.execute(this::backup);
}
- private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
- // split up the devices into smaller batches and send them separately.
- Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
- .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
- }
-
- private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
- if (deviceIds.isEmpty()) {
- return;
- }
- log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
- Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- deviceFlowEntries = Maps.newConcurrentMap();
- deviceIds.forEach(id -> {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = getFlowTableCopy(id);
- int flowCount = copy.entrySet().stream()
- .mapToInt(e -> e.getValue().values().size()).sum();
- flowCounts.put(id, flowCount);
- deviceFlowEntries.put(id, copy);
+ private CompletableFuture<Void> backupFlowEntries(
+ NodeId nodeId, DeviceId deviceId, int bucket, long timestamp) {
+ log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.", bucket, deviceId, nodeId);
+ FlowBucket flowBucket = getFlowBucket(deviceId, bucket);
+ int flowCount = flowBucket.table.entrySet().stream()
+ .mapToInt(e -> e.getValue().values().size()).sum();
+ flowCounts.compute(deviceId, (id, counts) -> {
+ if (counts == null) {
+ counts = Maps.newConcurrentMap();
+ }
+ counts.put(bucket, flowCount);
+ return counts;
});
- clusterCommunicator.<Map<DeviceId,
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
- Set<DeviceId>>
- sendAndReceive(deviceFlowEntries,
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ clusterCommunicator.<FlowBucket, Set<FlowId>>
+ sendAndReceive(flowBucket,
FLOW_TABLE_BACKUP,
serializer::encode,
serializer::decode,
nodeId)
- .whenComplete((backedupDevices, error) -> {
- Set<DeviceId> devicesNotBackedup = error != null ?
- deviceFlowEntries.keySet() :
- Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
- if (devicesNotBackedup.size() > 0) {
- log.warn("Failed to backup devices: {}. Reason: {}, Node: {}",
- devicesNotBackedup, error != null ? error.getMessage() : "none",
- nodeId);
+ .whenComplete((backedupFlows, error) -> {
+ Set<FlowId> flowsNotBackedUp = error != null ?
+ flowBucket.table.keySet() :
+ Sets.difference(flowBucket.table.keySet(), backedupFlows);
+ if (flowsNotBackedUp.size() > 0) {
+ log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
+ flowsNotBackedUp, error != null ? error.getMessage() : "none", nodeId);
}
- if (backedupDevices != null) {
- backedupDevices.forEach(id -> {
- lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
- });
+ if (backedupFlows != null) {
+ lastBackupTimes.put(new BackupOperation(nodeId, deviceId, bucket), timestamp);
}
+ future.complete(null);
});
+ return future;
}
/**
@@ -783,34 +797,27 @@
}
}
- private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
+ private FlowBucket getFlowBucket(DeviceId deviceId, int bucket) {
if (persistenceEnabled) {
- return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
- .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
+ return new FlowBucket(deviceId, bucket, flowEntries.computeIfAbsent(deviceId, id ->
+ persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
.withName("FlowTable:" + deviceId.toString())
- .withSerializer(new Serializer() {
- @Override
- public <T> byte[] encode(T object) {
- return serializer.encode(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return serializer.decode(bytes);
- }
-
- @Override
- public <T> T copy(T object) {
- return serializer.copy(object);
- }
- })
- .build());
+ .withSerializer(serializer)
+ .build())
+ .entrySet()
+ .stream()
+ .filter(entry -> isBucket(entry.getKey(), bucket))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
} else {
- flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
- copy.put(k, Maps.newHashMap(v));
- });
- return copy;
+ Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
+ flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+ .entrySet()
+ .stream()
+ .filter(entry -> isBucket(entry.getKey(), bucket))
+ .forEach(entry -> {
+ copy.put(entry.getKey(), Maps.newHashMap(entry.getValue()));
+ });
+ return new FlowBucket(deviceId, bucket, copy);
}
}
@@ -836,12 +843,25 @@
return getFlowEntriesInternal(deviceId);
}
+ private boolean isBucket(FlowId flowId, int bucket) {
+ return bucket(flowId) == bucket;
+ }
+
+ private int bucket(FlowId flowId) {
+ return (int) (flowId.id() % NUM_BUCKETS);
+ }
+
+ private void recordUpdate(DeviceId deviceId, int bucket) {
+ lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+ .put(bucket, System.currentTimeMillis());
+ }
+
public void add(FlowEntry rule) {
getFlowEntriesInternal(rule.deviceId(), rule.id())
.compute((StoredFlowEntry) rule, (k, stored) -> {
return (StoredFlowEntry) rule;
});
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+ recordUpdate(rule.deviceId(), bucket(rule.id()));
}
public void update(FlowEntry rule) {
@@ -852,7 +872,7 @@
if (stored instanceof DefaultFlowEntry) {
DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
if (updated.created() >= storedEntry.created()) {
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+ recordUpdate(rule.deviceId(), bucket(rule.id()));
return updated;
} else {
log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
@@ -887,7 +907,7 @@
});
if (removedRule.get() != null) {
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+ recordUpdate(rule.deviceId(), bucket(rule.id()));
return removedRule.get();
} else {
return null;
@@ -902,53 +922,79 @@
flowEntries.clear();
}
- private List<NodeId> getBackupNodes(DeviceId deviceId) {
- // The returned backup node list is in the order of preference i.e. next likely master first.
+ private boolean isMasterNode(DeviceId deviceId) {
+ NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
+ return Objects.equals(master, clusterService.getLocalNode().id());
+ }
+
+ private boolean isBackupNode(NodeId nodeId, DeviceId deviceId) {
List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
- return ImmutableList.copyOf(allPossibleBackupNodes)
- .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
+ return allPossibleBackupNodes.indexOf(nodeId) < backupCount;
}
private void backup() {
- try {
- // compute a mapping from node to the set of devices whose flow entries it should backup
- Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
- flowEntries.keySet().forEach(deviceId -> {
- List<NodeId> backupNodes = getBackupNodes(deviceId);
- backupNodes.forEach(backupNode -> {
- if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
- < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
- devicesToBackupByNode.computeIfAbsent(backupNode,
- nodeId -> Sets.newHashSet()).add(deviceId);
- }
- });
+ clusterService.getNodes().stream()
+ .filter(node -> !node.id().equals(clusterService.getLocalNode().id()))
+ .forEach(node -> {
+ try {
+ backup(node.id());
+ } catch (Exception e) {
+ log.error("Backup failed.", e);
+ }
});
- // send the device flow entries to their respective backup nodes
- devicesToBackupByNode.forEach(this::sendBackups);
- } catch (Exception e) {
- log.error("Backup failed.", e);
+ }
+
+ private void backup(NodeId nodeId) {
+ for (DeviceId deviceId : flowEntries.keySet()) {
+ if (isMasterNode(deviceId) && isBackupNode(nodeId, deviceId)) {
+ backup(nodeId, deviceId);
+ }
}
}
- private Set<DeviceId> onBackupReceipt(Map<DeviceId,
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) {
- log.debug("Received flowEntries for {} to backup", flowTables.keySet());
- Set<DeviceId> backedupDevices = Sets.newHashSet();
+ private void backup(NodeId nodeId, DeviceId deviceId) {
+ final long timestamp = System.currentTimeMillis();
+ for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
+ long lastBackupTime = lastBackupTimes.getOrDefault(new BackupOperation(nodeId, deviceId, bucket), 0L);
+ long lastUpdateTime = lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+ .getOrDefault(bucket, 0L);
+ if (lastBackupTime < lastUpdateTime && startBackup(nodeId, deviceId)) {
+ backupFlowEntries(nodeId, deviceId, bucket, timestamp)
+ .thenRunAsync(() -> {
+ finishBackup(nodeId, deviceId);
+ backup(nodeId);
+ }, backupSenderExecutor);
+ return;
+ }
+ }
+ }
+
+ private boolean startBackup(NodeId nodeId, DeviceId deviceId) {
+ return inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).add(deviceId);
+ }
+
+ private void finishBackup(NodeId nodeId, DeviceId deviceId) {
+ inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).remove(deviceId);
+ }
+
+ private Set<FlowId> onBackupReceipt(FlowBucket bucket) {
+ log.debug("Received flowEntries for {} bucket {} to backup", bucket.deviceId, bucket.bucket);
+ Set<FlowId> backedupFlows = Sets.newHashSet();
try {
- flowTables.forEach((deviceId, deviceFlowTable) -> {
- // Only process those devices are that not managed by the local node.
- if (!Objects.equals(local, mastershipService.getMasterFor(deviceId))) {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
- getFlowTable(deviceId);
- backupFlowTable.clear();
- backupFlowTable.putAll(deviceFlowTable);
- backedupDevices.add(deviceId);
- }
- });
+ // Only process those devices are that not managed by the local node.
+ NodeId master = replicaInfoManager.getReplicaInfoFor(bucket.deviceId).master().orElse(null);
+ if (!Objects.equals(local, master)) {
+ Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable = getFlowTable(bucket.deviceId);
+ backupFlowTable.putAll(bucket.table);
+ backupFlowTable.entrySet()
+ .removeIf(entry -> isBucket(entry.getKey(), bucket.bucket)
+ && !bucket.table.containsKey(entry.getKey()));
+ backedupFlows.addAll(bucket.table.keySet());
+ }
} catch (Exception e) {
log.warn("Failure processing backup request", e);
}
- return backedupDevices;
+ return backedupFlows;
}
}