[ONOS-7379] Use atomic operations for flow table updates to avoid losing flows during updates

Change-Id: If15361f454b57cf551236674f757b8b7b08ec238
(cherry picked from commit 2edfeefc3a1d0b8d453a9986650c787d7253f58e)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index b50adf8..108f380 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -486,19 +486,18 @@
                     StoredFlowEntry entry;
                     switch (op.operator()) {
                         case ADD:
+                            entry = new DefaultFlowEntry(op.target());
+                            flowTable.add(entry);
+                            return op;
                         case MODIFY:
                             entry = new DefaultFlowEntry(op.target());
-                            // always add requested FlowRule
-                            // Note: 2 equal FlowEntry may have different treatment
-                            flowTable.remove(entry.deviceId(), entry);
-                            flowTable.add(entry);
-
+                            flowTable.update(entry);
                             return op;
                         case REMOVE:
                             entry = flowTable.getFlowEntry(op.target());
                             if (entry != null) {
-                                //FIXME modification of "stored" flow entry outside of flow table
                                 entry.setState(FlowEntryState.PENDING_REMOVE);
+                                flowTable.update(entry);
                                 log.debug("Setting state of rule to pending remove: {}", entry);
                                 return op;
                             }
@@ -550,7 +549,6 @@
         // check if this new rule is an update to an existing entry
         StoredFlowEntry stored = flowTable.getFlowEntry(rule);
         if (stored != null) {
-            //FIXME modification of "stored" flow entry outside of flow table
             stored.setBytes(rule.bytes());
             stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
             stored.setLiveType(rule.liveType());
@@ -558,6 +556,8 @@
             stored.setLastSeen();
             if (stored.state() == FlowEntryState.PENDING_ADD) {
                 stored.setState(FlowEntryState.ADDED);
+                // Update the flow table to ensure the changes are replicated
+                flowTable.update(stored);
                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
             }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
@@ -597,9 +597,8 @@
     }
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
-        final DeviceId deviceId = rule.deviceId();
         // This is where one could mark a rule as removed and still keep it in the store.
-        final FlowEntry removed = flowTable.remove(deviceId, rule);
+        final FlowEntry removed = flowTable.remove(rule);
         // rule may be partial rule that is missing treatment, we should use rule from store instead
         return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
     }
@@ -838,18 +837,36 @@
         }
 
         public void add(FlowEntry rule) {
-            getFlowEntriesInternal(rule.deviceId(), rule.id())
-                    .compute((StoredFlowEntry) rule, (k, stored) -> {
-                        //TODO compare stored and rule timestamps
-                        //TODO the key is not updated
-                        return (StoredFlowEntry) rule;
-                    });
-            lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+            StoredFlowEntry stored = getFlowEntriesInternal(rule.deviceId(), rule.id())
+                .putIfAbsent((StoredFlowEntry) rule, (StoredFlowEntry) rule);
+            if (stored == null) {
+                lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+            }
         }
 
-        public FlowEntry remove(DeviceId deviceId, FlowEntry rule) {
+        public void update(FlowEntry rule) {
+            getFlowEntriesInternal(rule.deviceId(), rule.id())
+                .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+                    if (rule instanceof DefaultFlowEntry) {
+                        DefaultFlowEntry updated = (DefaultFlowEntry) rule;
+                        if (stored instanceof DefaultFlowEntry) {
+                            DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+                            if (updated.created() >= storedEntry.created()) {
+                                lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+                                return updated;
+                            } else {
+                                log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
+                                return stored;
+                            }
+                        }
+                    }
+                    return stored;
+                });
+        }
+
+        public FlowEntry remove(FlowEntry rule) {
             final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
-            final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(deviceId);
+            final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
             flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
                 flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
                     if (rule instanceof DefaultFlowEntry) {
@@ -857,8 +874,7 @@
                         if (stored instanceof DefaultFlowEntry) {
                             DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
                             if (toRemove.created() < storedEntry.created()) {
-                                log.debug("Trying to remove more recent flow entry {} (stored: {})",
-                                    toRemove, stored);
+                                log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
                                 // the key is not updated, removedRule remains null
                                 return stored;
                             }
@@ -871,7 +887,7 @@
             });
 
             if (removedRule.get() != null) {
-                lastUpdateTimes.put(deviceId, System.currentTimeMillis());
+                lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
                 return removedRule.get();
             } else {
                 return null;