FlowRuleStore: Configurable backup count

Change-Id: Ida4d3669e28e66350f4809539a48a456b6ec43c7
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index b3a2e36..ed68887 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -16,90 +16,92 @@
 package org.onosproject.store.flow.impl;
 
  import com.google.common.collect.ImmutableList;
- import com.google.common.collect.ImmutableMap;
- import com.google.common.collect.Iterables;
- import com.google.common.collect.Maps;
- import com.google.common.collect.Sets;
- import com.google.common.util.concurrent.Futures;
- import org.apache.felix.scr.annotations.Activate;
- import org.apache.felix.scr.annotations.Component;
- import org.apache.felix.scr.annotations.Deactivate;
- import org.apache.felix.scr.annotations.Modified;
- import org.apache.felix.scr.annotations.Property;
- import org.apache.felix.scr.annotations.Reference;
- import org.apache.felix.scr.annotations.ReferenceCardinality;
- import org.apache.felix.scr.annotations.Service;
- import org.onlab.util.KryoNamespace;
- import org.onlab.util.Tools;
- import org.onosproject.cfg.ComponentConfigService;
- import org.onosproject.cluster.ClusterService;
- import org.onosproject.cluster.NodeId;
- import org.onosproject.core.CoreService;
- import org.onosproject.core.IdGenerator;
- import org.onosproject.mastership.MastershipService;
- import org.onosproject.net.DeviceId;
- import org.onosproject.net.device.DeviceService;
- import org.onosproject.net.flow.CompletedBatchOperation;
- import org.onosproject.net.flow.DefaultFlowEntry;
- import org.onosproject.net.flow.FlowEntry;
- import org.onosproject.net.flow.FlowEntry.FlowEntryState;
- import org.onosproject.net.flow.FlowId;
- import org.onosproject.net.flow.FlowRule;
- import org.onosproject.net.flow.FlowRuleBatchEntry;
- import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
- import org.onosproject.net.flow.FlowRuleBatchEvent;
- import org.onosproject.net.flow.FlowRuleBatchOperation;
- import org.onosproject.net.flow.FlowRuleBatchRequest;
- import org.onosproject.net.flow.FlowRuleEvent;
- import org.onosproject.net.flow.FlowRuleEvent.Type;
- import org.onosproject.net.flow.FlowRuleService;
- import org.onosproject.net.flow.FlowRuleStore;
- import org.onosproject.net.flow.FlowRuleStoreDelegate;
- import org.onosproject.net.flow.StoredFlowEntry;
- import org.onosproject.net.flow.TableStatisticsEntry;
- import org.onosproject.persistence.PersistenceService;
- import org.onosproject.store.AbstractStore;
- import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
- import org.onosproject.store.cluster.messaging.ClusterMessage;
- import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
- import org.onosproject.store.flow.ReplicaInfoEvent;
- import org.onosproject.store.flow.ReplicaInfoEventListener;
- import org.onosproject.store.flow.ReplicaInfoService;
- import org.onosproject.store.impl.MastershipBasedTimestamp;
- import org.onosproject.store.serializers.KryoNamespaces;
- import org.onosproject.store.serializers.StoreSerializer;
- import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
- import org.onosproject.store.service.EventuallyConsistentMap;
- import org.onosproject.store.service.EventuallyConsistentMapEvent;
- import org.onosproject.store.service.EventuallyConsistentMapListener;
- import org.onosproject.store.service.Serializer;
- import org.onosproject.store.service.StorageService;
- import org.onosproject.store.service.WallClockTimestamp;
- import org.osgi.service.component.ComponentContext;
- import org.slf4j.Logger;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.CompletedBatchOperation;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowEntry.FlowEntryState;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleBatchRequest;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleEvent.Type;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.FlowRuleStore;
+import org.onosproject.net.flow.FlowRuleStoreDelegate;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfoEvent;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
+import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
 
  import java.util.Collections;
- import java.util.Dictionary;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.Set;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledFuture;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicReference;
- import java.util.stream.Collectors;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
  import static com.google.common.base.Strings.isNullOrEmpty;
- import static org.onlab.util.Tools.get;
- import static org.onlab.util.Tools.groupedThreads;
- import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
- import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
- import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
+import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -114,6 +116,7 @@
 
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
     private static final boolean DEFAULT_BACKUP_ENABLED = true;
+    private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
     private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
@@ -126,7 +129,7 @@
 
     @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
             label = "Indicates whether backups are enabled or not")
-    private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
+    private volatile boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
 
     @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
             label = "Delay in ms between successive backup runs")
@@ -135,6 +138,10 @@
             label = "Indicates whether or not changes in the flow table should be persisted to disk.")
     private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
 
+    @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
+            label = "Max number of backup copies for each device")
+    private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
+
     private InternalFlowTable flowTable = new InternalFlowTable();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -255,6 +262,7 @@
         int newPoolSize;
         boolean newBackupEnabled;
         int newBackupPeriod;
+        int newBackupCount;
         try {
             String s = get(properties, "msgHandlerPoolSize");
             newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -265,10 +273,13 @@
             s = get(properties, "backupPeriod");
             newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
 
+            s = get(properties, "backupCount");
+            newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
         } catch (NumberFormatException | ClassCastException e) {
             newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
             newBackupEnabled = DEFAULT_BACKUP_ENABLED;
             newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+            newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
         }
 
         boolean restartBackupTask = false;
@@ -310,6 +321,9 @@
             registerMessageHandlers(messageHandlingExecutor);
             oldMsgHandler.shutdown();
         }
+        if (backupCount != newBackupCount) {
+            backupCount = newBackupCount;
+        }
         logConfig("Reconfigured");
     }
 
@@ -340,8 +354,8 @@
     }
 
     private void logConfig(String prefix) {
-        log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
-                 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
+        log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}, backupCount = {}",
+                 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod, backupCount);
     }
 
     // This is not a efficient operation on a distributed sharded
@@ -652,15 +666,40 @@
         }
     }
 
+    private class BackupOperation {
+        private final NodeId nodeId;
+        private final DeviceId deviceId;
+
+        public BackupOperation(NodeId nodeId, DeviceId deviceId) {
+            this.nodeId = nodeId;
+            this.deviceId = deviceId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(nodeId, deviceId);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other != null && other instanceof BackupOperation) {
+                BackupOperation that = (BackupOperation) other;
+                return this.nodeId.equals(that.nodeId) &&
+                        this.deviceId.equals(that.deviceId);
+            } else {
+                return false;
+            }
+        }
+    }
+
     private class InternalFlowTable implements ReplicaInfoEventListener {
 
         //TODO replace the Map<V,V> with ExtendedSet
         private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
                 flowEntries = Maps.newConcurrentMap();
 
-        private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
+        private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
         private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
-        private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
 
         @Override
         public void event(ReplicaInfoEvent event) {
@@ -668,41 +707,14 @@
         }
 
         private void handleEvent(ReplicaInfoEvent event) {
-            if (!backupEnabled) {
+            DeviceId deviceId = event.subject();
+            if (!backupEnabled || !mastershipService.isLocalMaster(deviceId)) {
                 return;
             }
-            if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
-                DeviceId deviceId = event.subject();
-                NodeId master = mastershipService.getMasterFor(deviceId);
-                if (!Objects.equals(local, master)) {
-                 // ignore since this event is for a device this node does not manage.
-                    return;
-                }
-                NodeId newBackupNode = getBackupNode(deviceId);
-                NodeId currentBackupNode = lastBackupNodes.get(deviceId);
-                if (Objects.equals(newBackupNode, currentBackupNode)) {
-                    // ignore since backup location hasn't changed.
-                    return;
-                }
-                if (currentBackupNode != null && newBackupNode == null) {
-                    // Current backup node is most likely down and no alternate backup node
-                    // has been chosen. Clear current backup location so that we can resume
-                    // backups when either current backup comes online or a different backup node
-                    // is chosen.
-                    log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
-                            + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
-                    lastBackupNodes.remove(deviceId);
-                    lastBackupTimes.remove(deviceId);
-                    return;
-                    // TODO: Pick any available node as backup and ensure hand-off occurs when
-                    // a new master is elected.
-                }
-                log.debug("Backup location for {} has changed from {} to {}.",
-                        deviceId, currentBackupNode, newBackupNode);
-                backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
-                        0,
-                        TimeUnit.SECONDS);
+            if (event.type() == MASTER_CHANGED) {
+                lastUpdateTimes.put(deviceId, System.currentTimeMillis());
             }
+            backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
         }
 
         private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
@@ -715,7 +727,7 @@
             if (deviceIds.isEmpty()) {
                 return;
             }
-            log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
+            log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
             Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
                     deviceFlowEntries = Maps.newConcurrentMap();
             deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
@@ -737,8 +749,7 @@
                         }
                         if (backedupDevices != null) {
                             backedupDevices.forEach(id -> {
-                                lastBackupTimes.put(id, System.currentTimeMillis());
-                                lastBackupNodes.put(id, nodeId);
+                                lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
                             });
                         }
                     });
@@ -836,10 +847,11 @@
             flowEntries.remove(deviceId);
         }
 
-        private NodeId getBackupNode(DeviceId deviceId) {
-            List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            // pick the standby which is most likely to become next master
-            return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
+        private List<NodeId> getBackupNodes(DeviceId deviceId) {
+            // The returned backup node list is in the order of preference i.e. next likely master first.
+            List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
+            return ImmutableList.copyOf(allPossibleBackupNodes)
+                                .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
         }
 
         private void backup() {
@@ -847,29 +859,17 @@
                 return;
             }
             try {
-                // determine the set of devices that we need to backup during this run.
-                Set<DeviceId> devicesToBackup = flowEntries.keySet()
-                            .stream()
-                            .filter(mastershipService::isLocalMaster)
-                            .filter(deviceId -> {
-                                Long lastBackupTime = lastBackupTimes.get(deviceId);
-                                Long lastUpdateTime = lastUpdateTimes.get(deviceId);
-                                NodeId lastBackupNode = lastBackupNodes.get(deviceId);
-                                NodeId newBackupNode = getBackupNode(deviceId);
-                                return lastBackupTime == null
-                                        ||  !Objects.equals(lastBackupNode, newBackupNode)
-                                        || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
-                            })
-                            .collect(Collectors.toSet());
-
                 // compute a mapping from node to the set of devices whose flow entries it should backup
                 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
-                devicesToBackup.forEach(deviceId -> {
-                    NodeId backupLocation = getBackupNode(deviceId);
-                    if (backupLocation != null) {
-                        devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
-                                             .add(deviceId);
-                    }
+                flowEntries.keySet().stream().forEach(deviceId -> {
+                    List<NodeId> backupNodes = getBackupNodes(deviceId);
+                    backupNodes.forEach(backupNode -> {
+                            if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
+                                    < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
+                                devicesToBackupByNode.computeIfAbsent(backupNode,
+                                                                      nodeId -> Sets.newHashSet()).add(deviceId);
+                            }
+                    });
                 });
                 // send the device flow entries to their respective backup nodes
                 devicesToBackupByNode.forEach(this::sendBackups);