Adding an option for persistent flow storage.

Change-Id: I1dd70c9f2ea9cd99ef5a55eaa4b54968f7d3c55f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 8cd63e7..1695e5f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -59,6 +59,7 @@
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -74,6 +75,7 @@
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
 import org.osgi.service.component.ComponentContext;
@@ -113,6 +115,7 @@
 
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
     private static final boolean DEFAULT_BACKUP_ENABLED = true;
+    private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
     // number of devices whose flow entries will be backed up in one communication round
@@ -129,6 +132,9 @@
     @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
             label = "Delay in ms between successive backup runs")
     private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+    @Property(name = "persistenceEnabled", boolValue = false,
+            label = "Indicates whether or not changes in the flow table should be persisted to disk.")
+    private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
 
     private InternalFlowTable flowTable = new InternalFlowTable();
 
@@ -153,6 +159,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PersistenceService persistenceService;
+
     private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
     private ExecutorService messageHandlingExecutor;
 
@@ -716,7 +725,25 @@
          * @return Map representing Flow Table of given device.
          */
         private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
-            return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
+            if (persistenceEnabled) {
+                return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
+                        .<FlowId, Set<StoredFlowEntry>>persistentMapBuilder()
+                        .withName("FlowTable:" + deviceId.toString())
+                        .withSerializer(new Serializer() {
+                            @Override
+                            public <T> byte[] encode(T object) {
+                                return SERIALIZER.encode(object);
+                            }
+
+                            @Override
+                            public <T> T decode(byte[] bytes) {
+                                return SERIALIZER.decode(bytes);
+                            }
+                        })
+                        .build());
+            } else {
+                return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
+            }
         }
 
         private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {