DistributedFlowRuleStore related fixes

- handle no master for Device case
- Changed failed item type to FlowRule

Change-Id: If6c85751759cf6ba9ab0ed0384cbe1bf08a5d572
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index e78ebe0..3758ea9 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -19,13 +19,13 @@
 
 import com.google.common.collect.ImmutableSet;
 
-public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
+public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
 
 
     private final boolean success;
-    private final Set<FlowEntry> failures;
+    private final Set<FlowRule> failures;
 
-    public CompletedBatchOperation(boolean success, Set<FlowEntry> failures) {
+    public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
         this.success = success;
         this.failures = ImmutableSet.copyOf(failures);
     }
@@ -36,7 +36,7 @@
     }
 
     @Override
-    public Set<FlowEntry> failedItems() {
+    public Set<FlowRule> failedItems() {
         return failures;
     }
 
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 3791296..4fe9022 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -395,7 +395,7 @@
 
                 break;
             case BATCH_OPERATION_COMPLETED:
-                Set<FlowEntry> failedItems = event.result().failedItems();
+                Set<FlowRule> failedItems = event.result().failedItems();
                 for (FlowEntry entry : request.toAdd()) {
                     if (!failedItems.contains(entry)) {
                         eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
@@ -463,7 +463,7 @@
             }
 
             boolean success = true;
-            Set<FlowEntry> failed = Sets.newHashSet();
+            Set<FlowRule> failed = Sets.newHashSet();
             CompletedBatchOperation completed;
             for (Future<CompletedBatchOperation> future : futures) {
                 completed = future.get();
@@ -483,7 +483,7 @@
                 return overall;
             }
             boolean success = true;
-            Set<FlowEntry> failed = Sets.newHashSet();
+            Set<FlowRule> failed = Sets.newHashSet();
             CompletedBatchOperation completed;
             long start = System.nanoTime();
             long end = start + unit.toNanos(timeout);
@@ -497,7 +497,7 @@
             return finalizeBatchOperation(success, failed);
         }
 
-        private boolean validateBatchOperation(Set<FlowEntry> failed,
+        private boolean validateBatchOperation(Set<FlowRule> failed,
                 CompletedBatchOperation completed) {
 
             if (isCancelled()) {
@@ -519,7 +519,7 @@
         }
 
         private CompletedBatchOperation finalizeBatchOperation(boolean success,
-                Set<FlowEntry> failed) {
+                Set<FlowRule> failed) {
             synchronized (this) {
                 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
                     if (state.get() == BatchState.FINISHED) {
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 2bb0965..b986d6d 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -555,7 +555,7 @@
             @Override
             public CompletedBatchOperation get()
                     throws InterruptedException, ExecutionException {
-                return new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet());
+                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
             }
 
             @Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 18d5f1d..7a917a7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -26,6 +26,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -172,12 +173,36 @@
                     .softValues()
                     .build(new SMapLoader());
 
+        final NodeId local = clusterService.getLocalNode().id();
+
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
 
             @Override
             public void handle(final ClusterMessage message) {
                 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
                 log.info("received batch request {}", operation);
+
+                final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+                ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+                if (!local.equals(replicaInfo.master().orNull())) {
+
+                    Set<FlowRule> failures = new HashSet<>(operation.size());
+                    for (FlowRuleBatchEntry op : operation.getOperations()) {
+                        failures.add(op.getTarget());
+                    }
+                    CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+                    // This node is no longer the master, respond as all failed.
+                    // TODO: we might want to wrap response in envelope
+                    // to distinguish sw programming failure and hand over
+                    // it make sense in the latter case to retry immediately.
+                    try {
+                        message.respond(SERIALIZER.encode(allFailed));
+                    } catch (IOException e) {
+                        log.error("Failed to respond back", e);
+                    }
+                    return;
+                }
+
                 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
 
                 f.addListener(new Runnable() {
@@ -256,6 +281,14 @@
     @Override
     public synchronized FlowEntry getFlowEntry(FlowRule rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", rule);
+            // TODO: revisit if this should be returning null.
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException("No master for " + rule);
+        }
+
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getFlowEntryInternal(rule);
         }
@@ -290,6 +323,14 @@
     public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
 
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", deviceId);
+            // TODO: revisit if this should be returning empty collection.
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException("No master for " + deviceId);
+        }
+
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getFlowEntriesInternal(deviceId);
         }
@@ -329,14 +370,22 @@
     public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
 
         if (operation.getOperations().isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
         }
 
         DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
 
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
 
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", deviceId);
+            // TODO: revisit if this should be "success" from Future point of view
+            // with every FlowEntry failed
+            return Futures.immediateFailedFuture(new IOException("No master to forward to"));
+        }
+
+        final NodeId local = clusterService.getLocalNode().id();
+        if (replicaInfo.master().get().equals(local)) {
             return storeBatchInternal(operation);
         }
 
@@ -344,7 +393,7 @@
                 replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
+                local,
                 APPLY_BATCH_FLOWS,
                 SERIALIZER.encode(operation));
 
@@ -367,7 +416,6 @@
         final List<StoredFlowEntry> toAdd = new ArrayList<>();
         DeviceId did = null;
 
-
         for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
             FlowRule flowRule = batchEntry.getTarget();
             FlowRuleOperation op = batchEntry.getOperator();
@@ -390,7 +438,7 @@
             }
         }
         if (toAdd.isEmpty() && toRemove.isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
         }
 
         // create remote backup copies
@@ -434,7 +482,8 @@
     @Override
     public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+        final NodeId localId = clusterService.getLocalNode().id();
+        if (localId.equals(replicaInfo.master().orNull())) {
             return addOrUpdateFlowRuleInternal(rule);
         }
 
@@ -471,7 +520,9 @@
     @Override
     public FlowRuleEvent removeFlowRule(FlowEntry rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+
+        final NodeId localId = clusterService.getLocalNode().id();
+        if (localId.equals(replicaInfo.master().orNull())) {
             // bypass and handle it locally
             return removeFlowRuleInternal(rule);
         }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index bd28e47..a7a3a02 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -28,6 +28,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowRule;
@@ -191,7 +192,14 @@
 
     @Override
     public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+        final DeviceId deviceId = connectPoint.deviceId();
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", deviceId);
+            // TODO: revisit if this should be returning empty collection.
+            // FIXME: throw a StatsStoreException
+            throw new RuntimeException("No master for " + deviceId);
+        }
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getCurrentStatisticInternal(connectPoint);
         } else {
@@ -219,7 +227,14 @@
 
     @Override
     public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
-        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId());
+        final DeviceId deviceId = connectPoint.deviceId();
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", deviceId);
+            // TODO: revisit if this should be returning empty collection.
+            // FIXME: throw a StatsStoreException
+            throw new RuntimeException("No master for " + deviceId);
+        }
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getPreviousStatisticInternal(connectPoint);
         } else {