ONOS-1286: Backing out chages to use ECMap for dist flow rule store backups
Change-Id: I93a60ef183aa335fecf63b97d300830369d2b9d7
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 7d30f99..85015c9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,4 +1,4 @@
-/*
+ /*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,11 +15,15 @@
*/
package org.onosproject.store.flow.impl;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.commons.lang.math.RandomUtils;
+import com.hazelcast.core.IMap;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -28,7 +32,9 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
@@ -36,7 +42,6 @@
import org.onosproject.core.IdGenerator;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
@@ -55,19 +60,15 @@
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
-import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.ecmap.EventuallyConsistentMap;
-import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.impl.ClockService;
-import org.onosproject.store.impl.MastershipBasedTimestamp;
-import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.hz.AbstractHazelcastStore;
+import org.onosproject.store.hz.SMap;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
@@ -75,12 +76,14 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Dictionary;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -93,8 +96,9 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
@@ -107,13 +111,13 @@
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
- extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+ extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
- private static final boolean DEFAULT_BACKUP_ENABLED = false;
+ private static final boolean DEFAULT_BACKUP_ENABLED = true;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
@@ -124,7 +128,10 @@
label = "Indicates whether backups are enabled or not")
private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
- private InternalFlowTable flowTable;
+ private InternalFlowTable flowTable = new InternalFlowTable();
+
+ /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+ flowEntries = new ConcurrentHashMap<>();*/
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@@ -139,18 +146,24 @@
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceClockService deviceClockService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
- @Reference(cardinality = MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService configService;
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
+ // Cache of SMaps used for backup data. each SMap contain device flow table
+ private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
+
private ExecutorService messageHandlingExecutor;
+ private final ExecutorService backupExecutors =
+ BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+ //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
+
+ private boolean syncBackup = false;
+
protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -170,11 +183,16 @@
@Activate
public void activate(ComponentContext context) {
configService.registerProperties(getClass());
-
- flowTable = new InternalFlowTable().withBackupsEnabled(backupEnabled);
+ super.serializer = SERIALIZER;
+ super.theInstance = storeService.getHazelcastInstance();
idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
+ // Cache to create SMap on demand
+ smaps = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new SMapLoader());
+
final NodeId local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
@@ -212,7 +230,7 @@
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
- Set<StoredFlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
+ Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
} catch (IOException e) {
@@ -282,8 +300,6 @@
if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
msgHandlerPoolSize = newPoolSize;
backupEnabled = newBackupEnabled;
- // reconfigure the store
- flowTable.withBackupsEnabled(backupEnabled);
ExecutorService oldMsgHandler = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
@@ -297,6 +313,7 @@
prefix, msgHandlerPoolSize, backupEnabled);
}
+
// This is not a efficient operation on a distributed sharded
// flow store. We need to revisit the need for this operation or at least
// make it device specific.
@@ -354,7 +371,7 @@
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return flowTable.getFlowEntries(deviceId).stream().collect(Collectors.toSet());
+ return flowTable.getFlowEntries(deviceId);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
@@ -451,6 +468,7 @@
new CompletedBatchOperation(true, Collections.emptySet(), did)));
return;
}
+ updateBackup(did, currentOps);
notifyDelegate(FlowRuleBatchEvent.requested(new
FlowRuleBatchRequest(operation.id(),
@@ -489,6 +507,23 @@
).filter(op -> op != null).collect(Collectors.toSet());
}
+ private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
+ if (!backupEnabled) {
+ return;
+ }
+
+ Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
+
+ if (syncBackup) {
+ // wait for backup to complete
+ try {
+ backup.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Failed to create backups", e);
+ }
+ }
+ }
+
@Override
public void deleteFlowRule(FlowRule rule) {
storeBatch(
@@ -504,7 +539,7 @@
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
final NodeId localId = clusterService.getLocalNode().id();
if (localId.equals(replicaInfo.master().orNull())) {
- return addOrUpdateFlowRuleInternal((StoredFlowEntry) rule);
+ return addOrUpdateFlowRuleInternal(rule);
}
log.warn("Tried to update FlowRule {} state,"
@@ -512,7 +547,10 @@
return null;
}
- private FlowRuleEvent addOrUpdateFlowRuleInternal(StoredFlowEntry rule) {
+ private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
+ final DeviceId did = rule.deviceId();
+
+
// check if this new rule is an update to an existing entry
StoredFlowEntry stored = flowTable.getFlowEntry(rule);
if (stored != null) {
@@ -521,15 +559,21 @@
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
+ updateBackup(did, Sets.newHashSet(entry));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+ // TODO: also update backup if the behavior is correct.
flowTable.add(rule);
+
return null;
+
}
@Override
@@ -570,6 +614,9 @@
final DeviceId deviceId = rule.deviceId();
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
+ FlowRuleBatchEntry entry =
+ new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
+ updateBackup(deviceId, Sets.newHashSet(entry));
if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
@@ -596,10 +643,35 @@
}
}
+ private void loadFromBackup(final DeviceId did) {
+ if (!backupEnabled) {
+ return;
+ }
+ log.info("We are now the master for {}. Will load flow rules from backup", did);
+ try {
+ log.debug("Loading FlowRules for {} from backups", did);
+ SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
+ for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
+ : backupFlowTable.entrySet()) {
+
+ log.trace("loading {}", e.getValue());
+ for (StoredFlowEntry entry : e.getValue()) {
+ flowTable.getFlowEntriesById(entry).remove(entry);
+ flowTable.getFlowEntriesById(entry).add(entry);
+
+
+ }
+ }
+ } catch (ExecutionException e) {
+ log.error("Failed to load backup flowtable for {}", did, e);
+ }
+ }
+
private void removeFromPrimary(final DeviceId did) {
flowTable.clearDevice(did);
}
+
private final class OnStoreBatch implements ClusterMessageHandler {
private final NodeId local;
@@ -616,11 +688,10 @@
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!local.equals(replicaInfo.master().orNull())) {
- Set<FlowRule> failures = operation.getOperations()
- .stream()
- .map(FlowRuleBatchEntry::target)
- .collect(Collectors.toSet());
-
+ Set<FlowRule> failures = new HashSet<>(operation.size());
+ for (FlowRuleBatchEntry op : operation.getOperations()) {
+ failures.add(op.target());
+ }
CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
// This node is no longer the master, respond as all failed.
// TODO: we might want to wrap response in envelope
@@ -641,6 +712,17 @@
}
}
+ private final class SMapLoader
+ extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
+
+ @Override
+ public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
+ throws Exception {
+ IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
+ return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
+ }
+ }
+
private final class InternalReplicaInfoEventListener
implements ReplicaInfoEventListener {
@@ -655,7 +737,7 @@
if (local.equals(rInfo.master().orNull())) {
// This node is the new master, populate local structure
// from backup
- flowTable.loadFromBackup(did);
+ loadFromBackup(did);
}
//else {
// This node is no longer the master holder,
@@ -672,133 +754,146 @@
}
}
+ // Task to update FlowEntries in backup HZ store
+ private final class UpdateBackup implements Runnable {
+
+ private final DeviceId deviceId;
+ private final Set<FlowRuleBatchEntry> ops;
+
+
+ public UpdateBackup(DeviceId deviceId,
+ Set<FlowRuleBatchEntry> ops) {
+ this.deviceId = checkNotNull(deviceId);
+ this.ops = checkNotNull(ops);
+
+ }
+
+ @Override
+ public void run() {
+ try {
+ log.trace("update backup {} {}", deviceId, ops
+ );
+ final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
+
+
+ ops.stream().forEach(
+ op -> {
+ final FlowRule entry = op.target();
+ final FlowId id = entry.id();
+ ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+ List<StoredFlowEntry> list = new ArrayList<>();
+ if (original != null) {
+ list.addAll(original);
+ }
+ list.remove(op.target());
+ if (op.operator() == FlowRuleOperation.ADD) {
+ list.add((StoredFlowEntry) entry);
+ }
+
+ ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+ boolean success;
+ if (original == null) {
+ success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+ } else {
+ success = backupFlowTable.replace(id, original, newValue);
+ }
+ if (!success) {
+ log.error("Updating backup failed.");
+ }
+
+ }
+ );
+ } catch (ExecutionException e) {
+ log.error("Failed to write to backups", e);
+ }
+
+ }
+ }
+
private class InternalFlowTable {
- private boolean backupsEnabled = true;
+ /*
+ TODO: This needs to be cleaned up. Perhaps using the eventually consistent
+ map when it supports distributed to a sequence of instances.
+ */
+
+
+ private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
+ flowEntries = new ConcurrentHashMap<>();
+
+
+ private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
+ return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
+ }
/**
- * Turns backups on or off.
- * @param backupsEnabled whether backups should be enabled or not
- * @return this instance
+ * Returns the flow table for specified device.
+ *
+ * @param deviceId identifier of the device
+ * @return Map representing Flow Table of given device.
*/
- public InternalFlowTable withBackupsEnabled(boolean backupsEnabled) {
- this.backupsEnabled = backupsEnabled;
- return this;
- }
-
- private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
- flowEntries = Maps.newConcurrentMap();
-
- private final KryoNamespace.Builder flowSerializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MastershipBasedTimestamp.class);
-
- private final ClockService<FlowId, StoredFlowEntry> clockService =
- (flowId, flowEntry) ->
- (flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId());
-
- private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
- new EventuallyConsistentMapImpl<>("flow-backup",
- clusterService,
- clusterCommunicator,
- flowSerializer,
- clockService,
- (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true);
-
- private Collection<NodeId> getPeerNodes() {
- List<NodeId> nodes = clusterService.getNodes()
- .stream()
- .map(node -> node.id())
- .filter(id -> !id.equals(clusterService.getLocalNode().id()))
- .collect(Collectors.toList());
-
- if (nodes.isEmpty()) {
- return ImmutableList.of();
- } else {
- // get a random peer
- return ImmutableList.of(nodes.get(RandomUtils.nextInt(nodes.size())));
- }
- }
-
- public void loadFromBackup(DeviceId deviceId) {
- if (!backupsEnabled) {
- return;
- }
- log.info("We are now the master for {}. Will load flow rules from backup", deviceId);
-
- ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>();
-
- backupMap.values()
- .stream()
- .filter(entry -> entry.deviceId().equals(deviceId))
- .forEach(entry -> flowTable.computeIfPresent(entry.id(), (k, v) -> {
- if (v == null) {
- return Sets.newHashSet(entry);
- } else {
- v.add(entry);
- }
- return v;
- }));
- flowEntries.putIfAbsent(deviceId, flowTable);
+ private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
+ return createIfAbsentUnchecked(flowEntries,
+ deviceId, lazyEmptyFlowTable());
}
private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
- return flowEntries
- .computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
- .computeIfAbsent(flowId, k -> new CopyOnWriteArraySet<>());
+ final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
+ Set<StoredFlowEntry> r = flowTable.get(flowId);
+ if (r == null) {
+ final Set<StoredFlowEntry> concurrentlyAdded;
+ r = new CopyOnWriteArraySet<>();
+ concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
+ if (concurrentlyAdded != null) {
+ return concurrentlyAdded;
+ }
+ }
+ return r;
}
private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
- return getFlowEntriesInternal(rule.deviceId(), rule.id())
- .stream()
- .filter(element -> element.equals(rule))
- .findFirst()
- .orElse(null);
+ for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
+ if (f.equals(rule)) {
+ return f;
+ }
+ }
+ return null;
}
- private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
- Set<StoredFlowEntry> entries = Sets.newHashSet();
- flowEntries.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
- .values()
- .forEach(entries::addAll);
- return entries;
+ private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+ return getFlowTable(deviceId).values().stream()
+ .flatMap((list -> list.stream())).collect(Collectors.toSet());
+
}
+
public StoredFlowEntry getFlowEntry(FlowRule rule) {
return getFlowEntryInternal(rule);
}
- public Set<StoredFlowEntry> getFlowEntries(DeviceId deviceId) {
+ public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
return getFlowEntriesInternal(deviceId);
}
- public void add(StoredFlowEntry rule) {
- getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule);
- if (backupsEnabled) {
- try {
- backupMap.put(rule.id(), rule);
- } catch (Exception e) {
- log.warn("Failed to backup flow rule", e);
- }
- }
+ public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
+ return getFlowEntriesInternal(entry.deviceId(), entry.id());
+ }
+
+ public void add(FlowEntry rule) {
+ ((CopyOnWriteArraySet)
+ getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
}
public boolean remove(DeviceId deviceId, FlowEntry rule) {
- boolean status =
- getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
- if (backupsEnabled && status) {
- try {
- backupMap.remove(rule.id(), (DefaultFlowEntry) rule);
- } catch (Exception e) {
- log.warn("Failed to remove backup of flow rule", e);
- }
- }
- return status;
+ return ((CopyOnWriteArraySet)
+ getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
+ //return flowEntries.remove(deviceId, rule);
}
public void clearDevice(DeviceId did) {
flowEntries.remove(did);
- // Flow entries should continue to remain in backup map.
}
}
+
+
}