[ONOS-6870] Refactor DistributedFlowRuleStore to use DocumentTree for persistence

Change-Id: I5f0eccfeb0050ccf1959f3ca95bbd0a90406e4ba
(cherry picked from commit f7554093ada8e5dea81e0bfd7092699cf6dadb44)
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
index 6dd962c..d1e74cd 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
@@ -15,13 +15,13 @@
  */
 package org.onosproject.net.flow;
 
-import com.google.common.collect.Lists;
-import org.onosproject.net.DeviceId;
-
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import org.onosproject.net.DeviceId;
+
 @Deprecated
 /**
  * @deprecated in Drake release - no longer a public API
@@ -37,10 +37,9 @@
 
     private final Set<FlowRuleBatchEntry> ops;
 
-
     public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) {
         this.batchId = batchId;
-        this.ops = Collections.unmodifiableSet(ops);
+        this.ops = ImmutableSet.copyOf(ops);
     }
 
     public Set<FlowRuleBatchEntry> ops() {
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncDocumentTreeAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncDocumentTreeAdapter.java
new file mode 100644
index 0000000..6ff99ee
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncDocumentTreeAdapter.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
+
+/**
+ * Async document tree adapter.
+ */
+public class AsyncDocumentTreeAdapter<V> implements AsyncDocumentTree<V> {
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public DocumentPath root() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return null;
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestDocumentTree.java b/core/api/src/test/java/org/onosproject/store/service/TestDocumentTree.java
new file mode 100644
index 0000000..c9100f5
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/TestDocumentTree.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * Simple implementation of a {@link DocumentTree}.
+ *
+ * @param <V> tree node value type
+ */
+public class TestDocumentTree<V> implements DocumentTree<V> {
+
+    private static final DocumentPath ROOT_PATH = DocumentPath.from("root");
+    private final String name;
+    final TestDocumentTreeNode<V> root;
+    private final AtomicLong version = new AtomicLong();
+
+    public TestDocumentTree(String name) {
+        this(name, null);
+    }
+
+    public TestDocumentTree(String name, V value) {
+        this.name = name;
+        this.root = new TestDocumentTreeNode<>(ROOT_PATH, value, version.incrementAndGet(), null);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public DocumentPath root() {
+        return ROOT_PATH;
+    }
+
+    @Override
+    public Map<String, Versioned<V>> getChildren(DocumentPath path) {
+        DocumentTreeNode<V> node = getNode(path);
+        if (node != null) {
+            Map<String, Versioned<V>> childrenValues = Maps.newHashMap();
+            node.children().forEachRemaining(n -> childrenValues.put(simpleName(n.path()), n.value()));
+            return childrenValues;
+        }
+        throw new NoSuchDocumentPathException();
+    }
+
+    @Override
+    public Versioned<V> get(DocumentPath path) {
+        DocumentTreeNode<V> currentNode = getNode(path);
+        return currentNode != null ? currentNode.value() : null;
+    }
+
+    @Override
+    public Versioned<V> set(DocumentPath path, V value) {
+        checkRootModification(path);
+        TestDocumentTreeNode<V> node = getNode(path);
+        if (node != null) {
+            return node.update(value, version.incrementAndGet());
+        } else {
+            create(path, value);
+            return null;
+        }
+    }
+
+    @Override
+    public boolean create(DocumentPath path, V value) {
+        checkRootModification(path);
+        DocumentTreeNode<V> node = getNode(path);
+        if (node != null) {
+            return false;
+        }
+        DocumentPath parentPath = path.parent();
+        TestDocumentTreeNode<V> parentNode =  getNode(parentPath);
+        if (parentNode == null) {
+            throw new IllegalDocumentModificationException();
+        }
+        parentNode.addChild(simpleName(path), value, version.incrementAndGet());
+        return true;
+    }
+
+    @Override
+    public boolean createRecursive(DocumentPath path, V value) {
+        checkRootModification(path);
+        DocumentTreeNode<V> node = getNode(path);
+        if (node != null) {
+            return false;
+        }
+        DocumentPath parentPath = path.parent();
+        if (getNode(parentPath) == null) {
+            createRecursive(parentPath, null);
+        }
+        TestDocumentTreeNode<V> parentNode =  getNode(parentPath);
+        if (parentNode == null) {
+            throw new IllegalDocumentModificationException();
+        }
+        parentNode.addChild(simpleName(path), value, version.incrementAndGet());
+        return true;
+    }
+
+    @Override
+    public boolean replace(DocumentPath path, V newValue, long version) {
+        checkRootModification(path);
+        DocumentTreeNode<V> node = getNode(path);
+        if (node != null && node.value() != null && node.value().version() == version) {
+            set(path, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean replace(DocumentPath path, V newValue, V currentValue) {
+        checkRootModification(path);
+        if (Objects.equals(newValue, currentValue)) {
+            return false;
+        }
+        DocumentTreeNode<V> node = getNode(path);
+        if (node != null && Objects.equals(Versioned.valueOrNull(node.value()), currentValue)) {
+            set(path, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public Versioned<V> removeNode(DocumentPath path) {
+        checkRootModification(path);
+        TestDocumentTreeNode<V> nodeToRemove = getNode(path);
+        if (nodeToRemove == null) {
+            throw new NoSuchDocumentPathException();
+        }
+        if (nodeToRemove.hasChildren()) {
+            throw new IllegalDocumentModificationException();
+        }
+        TestDocumentTreeNode<V> parent = (TestDocumentTreeNode<V>) nodeToRemove.parent();
+        parent.removeChild(simpleName(path));
+        return nodeToRemove.value();
+    }
+
+    @Override
+    public void addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void removeListener(DocumentTreeListener<V> listener) {
+        // TODO Auto-generated method stub
+    }
+
+    private TestDocumentTreeNode<V> getNode(DocumentPath path) {
+        Iterator<String> pathElements = path.pathElements().iterator();
+        TestDocumentTreeNode<V> currentNode = root;
+        Preconditions.checkState("root".equals(pathElements.next()), "Path should start with root");
+        while (pathElements.hasNext() &&  currentNode != null) {
+            currentNode = (TestDocumentTreeNode<V>) currentNode.child(pathElements.next());
+        }
+        return currentNode;
+    }
+
+    private String simpleName(DocumentPath path) {
+        return path.pathElements().get(path.pathElements().size() - 1);
+    }
+
+    private void checkRootModification(DocumentPath path) {
+        if (ROOT_PATH.equals(path)) {
+            throw new IllegalDocumentModificationException();
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestDocumentTreeNode.java b/core/api/src/test/java/org/onosproject/store/service/TestDocumentTreeNode.java
new file mode 100644
index 0000000..508fad2
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/TestDocumentTreeNode.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@code DocumentTree} node.
+ */
+public class TestDocumentTreeNode<V> implements DocumentTreeNode<V> {
+    private final DocumentPath key;
+    private Versioned<V> value;
+    private final TreeMap<String, DocumentTreeNode<V>> children = Maps.newTreeMap();
+    private final DocumentTreeNode<V> parent;
+
+    public TestDocumentTreeNode(DocumentPath key,
+            V value,
+            long version,
+            DocumentTreeNode<V> parent) {
+        this.key = checkNotNull(key);
+        this.value = new Versioned<>(value, version);
+        this.parent = parent;
+    }
+
+    @Override
+    public DocumentPath path() {
+        return key;
+    }
+
+    @Override
+    public Versioned<V> value() {
+        return value;
+    }
+
+    @Override
+    public Iterator<DocumentTreeNode<V>> children() {
+        return ImmutableList.copyOf(children.values()).iterator();
+    }
+
+    @Override
+    public DocumentTreeNode<V> child(String name) {
+        return children.get(name);
+    }
+
+
+    public DocumentTreeNode<V> parent() {
+        return parent;
+    }
+
+    /**
+     * Adds a new child only if one does not exist with the name.
+     * @param name relative path name of the child node
+     * @param newValue new value to set
+     * @param newVersion new version to set
+     * @return previous value; can be {@code null} if no child currently exists with that relative path name.
+     * a non null return value indicates child already exists and no modification occured.
+     */
+    public Versioned<V> addChild(String name, V newValue, long newVersion) {
+        TestDocumentTreeNode<V> child = (TestDocumentTreeNode<V>) children.get(name);
+        if (child != null) {
+            return child.value();
+        }
+        children.put(name, new TestDocumentTreeNode<>(new DocumentPath(name, path()), newValue, newVersion, this));
+        return null;
+    }
+
+    /**
+     * Updates the node value.
+     *
+     * @param newValue new value to set
+     * @param newVersion new version to set
+     * @return previous value
+     */
+    public Versioned<V> update(V newValue, long newVersion) {
+        Versioned<V> previousValue = value;
+        value = new Versioned<>(newValue, newVersion);
+        return previousValue;
+    }
+
+
+    /**
+     * Removes a child node.
+     *
+     * @param name the name of child node to be removed
+     * @return {@code true} if the child set was modified as a result of this call, {@code false} otherwise
+     */
+    public boolean removeChild(String name) {
+        return children.remove(name) != null;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.key);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof TestDocumentTreeNode) {
+            TestDocumentTreeNode<V> that = (TestDocumentTreeNode<V>) obj;
+            if (this.parent.equals(that.parent)) {
+                if (this.children.size() == that.children.size()) {
+                    return Sets.symmetricDifference(this.children.keySet(), that.children.keySet()).isEmpty();
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        MoreObjects.ToStringHelper helper =
+                MoreObjects.toStringHelper(getClass())
+                        .add("parent", this.parent)
+                        .add("key", this.key)
+                        .add("value", this.value);
+        for (DocumentTreeNode<V> child : children.values()) {
+            helper = helper.add("child", "\n" + child.path().pathElements()
+                    .get(child.path().pathElements().size() - 1) +
+                    " : " + child.value());
+        }
+        return helper.toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 0e3c8ea..7b6ae55 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,4 +1,4 @@
- /*
+/*
  * Copyright 2014-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,32 +16,29 @@
 package org.onosproject.store.flow.impl;
 
 import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.collect.Streams;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
-import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
@@ -67,42 +64,27 @@
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
-import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.flow.ReplicaInfoEvent;
-import org.onosproject.store.flow.ReplicaInfoEventListener;
-import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.MastershipBasedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTree;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.NoSuchDocumentPathException;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WallClockTimestamp;
-import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.FLOW_TABLE_BACKUP;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
+import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -116,39 +98,23 @@
 
     private final Logger log = getLogger(getClass());
 
+    // Constant exception used to indicate an atomic read-modify-write operation needs to be retried.
+    // We don't want to populate a stack trace every time an optimistic lock is retried.
+    private static final StorageException.ConcurrentModification RETRY;
+
+    // Initialize retry exception with an empty stack trace.
+    static {
+        RETRY = new StorageException.ConcurrentModification();
+        RETRY.setStackTrace(new StackTraceElement[0]);
+    }
+
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
-    private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
-    private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
-    private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
-    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-    // number of devices whose flow entries will be backed up in one communication round
-    private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
+    private static final int MAX_RETRY_DELAY_MILLIS = 50;
 
-    @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
-            label = "Number of threads in the message handler pool")
-    private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
+    private static final String FLOW_TABLE = "onos-flow-table";
 
-    @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
-            label = "Delay in ms between successive backup runs")
-    private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
-    @Property(name = "persistenceEnabled", boolValue = false,
-            label = "Indicates whether or not changes in the flow table should be persisted to disk.")
-    private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
-
-    @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
-            label = "Max number of backup copies for each device")
-    private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
-
-    private InternalFlowTable flowTable = new InternalFlowTable();
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ReplicaInfoService replicaInfoManager;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
+    private static final MessageSubject APPLY_FLOWS = new MessageSubject("onos-flow-apply");
+    private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
@@ -157,25 +123,13 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ComponentConfigService configService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected PersistenceService persistenceService;
+    protected ClusterCommunicationService clusterCommunicator;
 
-    private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
-    private ExecutorService messageHandlingExecutor;
-    private ExecutorService eventHandler;
-
-    private ScheduledFuture<?> backupTask;
-    private final ScheduledExecutorService backupSenderExecutor =
-            Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
-
-    private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
-    private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
-            new InternalTableStatsListener();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
@@ -186,31 +140,26 @@
             .register(KryoNamespaces.API)
             .register(MastershipBasedTimestamp.class);
 
+    private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
+    private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
+            new InternalTableStatsListener();
 
+    private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
+    private ExecutorService messageHandlingExecutor;
+
+    private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
     private IdGenerator idGenerator;
     private NodeId local;
 
     @Activate
-    public void activate(ComponentContext context) {
-        configService.registerProperties(getClass());
-
+    public void activate() {
         idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
 
         local = clusterService.getLocalNode().id();
 
-        eventHandler = Executors.newSingleThreadExecutor(
-                groupedThreads("onos/flow", "event-handler", log));
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
-
-        registerMessageHandlers(messageHandlingExecutor);
-
-        replicaInfoManager.addListener(flowTable);
-        backupTask = backupSenderExecutor.scheduleWithFixedDelay(
-                flowTable::backup,
-                0,
-                backupPeriod,
-                TimeUnit.MILLISECONDS);
+                MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                groupedThreads("onos/store/flow", "message-handlers", log));
 
         deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
                 .withName("onos-flow-table-stats")
@@ -221,110 +170,91 @@
                 .build();
         deviceTableStats.addListener(tableStatsListener);
 
-        logConfig("Started");
+        flows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
+                .withName(FLOW_TABLE)
+                .withSerializer(serializer)
+                .buildDocumentTree()
+                .asDocumentTree();
+
+        clusterCommunicator.addSubscriber(
+                APPLY_FLOWS,
+                serializer::decode,
+                this::applyFlows,
+                messageHandlingExecutor);
+        clusterCommunicator.addSubscriber(
+                COMPLETE_BATCH,
+                serializer::decode,
+                this::completeBatch,
+                messageHandlingExecutor);
+
+        log.info("Started");
     }
 
     @Deactivate
-    public void deactivate(ComponentContext context) {
-        replicaInfoManager.removeListener(flowTable);
-        backupTask.cancel(true);
-        configService.unregisterProperties(getClass(), false);
-        unregisterMessageHandlers();
+    public void deactivate() {
         deviceTableStats.removeListener(tableStatsListener);
         deviceTableStats.destroy();
-        eventHandler.shutdownNow();
+        clusterCommunicator.removeSubscriber(APPLY_FLOWS);
+        clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
         messageHandlingExecutor.shutdownNow();
-        backupSenderExecutor.shutdownNow();
         log.info("Stopped");
     }
 
-    @SuppressWarnings("rawtypes")
-    @Modified
-    public void modified(ComponentContext context) {
-        if (context == null) {
-            logConfig("Default config");
-            return;
-        }
-
-        Dictionary properties = context.getProperties();
-        int newPoolSize;
-        int newBackupPeriod;
-        int newBackupCount;
-        try {
-            String s = get(properties, "msgHandlerPoolSize");
-            newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
-
-            s = get(properties, "backupPeriod");
-            newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
-
-            s = get(properties, "backupCount");
-            newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
-        } catch (NumberFormatException | ClassCastException e) {
-            newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
-            newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
-            newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
-        }
-
-        boolean restartBackupTask = false;
-
-        if (newBackupPeriod != backupPeriod) {
-            backupPeriod = newBackupPeriod;
-            restartBackupTask = true;
-        }
-        if (restartBackupTask) {
-            if (backupTask != null) {
-                // cancel previously running task
-                backupTask.cancel(false);
-            }
-            backupTask = backupSenderExecutor.scheduleWithFixedDelay(
-                    flowTable::backup,
-                    0,
-                    backupPeriod,
-                    TimeUnit.MILLISECONDS);
-        }
-        if (newPoolSize != msgHandlerPoolSize) {
-            msgHandlerPoolSize = newPoolSize;
-            ExecutorService oldMsgHandler = messageHandlingExecutor;
-            messageHandlingExecutor = Executors.newFixedThreadPool(
-                    msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
-
-            // replace previously registered handlers.
-            registerMessageHandlers(messageHandlingExecutor);
-            oldMsgHandler.shutdown();
-        }
-        if (backupCount != newBackupCount) {
-            backupCount = newBackupCount;
-        }
-        logConfig("Reconfigured");
+    /**
+     * Retries the given supplier until successful.
+     * <p>
+     * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+     * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+     *
+     * @param supplier the supplier to retry
+     * @param <T> the return type
+     * @return the return value of the given supplier once it runs successfully
+     */
+    private <T> T retryUntilSuccess(Supplier<T> supplier) {
+        return Tools.retryable(
+                supplier,
+                StorageException.ConcurrentModification.class,
+                Integer.MAX_VALUE,
+                MAX_RETRY_DELAY_MILLIS)
+                .get();
     }
 
-    private void registerMessageHandlers(ExecutorService executor) {
-
-        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
-        clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
-                REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
-        clusterCommunicator.addSubscriber(
-                GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
-        clusterCommunicator.addSubscriber(
-                GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
-        clusterCommunicator.addSubscriber(
-                REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
-        clusterCommunicator.addSubscriber(
-                FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
+    /**
+     * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
+     * retried after a randomized delay.
+     *
+     * @param <T> the return type
+     * @return nothing
+     * @throws StorageException.ConcurrentModification to force a retry of the callback
+     */
+    private <T> T retry() {
+        throw RETRY;
     }
 
-    private void unregisterMessageHandlers() {
-        clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
-        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
-        clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
-        clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
-        clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
-        clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
+    /**
+     * Handles a flow rule batch event forwarded to the master node.
+     * <p>
+     * If this node is the master for the associated device, notifies event listeners to install flow rules.
+     *
+     * @param event the event to handle
+     */
+    private void applyFlows(FlowRuleBatchEvent event) {
+        if (mastershipService.isLocalMaster(event.deviceId())) {
+            notifyDelegate(event);
+        }
     }
 
-    private void logConfig(String prefix) {
-        log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
-                 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
+    /**
+     * Handles a completed batch event received from the master node.
+     * <p>
+     * If this node is the source of the batch, notifies event listeners to complete the operations.
+     *
+     * @param event the event to handle
+     */
+    private void completeBatch(FlowRuleBatchEvent event) {
+        if (pendingBatches.remove(event.subject().batchId())) {
+            notifyDelegate(event);
+        }
     }
 
     // This is not a efficient operation on a distributed sharded
@@ -333,63 +263,60 @@
     @Override
     public int getFlowRuleCount() {
         return Streams.stream(deviceService.getDevices()).parallel()
-                        .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
-                        .sum();
+                .mapToInt(device -> Iterables.size(getFlowEntries(device.id())))
+                .sum();
+    }
+
+    /**
+     * Returns the {@link DocumentPath} for the given {@link DeviceId}.
+     *
+     * @param deviceId the device identifier for which to return a path
+     * @return the path for the given device
+     */
+    private DocumentPath getPathFor(DeviceId deviceId) {
+        return DocumentPath.from("root", deviceId.toString());
+    }
+
+    /**
+     * Returns the {@link DocumentPath} for the given {@link DeviceId} and {@link FlowId}.
+     *
+     * @param deviceId the device identifier for which to return the path
+     * @param flowId the flow identifier for which to return the path
+     * @return the path for the given device/flow
+     */
+    private DocumentPath getPathFor(DeviceId deviceId, FlowId flowId) {
+        return DocumentPath.from("root", deviceId.toString(), flowId.toString());
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public FlowEntry getFlowEntry(FlowRule rule) {
-        NodeId master = mastershipService.getMasterFor(rule.deviceId());
-
-        if (master == null) {
-            log.debug("Failed to getFlowEntry: No master for {}", rule.deviceId());
-            return null;
-        }
-
-        if (Objects.equals(local, master)) {
-            return flowTable.getFlowEntry(rule);
-        }
-
-        log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
-                  master, rule.deviceId());
-
-        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
-                                    FlowStoreMessageSubjects.GET_FLOW_ENTRY,
-                                    serializer::encode,
-                                    serializer::decode,
-                                    master),
-                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
-                               TimeUnit.MILLISECONDS,
-                               null);
+        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+        Versioned<Map<StoredFlowEntry, StoredFlowEntry>> flowEntries = flows.get(path);
+        return flowEntries != null ? flowEntries.value().get(rule) : null;
     }
 
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
-        NodeId master = mastershipService.getMasterFor(deviceId);
-
-        if (master == null) {
-            log.debug("Failed to getFlowEntries: No master for {}", deviceId);
+        DocumentPath path = getPathFor(deviceId);
+        try {
+            return getFlowEntries(path);
+        } catch (NoSuchDocumentPathException e) {
             return Collections.emptyList();
         }
+    }
 
-        if (Objects.equals(local, master)) {
-            return flowTable.getFlowEntries(deviceId);
-        }
-
-        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                  master, deviceId);
-
-        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
-                                    FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
-                                    serializer::encode,
-                                    serializer::decode,
-                                    master),
-                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
-                               TimeUnit.MILLISECONDS,
-                               Collections.emptyList());
+    @SuppressWarnings("unchecked")
+    private Iterable<FlowEntry> getFlowEntries(DocumentPath path) {
+        return flows.getChildren(path)
+                .values()
+                .stream()
+                .flatMap(v -> v.value().values().stream())
+                .collect(Collectors.toList());
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void storeFlowRule(FlowRule rule) {
         storeBatch(new FlowRuleBatchOperation(
                 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
@@ -419,73 +346,39 @@
             return;
         }
 
-        if (Objects.equals(local, master)) {
-            storeBatchInternal(operation);
-            return;
-        }
+        pendingBatches.add(operation.id());
 
-        log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
-                  master, deviceId);
-
-        clusterCommunicator.unicast(operation,
-                                    APPLY_BATCH_FLOWS,
-                                    serializer::encode,
-                                    master)
-                           .whenComplete((result, error) -> {
-                               if (error != null) {
-                                   log.warn("Failed to storeBatch: {} to {}", operation, master, error);
-
-                                   Set<FlowRule> allFailures = operation.getOperations()
-                                           .stream()
-                                           .map(op -> op.target())
-                                           .collect(Collectors.toSet());
-
-                                   notifyDelegate(FlowRuleBatchEvent.completed(
-                                           new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                                           new CompletedBatchOperation(false, allFailures, deviceId)));
-                               }
-                           });
-    }
-
-    private void storeBatchInternal(FlowRuleBatchOperation operation) {
-
-        final DeviceId did = operation.deviceId();
-        //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
         Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
         if (currentOps.isEmpty()) {
             batchOperationComplete(FlowRuleBatchEvent.completed(
                     new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), did)));
-            return;
+                    new CompletedBatchOperation(true, Collections.emptySet(), deviceId)));
+        } else if (Objects.equals(local, master)) {
+            notifyDelegate(FlowRuleBatchEvent.requested(
+                    new FlowRuleBatchRequest(operation.id(), currentOps),
+                    operation.deviceId()));
+        } else {
+            clusterCommunicator.unicast(FlowRuleBatchEvent.requested(
+                    new FlowRuleBatchRequest(operation.id(), currentOps),
+                    operation.deviceId()),
+                    APPLY_FLOWS,
+                    serializer::encode,
+                    master);
         }
-
-        notifyDelegate(FlowRuleBatchEvent.requested(new
-                           FlowRuleBatchRequest(operation.id(),
-                                                currentOps), operation.deviceId()));
     }
 
     private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
         return operation.getOperations().stream().map(
                 op -> {
-                    StoredFlowEntry entry;
                     switch (op.operator()) {
                         case ADD:
-                            entry = new DefaultFlowEntry(op.target());
-                            // always add requested FlowRule
-                            // Note: 2 equal FlowEntry may have different treatment
-                            flowTable.remove(entry.deviceId(), entry);
-                            flowTable.add(entry);
-
+                            addBatchEntry(op);
                             return op;
                         case REMOVE:
-                            entry = flowTable.getFlowEntry(op.target());
-                            if (entry != null) {
-                                //FIXME modification of "stored" flow entry outside of flow table
-                                entry.setState(FlowEntryState.PENDING_REMOVE);
-                                log.debug("Setting state of rule to pending remove: {}", entry);
+                            if (removeBatchEntry(op)) {
                                 return op;
                             }
-                            break;
+                            return null;
                         case MODIFY:
                             //TODO: figure this out at some point
                             break;
@@ -497,6 +390,66 @@
         ).filter(Objects::nonNull).collect(Collectors.toSet());
     }
 
+    @SuppressWarnings("unchecked")
+    private void addBatchEntry(FlowRuleBatchEntry batchEntry) {
+        StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
+        DocumentPath path = getPathFor(entry.deviceId(), entry.id());
+        retryUntilSuccess(() -> {
+            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
+            if (value != null) {
+                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                entries.put(entry, entry);
+                if (flows.replace(path, entries, value.version())) {
+                    log.trace("Stored new flow rule: {}", entry);
+                    return null;
+                } else {
+                    log.trace("Failed to store new flow rule: {}", entry);
+                    return retry();
+                }
+            } else {
+                // If there are no entries stored for the device, initialize the device's flows.
+                flows.createRecursive(path, Maps.newHashMap());
+                return retry();
+            }
+        });
+    }
+
+    @SuppressWarnings("unchecked")
+    private boolean removeBatchEntry(FlowRuleBatchEntry batchEntry) {
+        FlowRule rule = batchEntry.target();
+        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+        return retryUntilSuccess(() -> {
+            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
+            if (value != null) {
+                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                StoredFlowEntry entry = entries.get(rule);
+                if (entry != null) {
+                    entry.setState(FlowEntryState.PENDING_REMOVE);
+                    if (flows.replace(path, entries, value.version())) {
+                        log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
+                        return true;
+                    } else {
+                        log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
+                        return retry();
+                    }
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+        });
+    }
+
+    @Override
+    public void batchOperationComplete(FlowRuleBatchEvent event) {
+        if (pendingBatches.remove(event.subject().batchId())) {
+            notifyDelegate(event);
+        } else {
+            clusterCommunicator.broadcast(event, COMPLETE_BATCH, serializer::encode);
+        }
+    }
+
     @Override
     public void deleteFlowRule(FlowRule rule) {
         storeBatch(
@@ -509,426 +462,142 @@
 
     @Override
     public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
-        if (mastershipService.isLocalMaster(rule.deviceId())) {
-            StoredFlowEntry stored = flowTable.getFlowEntry(rule);
-            if (stored != null &&
-                    stored.state() != FlowEntryState.PENDING_ADD) {
-                stored.setState(FlowEntryState.PENDING_ADD);
-                return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+        return retryUntilSuccess(() -> {
+            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
+            if (value != null) {
+                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                StoredFlowEntry entry = entries.get(rule);
+                if (entry != null && entry.state() != FlowEntryState.PENDING_ADD) {
+                    entry.setState(FlowEntryState.PENDING_ADD);
+                    if (flows.replace(path, entries, value.version())) {
+                        log.trace("Updated flow rule state to PENDING_ADD: {}", entry);
+                        return new FlowRuleEvent(RULE_UPDATED, rule);
+                    } else {
+                        log.trace("Failed to update flow rule state to PENDING_ADD: {}", entry);
+                        return retry();
+                    }
+                } else {
+                    return null;
+                }
+            } else {
+                return null;
             }
-        }
-        return null;
+        });
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
-        NodeId master = mastershipService.getMasterFor(rule.deviceId());
-        if (Objects.equals(local, master)) {
-            return addOrUpdateFlowRuleInternal(rule);
-        }
+        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+        return retryUntilSuccess(() -> {
+            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
+            if (value != null) {
+                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                StoredFlowEntry entry = entries.get(rule);
+                if (entry != null) {
+                    FlowRuleEvent event;
+                    String message;
 
-        log.warn("Tried to update FlowRule {} state,"
-                         + " while the Node was not the master.", rule);
-        return null;
-    }
+                    entry.setBytes(rule.bytes());
+                    entry.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+                    entry.setLiveType(rule.liveType());
+                    entry.setPackets(rule.packets());
+                    entry.setLastSeen();
 
-    private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
-        // check if this new rule is an update to an existing entry
-        StoredFlowEntry stored = flowTable.getFlowEntry(rule);
-        if (stored != null) {
-            //FIXME modification of "stored" flow entry outside of flow table
-            stored.setBytes(rule.bytes());
-            stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
-            stored.setLiveType(rule.liveType());
-            stored.setPackets(rule.packets());
-            stored.setLastSeen();
-            if (stored.state() == FlowEntryState.PENDING_ADD) {
-                stored.setState(FlowEntryState.ADDED);
-                return new FlowRuleEvent(Type.RULE_ADDED, rule);
+                    // If the entry state is PENDING_ADD, set it to ADDED. Otherwise, just update the rule.
+                    if (entry.state() == FlowEntryState.PENDING_ADD) {
+                        entry.setState(FlowEntryState.ADDED);
+                        event = new FlowRuleEvent(Type.RULE_ADDED, rule);
+                        message = "Updated flow rule state to ADDED: {}";
+                    } else {
+                        event = new FlowRuleEvent(Type.RULE_UPDATED, rule);
+                        message = "Updated flow rule: {}";
+                    }
+
+                    if (flows.replace(path, entries, value.version())) {
+                        log.trace(message, entry);
+                        return event;
+                    } else {
+                        log.trace("Failed to update flow rule: {}", entry);
+                        return retry();
+                    }
+                } else {
+                    // If the rule does not exist, return null. Inserting the rule risks race conditions
+                    // that can result in removed rules being retained.
+                    return null;
+                }
+            } else {
+                // If there are no entries stored for the device, initialize the device's flows.
+                flows.createRecursive(path, Maps.newHashMap());
+                return retry();
             }
-            return new FlowRuleEvent(Type.RULE_UPDATED, rule);
-        }
-
-        // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
-        // TODO: also update backup if the behavior is correct.
-        flowTable.add(rule);
-        return null;
+        });
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public FlowRuleEvent removeFlowRule(FlowEntry rule) {
-        final DeviceId deviceId = rule.deviceId();
-        NodeId master = mastershipService.getMasterFor(deviceId);
-
-        if (Objects.equals(local, master)) {
-            // bypass and handle it locally
-            return removeFlowRuleInternal(rule);
-        }
-
-        if (master == null) {
-            log.warn("Failed to removeFlowRule: No master for {}", deviceId);
-            // TODO: revisit if this should be null (="no-op") or Exception
-            return null;
-        }
-
-        log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
-                  master, deviceId);
-
-        return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
-                               rule,
-                               REMOVE_FLOW_ENTRY,
-                               serializer::encode,
-                               serializer::decode,
-                               master));
-    }
-
-    private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
-        final DeviceId deviceId = rule.deviceId();
-        // This is where one could mark a rule as removed and still keep it in the store.
-        final FlowEntry removed = flowTable.remove(deviceId, rule);
-        // rule may be partial rule that is missing treatment, we should use rule from store instead
-        return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
+        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+        return retryUntilSuccess(() -> {
+            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
+            if (value != null) {
+                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                StoredFlowEntry entry = entries.remove(rule);
+                if (entry != null) {
+                    if (flows.replace(path, entries, value.version())) {
+                        log.trace("Removed flow rule: {}", entry);
+                        return new FlowRuleEvent(RULE_REMOVED, entry);
+                    } else {
+                        log.trace("Failed to remove flow rule: {}", entry);
+                        return retry();
+                    }
+                } else {
+                    return null;
+                }
+            } else {
+                return null;
+            }
+        });
     }
 
     @Override
     public void purgeFlowRule(DeviceId deviceId) {
-        flowTable.purgeFlowRule(deviceId);
+        DocumentPath path = getPathFor(deviceId);
+        try {
+            for (String flowId : flows.getChildren(path).keySet()) {
+                flows.removeNode(DocumentPath.from("root", deviceId.toString(), flowId));
+            }
+        } catch (NoSuchDocumentPathException e) {
+            // Do nothing
+        }
+        try {
+            flows.removeNode(path);
+        } catch (NoSuchDocumentPathException e) {
+            // Do nothing
+        }
     }
 
     @Override
     public void purgeFlowRules() {
-        flowTable.purgeFlowRules();
-    }
-
-    @Override
-    public void batchOperationComplete(FlowRuleBatchEvent event) {
-        //FIXME: need a per device pending response
-        NodeId nodeId = pendingResponses.remove(event.subject().batchId());
-        if (nodeId == null) {
-            notifyDelegate(event);
-        } else {
-            // TODO check unicast return value
-            clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
-            //error log: log.warn("Failed to respond to peer for batch operation result");
-        }
-    }
-
-    private final class OnStoreBatch implements ClusterMessageHandler {
-
-        @Override
-        public void handle(final ClusterMessage message) {
-            FlowRuleBatchOperation operation = serializer.decode(message.payload());
-            log.debug("received batch request {}", operation);
-
-            final DeviceId deviceId = operation.deviceId();
-            NodeId master = mastershipService.getMasterFor(deviceId);
-            if (!Objects.equals(local, master)) {
-                Set<FlowRule> failures = new HashSet<>(operation.size());
-                for (FlowRuleBatchEntry op : operation.getOperations()) {
-                    failures.add(op.target());
-                }
-                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
-                // This node is no longer the master, respond as all failed.
-                // TODO: we might want to wrap response in envelope
-                // to distinguish sw programming failure and hand over
-                // it make sense in the latter case to retry immediately.
-                message.respond(serializer.encode(allFailed));
-                return;
+        try {
+            for (String deviceId : flows.getChildren(flows.root()).keySet()) {
+                purgeFlowRule(DeviceId.deviceId(deviceId));
             }
-
-            pendingResponses.put(operation.id(), message.sender());
-            storeBatchInternal(operation);
-        }
-    }
-
-    private class BackupOperation {
-        private final NodeId nodeId;
-        private final DeviceId deviceId;
-
-        public BackupOperation(NodeId nodeId, DeviceId deviceId) {
-            this.nodeId = nodeId;
-            this.deviceId = deviceId;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(nodeId, deviceId);
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (other != null && other instanceof BackupOperation) {
-                BackupOperation that = (BackupOperation) other;
-                return this.nodeId.equals(that.nodeId) &&
-                        this.deviceId.equals(that.deviceId);
-            } else {
-                return false;
-            }
-        }
-    }
-
-    private class InternalFlowTable implements ReplicaInfoEventListener {
-
-        //TODO replace the Map<V,V> with ExtendedSet
-        private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
-                flowEntries = Maps.newConcurrentMap();
-
-        private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
-        private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
-
-        @Override
-        public void event(ReplicaInfoEvent event) {
-            eventHandler.execute(() -> handleEvent(event));
-        }
-
-        private void handleEvent(ReplicaInfoEvent event) {
-            DeviceId deviceId = event.subject();
-            if (!mastershipService.isLocalMaster(deviceId)) {
-                return;
-            }
-            if (event.type() == MASTER_CHANGED) {
-                lastUpdateTimes.put(deviceId, System.currentTimeMillis());
-            }
-            backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
-        }
-
-        private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
-            // split up the devices into smaller batches and send them separately.
-            Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
-                     .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
-        }
-
-        private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
-            if (deviceIds.isEmpty()) {
-                return;
-            }
-            log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
-            Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
-                    deviceFlowEntries = Maps.newConcurrentMap();
-            deviceIds.forEach(id -> deviceFlowEntries.put(id, getFlowTableCopy(id)));
-            clusterCommunicator.<Map<DeviceId,
-                                 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
-                                 Set<DeviceId>>
-                    sendAndReceive(deviceFlowEntries,
-                                   FLOW_TABLE_BACKUP,
-                                   serializer::encode,
-                                   serializer::decode,
-                                   nodeId)
-                    .whenComplete((backedupDevices, error) -> {
-                        Set<DeviceId> devicesNotBackedup = error != null ?
-                            deviceFlowEntries.keySet() :
-                            Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
-                        if (devicesNotBackedup.size() > 0) {
-                            log.warn("Failed to backup devices: {}. Reason: {}, Node: {}",
-                                     devicesNotBackedup, error != null ? error.getMessage() : "none",
-                                     nodeId);
-                        }
-                        if (backedupDevices != null) {
-                            backedupDevices.forEach(id -> {
-                                lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
-                            });
-                        }
-                    });
-        }
-
-        /**
-         * Returns the flow table for specified device.
-         *
-         * @param deviceId identifier of the device
-         * @return Map representing Flow Table of given device.
-         */
-        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
-            if (persistenceEnabled) {
-                return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
-                        .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
-                        .withName("FlowTable:" + deviceId.toString())
-                        .withSerializer(new Serializer() {
-                            @Override
-                            public <T> byte[] encode(T object) {
-                                return serializer.encode(object);
-                            }
-
-                            @Override
-                            public <T> T decode(byte[] bytes) {
-                                return serializer.decode(bytes);
-                            }
-
-                            @Override
-                            public <T> T copy(T object) {
-                                return serializer.copy(object);
-                            }
-                        })
-                        .build());
-            } else {
-                return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
-            }
-        }
-
-        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
-            Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
-            if (persistenceEnabled) {
-                return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
-                        .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
-                        .withName("FlowTable:" + deviceId.toString())
-                        .withSerializer(new Serializer() {
-                            @Override
-                            public <T> byte[] encode(T object) {
-                                return serializer.encode(object);
-                            }
-
-                            @Override
-                            public <T> T decode(byte[] bytes) {
-                                return serializer.decode(bytes);
-                            }
-
-                            @Override
-                            public <T> T copy(T object) {
-                                return serializer.copy(object);
-                            }
-                        })
-                        .build());
-            } else {
-                flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
-                    copy.put(k, Maps.newHashMap(v));
-                });
-                return copy;
-            }
-        }
-
-        private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
-            return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
-        }
-
-        private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
-            return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
-        }
-
-        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
-            return getFlowTable(deviceId).values().stream()
-                        .flatMap(m -> m.values().stream())
-                        .collect(Collectors.toSet());
-        }
-
-        public StoredFlowEntry getFlowEntry(FlowRule rule) {
-            return getFlowEntryInternal(rule);
-        }
-
-        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
-            return getFlowEntriesInternal(deviceId);
-        }
-
-        public void add(FlowEntry rule) {
-            getFlowEntriesInternal(rule.deviceId(), rule.id())
-                    .compute((StoredFlowEntry) rule, (k, stored) -> {
-                        //TODO compare stored and rule timestamps
-                        //TODO the key is not updated
-                        return (StoredFlowEntry) rule;
-                    });
-            lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
-        }
-
-        public FlowEntry remove(DeviceId deviceId, FlowEntry rule) {
-            final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
-            getFlowEntriesInternal(rule.deviceId(), rule.id())
-                .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
-                    if (rule instanceof DefaultFlowEntry) {
-                        DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
-                        if (stored instanceof DefaultFlowEntry) {
-                            DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
-                            if (toRemove.created() < storedEntry.created()) {
-                                log.debug("Trying to remove more recent flow entry {} (stored: {})",
-                                          toRemove, stored);
-                                // the key is not updated, removedRule remains null
-                                return stored;
-                            }
-                        }
-                    }
-                    removedRule.set(stored);
-                    return null;
-                });
-
-            if (removedRule.get() != null) {
-                lastUpdateTimes.put(deviceId, System.currentTimeMillis());
-                return removedRule.get();
-            } else {
-                return null;
-            }
-        }
-
-        public void purgeFlowRule(DeviceId deviceId) {
-            flowEntries.remove(deviceId);
-        }
-
-        public void purgeFlowRules() {
-            flowEntries.clear();
-        }
-
-        private List<NodeId> getBackupNodes(DeviceId deviceId) {
-            // The returned backup node list is in the order of preference i.e. next likely master first.
-            List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            return ImmutableList.copyOf(allPossibleBackupNodes)
-                                .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
-        }
-
-        private void backup() {
-            try {
-                // compute a mapping from node to the set of devices whose flow entries it should backup
-                Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
-                flowEntries.keySet().forEach(deviceId -> {
-                    List<NodeId> backupNodes = getBackupNodes(deviceId);
-                    backupNodes.forEach(backupNode -> {
-                            if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
-                                    < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
-                                devicesToBackupByNode.computeIfAbsent(backupNode,
-                                                                      nodeId -> Sets.newHashSet()).add(deviceId);
-                            }
-                    });
-                });
-                // send the device flow entries to their respective backup nodes
-                devicesToBackupByNode.forEach(this::sendBackups);
-            } catch (Exception e) {
-                log.error("Backup failed.", e);
-            }
-        }
-
-        private Set<DeviceId> onBackupReceipt(Map<DeviceId,
-                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) {
-            log.debug("Received flowEntries for {} to backup", flowTables.keySet());
-            Set<DeviceId> backedupDevices = Sets.newHashSet();
-            try {
-                flowTables.forEach((deviceId, deviceFlowTable) -> {
-                    // Only process those devices are that not managed by the local node.
-                    if (!Objects.equals(local, mastershipService.getMasterFor(deviceId))) {
-                        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
-                                getFlowTable(deviceId);
-                        backupFlowTable.clear();
-                        backupFlowTable.putAll(deviceFlowTable);
-                        backedupDevices.add(deviceId);
-                    }
-                });
-            } catch (Exception e) {
-                log.warn("Failure processing backup request", e);
-            }
-            return backedupDevices;
+        } catch (NoSuchDocumentPathException e) {
+            // Do nothing
         }
     }
 
     @Override
     public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
-                                               List<TableStatisticsEntry> tableStats) {
+            List<TableStatisticsEntry> tableStats) {
         deviceTableStats.put(deviceId, tableStats);
         return null;
     }
 
     @Override
     public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
-        NodeId master = mastershipService.getMasterFor(deviceId);
-
-        if (master == null) {
-            log.debug("Failed to getTableStats: No master for {}", deviceId);
-            return Collections.emptyList();
-        }
-
         List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
         if (tableStats == null) {
             return Collections.emptyList();
@@ -944,11 +613,11 @@
     }
 
     private class InternalTableStatsListener
-        implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
+            implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
         @Override
         public void event(EventuallyConsistentMapEvent<DeviceId,
-                          List<TableStatisticsEntry>> event) {
+                List<TableStatisticsEntry>> event) {
             //TODO: Generate an event to listeners (do we need?)
         }
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
deleted file mode 100644
index 91ab0aa..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowStoreMessageSubjects.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2014-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.flow.impl;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-/**
- * MessageSubjects used by DistributedFlowRuleStore peer-peer communication.
- */
-public final class FlowStoreMessageSubjects {
-    private FlowStoreMessageSubjects() {}
-
-    public static final  MessageSubject APPLY_BATCH_FLOWS
-        = new MessageSubject("peer-forward-apply-batch");
-
-    public static final MessageSubject GET_FLOW_ENTRY
-        = new MessageSubject("peer-forward-get-flow-entry");
-
-    public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
-        = new MessageSubject("peer-forward-get-device-flow-entries");
-
-    public static final MessageSubject REMOVE_FLOW_ENTRY
-        = new MessageSubject("peer-forward-remove-flow-entry");
-
-    public static final MessageSubject REMOTE_APPLY_COMPLETED
-        = new MessageSubject("peer-apply-completed");
-
-    public static final MessageSubject FLOW_TABLE_BACKUP
-        = new MessageSubject("peer-flow-table-backup");
-}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
index af4b2f0..d49f1cc 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
@@ -15,36 +15,41 @@
  */
 package org.onosproject.store.flow.impl;
 
+import java.util.Collections;
+import java.util.Iterator;
+
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
-import org.onosproject.cfg.ComponentConfigAdapter;
-import org.onosproject.cluster.NodeId;
+import org.onlab.packet.Ip4Address;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.mastership.MastershipServiceAdapter;
-import org.onosproject.net.device.DeviceServiceAdapter;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
-import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.DefaultFlowEntry;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleOperation;
 import org.onosproject.net.intent.IntentTestsMocks;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
-import org.onosproject.store.persistence.PersistenceServiceAdapter;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.AsyncDocumentTreeAdapter;
+import org.onosproject.store.service.DocumentTree;
+import org.onosproject.store.service.DocumentTreeBuilder;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TestDocumentTree;
 import org.onosproject.store.service.TestStorageService;
-
-import org.onlab.packet.Ip4Address;
-import java.util.Iterator;
-import org.osgi.service.component.ComponentContext;
+import org.onosproject.store.service.TestTopic;
+import org.onosproject.store.service.Topic;
 
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
@@ -63,7 +68,6 @@
 public class DistributedFlowRuleStoreTest {
 
     DistributedFlowRuleStore flowStoreImpl;
-    ComponentContext context = null;
     private ClusterService mockClusterService;
     private ControllerNode mockControllerNode;
 
@@ -129,11 +133,39 @@
         }
     }
 
+    private static class MockStorageService extends TestStorageService {
+        @Override
+        public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+            return new DocumentTreeBuilder<V>() {
+                @Override
+                public AsyncDocumentTree<V> buildDocumentTree() {
+                    return build();
+                }
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public AsyncDocumentTree<V> build() {
+                    String name = name();
+                    return new AsyncDocumentTreeAdapter() {
+                        @Override
+                        public DocumentTree asDocumentTree() {
+                            return new TestDocumentTree(name);
+                        }
+                    };
+                }
+            };
+        }
+
+        @Override
+        public <T> Topic<T> getTopic(String name, Serializer serializer) {
+            return new TestTopic<>(name);
+        }
+    }
+
     @Before
     public void setUp() throws Exception {
         flowStoreImpl = new DistributedFlowRuleStore();
-        flowStoreImpl.storageService = new TestStorageService();
-        flowStoreImpl.replicaInfoManager = new ReplicaInfoManager();
+        flowStoreImpl.storageService = new MockStorageService();
         mockClusterService = createMock(ClusterService.class);
         flowStoreImpl.clusterService = mockClusterService;
         nodeId = new NodeId("1");
@@ -147,14 +179,12 @@
         flowStoreImpl.mastershipService = new MasterOfAll();
         flowStoreImpl.deviceService = new DeviceServiceAdapter();
         flowStoreImpl.coreService = new CoreServiceAdapter();
-        flowStoreImpl.configService = new ComponentConfigAdapter();
-        flowStoreImpl.persistenceService = new PersistenceServiceAdapter();
-        flowStoreImpl.activate(context);
+        flowStoreImpl.activate();
     }
 
     @After
     public void tearDown() throws Exception {
-        flowStoreImpl.deactivate(context);
+        flowStoreImpl.deactivate();
     }
 
     /**
@@ -239,10 +269,15 @@
     @Test
     public void testPurgeFlow() {
         FlowEntry flowEntry = new DefaultFlowEntry(flowRule);
-        flowStoreImpl.addOrUpdateFlowRule(flowEntry);
+        flowStoreImpl.storeBatch(new FlowRuleBatchOperation(
+                Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD, flowEntry)),
+                flowEntry.deviceId(), 1));
 
         FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
-        flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
+        flowStoreImpl.storeBatch(new FlowRuleBatchOperation(
+                Collections.singletonList(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD, flowEntry1)),
+                flowEntry1.deviceId(), 2));
+
         Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
         int sum2 = 0;
         Iterator it1 = flows1.iterator();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..ed46a96 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -247,8 +247,8 @@
         AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
                 .withName(name)
                 .withServiceType(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), ordering))
-                .withReadConsistency(ReadConsistency.SEQUENTIAL)
-                .withCommunicationStrategy(CommunicationStrategy.ANY)
+                .withReadConsistency(ReadConsistency.LINEARIZABLE)
+                .withCommunicationStrategy(CommunicationStrategy.LEADER)
                 .withTimeout(Duration.ofSeconds(30))
                 .withMaxRetries(5)
                 .build()