FlowRuleStore: Consider errors when updating state of all current backups

Change-Id: I3bf4d20d79dc37c7040648ec6379794b8c93aad2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 3b82655..888f95b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -290,7 +290,7 @@
         clusterCommunicator.addSubscriber(
                 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
         clusterCommunicator.addSubscriber(
-                FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
+                FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
@@ -644,21 +644,33 @@
 
         private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
             log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
-            Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
+            Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
                     Maps.newConcurrentMap();
             flowEntries.forEach((key, value) -> {
                 if (deviceIds.contains(key)) {
                     deviceFlowEntries.put(key, value);
                 }
             });
-            clusterCommunicator.unicast(deviceFlowEntries,
-                    FLOW_TABLE_BACKUP,
-                    SERIALIZER::encode,
-                    nodeId);
-            deviceIds.forEach(id -> {
-                lastBackupTimes.put(id, System.currentTimeMillis());
-                lastBackupNodes.put(id, nodeId);
-            });
+            clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
+                                        deviceFlowEntries,
+                                        FLOW_TABLE_BACKUP,
+                                        SERIALIZER::encode,
+                                        SERIALIZER::decode,
+                                        nodeId)
+                               .whenComplete((backedupDevices, error) -> {
+                                   Set<DeviceId> devicesNotBackedup = error != null ?
+                                           deviceFlowEntries.keySet() :
+                                           Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
+                                   if (devicesNotBackedup.size() > 0) {
+                                       log.warn("Failed to backup devices: {}", devicesNotBackedup, error);
+                                   }
+                                   if (backedupDevices != null) {
+                                       backedupDevices.forEach(id -> {
+                                           lastBackupTimes.put(id, System.currentTimeMillis());
+                                           lastBackupNodes.put(id, nodeId);
+                                       });
+                                   }
+                               });
         }
 
         /**
@@ -751,16 +763,23 @@
             }
         }
 
-        private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
+        private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
             log.debug("Received flowEntries for {} to backup", flowTables.keySet());
-            Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
-            // Only process those devices are that not managed by the local node.
-            Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
+            Set<DeviceId> backedupDevices = Sets.newHashSet();
+            try {
+                Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
+                // Only process those devices are that not managed by the local node.
+                Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
                 .forEach((deviceId, flowTable) -> {
                     Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
                     deviceFlowTable.clear();
                     deviceFlowTable.putAll(flowTable);
+                    backedupDevices.add(deviceId);
                 });
+            } catch (Exception e) {
+                log.warn("Failure processing backup request", e);
+            }
+            return backedupDevices;
         }
     }
 }