Moving Dist flow rule store backup mechanism to use EC Map

Change-Id: I465cc2424004721bf09505ac9cde068884f04940
diff --git a/core/api/src/main/java/org/onosproject/net/flow/DefaultFlowEntry.java b/core/api/src/main/java/org/onosproject/net/flow/DefaultFlowEntry.java
index e46e3df..a904d0c 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/DefaultFlowEntry.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/DefaultFlowEntry.java
@@ -22,7 +22,7 @@
 import org.slf4j.Logger;
 
 public class DefaultFlowEntry extends DefaultFlowRule
-    implements FlowEntry, StoredFlowEntry {
+    implements StoredFlowEntry {
 
     private static final Logger log = getLogger(DefaultFlowEntry.class);
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index c2c46fc..8111db8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -357,6 +357,10 @@
     }
 
     private boolean removeInternal(K key, Timestamp timestamp) {
+        if (timestamp == null) {
+            return false;
+        }
+
         counter.incrementCount();
         final MutableBoolean updated = new MutableBoolean(false);
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 450bb3d..ccfe5c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -15,29 +15,25 @@
  */
 package org.onosproject.store.flow.impl;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.hazelcast.core.IMap;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.BoundedThreadPool;
 import org.onlab.util.KryoNamespace;
-import org.onlab.util.NewConcurrentHashMap;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceClockService;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
@@ -56,28 +52,31 @@
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.ecmap.EventuallyConsistentMap;
+import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.SMap;
+import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -90,8 +89,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
@@ -103,7 +100,7 @@
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
-        extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+        extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
         implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
@@ -111,10 +108,7 @@
     // TODO: Make configurable.
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
 
-    private InternalFlowTable flowTable = new InternalFlowTable();
-
-    /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
-            flowEntries = new ConcurrentHashMap<>();*/
+    private InternalFlowTable flowTable;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ReplicaInfoService replicaInfoManager;
@@ -129,21 +123,15 @@
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceClockService deviceClockService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
     private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
 
-    // Cache of SMaps used for backup data.  each SMap contain device flow table
-    private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
-
     private ExecutorService messageHandlingExecutor;
 
-    private final ExecutorService backupExecutors =
-            BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
-            //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
-
-    private boolean syncBackup = false;
-
     protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -162,20 +150,13 @@
 
     private IdGenerator idGenerator;
 
-    @Override
     @Activate
     public void activate() {
 
-        super.serializer = SERIALIZER;
-        super.theInstance = storeService.getHazelcastInstance();
+        flowTable = new InternalFlowTable();
 
         idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
 
-        // Cache to create SMap on demand
-        smaps = CacheBuilder.newBuilder()
-                .softValues()
-                .build(new SMapLoader());
-
         final NodeId local = clusterService.getLocalNode().id();
 
         messageHandlingExecutor = Executors.newFixedThreadPool(
@@ -214,7 +195,7 @@
             public void handle(ClusterMessage message) {
                 DeviceId deviceId = SERIALIZER.decode(message.payload());
                 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
-                Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
+                Set<StoredFlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
                 try {
                     message.respond(SERIALIZER.encode(flowEntries));
                 } catch (IOException e) {
@@ -315,7 +296,7 @@
         }
 
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            return flowTable.getFlowEntries(deviceId);
+            return flowTable.getFlowEntries(deviceId).stream().collect(Collectors.toSet());
         }
 
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
@@ -412,7 +393,6 @@
                     new CompletedBatchOperation(true, Collections.emptySet(), did)));
             return;
         }
-        updateBackup(did, currentOps);
 
         notifyDelegate(FlowRuleBatchEvent.requested(new
                            FlowRuleBatchRequest(operation.id(),
@@ -451,19 +431,6 @@
         ).filter(op -> op != null).collect(Collectors.toSet());
     }
 
-    private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
-        Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
-
-        if (syncBackup) {
-            // wait for backup to complete
-            try {
-                backup.get();
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to create backups", e);
-            }
-        }
-    }
-
     @Override
     public void deleteFlowRule(FlowRule rule) {
         storeBatch(
@@ -479,7 +446,7 @@
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
         final NodeId localId = clusterService.getLocalNode().id();
         if (localId.equals(replicaInfo.master().orNull())) {
-            return addOrUpdateFlowRuleInternal(rule);
+            return addOrUpdateFlowRuleInternal((StoredFlowEntry) rule);
         }
 
         log.warn("Tried to update FlowRule {} state,"
@@ -487,10 +454,7 @@
         return null;
     }
 
-    private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
-        final DeviceId did = rule.deviceId();
-
-
+    private FlowRuleEvent addOrUpdateFlowRuleInternal(StoredFlowEntry rule) {
         // check if this new rule is an update to an existing entry
         StoredFlowEntry stored = flowTable.getFlowEntry(rule);
         if (stored != null) {
@@ -499,21 +463,15 @@
             stored.setPackets(rule.packets());
             if (stored.state() == FlowEntryState.PENDING_ADD) {
                 stored.setState(FlowEntryState.ADDED);
-                FlowRuleBatchEntry entry =
-                        new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
-                updateBackup(did, Sets.newHashSet(entry));
                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
             }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
 
         // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
-        // TODO: also update backup if the behavior is correct.
         flowTable.add(rule);
 
-
         return null;
-
     }
 
     @Override
@@ -554,9 +512,6 @@
         final DeviceId deviceId = rule.deviceId();
         // This is where one could mark a rule as removed and still keep it in the store.
         final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
-        FlowRuleBatchEntry entry =
-                new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
-        updateBackup(deviceId, Sets.newHashSet(entry));
         if (removed) {
             return new FlowRuleEvent(RULE_REMOVED, rule);
         } else {
@@ -583,33 +538,10 @@
         }
     }
 
-    private void loadFromBackup(final DeviceId did) {
-
-
-        try {
-            log.debug("Loading FlowRules for {} from backups", did);
-            SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
-            for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
-                    : backupFlowTable.entrySet()) {
-
-                log.trace("loading {}", e.getValue());
-                for (StoredFlowEntry entry : e.getValue()) {
-                    flowTable.getFlowEntriesById(entry).remove(entry);
-                    flowTable.getFlowEntriesById(entry).add(entry);
-
-
-                }
-            }
-        } catch (ExecutionException e) {
-            log.error("Failed to load backup flowtable for {}", did, e);
-        }
-    }
-
     private void removeFromPrimary(final DeviceId did) {
         flowTable.clearDevice(did);
     }
 
-
     private final class OnStoreBatch implements ClusterMessageHandler {
         private final NodeId local;
 
@@ -626,10 +558,11 @@
             ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
             if (!local.equals(replicaInfo.master().orNull())) {
 
-                Set<FlowRule> failures = new HashSet<>(operation.size());
-                for (FlowRuleBatchEntry op : operation.getOperations()) {
-                    failures.add(op.target());
-                }
+                Set<FlowRule> failures = operation.getOperations()
+                            .stream()
+                            .map(FlowRuleBatchEntry::target)
+                            .collect(Collectors.toSet());
+
                 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
                 // This node is no longer the master, respond as all failed.
                 // TODO: we might want to wrap response in envelope
@@ -650,17 +583,6 @@
         }
     }
 
-    private final class SMapLoader
-            extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
-
-        @Override
-        public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
-                throws Exception {
-            IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
-            return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
-        }
-    }
-
     private final class InternalReplicaInfoEventListener
             implements ReplicaInfoEventListener {
 
@@ -673,9 +595,10 @@
             switch (event.type()) {
                 case MASTER_CHANGED:
                     if (local.equals(rInfo.master().orNull())) {
+                        log.info("{} is now the master for {}. Will load flow rules from backup", local, did);
                         // This node is the new master, populate local structure
                         // from backup
-                        loadFromBackup(did);
+                        flowTable.loadFromBackup(did);
                     }
                     //else {
                         // This node is no longer the master holder,
@@ -692,146 +615,122 @@
         }
     }
 
-    // Task to update FlowEntries in backup HZ store
-    private final class UpdateBackup implements Runnable {
-
-        private final DeviceId deviceId;
-        private final Set<FlowRuleBatchEntry> ops;
-
-
-        public UpdateBackup(DeviceId deviceId,
-                            Set<FlowRuleBatchEntry> ops) {
-            this.deviceId = checkNotNull(deviceId);
-            this.ops = checkNotNull(ops);
-
-        }
-
-        @Override
-        public void run() {
-            try {
-                log.trace("update backup {} {}", deviceId, ops
-                );
-                final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
-
-
-                ops.stream().forEach(
-                        op -> {
-                            final FlowRule entry = op.target();
-                            final FlowId id = entry.id();
-                            ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
-                            List<StoredFlowEntry> list = new ArrayList<>();
-                            if (original != null) {
-                                list.addAll(original);
-                            }
-                            list.remove(op.target());
-                            if (op.operator() == FlowRuleOperation.ADD) {
-                                list.add((StoredFlowEntry) entry);
-                            }
-
-                            ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
-                            boolean success;
-                            if (original == null) {
-                                success = (backupFlowTable.putIfAbsent(id, newValue) == null);
-                            } else {
-                                success = backupFlowTable.replace(id, original, newValue);
-                            }
-                            if (!success) {
-                                log.error("Updating backup failed.");
-                            }
-
-                        }
-                );
-            } catch (ExecutionException e) {
-                log.error("Failed to write to backups", e);
-            }
-
-        }
-    }
-
     private class InternalFlowTable {
 
-        /*
-            TODO: This needs to be cleaned up. Perhaps using the eventually consistent
-            map when it supports distributed to a sequence of instances.
-         */
+        private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
+                flowEntries = Maps.newConcurrentMap();
 
+        private final KryoNamespace.Builder flowSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(MastershipBasedTimestamp.class);
 
-        private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
-                flowEntries = new ConcurrentHashMap<>();
+        private final ClockService<FlowId, StoredFlowEntry> clockService =
+            new ClockService<FlowId, StoredFlowEntry>() {
+                @Override
+                public Timestamp getTimestamp(FlowId flowId, StoredFlowEntry flowEntry) {
+                    if (flowEntry == null) {
+                        return null;
+                    }
+                    return deviceClockService.getTimestamp(flowEntry.deviceId());
+                }
+            };
 
+        private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap =
+                new EventuallyConsistentMapImpl<>("flow-backup",
+                        clusterService,
+                        clusterCommunicator,
+                        flowSerializer,
+                        clockService,
+                        (key, flowEntry) -> getPeerNodes());
 
-        private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
-            return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
+        private Collection<NodeId> getPeerNodes() {
+            List<NodeId> nodes = clusterService.getNodes()
+                                    .stream()
+                                    .map(node -> node.id())
+                                    .filter(id -> !id.equals(clusterService.getLocalNode().id()))
+                                    .collect(Collectors.toList());
+
+            if (nodes.isEmpty()) {
+                return ImmutableList.of();
+            } else {
+                Collections.shuffle(nodes);
+                return ImmutableList.of(nodes.get(0));
+            }
         }
 
-        /**
-         * Returns the flow table for specified device.
-         *
-         * @param deviceId identifier of the device
-         * @return Map representing Flow Table of given device.
-         */
-        private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
-            return createIfAbsentUnchecked(flowEntries,
-                                           deviceId, lazyEmptyFlowTable());
+        public void loadFromBackup(DeviceId deviceId) {
+            ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>();
+
+            backupMap.values()
+                .stream()
+                .filter(entry -> entry.deviceId().equals(deviceId))
+                .forEach(entry -> flowTable.computeIfPresent(entry.id(), (k, v) -> {
+                    if (v == null) {
+                        return Sets.newHashSet(entry);
+                    } else {
+                        v.add(entry);
+                    }
+                    return v;
+                }));
+            flowEntries.putIfAbsent(deviceId, flowTable);
         }
 
         private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
-            final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
-            Set<StoredFlowEntry> r = flowTable.get(flowId);
-            if (r == null) {
-                final Set<StoredFlowEntry> concurrentlyAdded;
-                r = new CopyOnWriteArraySet<>();
-                concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
-                if (concurrentlyAdded != null) {
-                    return concurrentlyAdded;
-                }
-            }
-            return r;
+            return flowEntries
+                        .computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
+                        .computeIfAbsent(flowId, k -> new CopyOnWriteArraySet<>());
         }
 
         private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
-            for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
-                if (f.equals(rule)) {
-                    return f;
-                }
-            }
-            return null;
+            return getFlowEntriesInternal(rule.deviceId(), rule.id())
+                .stream()
+                .filter(element -> element.equals(rule))
+                .findFirst()
+                .orElse(null);
         }
 
-        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
-            return getFlowTable(deviceId).values().stream()
-                    .flatMap((list -> list.stream())).collect(Collectors.toSet());
-
+        private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
+            Set<StoredFlowEntry> entries = Sets.newHashSet();
+            flowEntries.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap())
+                        .values()
+                        .forEach(entries::addAll);
+            return entries;
         }
 
-
         public StoredFlowEntry getFlowEntry(FlowRule rule) {
             return getFlowEntryInternal(rule);
         }
 
-        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
+        public Set<StoredFlowEntry> getFlowEntries(DeviceId deviceId) {
             return getFlowEntriesInternal(deviceId);
         }
 
-        public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
-            return getFlowEntriesInternal(entry.deviceId(), entry.id());
-        }
+        public void add(StoredFlowEntry rule) {
+            getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule);
 
-        public void add(FlowEntry rule) {
-            ((CopyOnWriteArraySet)
-                    getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
+            try {
+                backupMap.put(rule.id(), rule);
+            } catch (Exception e) {
+                log.warn("Failed to backup flow rule", e);
+            }
         }
 
         public boolean remove(DeviceId deviceId, FlowEntry rule) {
-            return ((CopyOnWriteArraySet)
-                    getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
-            //return flowEntries.remove(deviceId, rule);
+            boolean status =
+                    getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
+            if (status) {
+                try {
+                    backupMap.remove(rule.id(), (DefaultFlowEntry) rule);
+                } catch (Exception e) {
+                    log.warn("Failed to remove backup of flow rule", e);
+                }
+            }
+            return status;
         }
 
         public void clearDevice(DeviceId did) {
             flowEntries.remove(did);
+            // Flow entries should continue to remain in backup map.
         }
     }
-
-
-}
+}
\ No newline at end of file