DistributedFlowRuleStore related fixes

- handle no master for Device case
- Changed failed item type to FlowRule

Change-Id: If6c85751759cf6ba9ab0ed0384cbe1bf08a5d572
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 18d5f1d..7a917a7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -26,6 +26,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -172,12 +173,36 @@
                     .softValues()
                     .build(new SMapLoader());
 
+        final NodeId local = clusterService.getLocalNode().id();
+
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
 
             @Override
             public void handle(final ClusterMessage message) {
                 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
                 log.info("received batch request {}", operation);
+
+                final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+                ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+                if (!local.equals(replicaInfo.master().orNull())) {
+
+                    Set<FlowRule> failures = new HashSet<>(operation.size());
+                    for (FlowRuleBatchEntry op : operation.getOperations()) {
+                        failures.add(op.getTarget());
+                    }
+                    CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+                    // This node is no longer the master, respond as all failed.
+                    // TODO: we might want to wrap response in envelope
+                    // to distinguish sw programming failure and hand over
+                    // it make sense in the latter case to retry immediately.
+                    try {
+                        message.respond(SERIALIZER.encode(allFailed));
+                    } catch (IOException e) {
+                        log.error("Failed to respond back", e);
+                    }
+                    return;
+                }
+
                 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
 
                 f.addListener(new Runnable() {
@@ -256,6 +281,14 @@
     @Override
     public synchronized FlowEntry getFlowEntry(FlowRule rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", rule);
+            // TODO: revisit if this should be returning null.
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException("No master for " + rule);
+        }
+
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getFlowEntryInternal(rule);
         }
@@ -290,6 +323,14 @@
     public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
 
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", deviceId);
+            // TODO: revisit if this should be returning empty collection.
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException("No master for " + deviceId);
+        }
+
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
             return getFlowEntriesInternal(deviceId);
         }
@@ -329,14 +370,22 @@
     public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
 
         if (operation.getOperations().isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
         }
 
         DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
 
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
 
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("No master for {}", deviceId);
+            // TODO: revisit if this should be "success" from Future point of view
+            // with every FlowEntry failed
+            return Futures.immediateFailedFuture(new IOException("No master to forward to"));
+        }
+
+        final NodeId local = clusterService.getLocalNode().id();
+        if (replicaInfo.master().get().equals(local)) {
             return storeBatchInternal(operation);
         }
 
@@ -344,7 +393,7 @@
                 replicaInfo.master().orNull(), deviceId);
 
         ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
+                local,
                 APPLY_BATCH_FLOWS,
                 SERIALIZER.encode(operation));
 
@@ -367,7 +416,6 @@
         final List<StoredFlowEntry> toAdd = new ArrayList<>();
         DeviceId did = null;
 
-
         for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
             FlowRule flowRule = batchEntry.getTarget();
             FlowRuleOperation op = batchEntry.getOperator();
@@ -390,7 +438,7 @@
             }
         }
         if (toAdd.isEmpty() && toRemove.isEmpty()) {
-            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+            return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
         }
 
         // create remote backup copies
@@ -434,7 +482,8 @@
     @Override
     public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+        final NodeId localId = clusterService.getLocalNode().id();
+        if (localId.equals(replicaInfo.master().orNull())) {
             return addOrUpdateFlowRuleInternal(rule);
         }
 
@@ -471,7 +520,9 @@
     @Override
     public FlowRuleEvent removeFlowRule(FlowEntry rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-        if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+
+        final NodeId localId = clusterService.getLocalNode().id();
+        if (localId.equals(replicaInfo.master().orNull())) {
             // bypass and handle it locally
             return removeFlowRuleInternal(rule);
         }