Configuration options for disabling tombstones in ECMap + disabling backups in Dist flow rule store

Change-Id: I28b17f3d0bb7f5ba87a541b7f6337c3c1b587d36
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 8111db8..2987529 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -107,6 +107,7 @@
     private long initialDelaySec = 5;
     private long periodSec = 5;
     private boolean lightweightAntiEntropy = true;
+    private boolean tombstonesDisabled = false;
 
     private static final int WINDOW_SIZE = 5;
     private static final int HIGH_LOAD_THRESHOLD = 0;
@@ -223,6 +224,11 @@
                      .collect(Collectors.toList()));
     }
 
+    public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) {
+        tombstonesDisabled = status;
+        return this;
+    }
+
     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
         return new KryoSerializer() {
             @Override
@@ -379,14 +385,18 @@
             return false;
         }
 
-        Timestamp removedTimestamp = removedItems.get(key);
-        if (removedTimestamp == null) {
-            return removedItems.putIfAbsent(key, timestamp) == null;
-        } else if (timestamp.isNewerThan(removedTimestamp)) {
-            return removedItems.replace(key, removedTimestamp, timestamp);
-        } else {
-            return false;
+        if (!tombstonesDisabled) {
+            Timestamp removedTimestamp = removedItems.get(key);
+            if (removedTimestamp == null) {
+                return removedItems.putIfAbsent(key, timestamp) == null;
+            } else if (timestamp.isNewerThan(removedTimestamp)) {
+                return removedItems.replace(key, removedTimestamp, timestamp);
+            } else {
+                return false;
+            }
         }
+
+        return updated.booleanValue();
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 8a80cc9..d972d2b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -152,7 +152,7 @@
     @Activate
     public void activate() {
 
-        flowTable = new InternalFlowTable();
+        flowTable = new InternalFlowTable(); // .withBackupsEnabled(false);
 
         idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
 
@@ -616,6 +616,18 @@
 
     private class InternalFlowTable {
 
+        private boolean backupsEnabled = true;
+
+        /**
+         * Turns backups on or off.
+         * @param backupsEnabled whether backups should be enabled or not
+         * @return this instance
+         */
+        public InternalFlowTable withBackupsEnabled(boolean backupsEnabled) {
+            this.backupsEnabled = backupsEnabled;
+            return this;
+        }
+
         private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
                 flowEntries = Maps.newConcurrentMap();
 
@@ -627,13 +639,14 @@
                 (flowId, flowEntry) ->
                         (flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId());
 
-        private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
+        private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap = backupsEnabled ?
                 new EventuallyConsistentMapImpl<>("flow-backup",
                         clusterService,
                         clusterCommunicator,
                         flowSerializer,
                         clockService,
-                        (key, flowEntry) -> getPeerNodes());
+                        (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true)
+                      : null;
 
         private Collection<NodeId> getPeerNodes() {
             List<NodeId> nodes = clusterService.getNodes()
@@ -651,6 +664,10 @@
         }
 
         public void loadFromBackup(DeviceId deviceId) {
+            if (!backupsEnabled) {
+                return;
+            }
+
             ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>();
 
             backupMap.values()
@@ -699,18 +716,19 @@
 
         public void add(StoredFlowEntry rule) {
             getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule);
-
-            try {
-                backupMap.put(rule.id(), rule);
-            } catch (Exception e) {
-                log.warn("Failed to backup flow rule", e);
+            if (backupsEnabled) {
+                try {
+                    backupMap.put(rule.id(), rule);
+                } catch (Exception e) {
+                    log.warn("Failed to backup flow rule", e);
+                }
             }
         }
 
         public boolean remove(DeviceId deviceId, FlowEntry rule) {
             boolean status =
                     getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
-            if (status) {
+            if (backupsEnabled && status) {
                 try {
                     backupMap.remove(rule.id(), (DefaultFlowEntry) rule);
                 } catch (Exception e) {
@@ -725,4 +743,4 @@
             // Flow entries should continue to remain in backup map.
         }
     }
-}
\ No newline at end of file
+}