DistributedFlowRuleStore: synchronized -> Reader/Writer lock
fix for ONOS-195
Change-Id: I3e15104225878d1616fa790095695400bcc43697
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
index 958dc91..bba5a3c 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
@@ -60,6 +60,7 @@
* Stores a batch of flow rules.
*
* @param batchOperation batch of flow rules.
+ * A batch can contain flow rules for a single device only.
* @return Future response indicating success/failure of the batch operation
* all the way down to the device.
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index c94b6c0..14dfd0d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
@@ -109,7 +110,8 @@
private final Logger log = getLogger(getClass());
// primary data:
- // read/write needs to be synchronized
+ // read/write needs to be locked
+ private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
// store entries as a pile of rules, no info about device tables
private final Multimap<DeviceId, StoredFlowEntry> flowEntries
= ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
@@ -186,7 +188,7 @@
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
- log.info("received get flow entry request for {}", rule);
+ log.debug("received get flow entry request for {}", rule);
FlowEntry flowEntry = getFlowEntryInternal(rule);
try {
message.respond(SERIALIZER.encode(flowEntry));
@@ -201,7 +203,7 @@
@Override
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
- log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
+ log.debug("Received get flow entries request for {} from {}", deviceId, message.sender());
Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
@@ -240,21 +242,20 @@
}
@Override
- public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+ public FlowEntry getFlowEntry(FlowRule rule) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
if (!replicaInfo.master().isPresent()) {
log.warn("No master for {}", rule);
- // TODO: revisit if this should be returning null.
- // FIXME: throw a FlowStoreException
- throw new RuntimeException("No master for " + rule);
+ // TODO: should we try returning from backup?
+ return null;
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntryInternal(rule);
}
- log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
+ log.debug("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), rule.deviceId());
ClusterMessage message = new ClusterMessage(
@@ -271,25 +272,28 @@
}
}
- private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
- for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
- if (f.equals(rule)) {
- return f;
+ private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
+ flowEntriesLock.readLock().lock();
+ try {
+ for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
+ if (f.equals(rule)) {
+ return f;
+ }
}
+ } finally {
+ flowEntriesLock.readLock().unlock();
}
return null;
}
@Override
- public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!replicaInfo.master().isPresent()) {
log.warn("No master for {}", deviceId);
- // TODO: revisit if this should be returning empty collection or throwing exception.
- // FIXME: throw a FlowStoreException
- //throw new RuntimeException("No master for " + deviceId);
+ // TODO: should we try returning from backup?
return Collections.emptyList();
}
@@ -297,7 +301,7 @@
return getFlowEntriesInternal(deviceId);
}
- log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+ log.debug("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
@@ -314,12 +318,17 @@
}
}
- private synchronized Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
- Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
- if (rules == null) {
- return Collections.emptySet();
+ private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+ flowEntriesLock.readLock().lock();
+ try {
+ Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
+ if (rules == null) {
+ return Collections.emptySet();
+ }
+ return ImmutableSet.copyOf(rules);
+ } finally {
+ flowEntriesLock.readLock().unlock();
}
- return ImmutableSet.copyOf(rules);
}
@Override
@@ -327,7 +336,6 @@
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
}
- // FIXME document that all of the FlowEntries must be about same device
@Override
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
@@ -351,7 +359,7 @@
return storeBatchInternal(operation);
}
- log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
+ log.debug("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
@@ -368,41 +376,46 @@
}
}
- private synchronized ListenableFuture<CompletedBatchOperation>
+ private ListenableFuture<CompletedBatchOperation>
storeBatchInternal(FlowRuleBatchOperation operation) {
final List<StoredFlowEntry> toRemove = new ArrayList<>();
final List<StoredFlowEntry> toAdd = new ArrayList<>();
DeviceId did = null;
- for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
- FlowRule flowRule = batchEntry.getTarget();
- FlowRuleOperation op = batchEntry.getOperator();
- if (did == null) {
- did = flowRule.deviceId();
- }
- if (op.equals(FlowRuleOperation.REMOVE)) {
- StoredFlowEntry entry = getFlowEntryInternal(flowRule);
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- toRemove.add(entry);
- }
- } else if (op.equals(FlowRuleOperation.ADD)) {
- StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
- DeviceId deviceId = flowRule.deviceId();
- if (!flowEntries.containsEntry(deviceId, flowEntry)) {
- flowEntries.put(deviceId, flowEntry);
- toAdd.add(flowEntry);
- }
- }
- }
- if (toAdd.isEmpty() && toRemove.isEmpty()) {
- return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
- }
- // create remote backup copies
- final DeviceId deviceId = did;
- updateBackup(deviceId, toAdd, toRemove);
+ flowEntriesLock.writeLock().lock();
+ try {
+ for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
+ FlowRule flowRule = batchEntry.getTarget();
+ FlowRuleOperation op = batchEntry.getOperator();
+ if (did == null) {
+ did = flowRule.deviceId();
+ }
+ if (op.equals(FlowRuleOperation.REMOVE)) {
+ StoredFlowEntry entry = getFlowEntryInternal(flowRule);
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ toRemove.add(entry);
+ }
+ } else if (op.equals(FlowRuleOperation.ADD)) {
+ StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+ DeviceId deviceId = flowRule.deviceId();
+ if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+ flowEntries.put(deviceId, flowEntry);
+ toAdd.add(flowEntry);
+ }
+ }
+ }
+ if (toAdd.isEmpty() && toRemove.isEmpty()) {
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+ }
+
+ // create remote backup copies
+ updateBackup(did, toAdd, toRemove);
+ } finally {
+ flowEntriesLock.writeLock().unlock();
+ }
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
@@ -451,27 +464,32 @@
return null;
}
- private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
+ private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
final DeviceId did = rule.deviceId();
- // check if this new rule is an update to an existing entry
- StoredFlowEntry stored = getFlowEntryInternal(rule);
- if (stored != null) {
- stored.setBytes(rule.bytes());
- stored.setLife(rule.life());
- stored.setPackets(rule.packets());
- if (stored.state() == FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntryState.ADDED);
- // update backup.
- updateBackup(did, Arrays.asList(stored));
- return new FlowRuleEvent(Type.RULE_ADDED, rule);
+ flowEntriesLock.writeLock().lock();
+ try {
+ // check if this new rule is an update to an existing entry
+ StoredFlowEntry stored = getFlowEntryInternal(rule);
+ if (stored != null) {
+ stored.setBytes(rule.bytes());
+ stored.setLife(rule.life());
+ stored.setPackets(rule.packets());
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.ADDED);
+ // update backup.
+ updateBackup(did, Arrays.asList(stored));
+ return new FlowRuleEvent(Type.RULE_ADDED, rule);
+ }
+ return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
- return new FlowRuleEvent(Type.RULE_UPDATED, rule);
- }
- // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
- // TODO: also update backup.
- flowEntries.put(did, new DefaultFlowEntry(rule));
+ // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+ // TODO: also update backup.
+ flowEntries.put(did, new DefaultFlowEntry(rule));
+ } finally {
+ flowEntriesLock.writeLock().unlock();
+ }
return null;
}
@@ -491,15 +509,20 @@
return null;
}
- private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
+ private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
final DeviceId deviceId = rule.deviceId();
- // This is where one could mark a rule as removed and still keep it in the store.
- final boolean removed = flowEntries.remove(deviceId, rule);
- updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
- if (removed) {
- return new FlowRuleEvent(RULE_REMOVED, rule);
- } else {
- return null;
+ flowEntriesLock.writeLock().lock();
+ try {
+ // This is where one could mark a rule as removed and still keep it in the store.
+ final boolean removed = flowEntries.remove(deviceId, rule);
+ updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
+ if (removed) {
+ return new FlowRuleEvent(RULE_REMOVED, rule);
+ } else {
+ return null;
+ }
+ } finally {
+ flowEntriesLock.writeLock().unlock();
}
}
@@ -515,9 +538,9 @@
notifyDelegate(event);
}
- private synchronized void loadFromBackup(final DeviceId did) {
- // should relax synchronized condition
+ private void loadFromBackup(final DeviceId did) {
+ flowEntriesLock.writeLock().lock();
try {
log.info("Loading FlowRules for {} from backups", did);
SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
@@ -534,11 +557,19 @@
}
} catch (ExecutionException e) {
log.error("Failed to load backup flowtable for {}", did, e);
+ } finally {
+ flowEntriesLock.writeLock().unlock();
}
}
- private synchronized void removeFromPrimary(final DeviceId did) {
- Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
+ private void removeFromPrimary(final DeviceId did) {
+ Collection<StoredFlowEntry> removed = null;
+ flowEntriesLock.writeLock().lock();
+ try {
+ removed = flowEntries.removeAll(did);
+ } finally {
+ flowEntriesLock.writeLock().unlock();
+ }
log.debug("removedFromPrimary {}", removed);
}