Refactor flow rule store to resolve missing flows with old masters following mastership reassignment

Change-Id: I7b7e639c84cbd23fe9ab1f12080f865cdfc7f7f9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
new file mode 100644
index 0000000..7034118
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Identifier representing a backup of a distinct bucket to a specific node.
+ */
+public class BackupOperation {
+    private final NodeId nodeId;
+    private final int bucketId;
+
+    BackupOperation(NodeId nodeId, int bucketId) {
+        this.nodeId = nodeId;
+        this.bucketId = bucketId;
+    }
+
+    /**
+     * Returns the node identifier.
+     *
+     * @return the node identifier
+     */
+    public NodeId nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns the bucket identifier.
+     *
+     * @return the bucket identifier
+     */
+    public int bucket() {
+        return bucketId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(nodeId, bucketId);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other != null && other instanceof BackupOperation) {
+            BackupOperation that = (BackupOperation) other;
+            return this.nodeId.equals(that.nodeId)
+                && this.bucketId == that.bucketId;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+            .add("nodeId", nodeId())
+            .add("bucket", bucket())
+            .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
new file mode 100644
index 0000000..33cc304
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.net.DeviceId;
+
+/**
+ * Represents a distinct device flow bucket.
+ */
+public class BucketId {
+    private final DeviceId deviceId;
+    private final int bucket;
+
+    BucketId(DeviceId deviceId, int bucket) {
+        this.deviceId = deviceId;
+        this.bucket = bucket;
+    }
+
+    /**
+     * Returns the bucket device identifier.
+     *
+     * @return the device identifier
+     */
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    /**
+     * Returns the bucket number.
+     *
+     * @return the bucket number
+     */
+    public int bucket() {
+        return bucket;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(deviceId, bucket);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof BucketId) {
+            BucketId that = (BucketId) other;
+            return this.deviceId.equals(that.deviceId)
+                && this.bucket == that.bucket;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s/%d", deviceId, bucket);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
new file mode 100644
index 0000000..6bc5b5f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -0,0 +1,822 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flow table for all flows associated with a specific device.
+ * <p>
+ * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
+ * table performs communication independent of other device flow tables for more parallelism.
+ * <p>
+ * This implementation uses several different replication protocols. Changes that occur on the device master are
+ * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
+ * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
+ * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
+ * device, allowing mastership to be reassigned to non-backup nodes.
+ */
+public class DeviceFlowTable {
+    private static final int NUM_BUCKETS = 1024;
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+        .register(KryoNamespaces.API)
+        .register(BucketId.class)
+        .register(FlowBucket.class)
+        .register(FlowBucketDigest.class)
+        .register(LogicalTimestamp.class)
+        .register(Timestamped.class)
+        .build());
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final MessageSubject getDigestsSubject;
+    private final MessageSubject getBucketSubject;
+    private final MessageSubject backupSubject;
+
+    private final DeviceId deviceId;
+    private final ClusterCommunicationService clusterCommunicator;
+    private final LifecycleManager lifecycleManager;
+    private final ScheduledExecutorService executorService;
+    private final NodeId localNodeId;
+
+    private final LogicalClock clock = new LogicalClock();
+
+    private volatile DeviceReplicaInfo replicaInfo;
+    private volatile long activeTerm;
+
+    private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
+        @Override
+        public void event(LifecycleEvent event) {
+            executorService.execute(() -> onLifecycleEvent(event));
+        }
+    };
+
+    private ScheduledFuture<?> backupFuture;
+    private ScheduledFuture<?> antiEntropyFuture;
+
+    private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
+    private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
+
+    private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
+    private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
+
+    DeviceFlowTable(
+        DeviceId deviceId,
+        ClusterService clusterService,
+        ClusterCommunicationService clusterCommunicator,
+        LifecycleManager lifecycleManager,
+        ScheduledExecutorService executorService,
+        long backupPeriod,
+        long antiEntropyPeriod) {
+        this.deviceId = deviceId;
+        this.clusterCommunicator = clusterCommunicator;
+        this.lifecycleManager = lifecycleManager;
+        this.executorService = executorService;
+        this.localNodeId = clusterService.getLocalNode().id();
+
+        addListeners();
+
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+            flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
+        }
+
+        getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
+        getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
+        backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
+
+        setBackupPeriod(backupPeriod);
+        setAntiEntropyPeriod(antiEntropyPeriod);
+        registerSubscribers();
+
+        startTerm(lifecycleManager.getReplicaInfo());
+    }
+
+    /**
+     * Sets the flow table backup period.
+     *
+     * @param backupPeriod the flow table backup period in milliseconds
+     */
+    synchronized void setBackupPeriod(long backupPeriod) {
+        ScheduledFuture<?> backupFuture = this.backupFuture;
+        if (backupFuture != null) {
+            backupFuture.cancel(false);
+        }
+        this.backupFuture = executorService.scheduleAtFixedRate(
+            this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Sets the flow table anti-entropy period.
+     *
+     * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
+     */
+    synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
+        ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+        if (antiEntropyFuture != null) {
+            antiEntropyFuture.cancel(false);
+        }
+        this.antiEntropyFuture = executorService.scheduleAtFixedRate(
+            this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Counts the flows in the table.
+     *
+     * @return the total number of flows in the table
+     */
+    public int count() {
+        return flowBuckets.values().stream()
+            .mapToInt(FlowBucket::count)
+            .sum();
+    }
+
+    /**
+     * Returns the flow entry for the given rule.
+     *
+     * @param rule the rule for which to lookup the flow entry
+     * @return the flow entry for the given rule
+     */
+    public StoredFlowEntry getFlowEntry(FlowRule rule) {
+        return getBucket(rule.id())
+            .getFlowEntries(rule.id())
+            .get(rule);
+    }
+
+    /**
+     * Returns the set of flow entries in the table.
+     *
+     * @return the set of flow entries in the table
+     */
+    public Set<FlowEntry> getFlowEntries() {
+        return flowBuckets.values().stream()
+            .flatMap(bucket -> bucket.getFlowBucket().values().stream())
+            .flatMap(entries -> entries.values().stream())
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * Returns the bucket for the given flow identifier.
+     *
+     * @param flowId the flow identifier
+     * @return the bucket for the given flow identifier
+     */
+    private FlowBucket getBucket(FlowId flowId) {
+        return getBucket(bucket(flowId));
+    }
+
+    /**
+     * Returns the bucket with the given identifier.
+     *
+     * @param bucketId the bucket identifier
+     * @return the bucket with the given identifier
+     */
+    private FlowBucket getBucket(int bucketId) {
+        return flowBuckets.get(bucketId);
+    }
+
+    /**
+     * Returns the bucket number for the given flow identifier.
+     *
+     * @param flowId the flow identifier
+     * @return the bucket number for the given flow identifier
+     */
+    private int bucket(FlowId flowId) {
+        return Math.abs((int) (flowId.id() % NUM_BUCKETS));
+    }
+
+    /**
+     * Returns the digests for all buckets in the flow table for the device.
+     *
+     * @return the set of digests for all buckets for the device
+     */
+    private Set<FlowBucketDigest> getDigests() {
+        return flowBuckets.values()
+            .stream()
+            .map(bucket -> bucket.getDigest())
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * Returns the digest for the given bucket.
+     *
+     * @param bucket the bucket for which to return the digest
+     * @return the digest for the given bucket
+     */
+    private FlowBucketDigest getDigest(int bucket) {
+        return flowBuckets.get(bucket).getDigest();
+    }
+
+    /**
+     * Adds an entry to the table.
+     *
+     * @param rule the rule to add
+     * @return a future to be completed once the rule has been added
+     */
+    public CompletableFuture<Void> add(FlowEntry rule) {
+        return runInTerm(rule.id(), (bucket, term) -> {
+            bucket.add(rule, term, clock);
+            return null;
+        });
+    }
+
+    /**
+     * Updates an entry in the table.
+     *
+     * @param rule the rule to update
+     * @return a future to be completed once the rule has been updated
+     */
+    public CompletableFuture<Void> update(FlowEntry rule) {
+        return runInTerm(rule.id(), (bucket, term) -> {
+            bucket.update(rule, term, clock);
+            return null;
+        });
+    }
+
+    /**
+     * Applies the given update function to the rule.
+     *
+     * @param rule     the rule to update
+     * @param function the update function to apply
+     * @param <T>      the result type
+     * @return a future to be completed with the update result or {@code null} if the rule was not updated
+     */
+    public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+        return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
+    }
+
+    /**
+     * Removes an entry from the table.
+     *
+     * @param rule the rule to remove
+     * @return a future to be completed once the rule has been removed
+     */
+    public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
+        return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
+    }
+
+    /**
+     * Runs the given function in the current term.
+     *
+     * @param flowId   the flow identifier indicating the bucket in which to run the function
+     * @param function the function to execute in the current term
+     * @param <T>      the future result type
+     * @return a future to be completed with the function result once it has been run
+     */
+    private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return Tools.exceptionalFuture(new IllegalStateException());
+        }
+
+        FlowBucket bucket = getBucket(flowId);
+
+        // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+        // the change to be executed once the master has been synchronized.
+        final long term = replicaInfo.term();
+        if (activeTerm < term) {
+            log.debug("Enqueueing operation for device {}", deviceId);
+            synchronized (flowTasks) {
+                // Double checked lock on the active term.
+                if (activeTerm < term) {
+                    CompletableFuture<T> future = new CompletableFuture<>();
+                    flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+                        .add(() -> future.complete(function.apply(bucket, term)));
+                    return future;
+                }
+            }
+        }
+        return CompletableFuture.completedFuture(function.apply(bucket, term));
+    }
+
+    /**
+     * Backs up all buckets in the given device to the given node.
+     */
+    private void backup() {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+        // If the local node is not currently the master, skip the backup.
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return;
+        }
+
+        // Otherwise, iterate through backup nodes and backup the device.
+        for (NodeId nodeId : replicaInfo.backups()) {
+            try {
+                backup(nodeId, replicaInfo.term());
+            } catch (Exception e) {
+                log.error("Backup of " + deviceId + " to " + nodeId + " failed", e);
+            }
+        }
+    }
+
+    /**
+     * Backs up all buckets for the device to the given node.
+     *
+     * @param nodeId the node to which to back up the device
+     * @param term   the term for which to backup to the node
+     */
+    private void backup(NodeId nodeId, long term) {
+        for (FlowBucket bucket : flowBuckets.values()) {
+            // If the bucket is not in the current term, skip it. This forces synchronization of the bucket
+            // to occur prior to the new master replicating changes in the bucket to backups.
+            if (bucket.term() != term) {
+                continue;
+            }
+
+            // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
+            LogicalTimestamp timestamp = bucket.timestamp();
+
+            // If the backup can be run (no concurrent backup to the node in progress) then run it.
+            BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
+            if (startBackup(operation, timestamp)) {
+                backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+                    if (error != null) {
+                        log.debug("Backup operation {} failed", operation, error);
+                        failBackup(operation);
+                    } else if (succeeded) {
+                        succeedBackup(operation, timestamp);
+                        backup(nodeId, term);
+                    } else {
+                        log.debug("Backup operation {} failed: term mismatch", operation);
+                        failBackup(operation);
+                    }
+                }, executorService);
+            }
+        }
+    }
+
+    /**
+     * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
+     * <p>
+     * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
+     * are pending replication for the backup operation.
+     *
+     * @param operation the operation to start
+     * @param timestamp the timestamp for which to start the backup operation
+     * @return indicates whether the given backup operation should be started
+     */
+    private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+        LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
+        return timestamp != null
+            && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
+            && inFlightUpdates.add(operation);
+    }
+
+    /**
+     * Fails the given backup operation.
+     *
+     * @param operation the backup operation to fail
+     */
+    private void failBackup(BackupOperation operation) {
+        inFlightUpdates.remove(operation);
+    }
+
+    /**
+     * Succeeds the given backup operation.
+     * <p>
+     * The last backup time for the operation will be updated and the operation will be removed from
+     * in-flight updates.
+     *
+     * @param operation the operation to succeed
+     * @param timestamp the timestamp at which the operation was <em>started</em>
+     */
+    private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+        lastBackupTimes.put(operation, timestamp);
+        inFlightUpdates.remove(operation);
+    }
+
+    /**
+     * Resets the last completion time for the given backup operation to ensure it's replicated again.
+     *
+     * @param operation the backup operation to reset
+     */
+    private void resetBackup(BackupOperation operation) {
+        lastBackupTimes.remove(operation);
+    }
+
+    /**
+     * Performs the given backup operation.
+     *
+     * @param bucket the bucket to backup
+     * @param nodeId the node to which to backup the bucket
+     * @return a future to be completed with a boolean indicating whether the backup operation was successful
+     */
+    private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
+        if (log.isDebugEnabled()) {
+            log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
+        }
+        return sendWithTimestamp(bucket, backupSubject, nodeId);
+    }
+
+    /**
+     * Handles a flow bucket backup from a remote peer.
+     *
+     * @param flowBucket the flow bucket to back up
+     * @return the set of flows that could not be backed up
+     */
+    private boolean onBackup(FlowBucket flowBucket) {
+        if (log.isDebugEnabled()) {
+            log.debug("{} - Received {} flow entries in bucket {} to backup",
+                deviceId, flowBucket.count(), flowBucket.bucketId());
+        }
+
+        try {
+            DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+            // If the backup is for a different term, reject the request until we learn about the new term.
+            if (flowBucket.term() != replicaInfo.term()) {
+                log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
+                return false;
+            }
+
+            flowBuckets.compute(flowBucket.bucketId().bucket(),
+                (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+            return true;
+        } catch (Exception e) {
+            log.warn("Failure processing backup request", e);
+            return false;
+        }
+    }
+
+    /**
+     * Runs the anti-entropy protocol.
+     */
+    private void runAntiEntropy() {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return;
+        }
+
+        for (NodeId nodeId : replicaInfo.backups()) {
+            runAntiEntropy(nodeId);
+        }
+    }
+
+    /**
+     * Runs the anti-entropy protocol against the given peer.
+     *
+     * @param nodeId the node with which to execute the anti-entropy protocol
+     */
+    private void runAntiEntropy(NodeId nodeId) {
+        requestDigests(nodeId).thenAcceptAsync((digests) -> {
+            // Compute a set of missing BucketIds based on digest times and send them back to the master.
+            for (FlowBucketDigest remoteDigest : digests) {
+                FlowBucket localBucket = getBucket(remoteDigest.bucket());
+                if (localBucket.getDigest().isNewerThan(remoteDigest)) {
+                    log.debug("Detected missing flow entries on node {} in bucket {}/{}",
+                        nodeId, deviceId, remoteDigest.bucket());
+                    resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+                }
+            }
+        }, executorService);
+    }
+
+    /**
+     * Sends a digest request to the given node.
+     *
+     * @param nodeId the node to which to send the request
+     * @return future to be completed with the set of digests for the given device on the given node
+     */
+    private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
+        return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
+    }
+
+    /**
+     * Synchronizes flows from the previous master or backups.
+     *
+     * @param prevReplicaInfo the previous replica info
+     * @param newReplicaInfo  the new replica info
+     */
+    private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+        if (prevReplicaInfo == null) {
+            activateMaster(newReplicaInfo);
+        } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
+            syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
+        } else {
+            syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+        }
+    }
+
+    /**
+     * Synchronizes flows from the previous master, falling back to backups if the master fails.
+     *
+     * @param prevReplicaInfo the previous replica info
+     * @param newReplicaInfo  the new replica info
+     */
+    private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+        syncFlowsOn(prevReplicaInfo.master())
+            .whenCompleteAsync((result, error) -> {
+                if (error != null) {
+                    log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
+                    syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+                } else {
+                    activateMaster(newReplicaInfo);
+                }
+            }, executorService);
+    }
+
+    /**
+     * Synchronizes flows from the previous backups.
+     *
+     * @param prevReplicaInfo the previous replica info
+     * @param newReplicaInfo  the new replica info
+     */
+    private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+        List<NodeId> backups = prevReplicaInfo.backups()
+            .stream()
+            .filter(nodeId -> !nodeId.equals(localNodeId))
+            .collect(Collectors.toList());
+        syncFlowsOn(backups)
+            .whenCompleteAsync((result, error) -> {
+                if (error != null) {
+                    log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
+                }
+                activateMaster(newReplicaInfo);
+            }, executorService);
+    }
+
+    /**
+     * Synchronizes flows for the device on the given nodes.
+     *
+     * @param nodes the nodes via which to synchronize the flows
+     * @return a future to be completed once flows have been synchronizes
+     */
+    private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
+        return nodes.isEmpty()
+            ? CompletableFuture.completedFuture(null)
+            : Tools.firstOf(nodes.stream()
+            .map(node -> syncFlowsOn(node))
+            .collect(Collectors.toList()))
+            .thenApply(v -> null);
+    }
+
+    /**
+     * Synchronizes flows for the device from the given node.
+     *
+     * @param nodeId the node from which to synchronize flows
+     * @return a future to be completed once the flows have been synchronizes
+     */
+    private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
+        return requestDigests(nodeId)
+            .thenCompose(digests -> Tools.allOf(digests.stream()
+                .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
+                .map(digest -> syncBucketOn(nodeId, digest.bucket()))
+                .collect(Collectors.toList())))
+            .thenApply(v -> null);
+    }
+
+    /**
+     * Synchronizes the given bucket on the given node.
+     *
+     * @param nodeId       the node on which to synchronize the bucket
+     * @param bucketNumber the bucket to synchronize
+     * @return a future to be completed once the bucket has been synchronizes
+     */
+    private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
+        return requestBucket(nodeId, bucketNumber)
+            .thenAcceptAsync(flowBucket -> {
+                flowBuckets.compute(flowBucket.bucketId().bucket(),
+                    (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+            }, executorService);
+    }
+
+    /**
+     * Requests the given bucket from the given node.
+     *
+     * @param nodeId the node from which to request the bucket
+     * @param bucket the bucket to request
+     * @return a future to be completed with the bucket
+     */
+    private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
+        log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
+        return sendWithTimestamp(bucket, getBucketSubject, nodeId);
+    }
+
+    /**
+     * Handles a flow bucket request.
+     *
+     * @param bucket the bucket number
+     * @return the flow bucket
+     */
+    private FlowBucket onGetBucket(int bucket) {
+        return flowBuckets.get(bucket);
+    }
+
+    /**
+     * Activates the new master term.
+     *
+     * @param replicaInfo the new replica info
+     */
+    private void activateMaster(DeviceReplicaInfo replicaInfo) {
+        log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+            activateBucket(i);
+        }
+        lifecycleManager.activate(replicaInfo.term());
+        activeTerm = replicaInfo.term();
+    }
+
+    /**
+     * Activates the given bucket number.
+     *
+     * @param bucket the bucket number to activate
+     */
+    private void activateBucket(int bucket) {
+        Queue<Runnable> tasks;
+        synchronized (flowTasks) {
+            tasks = flowTasks.remove(bucket);
+        }
+        if (tasks != null) {
+            log.debug("Completing enqueued operations for device {}", deviceId);
+            tasks.forEach(task -> task.run());
+        }
+    }
+
+    /**
+     * Handles a lifecycle event.
+     */
+    private void onLifecycleEvent(LifecycleEvent event) {
+        log.debug("Received lifecycle event for device {}: {}", deviceId, event);
+        switch (event.type()) {
+            case TERM_START:
+                startTerm(event.subject());
+                break;
+            case TERM_ACTIVE:
+                activateTerm(event.subject());
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * Handles a replica change at the start of a new term.
+     */
+    private void startTerm(DeviceReplicaInfo replicaInfo) {
+        DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+        this.replicaInfo = replicaInfo;
+        if (replicaInfo.isMaster(localNodeId)) {
+            log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
+            syncFlows(oldReplicaInfo, replicaInfo);
+        }
+    }
+
+    /**
+     * Handles the activation of a term.
+     */
+    private void activateTerm(DeviceReplicaInfo replicaInfo) {
+        if (replicaInfo.term() < this.replicaInfo.term()) {
+            return;
+        }
+        if (replicaInfo.term() > this.replicaInfo.term()) {
+            this.replicaInfo = replicaInfo;
+        }
+
+        // If the local node is neither the master or a backup for the device, clear the flow table.
+        if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
+            flowBuckets.values().forEach(bucket -> bucket.clear());
+        }
+        activeTerm = replicaInfo.term();
+    }
+
+    /**
+     * Sends a message to the given node wrapped in a Lamport timestamp.
+     * <p>
+     * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
+     * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
+     *
+     * @param message  the message to send
+     * @param subject  the message subject
+     * @param toNodeId the node to which to send the message
+     * @param <M>      the message type
+     * @param <R>      the response type
+     * @return a future to be completed with the response
+     */
+    private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
+        return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
+            clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
+            .thenApply(response -> {
+                clock.tick(response.timestamp());
+                return response.value();
+            });
+    }
+
+    /**
+     * Receives messages to the given subject wrapped in Lamport timestamps.
+     * <p>
+     * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
+     * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
+     *
+     * @param subject  the subject for which to register the subscriber
+     * @param function the raw message handler
+     * @param <M>      the raw message type
+     * @param <R>      the raw response type
+     */
+    private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
+        clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
+            clock.tick(request.timestamp());
+            return clock.timestamp(function.apply(request.value()));
+        }, SERIALIZER::encode, executorService);
+    }
+
+    /**
+     * Registers internal message subscribers.
+     */
+    private void registerSubscribers() {
+        receiveWithTimestamp(getDigestsSubject, v -> getDigests());
+        receiveWithTimestamp(getBucketSubject, this::onGetBucket);
+        receiveWithTimestamp(backupSubject, this::onBackup);
+    }
+
+    /**
+     * Unregisters internal message subscribers.
+     */
+    private void unregisterSubscribers() {
+        clusterCommunicator.removeSubscriber(getDigestsSubject);
+        clusterCommunicator.removeSubscriber(getBucketSubject);
+        clusterCommunicator.removeSubscriber(backupSubject);
+    }
+
+    /**
+     * Adds internal event listeners.
+     */
+    private void addListeners() {
+        lifecycleManager.addListener(lifecycleEventListener);
+    }
+
+    /**
+     * Removes internal event listeners.
+     */
+    private void removeListeners() {
+        lifecycleManager.removeListener(lifecycleEventListener);
+    }
+
+    /**
+     * Cancels recurrent scheduled futures.
+     */
+    private synchronized void cancelFutures() {
+        ScheduledFuture<?> backupFuture = this.backupFuture;
+        if (backupFuture != null) {
+            backupFuture.cancel(false);
+        }
+
+        ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+        if (antiEntropyFuture != null) {
+            antiEntropyFuture.cancel(false);
+        }
+    }
+
+    /**
+     * Closes the device flow table.
+     */
+    public void close() {
+        removeListeners();
+        unregisterSubscribers();
+        cancelFutures();
+        lifecycleManager.close();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
new file mode 100644
index 0000000..30f9396
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Device term context.
+ */
+public class DeviceReplicaInfo {
+    private final long term;
+    private final NodeId master;
+    private final List<NodeId> backups;
+
+    public DeviceReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+        this.term = term;
+        this.master = master;
+        this.backups = backups;
+    }
+
+    /**
+     * Returns the mastership term.
+     *
+     * @return the mastership term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the master for the {@link #term()}.
+     *
+     * @return the master {@link NodeId} for the current {@link #term()}
+     */
+    public NodeId master() {
+        return master;
+    }
+
+    /**
+     * Returns a boolean indicating whether the given {@link NodeId} is the current master.
+     *
+     * @param nodeId the node ID to check
+     * @return indicates whether the given node identifier is the identifier of the current master
+     */
+    public boolean isMaster(NodeId nodeId) {
+        return Objects.equals(master, nodeId);
+    }
+
+    /**
+     * Returns a list of all active backup nodes in priority order.
+     * <p>
+     * The returned backups are limited by the flow rule store's configured backup count.
+     *
+     * @return a list of backup nodes in priority order
+     */
+    public List<NodeId> backups() {
+        return backups;
+    }
+
+    /**
+     * Returns a boolean indicating whether the given node is a backup.
+     *
+     * @param nodeId the node identifier
+     * @return indicates whether the given node is a backup
+     */
+    public boolean isBackup(NodeId nodeId) {
+        return backups.contains(nodeId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(term, master, backups);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof DeviceReplicaInfo) {
+            DeviceReplicaInfo that = (DeviceReplicaInfo) object;
+            return this.term == that.term
+                && Objects.equals(this.master, that.master)
+                && Objects.equals(this.backups, that.backups);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+            .add("term", term())
+            .add("master", master())
+            .add("backups", backups())
+            .toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 2919f08..19bfd48 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -1,44 +1,38 @@
- /*
- * Copyright 2014-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/*
+* Copyright 2014-present Open Networking Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.onosproject.store.flow.impl;
 
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.common.collect.Streams;
-import com.google.common.util.concurrent.Futures;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -54,14 +48,16 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
+import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
-import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleEvent.Type;
@@ -80,14 +76,18 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.impl.MastershipBasedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
@@ -99,8 +99,8 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
-import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
@@ -113,8 +113,8 @@
 @Component(immediate = true)
 @Service
 public class ECFlowRuleStore
-        extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
-        implements FlowRuleStore {
+    extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+    implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
 
@@ -124,26 +124,25 @@
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
     private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-    private static final int NUM_BUCKETS = 1024;
 
     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
-            label = "Number of threads in the message handler pool")
+        label = "Number of threads in the message handler pool")
     private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
 
     @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
-            label = "Delay in ms between successive backup runs")
+        label = "Delay in ms between successive backup runs")
     private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
 
     @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
-            label = "Delay in ms between anti-entropy runs")
+        label = "Delay in ms between anti-entropy runs")
     private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
 
     @Property(name = "persistenceEnabled", boolValue = false,
-            label = "Indicates whether or not changes in the flow table should be persisted to disk.")
+        label = "Indicates whether or not changes in the flow table should be persisted to disk.")
     private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
 
     @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
-            label = "Max number of backup copies for each device")
+        label = "Max number of backup copies for each device")
     private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
 
     private InternalFlowTable flowTable = new InternalFlowTable();
@@ -176,14 +175,12 @@
     private ExecutorService messageHandlingExecutor;
     private ExecutorService eventHandler;
 
-    private ScheduledFuture<?> backupTask;
-    private ScheduledFuture<?> antiEntropyTask;
     private final ScheduledExecutorService backupSenderExecutor =
-            Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
+        Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
 
     private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
     private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
-            new InternalTableStatsListener();
+        new InternalTableStatsListener();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
@@ -199,7 +196,7 @@
         .register(BucketId.class)
         .register(MastershipBasedTimestamp.class);
 
-    private EventuallyConsistentMap<BucketId, Integer> flowCounts;
+    protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
 
     private IdGenerator idGenerator;
     private NodeId local;
@@ -213,50 +210,37 @@
         local = clusterService.getLocalNode().id();
 
         eventHandler = Executors.newSingleThreadExecutor(
-                groupedThreads("onos/flow", "event-handler", log));
+            groupedThreads("onos/flow", "event-handler", log));
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+            msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
 
         registerMessageHandlers(messageHandlingExecutor);
 
-        replicaInfoManager.addListener(flowTable);
-        backupTask = backupSenderExecutor.scheduleWithFixedDelay(
-                flowTable::backup,
-                0,
-                backupPeriod,
-                TimeUnit.MILLISECONDS);
-        antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
-                flowTable::runAntiEntropy,
-                0,
-                antiEntropyPeriod,
-                TimeUnit.MILLISECONDS);
-
-        flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
-                .withName("onos-flow-counts")
-                .withSerializer(serializerBuilder)
-                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp())
-                .withTombstonesDisabled()
-                .build();
+        mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
+            .withName("onos-flow-store-terms")
+            .withSerializer(serializer)
+            .buildAsyncMap();
 
         deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
-                .withName("onos-flow-table-stats")
-                .withSerializer(serializerBuilder)
-                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp())
-                .withTombstonesDisabled()
-                .build();
+            .withName("onos-flow-table-stats")
+            .withSerializer(serializerBuilder)
+            .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+            .withTimestampProvider((k, v) -> new WallClockTimestamp())
+            .withTombstonesDisabled()
+            .build();
         deviceTableStats.addListener(tableStatsListener);
 
+        deviceService.addListener(flowTable);
+        deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
+
         logConfig("Started");
     }
 
     @Deactivate
     public void deactivate(ComponentContext context) {
-        replicaInfoManager.removeListener(flowTable);
-        backupTask.cancel(true);
         configService.unregisterProperties(getClass(), false);
         unregisterMessageHandlers();
+        deviceService.removeListener(flowTable);
         deviceTableStats.removeListener(tableStatsListener);
         deviceTableStats.destroy();
         eventHandler.shutdownNow();
@@ -297,48 +281,21 @@
             newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
         }
 
-        boolean restartBackupTask = false;
-        boolean restartAntiEntropyTask = false;
-
         if (newBackupPeriod != backupPeriod) {
             backupPeriod = newBackupPeriod;
-            restartBackupTask = true;
+            flowTable.setBackupPeriod(newBackupPeriod);
         }
 
         if (newAntiEntropyPeriod != antiEntropyPeriod) {
             antiEntropyPeriod = newAntiEntropyPeriod;
-            restartAntiEntropyTask = true;
-        }
-
-        if (restartBackupTask) {
-            if (backupTask != null) {
-                // cancel previously running task
-                backupTask.cancel(false);
-            }
-            backupTask = backupSenderExecutor.scheduleWithFixedDelay(
-                    flowTable::backup,
-                    0,
-                    backupPeriod,
-                    TimeUnit.MILLISECONDS);
-        }
-
-        if (restartAntiEntropyTask) {
-            if (antiEntropyTask != null) {
-                // cancel previously running task
-                antiEntropyTask.cancel(false);
-            }
-            antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
-                    flowTable::runAntiEntropy,
-                    0,
-                    antiEntropyPeriod,
-                    TimeUnit.MILLISECONDS);
+            flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
         }
 
         if (newPoolSize != msgHandlerPoolSize) {
             msgHandlerPoolSize = newPoolSize;
             ExecutorService oldMsgHandler = messageHandlingExecutor;
             messageHandlingExecutor = Executors.newFixedThreadPool(
-                    msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+                msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
 
             // replace previously registered handlers.
             registerMessageHandlers(messageHandlingExecutor);
@@ -352,50 +309,63 @@
     }
 
     private void registerMessageHandlers(ExecutorService executor) {
-
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
         clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
-                REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
+            REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
         clusterCommunicator.addSubscriber(
-                GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
+            GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
+            GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+            GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
-        clusterCommunicator.addSubscriber(
-                FLOW_TABLE_ANTI_ENTROPY, serializer::decode, flowTable::onAntiEntropy, serializer::encode, executor);
+            REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
         clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
         clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
-        clusterCommunicator.removeSubscriber(FLOW_TABLE_ANTI_ENTROPY);
     }
 
     private void logConfig(String prefix) {
         log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
-                 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
+            prefix, msgHandlerPoolSize, backupPeriod, backupCount);
     }
 
     @Override
     public int getFlowRuleCount() {
         return Streams.stream(deviceService.getDevices()).parallel()
-                .mapToInt(device -> getFlowRuleCount(device.id()))
-                .sum();
+            .mapToInt(device -> getFlowRuleCount(device.id()))
+            .sum();
     }
 
     @Override
     public int getFlowRuleCount(DeviceId deviceId) {
-        return flowCounts.entrySet().stream()
-            .filter(entry -> entry.getKey().deviceId().equals(deviceId))
-            .mapToInt(entry -> entry.getValue())
-            .sum();
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        if (master == null) {
+            log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
+            return 0;
+        }
+
+        if (Objects.equals(local, master)) {
+            return flowTable.getFlowRuleCount(deviceId);
+        }
+
+        log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+            deviceId,
+            GET_DEVICE_FLOW_COUNT,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            0);
     }
 
     @Override
@@ -412,16 +382,16 @@
         }
 
         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
-                  master, rule.deviceId());
+            master, rule.deviceId());
 
         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
-                                    ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
-                                    serializer::encode,
-                                    serializer::decode,
-                                    master),
-                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
-                               TimeUnit.MILLISECONDS,
-                               null);
+            ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            null);
     }
 
     @Override
@@ -438,31 +408,31 @@
         }
 
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                  master, deviceId);
+            master, deviceId);
 
         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
-                                    ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
-                                    serializer::encode,
-                                    serializer::decode,
-                                    master),
-                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
-                               TimeUnit.MILLISECONDS,
-                               Collections.emptyList());
+            ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            Collections.emptyList());
     }
 
     @Override
     public void storeFlowRule(FlowRule rule) {
         storeBatch(new FlowRuleBatchOperation(
-                Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
-                rule.deviceId(), idGenerator.getNewId()));
+            Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+            rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
     public void storeBatch(FlowRuleBatchOperation operation) {
         if (operation.getOperations().isEmpty()) {
             notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+                new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
             return;
         }
 
@@ -472,11 +442,13 @@
         if (master == null) {
             log.warn("No master for {} ", deviceId);
 
-            updateStoreInternal(operation);
-
+            Set<FlowRule> allFailures = operation.getOperations()
+                .stream()
+                .map(op -> op.target())
+                .collect(Collectors.toSet());
             notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+                new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                new CompletedBatchOperation(false, allFailures, deviceId)));
             return;
         }
 
@@ -486,26 +458,26 @@
         }
 
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
-                  master, deviceId);
+            master, deviceId);
 
         clusterCommunicator.unicast(operation,
-                                    APPLY_BATCH_FLOWS,
-                                    serializer::encode,
-                                    master)
-                           .whenComplete((result, error) -> {
-                               if (error != null) {
-                                   log.warn("Failed to storeBatch: {} to {}", operation, master, error);
+            APPLY_BATCH_FLOWS,
+            serializer::encode,
+            master)
+            .whenComplete((result, error) -> {
+                if (error != null) {
+                    log.warn("Failed to storeBatch: {} to {}", operation, master, error);
 
-                                   Set<FlowRule> allFailures = operation.getOperations()
-                                           .stream()
-                                           .map(op -> op.target())
-                                           .collect(Collectors.toSet());
+                    Set<FlowRule> allFailures = operation.getOperations()
+                        .stream()
+                        .map(op -> op.target())
+                        .collect(Collectors.toSet());
 
-                                   notifyDelegate(FlowRuleBatchEvent.completed(
-                                           new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                                           new CompletedBatchOperation(false, allFailures, deviceId)));
-                               }
-                           });
+                    notifyDelegate(FlowRuleBatchEvent.completed(
+                        new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                        new CompletedBatchOperation(false, allFailures, deviceId)));
+                }
+            });
     }
 
     private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -515,65 +487,63 @@
         Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
         if (currentOps.isEmpty()) {
             batchOperationComplete(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), did)));
+                new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                new CompletedBatchOperation(true, Collections.emptySet(), did)));
             return;
         }
 
         notifyDelegate(FlowRuleBatchEvent.requested(new
-                           FlowRuleBatchRequest(operation.id(),
-                                                currentOps), operation.deviceId()));
+            FlowRuleBatchRequest(operation.id(),
+            currentOps), operation.deviceId()));
     }
 
     private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
         return operation.getOperations().stream().map(
-                op -> {
-                    StoredFlowEntry entry;
-                    switch (op.operator()) {
-                        case ADD:
-                            entry = new DefaultFlowEntry(op.target());
-                            flowTable.add(entry);
+            op -> {
+                StoredFlowEntry entry;
+                switch (op.operator()) {
+                    case ADD:
+                        entry = new DefaultFlowEntry(op.target());
+                        flowTable.add(entry);
+                        return op;
+                    case MODIFY:
+                        entry = new DefaultFlowEntry(op.target());
+                        flowTable.update(entry);
+                        return op;
+                    case REMOVE:
+                        return flowTable.update(op.target(), stored -> {
+                            stored.setState(FlowEntryState.PENDING_REMOVE);
+                            log.debug("Setting state of rule to pending remove: {}", stored);
                             return op;
-                        case MODIFY:
-                            entry = new DefaultFlowEntry(op.target());
-                            flowTable.update(entry);
-                            return op;
-                        case REMOVE:
-                            entry = flowTable.getFlowEntry(op.target());
-                            if (entry != null) {
-                                entry.setState(FlowEntryState.PENDING_REMOVE);
-                                flowTable.update(entry);
-                                log.debug("Setting state of rule to pending remove: {}", entry);
-                                return op;
-                            }
-                            break;
-                        default:
-                            log.warn("Unknown flow operation operator: {}", op.operator());
-                    }
-                    return null;
+                        });
+                    default:
+                        log.warn("Unknown flow operation operator: {}", op.operator());
                 }
+                return null;
+            }
         ).filter(Objects::nonNull).collect(Collectors.toSet());
     }
 
     @Override
     public void deleteFlowRule(FlowRule rule) {
         storeBatch(
-                new FlowRuleBatchOperation(
-                        Collections.singletonList(
-                                new FlowRuleBatchEntry(
-                                        FlowRuleOperation.REMOVE,
-                                        rule)), rule.deviceId(), idGenerator.getNewId()));
+            new FlowRuleBatchOperation(
+                Collections.singletonList(
+                    new FlowRuleBatchEntry(
+                        FlowRuleOperation.REMOVE,
+                        rule)), rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
     public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
         if (mastershipService.isLocalMaster(rule.deviceId())) {
-            StoredFlowEntry stored = flowTable.getFlowEntry(rule);
-            if (stored != null &&
-                    stored.state() != FlowEntryState.PENDING_ADD) {
-                stored.setState(FlowEntryState.PENDING_ADD);
-                return new FlowRuleEvent(Type.RULE_UPDATED, rule);
-            }
+            return flowTable.update(rule, stored -> {
+                if (stored.state() == FlowEntryState.PENDING_ADD) {
+                    stored.setState(FlowEntryState.PENDING_ADD);
+                    return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+                }
+                return null;
+            });
         }
         return null;
     }
@@ -586,14 +556,12 @@
         }
 
         log.warn("Tried to update FlowRule {} state,"
-                         + " while the Node was not the master.", rule);
+            + " while the Node was not the master.", rule);
         return null;
     }
 
     private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
-        // check if this new rule is an update to an existing entry
-        StoredFlowEntry stored = flowTable.getFlowEntry(rule);
-        if (stored != null) {
+        FlowRuleEvent event = flowTable.update(rule, stored -> {
             stored.setBytes(rule.bytes());
             stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
             stored.setLiveType(rule.liveType());
@@ -601,11 +569,12 @@
             stored.setLastSeen();
             if (stored.state() == FlowEntryState.PENDING_ADD) {
                 stored.setState(FlowEntryState.ADDED);
-                // Update the flow table to ensure the changes are replicated
-                flowTable.update(stored);
                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
             }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+        });
+        if (event != null) {
+            return event;
         }
 
         // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
@@ -631,14 +600,17 @@
         }
 
         log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
-                  master, deviceId);
+            master, deviceId);
 
-        return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
-                               rule,
-                               REMOVE_FLOW_ENTRY,
-                               serializer::encode,
-                               serializer::decode,
-                               master));
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+            rule,
+            REMOVE_FLOW_ENTRY,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            null);
     }
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -699,675 +671,173 @@
         }
     }
 
-    /**
-     * Represents a backup to a of a distinct bucket to a distinct node.
-     */
-    private class BackupOperation {
-        private final NodeId nodeId;
-        private final BucketId bucketId;
-
-        BackupOperation(NodeId nodeId, BucketId bucketId) {
-            this.nodeId = nodeId;
-            this.bucketId = bucketId;
-        }
-
-        NodeId nodeId() {
-            return nodeId;
-        }
-
-        BucketId bucketId() {
-            return bucketId;
-        }
+    private class InternalFlowTable implements DeviceListener {
+        private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
 
         @Override
-        public int hashCode() {
-            return Objects.hash(nodeId, bucketId);
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (other != null && other instanceof BackupOperation) {
-                BackupOperation that = (BackupOperation) other;
-                return this.nodeId.equals(that.nodeId)
-                    && this.bucketId.equals(that.bucketId);
-            }
-            return false;
-        }
-    }
-
-    /**
-     * Represents a distinct device flow bucket.
-     */
-    private class BucketId {
-        private final DeviceId deviceId;
-        private final int bucket;
-
-        BucketId(DeviceId deviceId, int bucket) {
-            this.deviceId = deviceId;
-            this.bucket = bucket;
-        }
-
-        DeviceId deviceId() {
-            return deviceId;
-        }
-
-        int bucket() {
-            return bucket;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(deviceId, bucket);
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (other != null && other instanceof BucketId) {
-                BucketId that = (BucketId) other;
-                return this.deviceId.equals(that.deviceId)
-                    && this.bucket == that.bucket;
-            }
-            return false;
-        }
-    }
-
-    /**
-     * Container for flows in a specific bucket.
-     */
-    private class FlowBucket {
-        private final BucketId bucketId;
-        private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
-        private final long timestamp;
-
-        BucketId bucketId() {
-            return bucketId;
-        }
-
-        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table() {
-            return table;
-        }
-
-        long timestamp() {
-            return timestamp;
-        }
-
-        FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table, long timestamp) {
-            this.bucketId = bucketId;
-            this.table = table;
-            this.timestamp = timestamp;
-        }
-    }
-
-    /**
-     * Device digest.
-     */
-    private class DeviceDigest {
-        private final DeviceId deviceId;
-        private final Set<FlowBucketDigest> digests;
-
-        DeviceDigest(DeviceId deviceId, Set<FlowBucketDigest> digests) {
-            this.deviceId = deviceId;
-            this.digests = digests;
-        }
-
-        DeviceId deviceId() {
-            return deviceId;
-        }
-
-        Set<FlowBucketDigest> digests() {
-            return digests;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(deviceId, digests);
-        }
-
-        @Override
-        public boolean equals(Object object) {
-            return object instanceof DeviceDigest
-                && ((DeviceDigest) object).deviceId.equals(deviceId);
-        }
-    }
-
-    /**
-     * Flow bucket digest.
-     */
-    private class FlowBucketDigest {
-        private final BucketId bucketId;
-        private final long timestamp;
-
-        FlowBucketDigest(BucketId bucketId, long timestamp) {
-            this.bucketId = bucketId;
-            this.timestamp = timestamp;
-        }
-
-        BucketId bucketId() {
-            return bucketId;
-        }
-
-        long timestamp() {
-            return timestamp;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(bucketId);
-        }
-
-        @Override
-        public boolean equals(Object object) {
-            return object instanceof FlowBucketDigest
-                && ((FlowBucketDigest) object).bucketId.equals(bucketId);
-        }
-    }
-
-    private class InternalFlowTable implements ReplicaInfoEventListener {
-
-        //TODO replace the Map<V,V> with ExtendedSet
-        private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
-                flowEntries = Maps.newConcurrentMap();
-
-        private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
-        private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
-        private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
-
-        private final AtomicLong currentTimestamp = new AtomicLong();
-
-        @Override
-        public void event(ReplicaInfoEvent event) {
-            eventHandler.execute(() -> handleEvent(event));
-        }
-
-        /**
-         * Handles a replica change event.
-         *
-         * @param event the replica change event to handle
-         */
-        private void handleEvent(ReplicaInfoEvent event) {
-            DeviceId deviceId = event.subject();
-
-            // If the local node is not the master, return.
-            if (!isMasterNode(deviceId)) {
-                // If the local node is neither the master or a backup, remove flow tables for the device.
-                if (!isBackupNode(deviceId)) {
-                    purgeFlowRule(deviceId);
-                }
-                return;
-            }
-            backupSenderExecutor.execute(this::runAntiEntropy);
-        }
-
-        /**
-         * Returns the set of devices in the flow table.
-         *
-         * @return the set of devices in the flow table
-         */
-        private Set<DeviceId> getDevices() {
-            return flowEntries.keySet();
-        }
-
-        /**
-         * Returns the digests for all buckets in the flow table for the given device.
-         *
-         * @param deviceId the device for which to return digests
-         * @return the set of digests for all buckets for the given device
-         */
-        private Set<FlowBucketDigest> getDigests(DeviceId deviceId) {
-            return IntStream.range(0, NUM_BUCKETS)
-                .mapToObj(bucket -> {
-                    BucketId bucketId = new BucketId(deviceId, bucket);
-                    long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
-                    return new FlowBucketDigest(bucketId, timestamp);
-                }).collect(Collectors.toSet());
-        }
-
-        /**
-         * Returns the flow table for specified device.
-         *
-         * @param deviceId identifier of the device
-         * @return Map representing Flow Table of given device.
-         */
-        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
-            // Use an external get/null check to avoid locks.
-            // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
-            if (persistenceEnabled) {
-                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
-                return flowTable != null ? flowTable
-                    : flowEntries.computeIfAbsent(deviceId, id ->
-                    persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
-                        .withName("FlowTable:" + deviceId.toString())
-                        .withSerializer(serializer)
-                        .build());
-            } else {
-                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
-                return flowTable != null ? flowTable
-                    : flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
+        public void event(DeviceEvent event) {
+            if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
+                addDevice(event.subject().id());
             }
         }
 
-        private FlowBucket getFlowBucket(BucketId bucketId) {
-            long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
-            return new FlowBucket(bucketId, getFlowTable(bucketId.deviceId())
-                .entrySet()
-                .stream()
-                .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
-                timestamp);
+        /**
+         * Adds the given device to the flow table.
+         *
+         * @param deviceId the device to add to the table
+         */
+        public void addDevice(DeviceId deviceId) {
+            flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+                id,
+                clusterService,
+                clusterCommunicator,
+                new InternalLifecycleManager(id),
+                backupSenderExecutor,
+                backupPeriod,
+                antiEntropyPeriod));
         }
 
-        private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
-            // Use an external get/null check to avoid locks.
-            // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
-            Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(deviceId);
-            Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowTable.get(flowId);
-            return flowEntries != null ? flowEntries : flowTable.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+        /**
+         * Sets the flow table backup period.
+         *
+         * @param backupPeriod the flow table backup period
+         */
+        void setBackupPeriod(int backupPeriod) {
+            flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
         }
 
-        private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
-            return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
+        /**
+         * Sets the flow table anti-entropy period.
+         *
+         * @param antiEntropyPeriod the flow table anti-entropy period
+         */
+        void setAntiEntropyPeriod(int antiEntropyPeriod) {
+            flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
         }
 
-        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
-            return getFlowTable(deviceId).values().stream()
-                        .flatMap(m -> m.values().stream())
-                        .collect(Collectors.toSet());
+        /**
+         * Returns the flow table for a specific device.
+         *
+         * @param deviceId the device identifier
+         * @return the flow table for the given device
+         */
+        private DeviceFlowTable getFlowTable(DeviceId deviceId) {
+            DeviceFlowTable flowTable = flowTables.get(deviceId);
+            return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+                deviceId,
+                clusterService,
+                clusterCommunicator,
+                new InternalLifecycleManager(deviceId),
+                backupSenderExecutor,
+                backupPeriod,
+                antiEntropyPeriod));
         }
 
+        /**
+         * Returns the flow rule count for the given device.
+         *
+         * @param deviceId the device for which to return the flow rule count
+         * @return the flow rule count for the given device
+         */
+        public int getFlowRuleCount(DeviceId deviceId) {
+            return getFlowTable(deviceId).count();
+        }
+
+        /**
+         * Returns the flow entry for the given rule.
+         *
+         * @param rule the rule for which to return the flow entry
+         * @return the flow entry for the given rule
+         */
         public StoredFlowEntry getFlowEntry(FlowRule rule) {
-            return getFlowEntryInternal(rule);
+            return getFlowTable(rule.deviceId()).getFlowEntry(rule);
         }
 
+        /**
+         * Returns the set of flow entries for the given device.
+         *
+         * @param deviceId the device for which to lookup flow entries
+         * @return the set of flow entries for the given device
+         */
         public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
-            return getFlowEntriesInternal(deviceId);
+            return getFlowTable(deviceId).getFlowEntries();
         }
 
-        private boolean isInBucket(FlowId flowId, int bucket) {
-            return bucket(flowId) == bucket;
-        }
-
-        private int bucket(FlowId flowId) {
-            return (int) (flowId.id() % NUM_BUCKETS);
-        }
-
-        private void recordUpdate(BucketId bucketId) {
-            recordUpdate(bucketId, currentTimestamp.accumulateAndGet(System.currentTimeMillis(), Math::max));
-        }
-
-        private void recordUpdate(BucketId bucketId, long timestamp) {
-            lastUpdateTimes.put(bucketId, timestamp);
-        }
-
+        /**
+         * Adds the given flow rule.
+         *
+         * @param rule the rule to add
+         */
         public void add(FlowEntry rule) {
-            getFlowEntriesInternal(rule.deviceId(), rule.id())
-                    .compute((StoredFlowEntry) rule, (k, stored) -> {
-                        return (StoredFlowEntry) rule;
-                    });
-            recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
+            Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).add(rule),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
         }
 
+        /**
+         * Updates the given flow rule.
+         *
+         * @param rule the rule to update
+         */
         public void update(FlowEntry rule) {
-            getFlowEntriesInternal(rule.deviceId(), rule.id())
-                .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
-                    if (rule instanceof DefaultFlowEntry) {
-                        DefaultFlowEntry updated = (DefaultFlowEntry) rule;
-                        if (stored instanceof DefaultFlowEntry) {
-                            DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
-                            if (updated.created() >= storedEntry.created()) {
-                                recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
-                                return updated;
-                            } else {
-                                log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
-                                return stored;
-                            }
-                        }
-                    }
-                    return stored;
-                });
+            Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).update(rule),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
         }
 
+        /**
+         * Applies the given update function to the rule.
+         *
+         * @param function the update function to apply
+         * @return a future to be completed with the update event or {@code null} if the rule was not updated
+         */
+        public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+            return Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).update(rule, function),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
+        }
+
+        /**
+         * Removes the given flow rule.
+         *
+         * @param rule the rule to remove
+         */
         public FlowEntry remove(FlowEntry rule) {
-            final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
-            final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
-            flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
-                flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
-                    if (rule instanceof DefaultFlowEntry) {
-                        DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
-                        if (stored instanceof DefaultFlowEntry) {
-                            DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
-                            if (toRemove.created() < storedEntry.created()) {
-                                log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
-                                // the key is not updated, removedRule remains null
-                                return stored;
-                            }
-                        }
-                    }
-                    removedRule.set(stored);
-                    return null;
-                });
-                return flowEntries.isEmpty() ? null : flowEntries;
-            });
-
-            if (removedRule.get() != null) {
-                recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
-                return removedRule.get();
-            } else {
-                return null;
-            }
+            return Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).remove(rule),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
         }
 
+        /**
+         * Purges flow rules for the given device.
+         *
+         * @param deviceId the device for which to purge flow rules
+         */
         public void purgeFlowRule(DeviceId deviceId) {
-            flowEntries.remove(deviceId);
+            DeviceFlowTable flowTable = flowTables.remove(deviceId);
+            if (flowTable != null) {
+                flowTable.close();
+            }
         }
 
+        /**
+         * Purges all flow rules from the table.
+         */
         public void purgeFlowRules() {
-            flowEntries.clear();
-        }
-
-        /**
-         * Returns a boolean indicating whether the local node is the current master for the given device.
-         *
-         * @param deviceId the device for which to indicate whether the local node is the current master
-         * @return indicates whether the local node is the current master for the given device
-         */
-        private boolean isMasterNode(DeviceId deviceId) {
-            NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
-            return Objects.equals(master, clusterService.getLocalNode().id());
-        }
-
-        /**
-         * Returns a boolean indicating whether the local node is a backup for the given device.
-         *
-         * @param deviceId the device for which to indicate whether the local node is a backup
-         * @return indicates whether the local node is a backup for the given device
-         */
-        private boolean isBackupNode(DeviceId deviceId) {
-            List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            int index = backupNodes.indexOf(local);
-            return index != -1 && index < backupCount;
-        }
-
-        /**
-         * Backs up all devices to all backup nodes.
-         */
-        private void backup() {
-            for (DeviceId deviceId : getDevices()) {
-                backup(deviceId);
+            Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
+            while (iterator.hasNext()) {
+                iterator.next().close();
+                iterator.remove();
             }
         }
-
-        /**
-         * Backs up all buckets in the given device to the given node.
-         *
-         * @param deviceId the device to back up
-         */
-        private void backup(DeviceId deviceId) {
-            if (!isMasterNode(deviceId)) {
-                return;
-            }
-
-            // Get a list of backup nodes for the device.
-            List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            int availableBackupCount = Math.min(backupCount, backupNodes.size());
-
-            // If the list of backup nodes is empty, update the flow count.
-            if (availableBackupCount == 0) {
-                updateDeviceFlowCounts(deviceId);
-            } else {
-                // Otherwise, iterate through backup nodes and backup the device.
-                for (int index = 0; index < availableBackupCount; index++) {
-                    NodeId backupNode = backupNodes.get(index);
-                    try {
-                        backup(deviceId, backupNode);
-                    } catch (Exception e) {
-                        log.error("Backup of " + deviceId + " to " + backupNode + " failed", e);
-                    }
-                }
-            }
-        }
-
-        /**
-         * Backs up all buckets for the given device to the given node.
-         *
-         * @param deviceId the device to back up
-         * @param nodeId the node to which to back up the device
-         */
-        private void backup(DeviceId deviceId, NodeId nodeId) {
-            final long timestamp = System.currentTimeMillis();
-            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
-                BucketId bucketId = new BucketId(deviceId, bucket);
-                BackupOperation operation = new BackupOperation(nodeId, bucketId);
-                if (startBackup(operation)) {
-                    backup(operation).whenCompleteAsync((succeeded, error) -> {
-                        if (error == null && succeeded) {
-                            succeedBackup(operation, timestamp);
-                        } else {
-                            failBackup(operation);
-                        }
-                        backup(deviceId, nodeId);
-                    }, backupSenderExecutor);
-                }
-            }
-        }
-
-        /**
-         * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
-         * <p>
-         * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
-         * are pending replication for the backup operation.
-         *
-         * @param operation the operation to start
-         * @return indicates whether the given backup operation should be started
-         */
-        private boolean startBackup(BackupOperation operation) {
-            long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
-            long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
-            return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
-        }
-
-        /**
-         * Fails the given backup operation.
-         *
-         * @param operation the backup operation to fail
-         */
-        private void failBackup(BackupOperation operation) {
-            inFlightUpdates.remove(operation);
-        }
-
-        /**
-         * Succeeds the given backup operation.
-         * <p>
-         * The last backup time for the operation will be updated and the operation will be removed from
-         * in-flight updates.
-         *
-         * @param operation the operation to succeed
-         * @param timestamp the timestamp at which the operation was <em>started</em>
-         */
-        private void succeedBackup(BackupOperation operation, long timestamp) {
-            lastBackupTimes.put(operation, timestamp);
-            inFlightUpdates.remove(operation);
-        }
-
-        /**
-         * Performs the given backup operation.
-         *
-         * @param operation the operation to perform
-         * @return a future to be completed with a boolean indicating whether the backup operation was successful
-         */
-        private CompletableFuture<Boolean> backup(BackupOperation operation) {
-            log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
-                operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
-            FlowBucket flowBucket = getFlowBucket(operation.bucketId());
-
-            CompletableFuture<Boolean> future = new CompletableFuture<>();
-            clusterCommunicator.<FlowBucket, Set<FlowId>>sendAndReceive(
-                flowBucket,
-                FLOW_TABLE_BACKUP,
-                serializer::encode,
-                serializer::decode,
-                operation.nodeId())
-                .whenComplete((backedupFlows, error) -> {
-                    Set<FlowId> flowsNotBackedUp = error != null ?
-                        flowBucket.table().keySet() :
-                        Sets.difference(flowBucket.table().keySet(), backedupFlows);
-                    if (flowsNotBackedUp.size() > 0) {
-                        log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
-                            flowsNotBackedUp, error != null ? error.getMessage() : "none", operation.nodeId());
-                    }
-                    future.complete(backedupFlows != null);
-                });
-
-            updateFlowCounts(flowBucket);
-            return future;
-        }
-
-        /**
-         * Handles a flow bucket backup from a remote peer.
-         *
-         * @param flowBucket the flow bucket to back up
-         * @return the set of flows that could not be backed up
-         */
-        private Set<FlowId> onBackup(FlowBucket flowBucket) {
-            log.debug("Received flowEntries for {} bucket {} to backup",
-                flowBucket.bucketId().deviceId(), flowBucket.bucketId);
-            Set<FlowId> backedupFlows = Sets.newHashSet();
-            try {
-                // Only process those devices are that not managed by the local node.
-                NodeId master = replicaInfoManager.getReplicaInfoFor(flowBucket.bucketId().deviceId())
-                    .master()
-                    .orElse(null);
-                if (!Objects.equals(local, master)) {
-                    Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
-                        getFlowTable(flowBucket.bucketId().deviceId());
-                    backupFlowTable.putAll(flowBucket.table());
-                    backupFlowTable.entrySet()
-                        .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
-                            && !flowBucket.table().containsKey(entry.getKey()));
-                    backedupFlows.addAll(flowBucket.table().keySet());
-                    recordUpdate(flowBucket.bucketId(), flowBucket.timestamp());
-                }
-            } catch (Exception e) {
-                log.warn("Failure processing backup request", e);
-            }
-            return backedupFlows;
-        }
-
-        /**
-         * Runs the anti-entropy protocol.
-         */
-        private void runAntiEntropy() {
-            for (DeviceId deviceId : getDevices()) {
-                runAntiEntropy(deviceId);
-            }
-        }
-
-        /**
-         * Runs the anti-entropy protocol for the given device.
-         *
-         * @param deviceId the device for which to run the anti-entropy protocol
-         */
-        private void runAntiEntropy(DeviceId deviceId) {
-            if (!isMasterNode(deviceId)) {
-                return;
-            }
-
-            // Get the set of digests for the node.
-            Set<FlowBucketDigest> digests = getDigests(deviceId);
-
-            // Get a list of backup nodes for the device and compute the real backup count.
-            List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            int availableBackupCount = Math.min(backupCount, backupNodes.size());
-
-            // Iterate through backup nodes and run the anti-entropy protocol.
-            for (int index = 0; index < availableBackupCount; index++) {
-                NodeId backupNode = backupNodes.get(index);
-                try {
-                    runAntiEntropy(deviceId, backupNode, digests);
-                } catch (Exception e) {
-                    log.error("Anti-entropy for " + deviceId + " to " + backupNode + " failed", e);
-                }
-            }
-        }
-
-        /**
-         * Sends an anti-entropy advertisement to the given node.
-         *
-         * @param deviceId the device ID for which to send the advertisement
-         * @param nodeId the node to which to send the advertisement
-         * @param digests the digests to send to the given node
-         */
-        private void runAntiEntropy(DeviceId deviceId, NodeId nodeId, Set<FlowBucketDigest> digests) {
-            log.trace("Sending anti-entropy advertisement for device {} to {}", deviceId, nodeId);
-            clusterCommunicator.<Set<FlowBucketDigest>, Set<BucketId>>sendAndReceive(
-                digests,
-                FLOW_TABLE_ANTI_ENTROPY,
-                serializer::encode,
-                serializer::decode,
-                nodeId)
-                .whenComplete((missingBuckets, error) -> {
-                    if (error == null) {
-                        log.debug("Detected {} missing buckets on node {} for device {}",
-                            missingBuckets.size(), nodeId, deviceId);
-                    } else {
-                        log.trace("Anti-entropy advertisement for device {} to {} failed", deviceId, nodeId, error);
-                    }
-                });
-        }
-
-        /**
-         * Handles a device anti-entropy request from a remote peer.
-         *
-         * @param digest the device digest
-         * @return the set of flow buckets to update
-         */
-        private Set<BucketId> onAntiEntropy(DeviceDigest digest) {
-            // If the local node is the master, reject the anti-entropy request.
-            // TODO: We really should be using mastership terms in anti-entropy requests to determine whether
-            // this node is a newer master, but that would only reduce the time it takes to resolve missing flows
-            // as a later anti-entropy request will still succeed once this node recognizes it's no longer the master.
-            NodeId master = replicaInfoManager.getReplicaInfoFor(digest.deviceId())
-                .master()
-                .orElse(null);
-            if (Objects.equals(master, local)) {
-                return ImmutableSet.of();
-            }
-
-            // Compute a set of missing BucketIds based on digest times and send them back to the master.
-            Set<BucketId> missingBuckets = new HashSet<>();
-            for (FlowBucketDigest flowBucketDigest : digest.digests()) {
-                long lastUpdated = lastUpdateTimes.getOrDefault(flowBucketDigest.bucketId(), 0L);
-                if (lastUpdated < flowBucketDigest.timestamp()) {
-                    missingBuckets.add(flowBucketDigest.bucketId());
-                }
-            }
-            return missingBuckets;
-        }
-
-        /**
-         * Updates all flow counts for the given device.
-         *
-         * @param deviceId the device for which to update flow counts
-         */
-        private void updateDeviceFlowCounts(DeviceId deviceId) {
-            for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
-                BucketId bucketId = new BucketId(deviceId, bucket);
-                FlowBucket flowBucket = getFlowBucket(bucketId);
-                updateFlowCounts(flowBucket);
-            }
-        }
-
-        /**
-         * Updates the eventually consistent flow count for the given bucket.
-         *
-         * @param flowBucket the flow bucket for which to update flow counts
-         */
-        private void updateFlowCounts(FlowBucket flowBucket) {
-            int flowCount = flowBucket.table().entrySet()
-                .stream()
-                .mapToInt(e -> e.getValue().values().size())
-                .sum();
-            flowCounts.put(flowBucket.bucketId(), flowCount);
-        }
     }
 
     @Override
@@ -1395,16 +865,114 @@
     @Override
     public long getActiveFlowRuleCount(DeviceId deviceId) {
         return Streams.stream(getTableStatistics(deviceId))
-                .mapToLong(TableStatisticsEntry::activeFlowEntries)
-                .sum();
+            .mapToLong(TableStatisticsEntry::activeFlowEntries)
+            .sum();
     }
 
     private class InternalTableStatsListener
         implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
         @Override
         public void event(EventuallyConsistentMapEvent<DeviceId,
-                          List<TableStatisticsEntry>> event) {
+            List<TableStatisticsEntry>> event) {
             //TODO: Generate an event to listeners (do we need?)
         }
     }
+
+    /**
+     * Device lifecycle manager implementation.
+     */
+    private final class InternalLifecycleManager
+        extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
+        implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
+
+        private final DeviceId deviceId;
+
+        private volatile DeviceReplicaInfo replicaInfo;
+
+        InternalLifecycleManager(DeviceId deviceId) {
+            this.deviceId = deviceId;
+            replicaInfoManager.addListener(this);
+            mastershipTermLifecycles.addListener(this);
+            replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
+        }
+
+        @Override
+        public DeviceReplicaInfo getReplicaInfo() {
+            return replicaInfo;
+        }
+
+        @Override
+        public void activate(long term) {
+            final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+            if (replicaInfo != null && replicaInfo.term() == term) {
+                mastershipTermLifecycles.put(deviceId, term);
+            }
+        }
+
+        @Override
+        public void event(ReplicaInfoEvent event) {
+            if (event.subject().equals(deviceId) && event.type() == ReplicaInfoEvent.Type.MASTER_CHANGED) {
+                onReplicaInfoChange(event.replicaInfo());
+            }
+        }
+
+        @Override
+        public void event(MapEvent<DeviceId, Long> event) {
+            if (event.key().equals(deviceId) && event.newValue() != null) {
+                onActivate(event.newValue().value());
+            }
+        }
+
+        /**
+         * Handles a term activation event.
+         *
+         * @param term the term that was activated
+         */
+        private void onActivate(long term) {
+            final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+            if (replicaInfo != null && replicaInfo.term() == term) {
+                NodeId master = replicaInfo.master().orElse(null);
+                List<NodeId> backups = replicaInfo.backups()
+                    .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+                listenerRegistry.process(new LifecycleEvent(
+                    LifecycleEvent.Type.TERM_ACTIVE,
+                    new DeviceReplicaInfo(term, master, backups)));
+            }
+        }
+
+        /**
+         * Handles a replica info change event.
+         *
+         * @param replicaInfo the updated replica info
+         */
+        private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
+            DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+            this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
+            if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
+                if (oldReplicaInfo != null) {
+                    listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
+                }
+                listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
+            }
+        }
+
+        /**
+         * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
+         *
+         * @param replicaInfo the replica info to convert
+         * @return the converted replica info
+         */
+        private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
+            NodeId master = replicaInfo.master().orElse(null);
+            List<NodeId> backups = replicaInfo.backups()
+                .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+            return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
+        }
+
+        @Override
+        public void close() {
+            replicaInfoManager.removeListener(this);
+            mastershipTermLifecycles.removeListener(this);
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index 254f5df..c57e9eb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -32,6 +32,9 @@
     public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
         = new MessageSubject("peer-forward-get-device-flow-entries");
 
+    public static final MessageSubject GET_DEVICE_FLOW_COUNT
+        = new MessageSubject("peer-forward-get-flow-count");
+
     public static final MessageSubject REMOVE_FLOW_ENTRY
         = new MessageSubject("peer-forward-remove-flow-entry");
 
@@ -40,7 +43,4 @@
 
     public static final MessageSubject FLOW_TABLE_BACKUP
         = new MessageSubject("peer-flow-table-backup");
-
-    public static final MessageSubject FLOW_TABLE_ANTI_ENTROPY
-        = new MessageSubject("peer-flow-table-anti-entropy");
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
new file mode 100644
index 0000000..2205b0b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Container for a bucket of flows assigned to a specific device.
+ * <p>
+ * The bucket is mutable. When changes are made to the bucket, the term and timestamp in which the change
+ * occurred is recorded for ordering changes.
+ */
+public class FlowBucket {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
+    private final BucketId bucketId;
+    private volatile long term;
+    private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
+    private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
+
+    FlowBucket(BucketId bucketId) {
+        this.bucketId = bucketId;
+    }
+
+    /**
+     * Returns the flow bucket identifier.
+     *
+     * @return the flow bucket identifier
+     */
+    public BucketId bucketId() {
+        return bucketId;
+    }
+
+    /**
+     * Returns the flow bucket term.
+     *
+     * @return the flow bucket term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the flow bucket timestamp.
+     *
+     * @return the flow bucket timestamp
+     */
+    public LogicalTimestamp timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Returns the digest for the bucket.
+     *
+     * @return the digest for the bucket
+     */
+    public FlowBucketDigest getDigest() {
+        return new FlowBucketDigest(bucketId().bucket(), term(), timestamp());
+    }
+
+    /**
+     * Returns the flow entries in the bucket.
+     *
+     * @return the flow entries in the bucket
+     */
+    public Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowBucket() {
+        return flowBucket;
+    }
+
+    /**
+     * Returns the flow entries for the given flow.
+     *
+     * @param flowId the flow identifier
+     * @return the flows for the given flow ID
+     */
+    public Map<StoredFlowEntry, StoredFlowEntry> getFlowEntries(FlowId flowId) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(flowId);
+        return flowEntries != null ? flowEntries : flowBucket.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+    }
+
+    /**
+     * Counts the flows in the bucket.
+     *
+     * @return the number of flows in the bucket
+     */
+    public int count() {
+        return flowBucket.values()
+            .stream()
+            .mapToInt(entry -> entry.values().size())
+            .sum();
+    }
+
+    /**
+     * Records an update to the bucket.
+     */
+    private void recordUpdate(long term, LogicalTimestamp timestamp) {
+        this.term = term;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Adds the given flow rule to the bucket.
+     *
+     * @param rule  the rule to add
+     * @param term  the term in which the change occurred
+     * @param clock the logical clock
+     */
+    public void add(FlowEntry rule, long term, LogicalClock clock) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+        if (flowEntries == null) {
+            flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+        }
+        flowEntries.put((StoredFlowEntry) rule, (StoredFlowEntry) rule);
+        recordUpdate(term, clock.getTimestamp());
+    }
+
+    /**
+     * Updates the given flow rule in the bucket.
+     *
+     * @param rule  the rule to update
+     * @param term  the term in which the change occurred
+     * @param clock the logical clock
+     */
+    public void update(FlowEntry rule, long term, LogicalClock clock) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+        if (flowEntries == null) {
+            flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+        }
+        flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+            if (rule instanceof DefaultFlowEntry) {
+                DefaultFlowEntry updated = (DefaultFlowEntry) rule;
+                if (stored instanceof DefaultFlowEntry) {
+                    DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+                    if (updated.created() >= storedEntry.created()) {
+                        recordUpdate(term, clock.getTimestamp());
+                        return updated;
+                    } else {
+                        LOGGER.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
+                        return stored;
+                    }
+                }
+            }
+            return stored;
+        });
+    }
+
+    /**
+     * Applies the given update function to the rule.
+     *
+     * @param rule     the rule to update
+     * @param function the update function to apply
+     * @param term     the term in which the change occurred
+     * @param clock    the logical clock
+     * @param <T>      the result type
+     * @return the update result or {@code null} if the rule was not updated
+     */
+    public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function, long term, LogicalClock clock) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+        if (flowEntries == null) {
+            flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+        }
+
+        AtomicReference<T> resultRef = new AtomicReference<>();
+        flowEntries.computeIfPresent(new DefaultFlowEntry(rule), (k, stored) -> {
+            if (stored != null) {
+                T result = function.apply(stored);
+                if (result != null) {
+                    recordUpdate(term, clock.getTimestamp());
+                    resultRef.set(result);
+                }
+            }
+            return stored;
+        });
+        return resultRef.get();
+    }
+
+    /**
+     * Removes the given flow rule from the bucket.
+     *
+     * @param rule  the rule to remove
+     * @param term  the term in which the change occurred
+     * @param clock the logical clock
+     * @return the removed flow entry
+     */
+    public FlowEntry remove(FlowEntry rule, long term, LogicalClock clock) {
+        final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
+        flowBucket.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
+            flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+                if (rule instanceof DefaultFlowEntry) {
+                    DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
+                    if (stored instanceof DefaultFlowEntry) {
+                        DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+                        if (toRemove.created() < storedEntry.created()) {
+                            LOGGER.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
+                            // the key is not updated, removedRule remains null
+                            return stored;
+                        }
+                    }
+                }
+                removedRule.set(stored);
+                return null;
+            });
+            return flowEntries.isEmpty() ? null : flowEntries;
+        });
+
+        if (removedRule.get() != null) {
+            recordUpdate(term, clock.getTimestamp());
+            return removedRule.get();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Clears the bucket.
+     */
+    public void clear() {
+        term = 0;
+        timestamp = new LogicalTimestamp(0);
+        flowBucket.clear();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
new file mode 100644
index 0000000..b5453e5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Flow bucket digest.
+ */
+public class FlowBucketDigest {
+    private final int bucket;
+    private final long term;
+    private final LogicalTimestamp timestamp;
+
+    FlowBucketDigest(int bucket, long term, LogicalTimestamp timestamp) {
+        this.bucket = bucket;
+        this.term = term;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Returns the bucket identifier.
+     *
+     * @return the bucket identifier
+     */
+    public int bucket() {
+        return bucket;
+    }
+
+    /**
+     * Returns the bucket term.
+     *
+     * @return the bucket term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the bucket timestamp.
+     *
+     * @return the bucket timestamp
+     */
+    public LogicalTimestamp timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Returns a boolean indicating whether this digest is newer than the given digest.
+     *
+     * @param digest the digest to check
+     * @return indicates whether this digest is newer than the given digest
+     */
+    public boolean isNewerThan(FlowBucketDigest digest) {
+        return digest == null || term() > digest.term() || timestamp().isNewerThan(digest.timestamp());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bucket);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        return object instanceof FlowBucketDigest
+            && ((FlowBucketDigest) object).bucket == bucket;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
new file mode 100644
index 0000000..29672d0
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Flow table lifecycle event.
+ */
+public class LifecycleEvent extends AbstractEvent<LifecycleEvent.Type, DeviceReplicaInfo> {
+
+    /**
+     * Lifecycle event type.
+     */
+    public enum Type {
+        TERM_START,
+        TERM_ACTIVE,
+        TERM_END,
+    }
+
+    public LifecycleEvent(Type type, DeviceReplicaInfo subject) {
+        super(type, subject);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
new file mode 100644
index 0000000..a2f3b3d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Flow table lifecycle event listener.
+ */
+public interface LifecycleEventListener extends EventListener<LifecycleEvent> {
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
new file mode 100644
index 0000000..213454c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.ListenerService;
+
+/**
+ * Flow table lifecycle manager.
+ */
+public interface LifecycleManager extends ListenerService<LifecycleEvent, LifecycleEventListener> {
+
+    /**
+     * Returns the current term.
+     *
+     * @return the current term
+     */
+    DeviceReplicaInfo getReplicaInfo();
+
+    /**
+     * Activates the given term.
+     *
+     * @param term the term to activate
+     */
+    void activate(long term);
+
+    /**
+     * Closes the lifecycle manager.
+     */
+    void close();
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
new file mode 100644
index 0000000..92d39cf
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Logical clock.
+ */
+public final class LogicalClock {
+    private final AtomicLong timestamp = new AtomicLong();
+
+    /**
+     * Records an event.
+     *
+     * @param timestamp the event timestamp
+     */
+    public void tick(LogicalTimestamp timestamp) {
+        this.timestamp.accumulateAndGet(timestamp.value(), (x, y) -> Math.max(x, y) + 1);
+    }
+
+    /**
+     * Returns a timestamped value.
+     *
+     * @param value the value to timestamp
+     * @param <T>   the value type
+     * @return the timestamped value
+     */
+    public <T> Timestamped<T> timestamp(T value) {
+        return new Timestamped<>(value, getTimestamp());
+    }
+
+    /**
+     * Increments and returns the current timestamp.
+     *
+     * @return the current timestamp
+     */
+    public LogicalTimestamp getTimestamp() {
+        return new LogicalTimestamp(timestamp.incrementAndGet());
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
new file mode 100644
index 0000000..e7f566b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Timestamped value.
+ */
+public class Timestamped<T> {
+    private final T value;
+    private final LogicalTimestamp timestamp;
+
+    public Timestamped(T value, LogicalTimestamp timestamp) {
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Returns the value.
+     *
+     * @return the value
+     */
+    public T value() {
+        return value;
+    }
+
+    /**
+     * Returns the timestamp.
+     *
+     * @return the timestamp
+     */
+    public LogicalTimestamp timestamp() {
+        return timestamp;
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 7fd0412..72562f9 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.flow.impl;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import org.junit.After;
 import org.junit.Before;
@@ -26,6 +27,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.net.device.DeviceServiceAdapter;
 import org.onosproject.net.DeviceId;
@@ -40,10 +42,16 @@
 import org.onosproject.net.intent.IntentTestsMocks;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.persistence.PersistenceServiceAdapter;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncConsistentMapAdapter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.TestStorageService;
 
 import org.onlab.packet.Ip4Address;
 import java.util.Iterator;
+import java.util.Optional;
+
 import org.osgi.service.component.ComponentContext;
 
 import static org.easymock.EasyMock.createMock;
@@ -103,6 +111,16 @@
         public NodeId getMasterFor(DeviceId deviceId) {
             return new NodeId("1");
         }
+
+        @Override
+        public MastershipInfo getMastershipFor(DeviceId deviceId) {
+            return new MastershipInfo(
+                1,
+                Optional.of(NodeId.nodeId("1")),
+                ImmutableMap.<NodeId, MastershipRole>builder()
+                    .put(NodeId.nodeId("1"), MastershipRole.MASTER)
+                    .build());
+        }
     }
 
 
@@ -132,8 +150,27 @@
     @Before
     public void setUp() throws Exception {
         flowStoreImpl = new ECFlowRuleStore();
-        flowStoreImpl.storageService = new TestStorageService();
-        flowStoreImpl.replicaInfoManager = new ReplicaInfoManager();
+        flowStoreImpl.storageService = new TestStorageService() {
+            @Override
+            public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+                return new ConsistentMapBuilder<K, V>() {
+                    @Override
+                    public AsyncConsistentMap<K, V> buildAsyncMap() {
+                        return new AsyncConsistentMapAdapter<K, V>();
+                    }
+
+                    @Override
+                    public ConsistentMap<K, V> build() {
+                        return null;
+                    }
+                };
+            }
+        };
+
+        ReplicaInfoManager replicaInfoManager = new ReplicaInfoManager();
+        replicaInfoManager.mastershipService = new MasterOfAll();
+
+        flowStoreImpl.replicaInfoManager = replicaInfoManager;
         mockClusterService = createMock(ClusterService.class);
         flowStoreImpl.clusterService = mockClusterService;
         nodeId = new NodeId("1");
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index caac1c1..1246ec9 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -755,6 +755,31 @@
     }
 
     /**
+     * Returns a new CompletableFuture completed with the first result from a list of futures. If no future
+     * is completed successfully, the returned future will be completed with the first exception.
+     *
+     * @param futures the input futures
+     * @param <T> future result type
+     * @return a new CompletableFuture
+     */
+    public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        CompletableFuture.allOf(futures.stream()
+            .map(future -> future.thenAccept(r -> resultFuture.complete(r)))
+            .toArray(CompletableFuture[]::new))
+            .whenComplete((r, e) -> {
+                if (!resultFuture.isDone()) {
+                    if (e != null) {
+                        resultFuture.completeExceptionally(e);
+                    } else {
+                        resultFuture.complete(null);
+                    }
+                }
+            });
+        return resultFuture;
+    }
+
+    /**
      * Returns a new CompletableFuture completed by with the first positive result from a list of
      * input CompletableFutures.
      *