add basic backup to DistributedFlowRuleStore
Change-Id: I8eedf0cf30a2555d45145889b5ef210e826b0ac0
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
index 34e3d31..f8a25cb 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -13,7 +13,7 @@
private final List<FlowEntry> toAdd;
private final List<FlowEntry> toRemove;
- public FlowRuleBatchRequest(int batchId, List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+ public FlowRuleBatchRequest(int batchId, List<? extends FlowEntry> toAdd, List<? extends FlowEntry> toRemove) {
this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 30b9008..69b6743 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,5 +1,6 @@
package org.onlab.onos.store.flow.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
@@ -10,6 +11,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -27,6 +29,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
@@ -34,6 +37,7 @@
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
+import org.onlab.onos.net.flow.FlowId;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEvent;
@@ -45,12 +49,15 @@
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
import org.onlab.onos.net.flow.StoredFlowEntry;
-import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.flow.ReplicaInfo;
+import org.onlab.onos.store.flow.ReplicaInfoEvent;
+import org.onlab.onos.store.flow.ReplicaInfoEventListener;
import org.onlab.onos.store.flow.ReplicaInfoService;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
@@ -59,13 +66,17 @@
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import com.hazelcast.core.IMap;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -73,7 +84,7 @@
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
- extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+ extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
@@ -82,8 +93,6 @@
private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
- private final Multimap<Short, FlowRule> flowEntriesById =
- ArrayListMultimap.<Short, FlowRule>create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
@@ -109,10 +118,17 @@
//.removalListener(listener)
.build();
+ private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
+
private final ExecutorService futureListeners =
Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
+ private final ExecutorService backupExecutors =
+ Executors.newSingleThreadExecutor(namedThreads("async-backups"));
+
+ // TODO make this configurable
+ private boolean syncBackup = false;
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -127,8 +143,20 @@
// TODO: make this configurable
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+ private ReplicaInfoEventListener replicaInfoEventListener;
+
+ @Override
@Activate
public void activate() {
+
+ super.serializer = SERIALIZER;
+ super.theInstance = storeService.getHazelcastInstance();
+
+ // Cache to create SMap on demand
+ smaps = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new SMapLoader());
+
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
@Override
@@ -182,11 +210,16 @@
}
});
+ replicaInfoEventListener = new InternalReplicaInfoEventListener();
+
+ replicaInfoManager.addListener(replicaInfoEventListener);
+
log.info("Started");
}
@Deactivate
public void deactivate() {
+ replicaInfoManager.removeListener(replicaInfoEventListener);
log.info("Stopped");
}
@@ -276,8 +309,10 @@
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
}
+ // FIXME document that all of the FlowEntries must be about same device
@Override
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+
if (operation.getOperations().isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
@@ -313,12 +348,17 @@
}
private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
- List<FlowEntry> toRemove = new ArrayList<>();
- List<FlowEntry> toAdd = new ArrayList<>();
- // TODO: backup changes to hazelcast map
+ final List<StoredFlowEntry> toRemove = new ArrayList<>();
+ final List<StoredFlowEntry> toAdd = new ArrayList<>();
+ DeviceId did = null;
+
+
for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
FlowRule flowRule = batchEntry.getTarget();
FlowRuleOperation op = batchEntry.getOperator();
+ if (did == null) {
+ did = flowRule.deviceId();
+ }
if (op.equals(FlowRuleOperation.REMOVE)) {
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
@@ -330,7 +370,6 @@
DeviceId deviceId = flowRule.deviceId();
if (!flowEntries.containsEntry(deviceId, flowEntry)) {
flowEntries.put(deviceId, flowEntry);
- flowEntriesById.put(flowRule.appId(), flowEntry);
toAdd.add(flowEntry);
}
}
@@ -339,14 +378,39 @@
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
+ // create remote backup copies
+ final DeviceId deviceId = did;
+ updateBackup(deviceId, toAdd, toRemove);
+
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
pendingFutures.put(batchId, r);
notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+
return r;
}
+ private void updateBackup(final DeviceId deviceId,
+ final List<StoredFlowEntry> toAdd,
+ final List<? extends FlowRule> list) {
+
+ Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
+
+ if (syncBackup) {
+ // wait for backup to complete
+ try {
+ submit.get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Failed to create backups", e);
+ }
+ }
+ }
+
+ private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
+ updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
+ }
+
@Override
public void deleteFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
@@ -365,7 +429,7 @@
}
private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
- DeviceId did = rule.deviceId();
+ final DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
StoredFlowEntry stored = getFlowEntryInternal(rule);
@@ -375,16 +439,18 @@
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
+ // update backup.
+ updateBackup(did, Arrays.asList(stored));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+ // TODO: also update backup.
flowEntries.put(did, new DefaultFlowEntry(rule));
return null;
- // TODO: also update backup.
}
@Override
@@ -401,13 +467,15 @@
}
private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
+ final DeviceId deviceId = rule.deviceId();
// This is where one could mark a rule as removed and still keep it in the store.
- if (flowEntries.remove(rule.deviceId(), rule)) {
+ final boolean removed = flowEntries.remove(deviceId, rule);
+ updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
+ if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
- // TODO: also update backup.
}
@Override
@@ -421,4 +489,145 @@
}
notifyDelegate(event);
}
+
+ private synchronized void loadFromBackup(final DeviceId did) {
+ // should relax synchronized condition
+
+ try {
+ log.info("Loading FlowRules for {} from backups", did);
+ SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
+ for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
+ : backupFlowTable.entrySet()) {
+
+ // TODO: should we be directly updating internal structure or
+ // should we be triggering event?
+ log.debug("loading {}", e.getValue());
+ for (StoredFlowEntry entry : e.getValue()) {
+ flowEntries.remove(did, entry);
+ flowEntries.put(did, entry);
+ }
+ }
+ } catch (ExecutionException e) {
+ log.error("Failed to load backup flowtable for {}", did, e);
+ }
+ }
+
+ private synchronized void removeFromPrimary(final DeviceId did) {
+ Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
+ log.debug("removedFromPrimary {}", removed);
+ }
+
+ private final class SMapLoader
+ extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
+
+ @Override
+ public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
+ throws Exception {
+ IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
+ return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
+ }
+ }
+
+ private final class InternalReplicaInfoEventListener
+ implements ReplicaInfoEventListener {
+
+ @Override
+ public void event(ReplicaInfoEvent event) {
+ final NodeId local = clusterService.getLocalNode().id();
+ final DeviceId did = event.subject();
+ final ReplicaInfo rInfo = event.replicaInfo();
+
+ switch (event.type()) {
+ case MASTER_CHANGED:
+ if (local.equals(rInfo.master().orNull())) {
+ // This node is the new master, populate local structure
+ // from backup
+ loadFromBackup(did);
+ } else {
+ // This node is no longer the master holder,
+ // clean local structure
+ removeFromPrimary(did);
+ // FIXME: probably should stop pending backup activities in
+ // executors to avoid overwriting with old value
+ }
+ break;
+ default:
+ break;
+
+ }
+ }
+ }
+
+ // Task to update FlowEntries in backup HZ store
+ private final class UpdateBackup implements Runnable {
+
+ private final DeviceId deviceId;
+ private final List<StoredFlowEntry> toAdd;
+ private final List<? extends FlowRule> toRemove;
+
+ public UpdateBackup(DeviceId deviceId,
+ List<StoredFlowEntry> toAdd,
+ List<? extends FlowRule> list) {
+ this.deviceId = checkNotNull(deviceId);
+ this.toAdd = checkNotNull(toAdd);
+ this.toRemove = checkNotNull(list);
+ }
+
+ @Override
+ public void run() {
+ try {
+ log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
+ final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
+ // Following should be rewritten using async APIs
+ for (StoredFlowEntry entry : toAdd) {
+ final FlowId id = entry.id();
+ ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+ List<StoredFlowEntry> list = new ArrayList<>();
+ if (original != null) {
+ list.addAll(original);
+ }
+
+ list.remove(entry);
+ list.add(entry);
+
+ ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+ boolean success;
+ if (original == null) {
+ success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+ } else {
+ success = backupFlowTable.replace(id, original, newValue);
+ }
+ // TODO retry?
+ if (!success) {
+ log.error("Updating backup failed.");
+ }
+ }
+ for (FlowRule entry : toRemove) {
+ final FlowId id = entry.id();
+ ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+ List<StoredFlowEntry> list = new ArrayList<>();
+ if (original != null) {
+ list.addAll(original);
+ }
+
+ list.remove(entry);
+
+ ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+ boolean success;
+ if (original == null) {
+ success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+ } else {
+ success = backupFlowTable.replace(id, original, newValue);
+ }
+ // TODO retry?
+ if (!success) {
+ log.error("Updating backup failed.");
+ }
+ }
+ } catch (ExecutionException e) {
+ log.error("Failed to write to backups", e);
+ }
+
+ }
+ }
}