DistributedFlowRuleStore: synchronized -> Reader/Writer lock

fix for ONOS-195

Change-Id: I3e15104225878d1616fa790095695400bcc43697
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
index 958dc91..bba5a3c 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
@@ -60,6 +60,7 @@
      * Stores a batch of flow rules.
      *
      * @param batchOperation batch of flow rules.
+     *           A batch can contain flow rules for a single device only.
      * @return Future response indicating success/failure of the batch operation
      * all the way down to the device.
      */
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index c94b6c0..14dfd0d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.List;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -109,7 +110,8 @@
     private final Logger log = getLogger(getClass());
 
     // primary data:
-    //  read/write needs to be synchronized
+    //  read/write needs to be locked
+    private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
     // store entries as a pile of rules, no info about device tables
     private final Multimap<DeviceId, StoredFlowEntry> flowEntries
         = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
@@ -186,7 +188,7 @@
             @Override
             public void handle(ClusterMessage message) {
                 FlowRule rule = SERIALIZER.decode(message.payload());
-                log.info("received get flow entry request for {}", rule);
+                log.debug("received get flow entry request for {}", rule);
                 FlowEntry flowEntry = getFlowEntryInternal(rule);
                 try {
                     message.respond(SERIALIZER.encode(flowEntry));
@@ -201,7 +203,7 @@
             @Override
             public void handle(ClusterMessage message) {
                 DeviceId deviceId = SERIALIZER.decode(message.payload());
-                log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
+                log.debug("Received get flow entries request for {} from {}", deviceId, message.sender());
                 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
                 try {
                     message.respond(SERIALIZER.encode(flowEntries));
@@ -240,21 +242,20 @@
     }
 
     @Override
-    public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+    public FlowEntry getFlowEntry(FlowRule rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
 
         if (!replicaInfo.master().isPresent()) {
             log.warn("No master for {}", rule);
-            // TODO: revisit if this should be returning null.
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException("No master for " + rule);
+            // TODO: should we try returning from backup?
+            return null;
         }
 
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getFlowEntryInternal(rule);
         }
 
-        log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
+        log.debug("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
                 replicaInfo.master().orNull(), rule.deviceId());
 
         ClusterMessage message = new ClusterMessage(
@@ -271,25 +272,28 @@
         }
     }
 
-    private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
-        for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
-            if (f.equals(rule)) {
-                return f;
+    private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
+        flowEntriesLock.readLock().lock();
+        try {
+            for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
+                if (f.equals(rule)) {
+                    return f;
+                }
             }
+        } finally {
+            flowEntriesLock.readLock().unlock();
         }
         return null;
     }
 
     @Override
-    public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+    public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
 
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
 
         if (!replicaInfo.master().isPresent()) {
             log.warn("No master for {}", deviceId);
-            // TODO: revisit if this should be returning empty collection or throwing exception.
-            // FIXME: throw a FlowStoreException
-            //throw new RuntimeException("No master for " + deviceId);
+            // TODO: should we try returning from backup?
             return Collections.emptyList();
         }
 
@@ -297,7 +301,7 @@
             return getFlowEntriesInternal(deviceId);
         }
 
-        log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+        log.debug("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
                 replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
@@ -314,12 +318,17 @@
         }
     }
 
-    private synchronized Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
-        Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
-        if (rules == null) {
-            return Collections.emptySet();
+    private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+        flowEntriesLock.readLock().lock();
+        try {
+            Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
+            if (rules == null) {
+                return Collections.emptySet();
+            }
+            return ImmutableSet.copyOf(rules);
+        } finally {
+            flowEntriesLock.readLock().unlock();
         }
-        return ImmutableSet.copyOf(rules);
     }
 
     @Override
@@ -327,7 +336,6 @@
         storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
     }
 
-    // FIXME document that all of the FlowEntries must be about same device
     @Override
     public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
 
@@ -351,7 +359,7 @@
             return storeBatchInternal(operation);
         }
 
-        log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
+        log.debug("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                 replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
@@ -368,41 +376,46 @@
         }
     }
 
-    private synchronized ListenableFuture<CompletedBatchOperation>
+    private ListenableFuture<CompletedBatchOperation>
                         storeBatchInternal(FlowRuleBatchOperation operation) {
 
         final List<StoredFlowEntry> toRemove = new ArrayList<>();
         final List<StoredFlowEntry> toAdd = new ArrayList<>();
         DeviceId did = null;
 
-        for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
-            FlowRule flowRule = batchEntry.getTarget();
-            FlowRuleOperation op = batchEntry.getOperator();
-            if (did == null) {
-                did = flowRule.deviceId();
-            }
-            if (op.equals(FlowRuleOperation.REMOVE)) {
-                StoredFlowEntry entry = getFlowEntryInternal(flowRule);
-                if (entry != null) {
-                    entry.setState(FlowEntryState.PENDING_REMOVE);
-                    toRemove.add(entry);
-                }
-            } else if (op.equals(FlowRuleOperation.ADD)) {
-                StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
-                DeviceId deviceId = flowRule.deviceId();
-                if (!flowEntries.containsEntry(deviceId, flowEntry)) {
-                    flowEntries.put(deviceId, flowEntry);
-                    toAdd.add(flowEntry);
-                }
-            }
-        }
-        if (toAdd.isEmpty() && toRemove.isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
-        }
 
-        // create remote backup copies
-        final DeviceId deviceId = did;
-        updateBackup(deviceId, toAdd, toRemove);
+        flowEntriesLock.writeLock().lock();
+        try {
+            for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
+                FlowRule flowRule = batchEntry.getTarget();
+                FlowRuleOperation op = batchEntry.getOperator();
+                if (did == null) {
+                    did = flowRule.deviceId();
+                }
+                if (op.equals(FlowRuleOperation.REMOVE)) {
+                    StoredFlowEntry entry = getFlowEntryInternal(flowRule);
+                    if (entry != null) {
+                        entry.setState(FlowEntryState.PENDING_REMOVE);
+                        toRemove.add(entry);
+                    }
+                } else if (op.equals(FlowRuleOperation.ADD)) {
+                    StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+                    DeviceId deviceId = flowRule.deviceId();
+                    if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+                        flowEntries.put(deviceId, flowEntry);
+                        toAdd.add(flowEntry);
+                    }
+                }
+            }
+            if (toAdd.isEmpty() && toRemove.isEmpty()) {
+                return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
+            }
+
+            // create remote backup copies
+            updateBackup(did, toAdd, toRemove);
+        } finally {
+            flowEntriesLock.writeLock().unlock();
+        }
 
         SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
         final int batchId = localBatchIdGen.incrementAndGet();
@@ -451,27 +464,32 @@
         return null;
     }
 
-    private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
+    private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
         final DeviceId did = rule.deviceId();
 
-        // check if this new rule is an update to an existing entry
-        StoredFlowEntry stored = getFlowEntryInternal(rule);
-        if (stored != null) {
-            stored.setBytes(rule.bytes());
-            stored.setLife(rule.life());
-            stored.setPackets(rule.packets());
-            if (stored.state() == FlowEntryState.PENDING_ADD) {
-                stored.setState(FlowEntryState.ADDED);
-                // update backup.
-                updateBackup(did, Arrays.asList(stored));
-                return new FlowRuleEvent(Type.RULE_ADDED, rule);
+        flowEntriesLock.writeLock().lock();
+        try {
+            // check if this new rule is an update to an existing entry
+            StoredFlowEntry stored = getFlowEntryInternal(rule);
+            if (stored != null) {
+                stored.setBytes(rule.bytes());
+                stored.setLife(rule.life());
+                stored.setPackets(rule.packets());
+                if (stored.state() == FlowEntryState.PENDING_ADD) {
+                    stored.setState(FlowEntryState.ADDED);
+                    // update backup.
+                    updateBackup(did, Arrays.asList(stored));
+                    return new FlowRuleEvent(Type.RULE_ADDED, rule);
+                }
+                return new FlowRuleEvent(Type.RULE_UPDATED, rule);
             }
-            return new FlowRuleEvent(Type.RULE_UPDATED, rule);
-        }
 
-        // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
-        // TODO: also update backup.
-        flowEntries.put(did, new DefaultFlowEntry(rule));
+            // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+            // TODO: also update backup.
+            flowEntries.put(did, new DefaultFlowEntry(rule));
+        } finally {
+            flowEntriesLock.writeLock().unlock();
+        }
         return null;
 
     }
@@ -491,15 +509,20 @@
         return null;
     }
 
-    private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
+    private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
         final DeviceId deviceId = rule.deviceId();
-        // This is where one could mark a rule as removed and still keep it in the store.
-        final boolean removed = flowEntries.remove(deviceId, rule);
-        updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
-        if (removed) {
-            return new FlowRuleEvent(RULE_REMOVED, rule);
-        } else {
-            return null;
+        flowEntriesLock.writeLock().lock();
+        try {
+            // This is where one could mark a rule as removed and still keep it in the store.
+            final boolean removed = flowEntries.remove(deviceId, rule);
+            updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
+            if (removed) {
+                return new FlowRuleEvent(RULE_REMOVED, rule);
+            } else {
+                return null;
+            }
+        } finally {
+            flowEntriesLock.writeLock().unlock();
         }
     }
 
@@ -515,9 +538,9 @@
         notifyDelegate(event);
     }
 
-    private synchronized void loadFromBackup(final DeviceId did) {
-        // should relax synchronized condition
+    private void loadFromBackup(final DeviceId did) {
 
+        flowEntriesLock.writeLock().lock();
         try {
             log.info("Loading FlowRules for {} from backups", did);
             SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
@@ -534,11 +557,19 @@
             }
         } catch (ExecutionException e) {
             log.error("Failed to load backup flowtable for {}", did, e);
+        } finally {
+            flowEntriesLock.writeLock().unlock();
         }
     }
 
-    private synchronized void removeFromPrimary(final DeviceId did) {
-        Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
+    private void removeFromPrimary(final DeviceId did) {
+        Collection<StoredFlowEntry> removed = null;
+        flowEntriesLock.writeLock().lock();
+        try {
+            removed = flowEntries.removeAll(did);
+        } finally {
+            flowEntriesLock.writeLock().unlock();
+        }
         log.debug("removedFromPrimary {}", removed);
     }