[CORD-3119] Fix missing flows on node restart

Change-Id: I51cf3ee0682873beb7f9334dac1e77ed20022bfb
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 3080615..8efb40a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -181,14 +181,16 @@
 
     protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
         .register(KryoNamespaces.API)
+        .register(BucketId.class)
         .register(FlowBucket.class)
         .build());
 
     protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
-            .register(KryoNamespaces.API)
-            .register(MastershipBasedTimestamp.class);
+        .register(KryoNamespaces.API)
+        .register(BucketId.class)
+        .register(MastershipBasedTimestamp.class);
 
-    private EventuallyConsistentMap<DeviceId, Map<Integer, Integer>> flowCounts;
+    private EventuallyConsistentMap<BucketId, Integer> flowCounts;
 
     private IdGenerator idGenerator;
     private NodeId local;
@@ -215,7 +217,7 @@
                 backupPeriod,
                 TimeUnit.MILLISECONDS);
 
-        flowCounts = storageService.<DeviceId, Map<Integer, Integer>>eventuallyConsistentMapBuilder()
+        flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
                 .withName("onos-flow-counts")
                 .withSerializer(serializerBuilder)
                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
@@ -338,9 +340,6 @@
                  prefix, msgHandlerPoolSize, backupPeriod, backupCount);
     }
 
-    // This is not a efficient operation on a distributed sharded
-    // flow store. We need to revisit the need for this operation or at least
-    // make it device specific.
     @Override
     public int getFlowRuleCount() {
         return Streams.stream(deviceService.getDevices()).parallel()
@@ -350,11 +349,10 @@
 
     @Override
     public int getFlowRuleCount(DeviceId deviceId) {
-        Map<Integer, Integer> counts = flowCounts.get(deviceId);
-        return counts != null
-            ? counts.values().stream().mapToInt(v -> v).sum()
-            : flowTable.flowEntries.get(deviceId) != null
-            ? flowTable.flowEntries.get(deviceId).keySet().size() : 0;
+        return flowCounts.entrySet().stream()
+            .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+            .mapToInt(entry -> entry.getValue())
+            .sum();
     }
 
     @Override
@@ -658,43 +656,95 @@
         }
     }
 
+    /**
+     * Represents a backup to a of a distinct bucket to a distinct node.
+     */
     private class BackupOperation {
         private final NodeId nodeId;
-        private final DeviceId deviceId;
-        private final int bucket;
+        private final BucketId bucketId;
 
-        public BackupOperation(NodeId nodeId, DeviceId deviceId, int bucket) {
+        BackupOperation(NodeId nodeId, BucketId bucketId) {
             this.nodeId = nodeId;
-            this.deviceId = deviceId;
-            this.bucket = bucket;
+            this.bucketId = bucketId;
+        }
+
+        NodeId nodeId() {
+            return nodeId;
+        }
+
+        BucketId bucketId() {
+            return bucketId;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(nodeId, deviceId, bucket);
+            return Objects.hash(nodeId, bucketId);
         }
 
         @Override
         public boolean equals(Object other) {
             if (other != null && other instanceof BackupOperation) {
                 BackupOperation that = (BackupOperation) other;
-                return this.nodeId.equals(that.nodeId) &&
-                        this.deviceId.equals(that.deviceId) &&
-                        this.bucket == that.bucket;
-            } else {
-                return false;
+                return this.nodeId.equals(that.nodeId)
+                    && this.bucketId.equals(that.bucketId);
             }
+            return false;
         }
     }
 
-    private class FlowBucket {
+    /**
+     * Represents a distinct device flow bucket.
+     */
+    private class BucketId {
         private final DeviceId deviceId;
         private final int bucket;
-        private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
 
-        FlowBucket(DeviceId deviceId, int bucket, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+        BucketId(DeviceId deviceId, int bucket) {
             this.deviceId = deviceId;
             this.bucket = bucket;
+        }
+
+        DeviceId deviceId() {
+            return deviceId;
+        }
+
+        int bucket() {
+            return bucket;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(deviceId, bucket);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other != null && other instanceof BucketId) {
+                BucketId that = (BucketId) other;
+                return this.deviceId.equals(that.deviceId)
+                    && this.bucket == that.bucket;
+            }
+            return false;
+        }
+    }
+
+    /**
+     * Container for flows in a specific bucket.
+     */
+    private class FlowBucket {
+        private final BucketId bucketId;
+        private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
+
+        BucketId bucketId() {
+            return bucketId;
+        }
+
+        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table() {
+            return table;
+        }
+
+        FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table) {
+            this.bucketId = bucketId;
             this.table = table;
         }
     }
@@ -706,8 +756,8 @@
                 flowEntries = Maps.newConcurrentMap();
 
         private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
-        private final Map<DeviceId, Map<Integer, Long>> lastUpdateTimes = Maps.newConcurrentMap();
-        private final Map<NodeId, Set<DeviceId>> inFlightUpdates = Maps.newConcurrentMap();
+        private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
+        private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
 
         @Override
         public void event(ReplicaInfoEvent event) {
@@ -721,49 +771,12 @@
             }
             if (event.type() == MASTER_CHANGED) {
                 for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
-                    recordUpdate(deviceId, bucket);
+                    recordUpdate(new BucketId(deviceId, bucket));
                 }
             }
             backupSenderExecutor.execute(this::backup);
         }
 
-        private CompletableFuture<Void> backupFlowEntries(
-            NodeId nodeId, DeviceId deviceId, int bucket, long timestamp) {
-            log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.", bucket, deviceId, nodeId);
-            FlowBucket flowBucket = getFlowBucket(deviceId, bucket);
-            int flowCount = flowBucket.table.entrySet().stream()
-                .mapToInt(e -> e.getValue().values().size()).sum();
-            flowCounts.compute(deviceId, (id, counts) -> {
-                if (counts == null) {
-                    counts = Maps.newConcurrentMap();
-                }
-                counts.put(bucket, flowCount);
-                return counts;
-            });
-
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            clusterCommunicator.<FlowBucket, Set<FlowId>>
-                    sendAndReceive(flowBucket,
-                                   FLOW_TABLE_BACKUP,
-                                   serializer::encode,
-                                   serializer::decode,
-                                   nodeId)
-                    .whenComplete((backedupFlows, error) -> {
-                        Set<FlowId> flowsNotBackedUp = error != null ?
-                            flowBucket.table.keySet() :
-                            Sets.difference(flowBucket.table.keySet(), backedupFlows);
-                        if (flowsNotBackedUp.size() > 0) {
-                            log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
-                                     flowsNotBackedUp, error != null ? error.getMessage() : "none", nodeId);
-                        }
-                        if (backedupFlows != null) {
-                            lastBackupTimes.put(new BackupOperation(nodeId, deviceId, bucket), timestamp);
-                        }
-                        future.complete(null);
-                    });
-            return future;
-        }
-
         /**
          * Returns the flow table for specified device.
          *
@@ -775,49 +788,34 @@
                 return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
                         .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
                         .withName("FlowTable:" + deviceId.toString())
-                        .withSerializer(new Serializer() {
-                            @Override
-                            public <T> byte[] encode(T object) {
-                                return serializer.encode(object);
-                            }
-
-                            @Override
-                            public <T> T decode(byte[] bytes) {
-                                return serializer.decode(bytes);
-                            }
-
-                            @Override
-                            public <T> T copy(T object) {
-                                return serializer.copy(object);
-                            }
-                        })
+                        .withSerializer(serializer)
                         .build());
             } else {
                 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
             }
         }
 
-        private FlowBucket getFlowBucket(DeviceId deviceId, int bucket) {
+        private FlowBucket getFlowBucket(BucketId bucketId) {
             if (persistenceEnabled) {
-                return new FlowBucket(deviceId, bucket, flowEntries.computeIfAbsent(deviceId, id ->
+                return new FlowBucket(bucketId, flowEntries.computeIfAbsent(bucketId.deviceId(), id ->
                     persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
-                        .withName("FlowTable:" + deviceId.toString())
+                        .withName("FlowTable:" + bucketId.deviceId().toString())
                         .withSerializer(serializer)
                         .build())
                     .entrySet()
                     .stream()
-                    .filter(entry -> isBucket(entry.getKey(), bucket))
+                    .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
                     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
             } else {
                 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
-                flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
+                flowEntries.computeIfAbsent(bucketId.deviceId(), id -> Maps.newConcurrentMap())
                     .entrySet()
                     .stream()
-                    .filter(entry -> isBucket(entry.getKey(), bucket))
+                    .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
                     .forEach(entry -> {
                         copy.put(entry.getKey(), Maps.newHashMap(entry.getValue()));
                     });
-                return new FlowBucket(deviceId, bucket, copy);
+                return new FlowBucket(bucketId, copy);
             }
         }
 
@@ -843,7 +841,7 @@
             return getFlowEntriesInternal(deviceId);
         }
 
-        private boolean isBucket(FlowId flowId, int bucket) {
+        private boolean isInBucket(FlowId flowId, int bucket) {
             return bucket(flowId) == bucket;
         }
 
@@ -851,9 +849,14 @@
             return (int) (flowId.id() % NUM_BUCKETS);
         }
 
-        private void recordUpdate(DeviceId deviceId, int bucket) {
-            lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
-                .put(bucket, System.currentTimeMillis());
+        private void recordUpdate(BucketId bucketId) {
+            lastUpdateTimes.put(bucketId, System.currentTimeMillis());
+            FlowBucket flowBucket = getFlowBucket(bucketId);
+            int flowCount = flowBucket.table().entrySet()
+                .stream()
+                .mapToInt(e -> e.getValue().values().size())
+                .sum();
+            flowCounts.put(bucketId, flowCount);
         }
 
         public void add(FlowEntry rule) {
@@ -861,7 +864,7 @@
                     .compute((StoredFlowEntry) rule, (k, stored) -> {
                         return (StoredFlowEntry) rule;
                     });
-            recordUpdate(rule.deviceId(), bucket(rule.id()));
+            recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
         }
 
         public void update(FlowEntry rule) {
@@ -872,7 +875,7 @@
                         if (stored instanceof DefaultFlowEntry) {
                             DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
                             if (updated.created() >= storedEntry.created()) {
-                                recordUpdate(rule.deviceId(), bucket(rule.id()));
+                                recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
                                 return updated;
                             } else {
                                 log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
@@ -907,7 +910,7 @@
             });
 
             if (removedRule.get() != null) {
-                recordUpdate(rule.deviceId(), bucket(rule.id()));
+                recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
                 return removedRule.get();
             } else {
                 return null;
@@ -955,41 +958,78 @@
         private void backup(NodeId nodeId, DeviceId deviceId) {
             final long timestamp = System.currentTimeMillis();
             for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
-                long lastBackupTime = lastBackupTimes.getOrDefault(new BackupOperation(nodeId, deviceId, bucket), 0L);
-                long lastUpdateTime = lastUpdateTimes.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap())
-                    .getOrDefault(bucket, 0L);
-                if (lastBackupTime < lastUpdateTime && startBackup(nodeId, deviceId)) {
-                    backupFlowEntries(nodeId, deviceId, bucket, timestamp)
-                        .thenRunAsync(() -> {
-                            finishBackup(nodeId, deviceId);
-                            backup(nodeId);
-                        }, backupSenderExecutor);
-                    return;
+                BucketId bucketId = new BucketId(deviceId, bucket);
+                BackupOperation operation = new BackupOperation(nodeId, bucketId);
+                if (startBackup(operation)) {
+                    backup(operation).whenCompleteAsync((succeeded, error) -> {
+                        if (error == null && succeeded) {
+                            succeedBackup(operation, timestamp);
+                        } else {
+                            failBackup(operation);
+                        }
+                        backup(nodeId);
+                    }, backupSenderExecutor);
                 }
             }
         }
 
-        private boolean startBackup(NodeId nodeId, DeviceId deviceId) {
-            return inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).add(deviceId);
+        private boolean startBackup(BackupOperation operation) {
+            long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
+            long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
+            return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
         }
 
-        private void finishBackup(NodeId nodeId, DeviceId deviceId) {
-            inFlightUpdates.computeIfAbsent(nodeId, id -> Sets.newConcurrentHashSet()).remove(deviceId);
+        private void failBackup(BackupOperation operation) {
+            inFlightUpdates.remove(operation);
         }
 
-        private Set<FlowId> onBackupReceipt(FlowBucket bucket) {
-            log.debug("Received flowEntries for {} bucket {} to backup", bucket.deviceId, bucket.bucket);
+        private void succeedBackup(BackupOperation operation, long timestamp) {
+            inFlightUpdates.remove(operation);
+            lastBackupTimes.put(operation, timestamp);
+        }
+
+        private CompletableFuture<Boolean> backup(BackupOperation operation) {
+            log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
+                operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
+            FlowBucket flowBucket = getFlowBucket(operation.bucketId());
+
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            clusterCommunicator.<FlowBucket, Set<FlowId>>
+                sendAndReceive(flowBucket,
+                FLOW_TABLE_BACKUP,
+                serializer::encode,
+                serializer::decode,
+                operation.nodeId())
+                .whenComplete((backedupFlows, error) -> {
+                    Set<FlowId> flowsNotBackedUp = error != null ?
+                        flowBucket.table().keySet() :
+                        Sets.difference(flowBucket.table().keySet(), backedupFlows);
+                    if (flowsNotBackedUp.size() > 0) {
+                        log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
+                            flowsNotBackedUp, error != null ? error.getMessage() : "none", operation.nodeId());
+                    }
+                    future.complete(backedupFlows != null);
+                });
+            return future;
+        }
+
+        private Set<FlowId> onBackupReceipt(FlowBucket flowBucket) {
+            log.debug("Received flowEntries for {} bucket {} to backup",
+                flowBucket.bucketId().deviceId(), flowBucket.bucketId);
             Set<FlowId> backedupFlows = Sets.newHashSet();
             try {
                 // Only process those devices are that not managed by the local node.
-                NodeId master = replicaInfoManager.getReplicaInfoFor(bucket.deviceId).master().orElse(null);
+                NodeId master = replicaInfoManager.getReplicaInfoFor(flowBucket.bucketId().deviceId())
+                    .master()
+                    .orElse(null);
                 if (!Objects.equals(local, master)) {
-                    Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable = getFlowTable(bucket.deviceId);
-                    backupFlowTable.putAll(bucket.table);
+                    Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
+                        getFlowTable(flowBucket.bucketId().deviceId());
+                    backupFlowTable.putAll(flowBucket.table());
                     backupFlowTable.entrySet()
-                        .removeIf(entry -> isBucket(entry.getKey(), bucket.bucket)
-                            && !bucket.table.containsKey(entry.getKey()));
-                    backedupFlows.addAll(bucket.table.keySet());
+                        .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
+                            && !flowBucket.table().containsKey(entry.getKey()));
+                    backedupFlows.addAll(flowBucket.table().keySet());
                 }
             } catch (Exception e) {
                 log.warn("Failure processing backup request", e);