add basic backup to DistributedFlowRuleStore

Change-Id: I8eedf0cf30a2555d45145889b5ef210e826b0ac0
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
index 34e3d31..f8a25cb 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -13,7 +13,7 @@
     private final List<FlowEntry> toAdd;
     private final List<FlowEntry> toRemove;
 
-    public FlowRuleBatchRequest(int batchId, List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+    public FlowRuleBatchRequest(int batchId, List<? extends FlowEntry> toAdd, List<? extends FlowEntry> toRemove) {
         this.batchId = batchId;
         this.toAdd = Collections.unmodifiableList(toAdd);
         this.toRemove = Collections.unmodifiableList(toRemove);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 30b9008..69b6743 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,5 +1,6 @@
 package org.onlab.onos.store.flow.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
@@ -10,6 +11,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -27,6 +29,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.device.DeviceService;
@@ -34,6 +37,7 @@
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
+import org.onlab.onos.net.flow.FlowId;
 import org.onlab.onos.net.flow.FlowRule;
 import org.onlab.onos.net.flow.FlowRuleBatchEntry;
 import org.onlab.onos.net.flow.FlowRuleBatchEvent;
@@ -45,12 +49,15 @@
 import org.onlab.onos.net.flow.FlowRuleStore;
 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
 import org.onlab.onos.net.flow.StoredFlowEntry;
-import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.flow.ReplicaInfo;
+import org.onlab.onos.store.flow.ReplicaInfoEvent;
+import org.onlab.onos.store.flow.ReplicaInfoEventListener;
 import org.onlab.onos.store.flow.ReplicaInfoService;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
 import org.onlab.onos.store.serializers.DistributedStoreSerializers;
 import org.onlab.onos.store.serializers.KryoSerializer;
 import org.onlab.util.KryoNamespace;
@@ -59,13 +66,17 @@
 import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.hazelcast.core.IMap;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -73,7 +84,7 @@
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
-        extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+        extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
         implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
@@ -82,8 +93,6 @@
     private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
             ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
 
-    private final Multimap<Short, FlowRule> flowEntriesById =
-            ArrayListMultimap.<Short, FlowRule>create();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ReplicaInfoService replicaInfoManager;
@@ -109,10 +118,17 @@
                 //.removalListener(listener)
                 .build();
 
+    private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
+
 
     private final ExecutorService futureListeners =
             Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
 
+    private final ExecutorService backupExecutors =
+            Executors.newSingleThreadExecutor(namedThreads("async-backups"));
+
+    // TODO make this configurable
+    private boolean syncBackup = false;
 
     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -127,8 +143,20 @@
     // TODO: make this configurable
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
 
+    private ReplicaInfoEventListener replicaInfoEventListener;
+
+    @Override
     @Activate
     public void activate() {
+
+        super.serializer = SERIALIZER;
+        super.theInstance = storeService.getHazelcastInstance();
+
+        // Cache to create SMap on demand
+        smaps = CacheBuilder.newBuilder()
+                    .softValues()
+                    .build(new SMapLoader());
+
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
 
             @Override
@@ -182,11 +210,16 @@
             }
         });
 
+        replicaInfoEventListener = new InternalReplicaInfoEventListener();
+
+        replicaInfoManager.addListener(replicaInfoEventListener);
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        replicaInfoManager.removeListener(replicaInfoEventListener);
         log.info("Stopped");
     }
 
@@ -276,8 +309,10 @@
         storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
     }
 
+    // FIXME document that all of the FlowEntries must be about same device
     @Override
     public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+
         if (operation.getOperations().isEmpty()) {
             return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
         }
@@ -313,12 +348,17 @@
     }
 
     private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
-        List<FlowEntry> toRemove = new ArrayList<>();
-        List<FlowEntry> toAdd = new ArrayList<>();
-        // TODO: backup changes to hazelcast map
+        final List<StoredFlowEntry> toRemove = new ArrayList<>();
+        final List<StoredFlowEntry> toAdd = new ArrayList<>();
+        DeviceId did = null;
+
+
         for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
             FlowRule flowRule = batchEntry.getTarget();
             FlowRuleOperation op = batchEntry.getOperator();
+            if (did == null) {
+                did = flowRule.deviceId();
+            }
             if (op.equals(FlowRuleOperation.REMOVE)) {
                 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
                 if (entry != null) {
@@ -330,7 +370,6 @@
                 DeviceId deviceId = flowRule.deviceId();
                 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
                     flowEntries.put(deviceId, flowEntry);
-                    flowEntriesById.put(flowRule.appId(), flowEntry);
                     toAdd.add(flowEntry);
                 }
             }
@@ -339,14 +378,39 @@
             return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
         }
 
+        // create remote backup copies
+        final DeviceId deviceId = did;
+        updateBackup(deviceId, toAdd, toRemove);
+
         SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
         final int batchId = localBatchIdGen.incrementAndGet();
 
         pendingFutures.put(batchId, r);
         notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+
         return r;
     }
 
+    private void updateBackup(final DeviceId deviceId,
+                              final List<StoredFlowEntry> toAdd,
+                              final List<? extends FlowRule> list) {
+
+        Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
+
+        if (syncBackup) {
+            // wait for backup to complete
+            try {
+                submit.get();
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Failed to create backups", e);
+            }
+        }
+    }
+
+    private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
+        updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
+    }
+
     @Override
     public void deleteFlowRule(FlowRule rule) {
         storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
@@ -365,7 +429,7 @@
     }
 
     private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
-        DeviceId did = rule.deviceId();
+        final DeviceId did = rule.deviceId();
 
         // check if this new rule is an update to an existing entry
         StoredFlowEntry stored = getFlowEntryInternal(rule);
@@ -375,16 +439,18 @@
             stored.setPackets(rule.packets());
             if (stored.state() == FlowEntryState.PENDING_ADD) {
                 stored.setState(FlowEntryState.ADDED);
+                // update backup.
+                updateBackup(did, Arrays.asList(stored));
                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
             }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
         }
 
         // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
+        // TODO: also update backup.
         flowEntries.put(did, new DefaultFlowEntry(rule));
         return null;
 
-        // TODO: also update backup.
     }
 
     @Override
@@ -401,13 +467,15 @@
     }
 
     private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
+        final DeviceId deviceId = rule.deviceId();
         // This is where one could mark a rule as removed and still keep it in the store.
-        if (flowEntries.remove(rule.deviceId(), rule)) {
+        final boolean removed = flowEntries.remove(deviceId, rule);
+        updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
+        if (removed) {
             return new FlowRuleEvent(RULE_REMOVED, rule);
         } else {
             return null;
         }
-        // TODO: also update backup.
     }
 
     @Override
@@ -421,4 +489,145 @@
         }
         notifyDelegate(event);
     }
+
+    private synchronized void loadFromBackup(final DeviceId did) {
+        // should relax synchronized condition
+
+        try {
+            log.info("Loading FlowRules for {} from backups", did);
+            SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
+            for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
+                    : backupFlowTable.entrySet()) {
+
+                // TODO: should we be directly updating internal structure or
+                // should we be triggering event?
+                log.debug("loading {}", e.getValue());
+                for (StoredFlowEntry entry : e.getValue()) {
+                    flowEntries.remove(did, entry);
+                    flowEntries.put(did, entry);
+                }
+            }
+        } catch (ExecutionException e) {
+            log.error("Failed to load backup flowtable for {}", did, e);
+        }
+    }
+
+    private synchronized void removeFromPrimary(final DeviceId did) {
+        Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
+        log.debug("removedFromPrimary {}", removed);
+    }
+
+    private final class SMapLoader
+        extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
+
+        @Override
+        public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
+                throws Exception {
+            IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
+            return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
+        }
+    }
+
+    private final class InternalReplicaInfoEventListener
+        implements ReplicaInfoEventListener {
+
+        @Override
+        public void event(ReplicaInfoEvent event) {
+            final NodeId local = clusterService.getLocalNode().id();
+            final DeviceId did = event.subject();
+            final ReplicaInfo rInfo = event.replicaInfo();
+
+            switch (event.type()) {
+            case MASTER_CHANGED:
+                if (local.equals(rInfo.master().orNull())) {
+                    // This node is the new master, populate local structure
+                    // from backup
+                    loadFromBackup(did);
+                } else {
+                    // This node is no longer the master holder,
+                    // clean local structure
+                    removeFromPrimary(did);
+                    // FIXME: probably should stop pending backup activities in
+                    // executors to avoid overwriting with old value
+                }
+                break;
+            default:
+                break;
+
+            }
+        }
+    }
+
+    // Task to update FlowEntries in backup HZ store
+    private final class UpdateBackup implements Runnable {
+
+        private final DeviceId deviceId;
+        private final List<StoredFlowEntry> toAdd;
+        private final List<? extends FlowRule> toRemove;
+
+        public UpdateBackup(DeviceId deviceId,
+                             List<StoredFlowEntry> toAdd,
+                             List<? extends FlowRule> list) {
+            this.deviceId = checkNotNull(deviceId);
+            this.toAdd = checkNotNull(toAdd);
+            this.toRemove = checkNotNull(list);
+        }
+
+        @Override
+        public void run() {
+            try {
+                log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
+                final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
+                // Following should be rewritten using async APIs
+                for (StoredFlowEntry entry : toAdd) {
+                    final FlowId id = entry.id();
+                    ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+                    List<StoredFlowEntry> list = new ArrayList<>();
+                    if (original != null) {
+                        list.addAll(original);
+                    }
+
+                    list.remove(entry);
+                    list.add(entry);
+
+                    ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+                    boolean success;
+                    if (original == null) {
+                        success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+                    } else {
+                        success = backupFlowTable.replace(id, original, newValue);
+                    }
+                    // TODO retry?
+                    if (!success) {
+                        log.error("Updating backup failed.");
+                    }
+                }
+                for (FlowRule entry : toRemove) {
+                    final FlowId id = entry.id();
+                    ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
+                    List<StoredFlowEntry> list = new ArrayList<>();
+                    if (original != null) {
+                        list.addAll(original);
+                    }
+
+                    list.remove(entry);
+
+                    ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
+                    boolean success;
+                    if (original == null) {
+                        success = (backupFlowTable.putIfAbsent(id, newValue) == null);
+                    } else {
+                        success = backupFlowTable.replace(id, original, newValue);
+                    }
+                    // TODO retry?
+                    if (!success) {
+                        log.error("Updating backup failed.");
+                    }
+                }
+            } catch (ExecutionException e) {
+                log.error("Failed to write to backups", e);
+            }
+
+        }
+    }
 }