allow flow to be marked for removal from flow store when device is disconnected

Change-Id: I0f60ff4f010d0d149be31272b9e592c5d812bef9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 3d66386..d6e5d40 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,7 +22,6 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.hazelcast.core.IMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -242,6 +241,7 @@
         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
+        clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
         replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }
@@ -346,20 +346,18 @@
             return;
         }
 
-        DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
+        DeviceId deviceId = operation.deviceId();
 
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
 
         if (!replicaInfo.master().isPresent()) {
-            log.warn("Failed to storeBatch: No master for {}", deviceId);
+            log.warn("No master for {} : flows will be marked for removal", deviceId);
 
-            Set<FlowRule> allFailures = operation.getOperations().stream()
-                    .map(op -> op.getTarget())
-                    .collect(Collectors.toSet());
+            updateStoreInternal(operation);
 
             notifyDelegate(FlowRuleBatchEvent.completed(
                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(false, allFailures, operation.deviceId())));
+                    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
             return;
         }
 
@@ -377,19 +375,12 @@
                 APPLY_BATCH_FLOWS,
                 SERIALIZER.encode(operation));
 
-        //CompletedBatchOperation response;
+
         try {
-            ListenableFuture<byte[]> responseFuture =
-                    clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            /*response =
-                    Futures.transform(responseFuture,
-                                      new DecodeTo<CompletedBatchOperation>(SERIALIZER))
-                            .get(500 * operation.size(), TimeUnit.MILLISECONDS);
 
-            notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
+            clusterCommunicator.unicast(message, replicaInfo.master().get());
 
-        } catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) {
+        } catch (IOException e) {
             log.warn("Failed to storeBatch: {}", e.getMessage());
 
             Set<FlowRule> allFailures = operation.getOperations().stream()
@@ -408,10 +399,23 @@
 
         final DeviceId did = operation.deviceId();
         //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
-        Set<FlowRuleBatchEntry> currentOps;
+        Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
+        if (currentOps.isEmpty()) {
+            batchOperationComplete(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                    new CompletedBatchOperation(true, Collections.emptySet(), did)));
+            return;
+        }
+        updateBackup(did, currentOps);
 
+        notifyDelegate(FlowRuleBatchEvent.requested(new
+                           FlowRuleBatchRequest(operation.id(),
+                                                currentOps), operation.deviceId()));
 
-        currentOps = operation.getOperations().stream().map(
+    }
+
+    private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
+        return operation.getOperations().stream().map(
                 op -> {
                     StoredFlowEntry entry;
                     switch (op.getOperator()) {
@@ -439,19 +443,6 @@
                     return null;
                 }
         ).filter(op -> op != null).collect(Collectors.toSet());
-        if (currentOps.isEmpty()) {
-            batchOperationComplete(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), did)));
-            return;
-        }
-        updateBackup(did, currentOps);
-
-
-        notifyDelegate(FlowRuleBatchEvent.requested(new
-             FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
-
-
     }
 
     private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
@@ -581,7 +572,7 @@
                         clusterService.getLocalNode().id(),
                         REMOTE_APPLY_COMPLETED,
                         SERIALIZER.encode(event));
-                clusterCommunicator.sendAndReceive(message, nodeId);
+                clusterCommunicator.unicast(message, nodeId);
             } catch (IOException e) {
                 log.warn("Failed to respond to peer for batch operation result");
             }