Cherry picking multiple commits for FlowRuleStore optimaization

Change-Id: I8013fe89f987bb12a193d3f3dc793a6a89609eb1
diff --git a/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java b/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java
index 560203b..0ebd4cc 100644
--- a/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java
+++ b/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java
@@ -17,6 +17,7 @@
 package org.onosproject.faultmanagement.impl;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.junit.Before;
 import org.junit.Test;
@@ -25,7 +26,6 @@
 import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
 import org.onosproject.incubator.net.faultmanagement.alarm.Alarm;
 import org.onosproject.incubator.net.faultmanagement.alarm.AlarmConsumer;
 import org.onosproject.incubator.net.faultmanagement.alarm.AlarmId;
@@ -35,6 +35,7 @@
 import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderService;
 import org.onosproject.incubator.net.faultmanagement.alarm.DefaultAlarm;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipServiceAdapter;
@@ -64,6 +65,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import static org.junit.Assert.*;
@@ -96,7 +98,7 @@
 
     private final MastershipEvent mastershipEvent =
             new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, DEVICE_ID,
-                                new RoleInfo(nodeId, ImmutableList.of()));
+                                new MastershipInfo(1, Optional.of(nodeId), ImmutableMap.of()));
 
     private final DeviceEvent deviceEvent =
             new DeviceEvent(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED, device);
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
index db5ba74..e7c0169 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
@@ -29,9 +29,6 @@
  */
 public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceId> {
 
-    //Contains master and standby information.
-    RoleInfo roleInfo;
-
     /**
      * Type of mastership events.
      */
@@ -55,45 +52,58 @@
         SUSPENDED
     }
 
+    private final MastershipInfo mastershipInfo;
+
     /**
      * Creates an event of a given type and for the specified device,
      * role information, and the current time.
      *
-     * @param type   mastership event type
-     * @param device event device subject
-     * @param info   mastership role information
+     * @param type           mastership event type
+     * @param device         event device subject
+     * @param mastershipInfo mastership info
      */
-    public MastershipEvent(Type type, DeviceId device, RoleInfo info) {
+    public MastershipEvent(Type type, DeviceId device, MastershipInfo mastershipInfo) {
         super(type, device);
-        this.roleInfo = info;
+        this.mastershipInfo = mastershipInfo;
     }
 
     /**
      * Creates an event of a given type and for the specified device, master,
      * and time.
      *
-     * @param type   mastership event type
-     * @param device event device subject
-     * @param info   role information
-     * @param time   occurrence time
+     * @param type           mastership event type
+     * @param device         event device subject
+     * @param mastershipInfo mastership information
+     * @param time           occurrence time
      */
-    public MastershipEvent(Type type, DeviceId device, RoleInfo info, long time) {
+    public MastershipEvent(Type type, DeviceId device, MastershipInfo mastershipInfo, long time) {
         super(type, device, time);
-        this.roleInfo = info;
+        this.mastershipInfo = mastershipInfo;
+    }
+
+    /**
+     * Returns the mastership info.
+     *
+     * @return the mastership info
+     */
+    public MastershipInfo mastershipInfo() {
+        return mastershipInfo;
     }
 
     /**
      * Returns the current role state for the subject.
      *
      * @return RoleInfo associated with Device ID subject
+     * @deprecated since 1.14
      */
+    @Deprecated
     public RoleInfo roleInfo() {
-        return roleInfo;
+        return new RoleInfo(mastershipInfo.master().orElse(null), mastershipInfo.backups());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(type(), subject(), roleInfo(), time());
+        return Objects.hash(type(), subject(), mastershipInfo(), time());
     }
 
     @Override
@@ -105,7 +115,7 @@
             final MastershipEvent other = (MastershipEvent) obj;
             return Objects.equals(this.type(), other.type()) &&
                     Objects.equals(this.subject(), other.subject()) &&
-                    Objects.equals(this.roleInfo(), other.roleInfo()) &&
+                    Objects.equals(this.mastershipInfo(), other.mastershipInfo()) &&
                     Objects.equals(this.time(), other.time());
         }
         return false;
@@ -117,7 +127,7 @@
                 .add("time", Tools.defaultOffsetDataTime(time()))
                 .add("type", type())
                 .add("subject", subject())
-                .add("roleInfo", roleInfo)
+                .add("mastershipInfo", mastershipInfo())
                 .toString();
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipInfo.java b/core/api/src/main/java/org/onosproject/mastership/MastershipInfo.java
new file mode 100644
index 0000000..c9a4f0e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipInfo.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.mastership;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.MastershipRole;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Mastership info.
+ */
+public final class MastershipInfo {
+    private final long term;
+    private final Optional<NodeId> master;
+    private final ImmutableMap<NodeId, MastershipRole> roles;
+
+    public MastershipInfo() {
+        this(0, Optional.empty(), ImmutableMap.of());
+    }
+
+    public MastershipInfo(long term, Optional<NodeId> master, ImmutableMap<NodeId, MastershipRole> roles) {
+        this.term = term;
+        this.master = master;
+        this.roles = roles;
+    }
+
+    /**
+     * Returns the mastership term.
+     *
+     * @return the mastership term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the current master.
+     *
+     * @return the current master
+     */
+    public Optional<NodeId> master() {
+        return master;
+    }
+
+    /**
+     * Returns a sorted list of standby nodes.
+     *
+     * @return a sorted list of standby nodes
+     */
+    public List<NodeId> backups() {
+        return getRoles(MastershipRole.STANDBY);
+    }
+
+    /**
+     * Returns the list of nodes with the given role.
+     *
+     * @param role the role by which to filter nodes
+     * @return an immutable list of nodes with the given role sorted in priority order
+     */
+    public List<NodeId> getRoles(MastershipRole role) {
+        return ImmutableList.copyOf(roles.entrySet()
+            .stream()
+            .filter(entry -> entry.getValue() == role)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList()));
+    }
+
+    /**
+     * Returns the current role for the given node.
+     *
+     * @param nodeId the node for which to return the current role
+     * @return the current role for the given node
+     */
+    public MastershipRole getRole(NodeId nodeId) {
+        return roles.get(nodeId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(term, master, roles);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof MastershipInfo) {
+            MastershipInfo that = (MastershipInfo) object;
+            return this.term == that.term
+                && Objects.equals(this.master, that.master)
+                && Objects.equals(this.roles, that.roles);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+            .add("term", term)
+            .add("master", master)
+            .add("roles", roles)
+            .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
index 86042b0..a5a0c43 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
@@ -120,6 +120,14 @@
     RoleInfo getNodesFor(DeviceId deviceId);
 
     /**
+     * Returns the mastership info for the given device.
+     *
+     * @param deviceId the device for which to return the mastership info
+     * @return the mastership info for the given device
+     */
+    MastershipInfo getMastershipFor(DeviceId deviceId);
+
+    /**
      * Returns the devices for which a controller is master.
      * <p>
      * Returned Set may contain DeviceId which no longer exist in the system.
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
index 3650388..7f43a15 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
@@ -73,7 +73,6 @@
      */
     Set<DeviceId> getDevices(NodeId nodeId);
 
-
     /**
      * Sets a device's role for a specified controller instance.
      *
@@ -93,6 +92,14 @@
     MastershipTerm getTermFor(DeviceId deviceId);
 
     /**
+     * Returns the mastership info for the given device.
+     *
+     * @param deviceId the device for which to return the mastership info
+     * @return the mastership info for the given device
+     */
+    MastershipInfo getMastership(DeviceId deviceId);
+
+    /**
      * Sets a controller instance's mastership role to STANDBY for a device.
      * If the role is MASTER, another controller instance will be selected
      * as a candidate master.
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java b/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java
index c5f6ba9..7b791f4 100644
--- a/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java
@@ -15,14 +15,15 @@
  */
 package org.onosproject.mastership;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.testing.EqualsTester;
 import org.junit.Test;
 
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
 
-import java.util.Arrays;
+import java.util.Optional;
 
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
@@ -36,21 +37,33 @@
     private final DeviceId deviceId2 = DeviceId.deviceId("bar:baz");
     private final NodeId node1 = new NodeId("1");
     private final NodeId node2 = new NodeId("2");
-    private final RoleInfo roleInfo1 = new RoleInfo(node1, Arrays.asList(node1, node2));
-    private final RoleInfo roleInfo2 = new RoleInfo(node2, Arrays.asList(node2, node1));
+    private final MastershipInfo mastershipInfo1 = new MastershipInfo(
+        1,
+        Optional.of(node1),
+        ImmutableMap.<NodeId, MastershipRole>builder()
+            .put(node1, MastershipRole.MASTER)
+            .put(node2, MastershipRole.STANDBY)
+            .build());
+    private final MastershipInfo mastershipInfo2 = new MastershipInfo(
+        2,
+        Optional.of(node1),
+        ImmutableMap.<NodeId, MastershipRole>builder()
+            .put(node2, MastershipRole.MASTER)
+            .put(node1, MastershipRole.STANDBY)
+            .build());
 
     private final MastershipEvent event1 =
-            new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId1, roleInfo1);
+            new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId1, mastershipInfo1);
     private final MastershipEvent event2 =
-            new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo1);
+            new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo1);
     private final MastershipEvent event3 =
-            new MastershipEvent(MastershipEvent.Type.SUSPENDED, deviceId1, roleInfo1);
+            new MastershipEvent(MastershipEvent.Type.SUSPENDED, deviceId1, mastershipInfo1);
     private final MastershipEvent event4 =
-            new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo2, time);
+            new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo2, time);
     private final MastershipEvent sameAsEvent4 =
-            new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo2, time);
+            new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo2, time);
     private final MastershipEvent event5 =
-            new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId2, roleInfo1);
+            new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId2, mastershipInfo1);
 
     /**
      * Tests for proper operation of equals(), hashCode() and toString() methods.
@@ -73,12 +86,12 @@
     public void checkConstruction() {
         assertThat(event1.type(), is(MastershipEvent.Type.BACKUPS_CHANGED));
         assertThat(event1.subject(), is(deviceId1));
-        assertThat(event1.roleInfo(), is(roleInfo1));
+        assertThat(event1.mastershipInfo(), is(mastershipInfo1));
 
         assertThat(event4.time(), is(time));
         assertThat(event4.type(), is(MastershipEvent.Type.MASTER_CHANGED));
         assertThat(event4.subject(), is(deviceId1));
-        assertThat(event4.roleInfo(), is(roleInfo2));
+        assertThat(event4.mastershipInfo(), is(mastershipInfo2));
     }
 
 }
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipInfoTest.java b/core/api/src/test/java/org/onosproject/mastership/MastershipInfoTest.java
new file mode 100644
index 0000000..89a3350
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipInfoTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.mastership;
+
+import java.util.Optional;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.MastershipRole;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Mastership info test.
+ */
+public class MastershipInfoTest {
+    private final NodeId node1 = new NodeId("1");
+    private final NodeId node2 = new NodeId("2");
+    private final NodeId node3 = new NodeId("3");
+    private final NodeId node4 = new NodeId("4");
+
+    private final MastershipInfo mastershipInfo = new MastershipInfo(
+        1,
+        Optional.of(node1),
+        ImmutableMap.<NodeId, MastershipRole>builder()
+            .put(node1, MastershipRole.MASTER)
+            .put(node2, MastershipRole.STANDBY)
+            .put(node3, MastershipRole.STANDBY)
+            .put(node4, MastershipRole.NONE)
+            .build());
+
+    @Test
+    public void testMastershipInfo() throws Exception {
+        assertEquals(1, mastershipInfo.term());
+        assertEquals(node1, mastershipInfo.master().get());
+        assertEquals(Lists.newArrayList(node1), mastershipInfo.getRoles(MastershipRole.MASTER));
+        assertEquals(Lists.newArrayList(node2, node3), mastershipInfo.backups());
+        assertEquals(Lists.newArrayList(node2, node3), mastershipInfo.getRoles(MastershipRole.STANDBY));
+        assertEquals(Lists.newArrayList(node4), mastershipInfo.getRoles(MastershipRole.NONE));
+    }
+
+    @Test
+    public void testEquals() throws Exception {
+        assertEquals(mastershipInfo, mastershipInfo);
+        assertNotEquals(mastershipInfo, new MastershipInfo(1, Optional.of(node1), ImmutableMap.of()));
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
index a0b203f..d9a2971 100644
--- a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
@@ -48,6 +48,11 @@
     }
 
     @Override
+    public MastershipInfo getMastershipFor(DeviceId deviceId) {
+        return null;
+    }
+
+    @Override
     public Set<DeviceId> getDevicesOf(NodeId nodeId) {
         return null;
     }
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java
index 31480e8..d4df583 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java
@@ -27,10 +27,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -48,6 +50,7 @@
 import org.onosproject.core.Version;
 import org.onosproject.core.VersionService;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStore;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
@@ -177,7 +180,7 @@
         }
 
         return CompletableFuture.completedFuture(
-                new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+                new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
     }
 
     @Override
@@ -219,8 +222,7 @@
                     incrementTerm(deviceId);
                     // remove from backup list
                     removeFromBackups(deviceId, node);
-                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                       getNodes(deviceId)));
+                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
@@ -229,14 +231,12 @@
                     // no master => become master
                     masterMap.put(deviceId, node);
                     incrementTerm(deviceId);
-                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                       getNodes(deviceId)));
+                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
                 // add to backup list
                 if (addToBackup(deviceId, node)) {
-                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
-                                                       getNodes(deviceId)));
+                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
                 }
                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
             default:
@@ -299,6 +299,21 @@
     }
 
     @Override
+    public MastershipInfo getMastership(DeviceId deviceId) {
+        ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
+        NodeId master = masterMap.get(deviceId);
+        if (master != null) {
+            roleBuilder.put(master, MastershipRole.MASTER);
+        }
+        backups.getOrDefault(deviceId, Collections.emptyList())
+            .forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
+        return new MastershipInfo(
+            termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
+            Optional.ofNullable(master),
+            roleBuilder.build());
+    }
+
+    @Override
     public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
         MastershipRole role = getRole(nodeId, deviceId);
         switch (role) {
@@ -309,13 +324,13 @@
                 masterMap.remove(deviceId);
                 // TODO: Should there be new event type for no MASTER?
                 return CompletableFuture.completedFuture(
-                        new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+                        new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
             } else {
                 NodeId prevMaster = masterMap.put(deviceId, backup);
                 incrementTerm(deviceId);
                 addToBackup(deviceId, prevMaster);
                 return CompletableFuture.completedFuture(
-                        new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+                        new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
             }
 
         case STANDBY:
@@ -323,7 +338,7 @@
             boolean modified = addToBackup(deviceId, nodeId);
             if (modified) {
                 return CompletableFuture.completedFuture(
-                        new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+                        new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
             }
             break;
 
@@ -357,12 +372,12 @@
             masterMap.put(deviceId, backup);
             incrementTerm(deviceId);
             return CompletableFuture.completedFuture(
-                    new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+                    new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
 
         case STANDBY:
             if (removeFromBackups(deviceId, nodeId)) {
                 return CompletableFuture.completedFuture(
-                    new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+                    new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
             }
             break;
 
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index c744d61..5dea660 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -39,6 +39,7 @@
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.mastership.MastershipAdminService;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipStore;
@@ -239,6 +240,13 @@
     }
 
     @Override
+    public MastershipInfo getMastershipFor(DeviceId deviceId) {
+        checkPermission(CLUSTER_READ);
+        checkNotNull(deviceId, DEVICE_ID_NULL);
+        return store.getMastership(deviceId);
+    }
+
+    @Override
     public MastershipTerm getMastershipTerm(DeviceId deviceId) {
         checkPermission(CLUSTER_READ);
         return store.getTermFor(deviceId);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
index 634582a..feb517c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
@@ -29,21 +29,33 @@
  */
 public final class ReplicaInfo {
 
+    private final long term;
     private final Optional<NodeId> master;
     private final List<NodeId> backups;
 
     /**
      * Creates a ReplicaInfo instance.
      *
+     * @param term monotonically increasing unique mastership term
      * @param master NodeId of the node where the master copy should be
      * @param backups list of NodeId, where backup copies should be placed
      */
-    public ReplicaInfo(NodeId master, List<NodeId> backups) {
+    public ReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+        this.term = term;
         this.master = Optional.ofNullable(master);
         this.backups = checkNotNull(backups);
     }
 
     /**
+     * Returns the mastership term.
+     *
+     * @return the mastership term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
      * Returns the NodeId, if there is a Node where the master copy should be.
      *
      * @return NodeId, where the master copy should be placed
@@ -78,6 +90,7 @@
 
     // for Serializer
     private ReplicaInfo() {
+        this.term = 0;
         this.master = Optional.empty();
         this.backups = Collections.emptyList();
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
new file mode 100644
index 0000000..7034118
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Identifier representing a backup of a distinct bucket to a specific node.
+ */
+public class BackupOperation {
+    private final NodeId nodeId;
+    private final int bucketId;
+
+    BackupOperation(NodeId nodeId, int bucketId) {
+        this.nodeId = nodeId;
+        this.bucketId = bucketId;
+    }
+
+    /**
+     * Returns the node identifier.
+     *
+     * @return the node identifier
+     */
+    public NodeId nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns the bucket identifier.
+     *
+     * @return the bucket identifier
+     */
+    public int bucket() {
+        return bucketId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(nodeId, bucketId);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other != null && other instanceof BackupOperation) {
+            BackupOperation that = (BackupOperation) other;
+            return this.nodeId.equals(that.nodeId)
+                && this.bucketId == that.bucketId;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+            .add("nodeId", nodeId())
+            .add("bucket", bucket())
+            .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
new file mode 100644
index 0000000..33cc304
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.net.DeviceId;
+
+/**
+ * Represents a distinct device flow bucket.
+ */
+public class BucketId {
+    private final DeviceId deviceId;
+    private final int bucket;
+
+    BucketId(DeviceId deviceId, int bucket) {
+        this.deviceId = deviceId;
+        this.bucket = bucket;
+    }
+
+    /**
+     * Returns the bucket device identifier.
+     *
+     * @return the device identifier
+     */
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    /**
+     * Returns the bucket number.
+     *
+     * @return the bucket number
+     */
+    public int bucket() {
+        return bucket;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(deviceId, bucket);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof BucketId) {
+            BucketId that = (BucketId) other;
+            return this.deviceId.equals(that.deviceId)
+                && this.bucket == that.bucket;
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s/%d", deviceId, bucket);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
new file mode 100644
index 0000000..b49ff4e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -0,0 +1,852 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flow table for all flows associated with a specific device.
+ * <p>
+ * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
+ * table performs communication independent of other device flow tables for more parallelism.
+ * <p>
+ * This implementation uses several different replication protocols. Changes that occur on the device master are
+ * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
+ * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
+ * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
+ * device, allowing mastership to be reassigned to non-backup nodes.
+ */
+public class DeviceFlowTable {
+    private static final int NUM_BUCKETS = 1024;
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+        .register(KryoNamespaces.API)
+        .register(BucketId.class)
+        .register(FlowBucket.class)
+        .register(FlowBucketDigest.class)
+        .register(LogicalTimestamp.class)
+        .register(Timestamped.class)
+        .build());
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final MessageSubject getDigestsSubject;
+    private final MessageSubject getBucketSubject;
+    private final MessageSubject backupSubject;
+
+    private final DeviceId deviceId;
+    private final ClusterCommunicationService clusterCommunicator;
+    private final LifecycleManager lifecycleManager;
+    private final ScheduledExecutorService executorService;
+    private final NodeId localNodeId;
+
+    private final LogicalClock clock = new LogicalClock();
+
+    private volatile DeviceReplicaInfo replicaInfo;
+    private volatile long activeTerm;
+
+    private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
+        @Override
+        public void event(LifecycleEvent event) {
+            executorService.execute(() -> onLifecycleEvent(event));
+        }
+    };
+
+    private ScheduledFuture<?> backupFuture;
+    private ScheduledFuture<?> antiEntropyFuture;
+
+    private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
+    private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
+
+    private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
+    private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
+
+    DeviceFlowTable(
+        DeviceId deviceId,
+        ClusterService clusterService,
+        ClusterCommunicationService clusterCommunicator,
+        LifecycleManager lifecycleManager,
+        ScheduledExecutorService executorService,
+        long backupPeriod,
+        long antiEntropyPeriod) {
+        this.deviceId = deviceId;
+        this.clusterCommunicator = clusterCommunicator;
+        this.lifecycleManager = lifecycleManager;
+        this.executorService = executorService;
+        this.localNodeId = clusterService.getLocalNode().id();
+
+        addListeners();
+
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+            flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
+        }
+
+        getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
+        getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
+        backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
+
+        setBackupPeriod(backupPeriod);
+        setAntiEntropyPeriod(antiEntropyPeriod);
+        registerSubscribers();
+
+        startTerm(lifecycleManager.getReplicaInfo());
+    }
+
+    /**
+     * Sets the flow table backup period.
+     *
+     * @param backupPeriod the flow table backup period in milliseconds
+     */
+    synchronized void setBackupPeriod(long backupPeriod) {
+        ScheduledFuture<?> backupFuture = this.backupFuture;
+        if (backupFuture != null) {
+            backupFuture.cancel(false);
+        }
+        this.backupFuture = executorService.scheduleAtFixedRate(
+            this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Sets the flow table anti-entropy period.
+     *
+     * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
+     */
+    synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
+        ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+        if (antiEntropyFuture != null) {
+            antiEntropyFuture.cancel(false);
+        }
+        this.antiEntropyFuture = executorService.scheduleAtFixedRate(
+            this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Counts the flows in the table.
+     *
+     * @return the total number of flows in the table
+     */
+    public int count() {
+        return flowBuckets.values().stream()
+            .mapToInt(FlowBucket::count)
+            .sum();
+    }
+
+    /**
+     * Returns the flow entry for the given rule.
+     *
+     * @param rule the rule for which to lookup the flow entry
+     * @return the flow entry for the given rule
+     */
+    public StoredFlowEntry getFlowEntry(FlowRule rule) {
+        return getBucket(rule.id())
+            .getFlowEntries(rule.id())
+            .get(rule);
+    }
+
+    /**
+     * Returns the set of flow entries in the table.
+     *
+     * @return the set of flow entries in the table
+     */
+    public Set<FlowEntry> getFlowEntries() {
+        return flowBuckets.values().stream()
+            .flatMap(bucket -> bucket.getFlowBucket().values().stream())
+            .flatMap(entries -> entries.values().stream())
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * Returns the bucket for the given flow identifier.
+     *
+     * @param flowId the flow identifier
+     * @return the bucket for the given flow identifier
+     */
+    private FlowBucket getBucket(FlowId flowId) {
+        return getBucket(bucket(flowId));
+    }
+
+    /**
+     * Returns the bucket with the given identifier.
+     *
+     * @param bucketId the bucket identifier
+     * @return the bucket with the given identifier
+     */
+    private FlowBucket getBucket(int bucketId) {
+        return flowBuckets.get(bucketId);
+    }
+
+    /**
+     * Returns the bucket number for the given flow identifier.
+     *
+     * @param flowId the flow identifier
+     * @return the bucket number for the given flow identifier
+     */
+    private int bucket(FlowId flowId) {
+        return Math.abs((int) (flowId.id() % NUM_BUCKETS));
+    }
+
+    /**
+     * Returns the digests for all buckets in the flow table for the device.
+     *
+     * @return the set of digests for all buckets for the device
+     */
+    private Set<FlowBucketDigest> getDigests() {
+        return flowBuckets.values()
+            .stream()
+            .map(bucket -> bucket.getDigest())
+            .collect(Collectors.toSet());
+    }
+
+    /**
+     * Returns the digest for the given bucket.
+     *
+     * @param bucket the bucket for which to return the digest
+     * @return the digest for the given bucket
+     */
+    private FlowBucketDigest getDigest(int bucket) {
+        return flowBuckets.get(bucket).getDigest();
+    }
+
+    /**
+     * Adds an entry to the table.
+     *
+     * @param rule the rule to add
+     * @return a future to be completed once the rule has been added
+     */
+    public CompletableFuture<Void> add(FlowEntry rule) {
+        return runInTerm(rule.id(), (bucket, term) -> {
+            bucket.add(rule, term, clock);
+            return null;
+        });
+    }
+
+    /**
+     * Updates an entry in the table.
+     *
+     * @param rule the rule to update
+     * @return a future to be completed once the rule has been updated
+     */
+    public CompletableFuture<Void> update(FlowEntry rule) {
+        return runInTerm(rule.id(), (bucket, term) -> {
+            bucket.update(rule, term, clock);
+            return null;
+        });
+    }
+
+    /**
+     * Applies the given update function to the rule.
+     *
+     * @param rule     the rule to update
+     * @param function the update function to apply
+     * @param <T>      the result type
+     * @return a future to be completed with the update result or {@code null} if the rule was not updated
+     */
+    public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+        return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
+    }
+
+    /**
+     * Removes an entry from the table.
+     *
+     * @param rule the rule to remove
+     * @return a future to be completed once the rule has been removed
+     */
+    public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
+        return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
+    }
+
+    /**
+     * Runs the given function in the current term.
+     *
+     * @param flowId   the flow identifier indicating the bucket in which to run the function
+     * @param function the function to execute in the current term
+     * @param <T>      the future result type
+     * @return a future to be completed with the function result once it has been run
+     */
+    private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return Tools.exceptionalFuture(new IllegalStateException());
+        }
+
+        FlowBucket bucket = getBucket(flowId);
+
+        // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+        // the change to be executed once the master has been synchronized.
+        final long term = replicaInfo.term();
+        if (activeTerm < term) {
+            log.debug("Enqueueing operation for device {}", deviceId);
+            synchronized (flowTasks) {
+                // Double checked lock on the active term.
+                if (activeTerm < term) {
+                    CompletableFuture<T> future = new CompletableFuture<>();
+                    flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+                        .add(() -> future.complete(function.apply(bucket, term)));
+                    return future;
+                }
+            }
+        }
+        return CompletableFuture.completedFuture(function.apply(bucket, term));
+    }
+
+    /**
+     * Backs up all buckets in the given device to the given node.
+     */
+    private void backup() {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+        // If the local node is not currently the master, skip the backup.
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return;
+        }
+
+        // Otherwise, iterate through backup nodes and backup the device.
+        for (NodeId nodeId : replicaInfo.backups()) {
+            try {
+                backup(nodeId, replicaInfo.term());
+            } catch (Exception e) {
+                log.error("Backup of " + deviceId + " to " + nodeId + " failed", e);
+            }
+        }
+    }
+
+    /**
+     * Backs up all buckets for the device to the given node.
+     *
+     * @param nodeId the node to which to back up the device
+     * @param term   the term for which to backup to the node
+     */
+    private void backup(NodeId nodeId, long term) {
+        for (FlowBucket bucket : flowBuckets.values()) {
+            // If the bucket is not in the current term, skip it. This forces synchronization of the bucket
+            // to occur prior to the new master replicating changes in the bucket to backups.
+            if (bucket.term() != term) {
+                continue;
+            }
+
+            // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
+            LogicalTimestamp timestamp = bucket.timestamp();
+
+            // If the backup can be run (no concurrent backup to the node in progress) then run it.
+            BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
+            if (startBackup(operation, timestamp)) {
+                backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+                    if (error != null) {
+                        log.debug("Backup operation {} failed", operation, error);
+                        failBackup(operation);
+                    } else if (succeeded) {
+                        succeedBackup(operation, timestamp);
+                        backup(nodeId, term);
+                    } else {
+                        log.debug("Backup operation {} failed: term mismatch", operation);
+                        failBackup(operation);
+                    }
+                }, executorService);
+            }
+        }
+    }
+
+    /**
+     * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
+     * <p>
+     * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
+     * are pending replication for the backup operation.
+     *
+     * @param operation the operation to start
+     * @param timestamp the timestamp for which to start the backup operation
+     * @return indicates whether the given backup operation should be started
+     */
+    private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+        LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
+        return timestamp != null
+            && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
+            && inFlightUpdates.add(operation);
+    }
+
+    /**
+     * Fails the given backup operation.
+     *
+     * @param operation the backup operation to fail
+     */
+    private void failBackup(BackupOperation operation) {
+        inFlightUpdates.remove(operation);
+    }
+
+    /**
+     * Succeeds the given backup operation.
+     * <p>
+     * The last backup time for the operation will be updated and the operation will be removed from
+     * in-flight updates.
+     *
+     * @param operation the operation to succeed
+     * @param timestamp the timestamp at which the operation was <em>started</em>
+     */
+    private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+        lastBackupTimes.put(operation, timestamp);
+        inFlightUpdates.remove(operation);
+    }
+
+    /**
+     * Resets the last completion time for the given backup operation to ensure it's replicated again.
+     *
+     * @param operation the backup operation to reset
+     */
+    private void resetBackup(BackupOperation operation) {
+        lastBackupTimes.remove(operation);
+    }
+
+    /**
+     * Performs the given backup operation.
+     *
+     * @param bucket the bucket to backup
+     * @param nodeId the node to which to backup the bucket
+     * @return a future to be completed with a boolean indicating whether the backup operation was successful
+     */
+    private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
+        if (log.isDebugEnabled()) {
+            log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
+        }
+        return sendWithTimestamp(bucket, backupSubject, nodeId);
+    }
+
+    /**
+     * Handles a flow bucket backup from a remote peer.
+     *
+     * @param flowBucket the flow bucket to back up
+     * @return the set of flows that could not be backed up
+     */
+    private boolean onBackup(FlowBucket flowBucket) {
+        if (log.isDebugEnabled()) {
+            log.debug("{} - Received {} flow entries in bucket {} to backup",
+                deviceId, flowBucket.count(), flowBucket.bucketId());
+        }
+
+        try {
+            DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+            // If the backup is for a different term, reject the request until we learn about the new term.
+            if (flowBucket.term() != replicaInfo.term()) {
+                log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
+                return false;
+            }
+
+            flowBuckets.compute(flowBucket.bucketId().bucket(),
+                (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+            return true;
+        } catch (Exception e) {
+            log.warn("Failure processing backup request", e);
+            return false;
+        }
+    }
+
+    /**
+     * Runs the anti-entropy protocol.
+     */
+    private void runAntiEntropy() {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return;
+        }
+
+        for (NodeId nodeId : replicaInfo.backups()) {
+            runAntiEntropy(nodeId);
+        }
+    }
+
+    /**
+     * Runs the anti-entropy protocol against the given peer.
+     *
+     * @param nodeId the node with which to execute the anti-entropy protocol
+     */
+    private void runAntiEntropy(NodeId nodeId) {
+        requestDigests(nodeId).thenAcceptAsync((digests) -> {
+            // Compute a set of missing BucketIds based on digest times and send them back to the master.
+            for (FlowBucketDigest remoteDigest : digests) {
+                FlowBucket localBucket = getBucket(remoteDigest.bucket());
+                if (localBucket.getDigest().isNewerThan(remoteDigest)) {
+                    log.debug("Detected missing flow entries on node {} in bucket {}/{}",
+                        nodeId, deviceId, remoteDigest.bucket());
+                    resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+                }
+            }
+        }, executorService);
+    }
+
+    /**
+     * Sends a digest request to the given node.
+     *
+     * @param nodeId the node to which to send the request
+     * @return future to be completed with the set of digests for the given device on the given node
+     */
+    private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
+        return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
+    }
+
+    /**
+     * Synchronizes flows from the previous master or backups.
+     *
+     * @param prevReplicaInfo the previous replica info
+     * @param newReplicaInfo  the new replica info
+     */
+    private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+        if (prevReplicaInfo == null) {
+            activateMaster(newReplicaInfo);
+        } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
+            syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
+        } else {
+            syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+        }
+    }
+
+    /**
+     * Synchronizes flows from the previous master, falling back to backups if the master fails.
+     *
+     * @param prevReplicaInfo the previous replica info
+     * @param newReplicaInfo  the new replica info
+     */
+    private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+        syncFlowsOn(prevReplicaInfo.master())
+            .whenCompleteAsync((result, error) -> {
+                if (error != null) {
+                    log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
+                    syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+                } else {
+                    activateMaster(newReplicaInfo);
+                }
+            }, executorService);
+    }
+
+    /**
+     * Synchronizes flows from the previous backups.
+     *
+     * @param prevReplicaInfo the previous replica info
+     * @param newReplicaInfo  the new replica info
+     */
+    private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+        List<NodeId> backups = prevReplicaInfo.backups()
+            .stream()
+            .filter(nodeId -> !nodeId.equals(localNodeId))
+            .collect(Collectors.toList());
+        syncFlowsOn(backups)
+            .whenCompleteAsync((result, error) -> {
+                if (error != null) {
+                    log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
+                }
+                activateMaster(newReplicaInfo);
+            }, executorService);
+    }
+
+    /**
+     * Synchronizes flows for the device on the given nodes.
+     *
+     * @param nodes the nodes via which to synchronize the flows
+     * @return a future to be completed once flows have been synchronizes
+     */
+    private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
+        return nodes.isEmpty()
+            ? CompletableFuture.completedFuture(null)
+            : Tools.firstOf(nodes.stream()
+            .map(node -> syncFlowsOn(node))
+            .collect(Collectors.toList()))
+            .thenApply(v -> null);
+    }
+
+    /**
+     * Synchronizes flows for the device from the given node.
+     *
+     * @param nodeId the node from which to synchronize flows
+     * @return a future to be completed once the flows have been synchronizes
+     */
+    private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
+        return requestDigests(nodeId)
+            .thenCompose(digests -> Tools.allOf(digests.stream()
+                .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
+                .map(digest -> syncBucketOn(nodeId, digest.bucket()))
+                .collect(Collectors.toList())))
+            .thenApply(v -> null);
+    }
+
+    /**
+     * Synchronizes the given bucket on the given node.
+     *
+     * @param nodeId       the node on which to synchronize the bucket
+     * @param bucketNumber the bucket to synchronize
+     * @return a future to be completed once the bucket has been synchronizes
+     */
+    private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
+        return requestBucket(nodeId, bucketNumber)
+            .thenAcceptAsync(flowBucket -> {
+                flowBuckets.compute(flowBucket.bucketId().bucket(),
+                    (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+            }, executorService);
+    }
+
+    /**
+     * Requests the given bucket from the given node.
+     *
+     * @param nodeId the node from which to request the bucket
+     * @param bucket the bucket to request
+     * @return a future to be completed with the bucket
+     */
+    private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
+        log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
+        return sendWithTimestamp(bucket, getBucketSubject, nodeId);
+    }
+
+    /**
+     * Handles a flow bucket request.
+     *
+     * @param bucket the bucket number
+     * @return the flow bucket
+     */
+    private FlowBucket onGetBucket(int bucket) {
+        return flowBuckets.get(bucket);
+    }
+
+    /**
+     * Activates the new master term.
+     *
+     * @param replicaInfo the new replica info
+     */
+    private void activateMaster(DeviceReplicaInfo replicaInfo) {
+        log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
+        for (int i = 0; i < NUM_BUCKETS; i++) {
+            activateBucket(i);
+        }
+        lifecycleManager.activate(replicaInfo.term());
+        activeTerm = replicaInfo.term();
+    }
+
+    /**
+     * Activates the given bucket number.
+     *
+     * @param bucket the bucket number to activate
+     */
+    private void activateBucket(int bucket) {
+        Queue<Runnable> tasks;
+        synchronized (flowTasks) {
+            tasks = flowTasks.remove(bucket);
+        }
+        if (tasks != null) {
+            log.debug("Completing enqueued operations for device {}", deviceId);
+            tasks.forEach(task -> task.run());
+        }
+    }
+
+    /**
+     * Handles a lifecycle event.
+     */
+    private void onLifecycleEvent(LifecycleEvent event) {
+        log.debug("Received lifecycle event for device {}: {}", deviceId, event);
+        switch (event.type()) {
+            case TERM_START:
+                startTerm(event.subject());
+                break;
+            case TERM_ACTIVE:
+                activateTerm(event.subject());
+                break;
+            case TERM_UPDATE:
+                updateTerm(event.subject());
+                break;
+            default:
+                break;
+        }
+    }
+
+    /**
+     * Handles a replica change at the start of a new term.
+     */
+    private void startTerm(DeviceReplicaInfo replicaInfo) {
+        DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+        this.replicaInfo = replicaInfo;
+        if (replicaInfo.isMaster(localNodeId)) {
+            log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
+            syncFlows(oldReplicaInfo, replicaInfo);
+        }
+    }
+
+    /**
+     * Handles the activation of a term.
+     */
+    private void activateTerm(DeviceReplicaInfo replicaInfo) {
+        if (replicaInfo.term() < this.replicaInfo.term()) {
+            return;
+        }
+        if (replicaInfo.term() > this.replicaInfo.term()) {
+            this.replicaInfo = replicaInfo;
+        }
+
+        // If the local node is neither the master or a backup for the device, clear the flow table.
+        if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
+            flowBuckets.values().forEach(bucket -> bucket.clear());
+        }
+        activeTerm = replicaInfo.term();
+    }
+
+    /**
+     * Handles an update to a term.
+     */
+    private void updateTerm(DeviceReplicaInfo replicaInfo) {
+        if (replicaInfo.term() == this.replicaInfo.term()) {
+            this.replicaInfo = replicaInfo;
+
+            // If the local node is neither the master or a backup for the device *and the term is active*,
+            // clear the flow table.
+            if (activeTerm == replicaInfo.term()
+                && !replicaInfo.isMaster(localNodeId)
+                && !replicaInfo.isBackup(localNodeId)) {
+                flowBuckets.values().forEach(bucket -> bucket.clear());
+            }
+        }
+    }
+
+    /**
+     * Sends a message to the given node wrapped in a Lamport timestamp.
+     * <p>
+     * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
+     * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
+     *
+     * @param message  the message to send
+     * @param subject  the message subject
+     * @param toNodeId the node to which to send the message
+     * @param <M>      the message type
+     * @param <R>      the response type
+     * @return a future to be completed with the response
+     */
+    private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
+        return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
+            clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
+            .thenApply(response -> {
+                clock.tick(response.timestamp());
+                return response.value();
+            });
+    }
+
+    /**
+     * Receives messages to the given subject wrapped in Lamport timestamps.
+     * <p>
+     * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
+     * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
+     *
+     * @param subject  the subject for which to register the subscriber
+     * @param function the raw message handler
+     * @param <M>      the raw message type
+     * @param <R>      the raw response type
+     */
+    private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
+        clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
+            clock.tick(request.timestamp());
+            return clock.timestamp(function.apply(request.value()));
+        }, SERIALIZER::encode, executorService);
+    }
+
+    /**
+     * Registers internal message subscribers.
+     */
+    private void registerSubscribers() {
+        receiveWithTimestamp(getDigestsSubject, v -> getDigests());
+        receiveWithTimestamp(getBucketSubject, this::onGetBucket);
+        receiveWithTimestamp(backupSubject, this::onBackup);
+    }
+
+    /**
+     * Unregisters internal message subscribers.
+     */
+    private void unregisterSubscribers() {
+        clusterCommunicator.removeSubscriber(getDigestsSubject);
+        clusterCommunicator.removeSubscriber(getBucketSubject);
+        clusterCommunicator.removeSubscriber(backupSubject);
+    }
+
+    /**
+     * Adds internal event listeners.
+     */
+    private void addListeners() {
+        lifecycleManager.addListener(lifecycleEventListener);
+    }
+
+    /**
+     * Removes internal event listeners.
+     */
+    private void removeListeners() {
+        lifecycleManager.removeListener(lifecycleEventListener);
+    }
+
+    /**
+     * Cancels recurrent scheduled futures.
+     */
+    private synchronized void cancelFutures() {
+        ScheduledFuture<?> backupFuture = this.backupFuture;
+        if (backupFuture != null) {
+            backupFuture.cancel(false);
+        }
+
+        ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+        if (antiEntropyFuture != null) {
+            antiEntropyFuture.cancel(false);
+        }
+    }
+
+    /**
+     * Purges the flow table.
+     */
+    public void purge() {
+        flowTasks.clear();
+        flowBuckets.values().forEach(bucket -> bucket.purge());
+        lastBackupTimes.clear();
+        inFlightUpdates.clear();
+    }
+
+    /**
+     * Closes the device flow table.
+     */
+    public void close() {
+        removeListeners();
+        unregisterSubscribers();
+        cancelFutures();
+        lifecycleManager.close();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
new file mode 100644
index 0000000..30f9396
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Device term context.
+ */
+public class DeviceReplicaInfo {
+    private final long term;
+    private final NodeId master;
+    private final List<NodeId> backups;
+
+    public DeviceReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+        this.term = term;
+        this.master = master;
+        this.backups = backups;
+    }
+
+    /**
+     * Returns the mastership term.
+     *
+     * @return the mastership term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the master for the {@link #term()}.
+     *
+     * @return the master {@link NodeId} for the current {@link #term()}
+     */
+    public NodeId master() {
+        return master;
+    }
+
+    /**
+     * Returns a boolean indicating whether the given {@link NodeId} is the current master.
+     *
+     * @param nodeId the node ID to check
+     * @return indicates whether the given node identifier is the identifier of the current master
+     */
+    public boolean isMaster(NodeId nodeId) {
+        return Objects.equals(master, nodeId);
+    }
+
+    /**
+     * Returns a list of all active backup nodes in priority order.
+     * <p>
+     * The returned backups are limited by the flow rule store's configured backup count.
+     *
+     * @return a list of backup nodes in priority order
+     */
+    public List<NodeId> backups() {
+        return backups;
+    }
+
+    /**
+     * Returns a boolean indicating whether the given node is a backup.
+     *
+     * @param nodeId the node identifier
+     * @return indicates whether the given node is a backup
+     */
+    public boolean isBackup(NodeId nodeId) {
+        return backups.contains(nodeId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(term, master, backups);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (object instanceof DeviceReplicaInfo) {
+            DeviceReplicaInfo that = (DeviceReplicaInfo) object;
+            return this.term == that.term
+                && Objects.equals(this.master, that.master)
+                && Objects.equals(this.backups, that.backups);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+            .add("term", term())
+            .add("master", master())
+            .add("backups", backups())
+            .toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index f998dad..4502016 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -1,23 +1,24 @@
- /*
- * Copyright 2014-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/*
+* Copyright 2014-present Open Networking Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.onosproject.store.flow.impl;
 
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -25,11 +26,12 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Streams;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -46,20 +48,17 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
+import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowEntry.FlowEntryState;
-import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleEvent.Type;
 import org.onosproject.net.flow.FlowRuleService;
@@ -67,38 +66,41 @@
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.impl.MastershipBasedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
@@ -111,8 +113,8 @@
 @Component(immediate = true)
 @Service
 public class ECFlowRuleStore
-        extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
-        implements FlowRuleStore {
+    extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+    implements FlowRuleStore {
 
     private final Logger log = getLogger(getClass());
 
@@ -120,23 +122,27 @@
     private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
     private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
     private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
+    private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-    // number of devices whose flow entries will be backed up in one communication round
-    private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
 
     @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
-            label = "Number of threads in the message handler pool")
+        label = "Number of threads in the message handler pool")
     private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
 
     @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
-            label = "Delay in ms between successive backup runs")
+        label = "Delay in ms between successive backup runs")
     private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+
+    @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
+        label = "Delay in ms between anti-entropy runs")
+    private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
+
     @Property(name = "persistenceEnabled", boolValue = false,
-            label = "Indicates whether or not changes in the flow table should be persisted to disk.")
+        label = "Indicates whether or not changes in the flow table should be persisted to disk.")
     private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
 
     @Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
-            label = "Max number of backup copies for each device")
+        label = "Max number of backup copies for each device")
     private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
 
     private InternalFlowTable flowTable = new InternalFlowTable();
@@ -169,24 +175,28 @@
     private ExecutorService messageHandlingExecutor;
     private ExecutorService eventHandler;
 
-    private ScheduledFuture<?> backupTask;
     private final ScheduledExecutorService backupSenderExecutor =
-            Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
+        Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
 
     private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
     private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
-            new InternalTableStatsListener();
+        new InternalTableStatsListener();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
+    protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
+        .register(KryoNamespaces.API)
+        .register(BucketId.class)
+        .register(FlowBucket.class)
+        .build());
 
     protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
-            .register(KryoNamespaces.API)
-            .register(MastershipBasedTimestamp.class);
+        .register(KryoNamespaces.API)
+        .register(BucketId.class)
+        .register(MastershipBasedTimestamp.class);
 
-    private EventuallyConsistentMap<DeviceId, Integer> flowCounts;
+    protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
 
     private IdGenerator idGenerator;
     private NodeId local;
@@ -200,45 +210,37 @@
         local = clusterService.getLocalNode().id();
 
         eventHandler = Executors.newSingleThreadExecutor(
-                groupedThreads("onos/flow", "event-handler", log));
+            groupedThreads("onos/flow", "event-handler", log));
         messageHandlingExecutor = Executors.newFixedThreadPool(
-                msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+            msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
 
         registerMessageHandlers(messageHandlingExecutor);
 
-        replicaInfoManager.addListener(flowTable);
-        backupTask = backupSenderExecutor.scheduleWithFixedDelay(
-                flowTable::backup,
-                0,
-                backupPeriod,
-                TimeUnit.MILLISECONDS);
-
-        flowCounts = storageService.<DeviceId, Integer>eventuallyConsistentMapBuilder()
-                .withName("onos-flow-counts")
-                .withSerializer(serializerBuilder)
-                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp())
-                .withTombstonesDisabled()
-                .build();
+        mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
+            .withName("onos-flow-store-terms")
+            .withSerializer(serializer)
+            .buildAsyncMap();
 
         deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
-                .withName("onos-flow-table-stats")
-                .withSerializer(serializerBuilder)
-                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp())
-                .withTombstonesDisabled()
-                .build();
+            .withName("onos-flow-table-stats")
+            .withSerializer(serializerBuilder)
+            .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+            .withTimestampProvider((k, v) -> new WallClockTimestamp())
+            .withTombstonesDisabled()
+            .build();
         deviceTableStats.addListener(tableStatsListener);
 
+        deviceService.addListener(flowTable);
+        deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
+
         logConfig("Started");
     }
 
     @Deactivate
     public void deactivate(ComponentContext context) {
-        replicaInfoManager.removeListener(flowTable);
-        backupTask.cancel(true);
         configService.unregisterProperties(getClass(), false);
         unregisterMessageHandlers();
+        deviceService.removeListener(flowTable);
         deviceTableStats.removeListener(tableStatsListener);
         deviceTableStats.destroy();
         eventHandler.shutdownNow();
@@ -259,6 +261,7 @@
         int newPoolSize;
         int newBackupPeriod;
         int newBackupCount;
+        int newAntiEntropyPeriod;
         try {
             String s = get(properties, "msgHandlerPoolSize");
             newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -268,39 +271,37 @@
 
             s = get(properties, "backupCount");
             newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
+
+            s = get(properties, "antiEntropyPeriod");
+            newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
         } catch (NumberFormatException | ClassCastException e) {
             newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
             newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
             newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
+            newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
         }
 
-        boolean restartBackupTask = false;
-
         if (newBackupPeriod != backupPeriod) {
             backupPeriod = newBackupPeriod;
-            restartBackupTask = true;
+            flowTable.setBackupPeriod(newBackupPeriod);
         }
-        if (restartBackupTask) {
-            if (backupTask != null) {
-                // cancel previously running task
-                backupTask.cancel(false);
-            }
-            backupTask = backupSenderExecutor.scheduleWithFixedDelay(
-                    flowTable::backup,
-                    0,
-                    backupPeriod,
-                    TimeUnit.MILLISECONDS);
+
+        if (newAntiEntropyPeriod != antiEntropyPeriod) {
+            antiEntropyPeriod = newAntiEntropyPeriod;
+            flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
         }
+
         if (newPoolSize != msgHandlerPoolSize) {
             msgHandlerPoolSize = newPoolSize;
             ExecutorService oldMsgHandler = messageHandlingExecutor;
             messageHandlingExecutor = Executors.newFixedThreadPool(
-                    msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+                msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
 
             // replace previously registered handlers.
             registerMessageHandlers(messageHandlingExecutor);
             oldMsgHandler.shutdown();
         }
+
         if (backupCount != newBackupCount) {
             backupCount = newBackupCount;
         }
@@ -308,23 +309,23 @@
     }
 
     private void registerMessageHandlers(ExecutorService executor) {
-
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
         clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
-                REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
+            REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
         clusterCommunicator.addSubscriber(
-                GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
+            GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
+            GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+            GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
+            REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
         clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
@@ -333,24 +334,38 @@
 
     private void logConfig(String prefix) {
         log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
-                 prefix, msgHandlerPoolSize, backupPeriod, backupCount);
+            prefix, msgHandlerPoolSize, backupPeriod, backupCount);
     }
 
-    // This is not a efficient operation on a distributed sharded
-    // flow store. We need to revisit the need for this operation or at least
-    // make it device specific.
     @Override
     public int getFlowRuleCount() {
         return Streams.stream(deviceService.getDevices()).parallel()
-                .mapToInt(device -> getFlowRuleCount(device.id()))
-                .sum();
+            .mapToInt(device -> getFlowRuleCount(device.id()))
+            .sum();
     }
 
     @Override
     public int getFlowRuleCount(DeviceId deviceId) {
-        Integer count = flowCounts.get(deviceId);
-        return count != null ? count : flowTable.flowEntries.get(deviceId) != null ?
-                flowTable.flowEntries.get(deviceId).keySet().size() : 0;
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        if (master == null) {
+            log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
+            return 0;
+        }
+
+        if (Objects.equals(local, master)) {
+            return flowTable.getFlowRuleCount(deviceId);
+        }
+
+        log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+            deviceId,
+            GET_DEVICE_FLOW_COUNT,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            0);
     }
 
     @Override
@@ -367,16 +382,16 @@
         }
 
         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
-                  master, rule.deviceId());
+            master, rule.deviceId());
 
         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
-                                    ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
-                                    serializer::encode,
-                                    serializer::decode,
-                                    master),
-                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
-                               TimeUnit.MILLISECONDS,
-                               null);
+            ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            null);
     }
 
     @Override
@@ -393,31 +408,31 @@
         }
 
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-                  master, deviceId);
+            master, deviceId);
 
         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
-                                    ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
-                                    serializer::encode,
-                                    serializer::decode,
-                                    master),
-                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
-                               TimeUnit.MILLISECONDS,
-                               Collections.emptyList());
+            ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            Collections.emptyList());
     }
 
     @Override
     public void storeFlowRule(FlowRule rule) {
         storeBatch(new FlowRuleBatchOperation(
-                Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
-                rule.deviceId(), idGenerator.getNewId()));
+            Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+            rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
     public void storeBatch(FlowRuleBatchOperation operation) {
         if (operation.getOperations().isEmpty()) {
             notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+                new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
             return;
         }
 
@@ -427,11 +442,13 @@
         if (master == null) {
             log.warn("No master for {} ", deviceId);
 
-            updateStoreInternal(operation);
-
+            Set<FlowRule> allFailures = operation.getOperations()
+                .stream()
+                .map(op -> op.target())
+                .collect(Collectors.toSet());
             notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+                new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                new CompletedBatchOperation(false, allFailures, deviceId)));
             return;
         }
 
@@ -441,26 +458,26 @@
         }
 
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
-                  master, deviceId);
+            master, deviceId);
 
         clusterCommunicator.unicast(operation,
-                                    APPLY_BATCH_FLOWS,
-                                    serializer::encode,
-                                    master)
-                           .whenComplete((result, error) -> {
-                               if (error != null) {
-                                   log.warn("Failed to storeBatch: {} to {}", operation, master, error);
+            APPLY_BATCH_FLOWS,
+            serializer::encode,
+            master)
+            .whenComplete((result, error) -> {
+                if (error != null) {
+                    log.warn("Failed to storeBatch: {} to {}", operation, master, error);
 
-                                   Set<FlowRule> allFailures = operation.getOperations()
-                                           .stream()
-                                           .map(op -> op.target())
-                                           .collect(Collectors.toSet());
+                    Set<FlowRule> allFailures = operation.getOperations()
+                        .stream()
+                        .map(op -> op.target())
+                        .collect(Collectors.toSet());
 
-                                   notifyDelegate(FlowRuleBatchEvent.completed(
-                                           new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                                           new CompletedBatchOperation(false, allFailures, deviceId)));
-                               }
-                           });
+                    notifyDelegate(FlowRuleBatchEvent.completed(
+                        new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                        new CompletedBatchOperation(false, allFailures, deviceId)));
+                }
+            });
     }
 
     private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -470,65 +487,63 @@
         Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
         if (currentOps.isEmpty()) {
             batchOperationComplete(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), did)));
+                new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                new CompletedBatchOperation(true, Collections.emptySet(), did)));
             return;
         }
 
         notifyDelegate(FlowRuleBatchEvent.requested(new
-                           FlowRuleBatchRequest(operation.id(),
-                                                currentOps), operation.deviceId()));
+            FlowRuleBatchRequest(operation.id(),
+            currentOps), operation.deviceId()));
     }
 
     private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
         return operation.getOperations().stream().map(
-                op -> {
-                    StoredFlowEntry entry;
-                    switch (op.operator()) {
-                        case ADD:
-                            entry = new DefaultFlowEntry(op.target());
-                            flowTable.add(entry);
+            op -> {
+                StoredFlowEntry entry;
+                switch (op.operator()) {
+                    case ADD:
+                        entry = new DefaultFlowEntry(op.target());
+                        flowTable.add(entry);
+                        return op;
+                    case MODIFY:
+                        entry = new DefaultFlowEntry(op.target());
+                        flowTable.update(entry);
+                        return op;
+                    case REMOVE:
+                        return flowTable.update(op.target(), stored -> {
+                            stored.setState(FlowEntryState.PENDING_REMOVE);
+                            log.debug("Setting state of rule to pending remove: {}", stored);
                             return op;
-                        case MODIFY:
-                            entry = new DefaultFlowEntry(op.target());
-                            flowTable.update(entry);
-                            return op;
-                        case REMOVE:
-                            entry = flowTable.getFlowEntry(op.target());
-                            if (entry != null) {
-                                entry.setState(FlowEntryState.PENDING_REMOVE);
-                                flowTable.update(entry);
-                                log.debug("Setting state of rule to pending remove: {}", entry);
-                                return op;
-                            }
-                            break;
-                        default:
-                            log.warn("Unknown flow operation operator: {}", op.operator());
-                    }
-                    return null;
+                        });
+                    default:
+                        log.warn("Unknown flow operation operator: {}", op.operator());
                 }
+                return null;
+            }
         ).filter(Objects::nonNull).collect(Collectors.toSet());
     }
 
     @Override
     public void deleteFlowRule(FlowRule rule) {
         storeBatch(
-                new FlowRuleBatchOperation(
-                        Collections.singletonList(
-                                new FlowRuleBatchEntry(
-                                        FlowRuleOperation.REMOVE,
-                                        rule)), rule.deviceId(), idGenerator.getNewId()));
+            new FlowRuleBatchOperation(
+                Collections.singletonList(
+                    new FlowRuleBatchEntry(
+                        FlowRuleOperation.REMOVE,
+                        rule)), rule.deviceId(), idGenerator.getNewId()));
     }
 
     @Override
     public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
         if (mastershipService.isLocalMaster(rule.deviceId())) {
-            StoredFlowEntry stored = flowTable.getFlowEntry(rule);
-            if (stored != null &&
-                    stored.state() != FlowEntryState.PENDING_ADD) {
-                stored.setState(FlowEntryState.PENDING_ADD);
-                return new FlowRuleEvent(Type.RULE_UPDATED, rule);
-            }
+            return flowTable.update(rule, stored -> {
+                if (stored.state() == FlowEntryState.PENDING_ADD) {
+                    stored.setState(FlowEntryState.PENDING_ADD);
+                    return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+                }
+                return null;
+            });
         }
         return null;
     }
@@ -541,14 +556,12 @@
         }
 
         log.warn("Tried to update FlowRule {} state,"
-                         + " while the Node was not the master.", rule);
+            + " while the Node was not the master.", rule);
         return null;
     }
 
     private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
-        // check if this new rule is an update to an existing entry
-        StoredFlowEntry stored = flowTable.getFlowEntry(rule);
-        if (stored != null) {
+        FlowRuleEvent event = flowTable.update(rule, stored -> {
             stored.setBytes(rule.bytes());
             stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
             stored.setLiveType(rule.liveType());
@@ -556,11 +569,12 @@
             stored.setLastSeen();
             if (stored.state() == FlowEntryState.PENDING_ADD) {
                 stored.setState(FlowEntryState.ADDED);
-                // Update the flow table to ensure the changes are replicated
-                flowTable.update(stored);
                 return new FlowRuleEvent(Type.RULE_ADDED, rule);
             }
             return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+        });
+        if (event != null) {
+            return event;
         }
 
         // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
@@ -586,14 +600,17 @@
         }
 
         log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
-                  master, deviceId);
+            master, deviceId);
 
-        return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
-                               rule,
-                               REMOVE_FLOW_ENTRY,
-                               serializer::encode,
-                               serializer::decode,
-                               master));
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+            rule,
+            REMOVE_FLOW_ENTRY,
+            serializer::encode,
+            serializer::decode,
+            master),
+            FLOW_RULE_STORE_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            null);
     }
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -654,307 +671,186 @@
         }
     }
 
-    private class BackupOperation {
-        private final NodeId nodeId;
-        private final DeviceId deviceId;
-
-        public BackupOperation(NodeId nodeId, DeviceId deviceId) {
-            this.nodeId = nodeId;
-            this.deviceId = deviceId;
-        }
+    private class InternalFlowTable implements DeviceListener {
+        private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
 
         @Override
-        public int hashCode() {
-            return Objects.hash(nodeId, deviceId);
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (other != null && other instanceof BackupOperation) {
-                BackupOperation that = (BackupOperation) other;
-                return this.nodeId.equals(that.nodeId) &&
-                        this.deviceId.equals(that.deviceId);
-            } else {
-                return false;
+        public void event(DeviceEvent event) {
+            if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
+                addDevice(event.subject().id());
             }
         }
-    }
-
-    private class InternalFlowTable implements ReplicaInfoEventListener {
-
-        //TODO replace the Map<V,V> with ExtendedSet
-        private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
-                flowEntries = Maps.newConcurrentMap();
-
-        private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
-        private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
-
-        @Override
-        public void event(ReplicaInfoEvent event) {
-            eventHandler.execute(() -> handleEvent(event));
-        }
-
-        private void handleEvent(ReplicaInfoEvent event) {
-            DeviceId deviceId = event.subject();
-            if (!mastershipService.isLocalMaster(deviceId)) {
-                return;
-            }
-            if (event.type() == MASTER_CHANGED) {
-                lastUpdateTimes.put(deviceId, System.currentTimeMillis());
-            }
-            backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
-        }
-
-        private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
-            // split up the devices into smaller batches and send them separately.
-            Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
-                     .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
-        }
-
-        private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
-            if (deviceIds.isEmpty()) {
-                return;
-            }
-            log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
-            Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
-                    deviceFlowEntries = Maps.newConcurrentMap();
-            deviceIds.forEach(id -> {
-                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = getFlowTableCopy(id);
-                int flowCount = copy.entrySet().stream()
-                        .mapToInt(e -> e.getValue().values().size()).sum();
-                flowCounts.put(id, flowCount);
-                deviceFlowEntries.put(id, copy);
-            });
-            clusterCommunicator.<Map<DeviceId,
-                                 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
-                                 Set<DeviceId>>
-                    sendAndReceive(deviceFlowEntries,
-                                   FLOW_TABLE_BACKUP,
-                                   serializer::encode,
-                                   serializer::decode,
-                                   nodeId)
-                    .whenComplete((backedupDevices, error) -> {
-                        Set<DeviceId> devicesNotBackedup = error != null ?
-                            deviceFlowEntries.keySet() :
-                            Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
-                        if (devicesNotBackedup.size() > 0) {
-                            log.warn("Failed to backup devices: {}. Reason: {}, Node: {}",
-                                     devicesNotBackedup, error != null ? error.getMessage() : "none",
-                                     nodeId);
-                        }
-                        if (backedupDevices != null) {
-                            backedupDevices.forEach(id -> {
-                                lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
-                            });
-                        }
-                    });
-        }
 
         /**
-         * Returns the flow table for specified device.
+         * Adds the given device to the flow table.
          *
-         * @param deviceId identifier of the device
-         * @return Map representing Flow Table of given device.
+         * @param deviceId the device to add to the table
          */
-        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
-            if (persistenceEnabled) {
-                return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
-                        .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
-                        .withName("FlowTable:" + deviceId.toString())
-                        .withSerializer(new Serializer() {
-                            @Override
-                            public <T> byte[] encode(T object) {
-                                return serializer.encode(object);
-                            }
-
-                            @Override
-                            public <T> T decode(byte[] bytes) {
-                                return serializer.decode(bytes);
-                            }
-
-                            @Override
-                            public <T> T copy(T object) {
-                                return serializer.copy(object);
-                            }
-                        })
-                        .build());
-            } else {
-                return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
-            }
+        public void addDevice(DeviceId deviceId) {
+            flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+                id,
+                clusterService,
+                clusterCommunicator,
+                new InternalLifecycleManager(id),
+                backupSenderExecutor,
+                backupPeriod,
+                antiEntropyPeriod));
         }
 
-        private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
-            Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
-            if (persistenceEnabled) {
-                return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
-                        .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
-                        .withName("FlowTable:" + deviceId.toString())
-                        .withSerializer(new Serializer() {
-                            @Override
-                            public <T> byte[] encode(T object) {
-                                return serializer.encode(object);
-                            }
-
-                            @Override
-                            public <T> T decode(byte[] bytes) {
-                                return serializer.decode(bytes);
-                            }
-
-                            @Override
-                            public <T> T copy(T object) {
-                                return serializer.copy(object);
-                            }
-                        })
-                        .build());
-            } else {
-                flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
-                    copy.put(k, Maps.newHashMap(v));
-                });
-                return copy;
-            }
+        /**
+         * Sets the flow table backup period.
+         *
+         * @param backupPeriod the flow table backup period
+         */
+        void setBackupPeriod(int backupPeriod) {
+            flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
         }
 
-        private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
-            return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+        /**
+         * Sets the flow table anti-entropy period.
+         *
+         * @param antiEntropyPeriod the flow table anti-entropy period
+         */
+        void setAntiEntropyPeriod(int antiEntropyPeriod) {
+            flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
         }
 
-        private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
-            return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
+        /**
+         * Returns the flow table for a specific device.
+         *
+         * @param deviceId the device identifier
+         * @return the flow table for the given device
+         */
+        private DeviceFlowTable getFlowTable(DeviceId deviceId) {
+            DeviceFlowTable flowTable = flowTables.get(deviceId);
+            return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+                deviceId,
+                clusterService,
+                clusterCommunicator,
+                new InternalLifecycleManager(deviceId),
+                backupSenderExecutor,
+                backupPeriod,
+                antiEntropyPeriod));
         }
 
-        private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
-            return getFlowTable(deviceId).values().stream()
-                        .flatMap(m -> m.values().stream())
-                        .collect(Collectors.toSet());
+        /**
+         * Returns the flow rule count for the given device.
+         *
+         * @param deviceId the device for which to return the flow rule count
+         * @return the flow rule count for the given device
+         */
+        public int getFlowRuleCount(DeviceId deviceId) {
+            return getFlowTable(deviceId).count();
         }
 
+        /**
+         * Returns the flow entry for the given rule.
+         *
+         * @param rule the rule for which to return the flow entry
+         * @return the flow entry for the given rule
+         */
         public StoredFlowEntry getFlowEntry(FlowRule rule) {
-            return getFlowEntryInternal(rule);
+            return getFlowTable(rule.deviceId()).getFlowEntry(rule);
         }
 
+        /**
+         * Returns the set of flow entries for the given device.
+         *
+         * @param deviceId the device for which to lookup flow entries
+         * @return the set of flow entries for the given device
+         */
         public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
-            return getFlowEntriesInternal(deviceId);
+            return getFlowTable(deviceId).getFlowEntries();
         }
 
+        /**
+         * Adds the given flow rule.
+         *
+         * @param rule the rule to add
+         */
         public void add(FlowEntry rule) {
-            getFlowEntriesInternal(rule.deviceId(), rule.id())
-                    .compute((StoredFlowEntry) rule, (k, stored) -> {
-                        return (StoredFlowEntry) rule;
-                    });
-            lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+            Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).add(rule),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
         }
 
+        /**
+         * Updates the given flow rule.
+         *
+         * @param rule the rule to update
+         */
         public void update(FlowEntry rule) {
-            getFlowEntriesInternal(rule.deviceId(), rule.id())
-                .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
-                    if (rule instanceof DefaultFlowEntry) {
-                        DefaultFlowEntry updated = (DefaultFlowEntry) rule;
-                        if (stored instanceof DefaultFlowEntry) {
-                            DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
-                            if (updated.created() >= storedEntry.created()) {
-                                lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
-                                return updated;
-                            } else {
-                                log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
-                                return stored;
-                            }
-                        }
-                    }
-                    return stored;
-                });
+            Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).update(rule),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
         }
 
+        /**
+         * Applies the given update function to the rule.
+         *
+         * @param function the update function to apply
+         * @return a future to be completed with the update event or {@code null} if the rule was not updated
+         */
+        public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+            return Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).update(rule, function),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
+        }
+
+        /**
+         * Removes the given flow rule.
+         *
+         * @param rule the rule to remove
+         */
         public FlowEntry remove(FlowEntry rule) {
-            final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
-            final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
-            flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
-                flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
-                    if (rule instanceof DefaultFlowEntry) {
-                        DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
-                        if (stored instanceof DefaultFlowEntry) {
-                            DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
-                            if (toRemove.created() < storedEntry.created()) {
-                                log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
-                                // the key is not updated, removedRule remains null
-                                return stored;
-                            }
-                        }
-                    }
-                    removedRule.set(stored);
-                    return null;
-                });
-                return flowEntries.isEmpty() ? null : flowEntries;
-            });
-
-            if (removedRule.get() != null) {
-                lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
-                return removedRule.get();
-            } else {
-                return null;
-            }
+            return Tools.futureGetOrElse(
+                getFlowTable(rule.deviceId()).remove(rule),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                null);
         }
 
+        /**
+         * Purges flow rules for the given device.
+         *
+         * @param deviceId the device for which to purge flow rules
+         */
         public void purgeFlowRule(DeviceId deviceId) {
-            flowEntries.remove(deviceId);
+            // If the device is still present in the store, purge the underlying DeviceFlowTable.
+            // Otherwise, remove the DeviceFlowTable and unregister message handlers.
+            if (deviceService.getDevice(deviceId) != null) {
+                DeviceFlowTable flowTable = flowTables.get(deviceId);
+                if (flowTable != null) {
+                    flowTable.purge();
+                }
+            } else {
+                DeviceFlowTable flowTable = flowTables.remove(deviceId);
+                if (flowTable != null) {
+                    flowTable.close();
+                }
+            }
         }
 
+        /**
+         * Purges all flow rules from the table.
+         */
         public void purgeFlowRules() {
-            flowEntries.clear();
-        }
-
-        private List<NodeId> getBackupNodes(DeviceId deviceId) {
-            // The returned backup node list is in the order of preference i.e. next likely master first.
-            List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
-            return ImmutableList.copyOf(allPossibleBackupNodes)
-                                .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
-        }
-
-        private void backup() {
-            try {
-                // compute a mapping from node to the set of devices whose flow entries it should backup
-                Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
-                flowEntries.keySet().forEach(deviceId -> {
-                    List<NodeId> backupNodes = getBackupNodes(deviceId);
-                    backupNodes.forEach(backupNode -> {
-                            if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
-                                    < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
-                                devicesToBackupByNode.computeIfAbsent(backupNode,
-                                                                      nodeId -> Sets.newHashSet()).add(deviceId);
-                            }
-                    });
-                });
-                // send the device flow entries to their respective backup nodes
-                devicesToBackupByNode.forEach(this::sendBackups);
-            } catch (Exception e) {
-                log.error("Backup failed.", e);
+            Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
+            while (iterator.hasNext()) {
+                iterator.next().close();
+                iterator.remove();
             }
         }
-
-        private Set<DeviceId> onBackupReceipt(Map<DeviceId,
-                Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) {
-            log.debug("Received flowEntries for {} to backup", flowTables.keySet());
-            Set<DeviceId> backedupDevices = Sets.newHashSet();
-            try {
-                flowTables.forEach((deviceId, deviceFlowTable) -> {
-                    // Only process those devices are that not managed by the local node.
-                    if (!Objects.equals(local, mastershipService.getMasterFor(deviceId))) {
-                        Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
-                                getFlowTable(deviceId);
-                        backupFlowTable.clear();
-                        backupFlowTable.putAll(deviceFlowTable);
-                        backedupDevices.add(deviceId);
-                    }
-                });
-            } catch (Exception e) {
-                log.warn("Failure processing backup request", e);
-            }
-            return backedupDevices;
-        }
     }
 
     @Override
-    public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
-                                               List<TableStatisticsEntry> tableStats) {
+    public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
         deviceTableStats.put(deviceId, tableStats);
         return null;
     }
@@ -978,16 +874,116 @@
     @Override
     public long getActiveFlowRuleCount(DeviceId deviceId) {
         return Streams.stream(getTableStatistics(deviceId))
-                .mapToLong(TableStatisticsEntry::activeFlowEntries)
-                .sum();
+            .mapToLong(TableStatisticsEntry::activeFlowEntries)
+            .sum();
     }
 
     private class InternalTableStatsListener
         implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
         @Override
         public void event(EventuallyConsistentMapEvent<DeviceId,
-                          List<TableStatisticsEntry>> event) {
+            List<TableStatisticsEntry>> event) {
             //TODO: Generate an event to listeners (do we need?)
         }
     }
-}
+
+    /**
+     * Device lifecycle manager implementation.
+     */
+    private final class InternalLifecycleManager
+        extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
+        implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
+
+        private final DeviceId deviceId;
+
+        private volatile DeviceReplicaInfo replicaInfo;
+
+        InternalLifecycleManager(DeviceId deviceId) {
+            this.deviceId = deviceId;
+            replicaInfoManager.addListener(this);
+            mastershipTermLifecycles.addListener(this);
+            replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
+        }
+
+        @Override
+        public DeviceReplicaInfo getReplicaInfo() {
+            return replicaInfo;
+        }
+
+        @Override
+        public void activate(long term) {
+            final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+            if (replicaInfo != null && replicaInfo.term() == term) {
+                mastershipTermLifecycles.put(deviceId, term);
+            }
+        }
+
+        @Override
+        public void event(ReplicaInfoEvent event) {
+            if (event.subject().equals(deviceId)) {
+                onReplicaInfoChange(event.replicaInfo());
+            }
+        }
+
+        @Override
+        public void event(MapEvent<DeviceId, Long> event) {
+            if (event.key().equals(deviceId) && event.newValue() != null) {
+                onActivate(event.newValue().value());
+            }
+        }
+
+        /**
+         * Handles a term activation event.
+         *
+         * @param term the term that was activated
+         */
+        private void onActivate(long term) {
+            final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+            if (replicaInfo != null && replicaInfo.term() == term) {
+                NodeId master = replicaInfo.master().orElse(null);
+                List<NodeId> backups = replicaInfo.backups()
+                    .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+                listenerRegistry.process(new LifecycleEvent(
+                    LifecycleEvent.Type.TERM_ACTIVE,
+                    new DeviceReplicaInfo(term, master, backups)));
+            }
+        }
+
+        /**
+         * Handles a replica info change event.
+         *
+         * @param replicaInfo the updated replica info
+         */
+        private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
+            DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+            this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
+            if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
+                if (oldReplicaInfo != null) {
+                    listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
+                }
+                listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
+            } else if (oldReplicaInfo.term() == replicaInfo.term()) {
+                listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
+            }
+        }
+
+        /**
+         * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
+         *
+         * @param replicaInfo the replica info to convert
+         * @return the converted replica info
+         */
+        private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
+            NodeId master = replicaInfo.master().orElse(null);
+            List<NodeId> backups = replicaInfo.backups()
+                .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+            return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
+        }
+
+        @Override
+        public void close() {
+            replicaInfoManager.removeListener(this);
+            mastershipTermLifecycles.removeListener(this);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index fd4f6d7..c57e9eb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -32,6 +32,9 @@
     public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
         = new MessageSubject("peer-forward-get-device-flow-entries");
 
+    public static final MessageSubject GET_DEVICE_FLOW_COUNT
+        = new MessageSubject("peer-forward-get-flow-count");
+
     public static final MessageSubject REMOVE_FLOW_ENTRY
         = new MessageSubject("peer-forward-remove-flow-entry");
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
new file mode 100644
index 0000000..c1fea52
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Container for a bucket of flows assigned to a specific device.
+ * <p>
+ * The bucket is mutable. When changes are made to the bucket, the term and timestamp in which the change
+ * occurred is recorded for ordering changes.
+ */
+public class FlowBucket {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
+    private final BucketId bucketId;
+    private volatile long term;
+    private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
+    private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
+
+    FlowBucket(BucketId bucketId) {
+        this.bucketId = bucketId;
+    }
+
+    /**
+     * Returns the flow bucket identifier.
+     *
+     * @return the flow bucket identifier
+     */
+    public BucketId bucketId() {
+        return bucketId;
+    }
+
+    /**
+     * Returns the flow bucket term.
+     *
+     * @return the flow bucket term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the flow bucket timestamp.
+     *
+     * @return the flow bucket timestamp
+     */
+    public LogicalTimestamp timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Returns the digest for the bucket.
+     *
+     * @return the digest for the bucket
+     */
+    public FlowBucketDigest getDigest() {
+        return new FlowBucketDigest(bucketId().bucket(), term(), timestamp());
+    }
+
+    /**
+     * Returns the flow entries in the bucket.
+     *
+     * @return the flow entries in the bucket
+     */
+    public Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowBucket() {
+        return flowBucket;
+    }
+
+    /**
+     * Returns the flow entries for the given flow.
+     *
+     * @param flowId the flow identifier
+     * @return the flows for the given flow ID
+     */
+    public Map<StoredFlowEntry, StoredFlowEntry> getFlowEntries(FlowId flowId) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(flowId);
+        return flowEntries != null ? flowEntries : flowBucket.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+    }
+
+    /**
+     * Counts the flows in the bucket.
+     *
+     * @return the number of flows in the bucket
+     */
+    public int count() {
+        return flowBucket.values()
+            .stream()
+            .mapToInt(entry -> entry.values().size())
+            .sum();
+    }
+
+    /**
+     * Records an update to the bucket.
+     */
+    private void recordUpdate(long term, LogicalTimestamp timestamp) {
+        this.term = term;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Adds the given flow rule to the bucket.
+     *
+     * @param rule  the rule to add
+     * @param term  the term in which the change occurred
+     * @param clock the logical clock
+     */
+    public void add(FlowEntry rule, long term, LogicalClock clock) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+        if (flowEntries == null) {
+            flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+        }
+        flowEntries.put((StoredFlowEntry) rule, (StoredFlowEntry) rule);
+        recordUpdate(term, clock.getTimestamp());
+    }
+
+    /**
+     * Updates the given flow rule in the bucket.
+     *
+     * @param rule  the rule to update
+     * @param term  the term in which the change occurred
+     * @param clock the logical clock
+     */
+    public void update(FlowEntry rule, long term, LogicalClock clock) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+        if (flowEntries == null) {
+            flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+        }
+        flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+            if (rule instanceof DefaultFlowEntry) {
+                DefaultFlowEntry updated = (DefaultFlowEntry) rule;
+                if (stored instanceof DefaultFlowEntry) {
+                    DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+                    if (updated.created() >= storedEntry.created()) {
+                        recordUpdate(term, clock.getTimestamp());
+                        return updated;
+                    } else {
+                        LOGGER.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
+                        return stored;
+                    }
+                }
+            }
+            return stored;
+        });
+    }
+
+    /**
+     * Applies the given update function to the rule.
+     *
+     * @param rule     the rule to update
+     * @param function the update function to apply
+     * @param term     the term in which the change occurred
+     * @param clock    the logical clock
+     * @param <T>      the result type
+     * @return the update result or {@code null} if the rule was not updated
+     */
+    public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function, long term, LogicalClock clock) {
+        Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+        if (flowEntries == null) {
+            flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+        }
+
+        AtomicReference<T> resultRef = new AtomicReference<>();
+        flowEntries.computeIfPresent(new DefaultFlowEntry(rule), (k, stored) -> {
+            if (stored != null) {
+                T result = function.apply(stored);
+                if (result != null) {
+                    recordUpdate(term, clock.getTimestamp());
+                    resultRef.set(result);
+                }
+            }
+            return stored;
+        });
+        return resultRef.get();
+    }
+
+    /**
+     * Removes the given flow rule from the bucket.
+     *
+     * @param rule  the rule to remove
+     * @param term  the term in which the change occurred
+     * @param clock the logical clock
+     * @return the removed flow entry
+     */
+    public FlowEntry remove(FlowEntry rule, long term, LogicalClock clock) {
+        final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
+        flowBucket.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
+            flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+                if (rule instanceof DefaultFlowEntry) {
+                    DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
+                    if (stored instanceof DefaultFlowEntry) {
+                        DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+                        if (toRemove.created() < storedEntry.created()) {
+                            LOGGER.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
+                            // the key is not updated, removedRule remains null
+                            return stored;
+                        }
+                    }
+                }
+                removedRule.set(stored);
+                return null;
+            });
+            return flowEntries.isEmpty() ? null : flowEntries;
+        });
+
+        if (removedRule.get() != null) {
+            recordUpdate(term, clock.getTimestamp());
+            return removedRule.get();
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Purges the bucket.
+     */
+    public void purge() {
+        flowBucket.clear();
+    }
+
+    /**
+     * Clears the bucket.
+     */
+    public void clear() {
+        term = 0;
+        timestamp = new LogicalTimestamp(0);
+        flowBucket.clear();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
new file mode 100644
index 0000000..b5453e5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Flow bucket digest.
+ */
+public class FlowBucketDigest {
+    private final int bucket;
+    private final long term;
+    private final LogicalTimestamp timestamp;
+
+    FlowBucketDigest(int bucket, long term, LogicalTimestamp timestamp) {
+        this.bucket = bucket;
+        this.term = term;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Returns the bucket identifier.
+     *
+     * @return the bucket identifier
+     */
+    public int bucket() {
+        return bucket;
+    }
+
+    /**
+     * Returns the bucket term.
+     *
+     * @return the bucket term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the bucket timestamp.
+     *
+     * @return the bucket timestamp
+     */
+    public LogicalTimestamp timestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Returns a boolean indicating whether this digest is newer than the given digest.
+     *
+     * @param digest the digest to check
+     * @return indicates whether this digest is newer than the given digest
+     */
+    public boolean isNewerThan(FlowBucketDigest digest) {
+        return digest == null || term() > digest.term() || timestamp().isNewerThan(digest.timestamp());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bucket);
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        return object instanceof FlowBucketDigest
+            && ((FlowBucketDigest) object).bucket == bucket;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
new file mode 100644
index 0000000..ab6262f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Flow table lifecycle event.
+ */
+public class LifecycleEvent extends AbstractEvent<LifecycleEvent.Type, DeviceReplicaInfo> {
+
+    /**
+     * Lifecycle event type.
+     */
+    public enum Type {
+        TERM_START,
+        TERM_ACTIVE,
+        TERM_UPDATE,
+        TERM_END,
+    }
+
+    public LifecycleEvent(Type type, DeviceReplicaInfo subject) {
+        super(type, subject);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
new file mode 100644
index 0000000..a2f3b3d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Flow table lifecycle event listener.
+ */
+public interface LifecycleEventListener extends EventListener<LifecycleEvent> {
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
new file mode 100644
index 0000000..213454c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.ListenerService;
+
+/**
+ * Flow table lifecycle manager.
+ */
+public interface LifecycleManager extends ListenerService<LifecycleEvent, LifecycleEventListener> {
+
+    /**
+     * Returns the current term.
+     *
+     * @return the current term
+     */
+    DeviceReplicaInfo getReplicaInfo();
+
+    /**
+     * Activates the given term.
+     *
+     * @param term the term to activate
+     */
+    void activate(long term);
+
+    /**
+     * Closes the lifecycle manager.
+     */
+    void close();
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
new file mode 100644
index 0000000..92d39cf
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Logical clock.
+ */
+public final class LogicalClock {
+    private final AtomicLong timestamp = new AtomicLong();
+
+    /**
+     * Records an event.
+     *
+     * @param timestamp the event timestamp
+     */
+    public void tick(LogicalTimestamp timestamp) {
+        this.timestamp.accumulateAndGet(timestamp.value(), (x, y) -> Math.max(x, y) + 1);
+    }
+
+    /**
+     * Returns a timestamped value.
+     *
+     * @param value the value to timestamp
+     * @param <T>   the value type
+     * @return the timestamped value
+     */
+    public <T> Timestamped<T> timestamp(T value) {
+        return new Timestamped<>(value, getTimestamp());
+    }
+
+    /**
+     * Increments and returns the current timestamp.
+     *
+     * @return the current timestamp
+     */
+    public LogicalTimestamp getTimestamp() {
+        return new LogicalTimestamp(timestamp.incrementAndGet());
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index 0f47c08..2da23a4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,18 +15,16 @@
  */
 package org.onosproject.store.flow.impl;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.event.ListenerRegistry;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -36,8 +34,6 @@
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.slf4j.Logger;
 
-import java.util.Collections;
-import java.util.List;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -79,7 +75,7 @@
 
     @Override
     public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
-        return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
+        return buildFromRoleInfo(mastershipService.getMastershipFor(deviceId));
     }
 
     @Override
@@ -92,17 +88,15 @@
         listenerRegistry.removeListener(checkNotNull(listener));
     }
 
-    private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
-        List<NodeId> backups = roles.backups() == null ?
-                Collections.emptyList() : ImmutableList.copyOf(roles.backups());
-        return new ReplicaInfo(roles.master(), backups);
+    private static ReplicaInfo buildFromRoleInfo(MastershipInfo mastership) {
+        return new ReplicaInfo(mastership.term(), mastership.master().orElse(null), mastership.backups());
     }
 
     final class InternalMastershipListener implements MastershipListener {
 
         @Override
         public void event(MastershipEvent event) {
-            final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
+            final ReplicaInfo replicaInfo = buildFromRoleInfo(event.mastershipInfo());
             switch (event.type()) {
             case MASTER_CHANGED:
                 eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
new file mode 100644
index 0000000..e7f566b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Timestamped value.
+ */
+public class Timestamped<T> {
+    private final T value;
+    private final LogicalTimestamp timestamp;
+
+    public Timestamped(T value, LogicalTimestamp timestamp) {
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * Returns the value.
+     *
+     * @return the value
+     */
+    public T value() {
+        return value;
+    }
+
+    /**
+     * Returns the timestamp.
+     *
+     * @return the timestamp
+     */
+    public LogicalTimestamp timestamp() {
+        return timestamp;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 8521f52..30ece6e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -23,7 +23,8 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -34,6 +35,7 @@
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -50,6 +52,7 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStore;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
@@ -62,10 +65,7 @@
 import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
 
-import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * Implementation of the MastershipStore on top of Leadership Service.
@@ -158,7 +158,7 @@
         NodeId leader = leadership == null ? null : leadership.leaderNodeId();
         List<NodeId> candidates = leadership == null ?
                 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
-        MastershipRole role = Objects.equal(localNodeId, leader) ?
+        MastershipRole role = Objects.equals(localNodeId, leader) ?
                 MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
         return CompletableFuture.completedFuture(role);
     }
@@ -173,7 +173,7 @@
         NodeId leader = leadership == null ? null : leadership.leaderNodeId();
         List<NodeId> candidates = leadership == null ?
                 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
-        return Objects.equal(nodeId, leader) ?
+        return Objects.equals(nodeId, leader) ?
                 MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
     }
 
@@ -187,27 +187,15 @@
     @Override
     public RoleInfo getNodes(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
+        MastershipInfo mastership = getMastership(deviceId);
+        return new RoleInfo(mastership.master().orElse(null), mastership.backups());
+    }
 
-        Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService.getNodes()
-                      .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
-
-        NodeId master = null;
-        final List<NodeId> standbys = Lists.newLinkedList();
-
-        List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
-
-        for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
-            if (entry.getValue() == MastershipRole.MASTER) {
-                master = entry.getKey();
-            } else if (entry.getValue() == MastershipRole.STANDBY) {
-                standbys.add(entry.getKey());
-            }
-        }
-
-        List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
-
-        return new RoleInfo(master, sortedStandbyList);
+    @Override
+    public MastershipInfo getMastership(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
+        return buildMastershipFromLeadership(leadership);
     }
 
     @Override
@@ -263,7 +251,7 @@
         List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
 
         NodeId newMaster = candidates.stream()
-                                     .filter(candidate -> !Objects.equal(nodeId, candidate))
+                                     .filter(candidate -> !Objects.equals(nodeId, candidate))
                                      .findFirst()
                                      .orElse(null);
         log.info("Transitioning to role {} for {}. Next master: {}",
@@ -304,7 +292,7 @@
         MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
                 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
         leadershipService.withdraw(leadershipTopic);
-        return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
+        return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getMastership(deviceId)));
     }
 
     @Override
@@ -312,6 +300,27 @@
         // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
+    private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+        ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+        if (leadership.leaderNodeId() != null) {
+            builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+        }
+        leadership.candidates().stream()
+            .filter(nodeId -> !Objects.equals(leadership.leaderNodeId(), nodeId))
+            .forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+        clusterService.getNodes().stream()
+            .filter(node -> !Objects.equals(leadership.leaderNodeId(), node.id()))
+            .filter(node -> !leadership.candidates().contains(node.id()))
+            .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+        return new MastershipInfo(
+            leadership.leader() != null ? leadership.leader().term() : 0,
+            leadership.leader() != null
+                ? Optional.of(leadership.leader().nodeId())
+                : Optional.empty(),
+            builder.build());
+    }
+
     private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
 
         @Override
@@ -328,27 +337,29 @@
         private void handleEvent(LeadershipEvent event) {
             Leadership leadership = event.subject();
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
-                    getNodes(deviceId) : new RoleInfo();
+            MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+                ? buildMastershipFromLeadership(event.subject())
+                : new MastershipInfo();
+
             switch (event.type()) {
-            case LEADER_AND_CANDIDATES_CHANGED:
-                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
-                break;
-            case LEADER_CHANGED:
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
-                break;
-            case CANDIDATES_CHANGED:
-                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
-                break;
-            case SERVICE_DISRUPTED:
-                notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
-                break;
-            case SERVICE_RESTORED:
-                // Do nothing, wait for updates from peers
-                break;
-            default:
-                return;
+                case LEADER_AND_CANDIDATES_CHANGED:
+                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
+                    break;
+                case LEADER_CHANGED:
+                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
+                    break;
+                case CANDIDATES_CHANGED:
+                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+                    break;
+                case SERVICE_DISRUPTED:
+                    notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
+                    break;
+                case SERVICE_RESTORED:
+                    // Do nothing, wait for updates from peers
+                    break;
+                default:
+                    return;
             }
         }
     }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 7fd0412..72562f9 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.flow.impl;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import org.junit.After;
 import org.junit.Before;
@@ -26,6 +27,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.net.device.DeviceServiceAdapter;
 import org.onosproject.net.DeviceId;
@@ -40,10 +42,16 @@
 import org.onosproject.net.intent.IntentTestsMocks;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.persistence.PersistenceServiceAdapter;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncConsistentMapAdapter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.TestStorageService;
 
 import org.onlab.packet.Ip4Address;
 import java.util.Iterator;
+import java.util.Optional;
+
 import org.osgi.service.component.ComponentContext;
 
 import static org.easymock.EasyMock.createMock;
@@ -103,6 +111,16 @@
         public NodeId getMasterFor(DeviceId deviceId) {
             return new NodeId("1");
         }
+
+        @Override
+        public MastershipInfo getMastershipFor(DeviceId deviceId) {
+            return new MastershipInfo(
+                1,
+                Optional.of(NodeId.nodeId("1")),
+                ImmutableMap.<NodeId, MastershipRole>builder()
+                    .put(NodeId.nodeId("1"), MastershipRole.MASTER)
+                    .build());
+        }
     }
 
 
@@ -132,8 +150,27 @@
     @Before
     public void setUp() throws Exception {
         flowStoreImpl = new ECFlowRuleStore();
-        flowStoreImpl.storageService = new TestStorageService();
-        flowStoreImpl.replicaInfoManager = new ReplicaInfoManager();
+        flowStoreImpl.storageService = new TestStorageService() {
+            @Override
+            public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+                return new ConsistentMapBuilder<K, V>() {
+                    @Override
+                    public AsyncConsistentMap<K, V> buildAsyncMap() {
+                        return new AsyncConsistentMapAdapter<K, V>();
+                    }
+
+                    @Override
+                    public ConsistentMap<K, V> build() {
+                        return null;
+                    }
+                };
+            }
+        };
+
+        ReplicaInfoManager replicaInfoManager = new ReplicaInfoManager();
+        replicaInfoManager.mastershipService = new MasterOfAll();
+
+        flowStoreImpl.replicaInfoManager = replicaInfoManager;
         mockClusterService = createMock(ClusterService.class);
         flowStoreImpl.clusterService = mockClusterService;
         nodeId = new NodeId("1");
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index dd822df..40aa395 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,32 +15,33 @@
  */
 package org.onosproject.store.flow.impl;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
 import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.event.ListenerRegistry;
 import org.onosproject.mastership.MastershipEvent;
 import org.onosproject.mastership.MastershipEvent.Type;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -101,7 +102,7 @@
 
         // fake MastershipEvent
         eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
-                new RoleInfo(NID1, new LinkedList<>())));
+            new MastershipInfo(1, Optional.of(NID1), ImmutableMap.of(NID1, MastershipRole.MASTER))));
 
         assertTrue(latch.await(1, TimeUnit.SECONDS));
     }
@@ -149,8 +150,11 @@
         }
 
         @Override
-        public RoleInfo getNodesFor(DeviceId deviceId) {
-            return new RoleInfo(masters.get(deviceId), Collections.emptyList());
+        public MastershipInfo getMastershipFor(DeviceId deviceId) {
+            return new MastershipInfo(
+                1,
+                Optional.ofNullable(masters.get(deviceId)),
+                ImmutableMap.of(NID1, MastershipRole.MASTER));
         }
 
         @Override
diff --git a/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java b/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java
index eed7b58..81349a3 100644
--- a/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java
+++ b/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java
@@ -20,6 +20,7 @@
 
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -81,6 +82,11 @@
     }
 
     @Override
+    public MastershipInfo getMastershipFor(DeviceId deviceId) {
+        return null;
+    }
+
+    @Override
     public Set<DeviceId> getDevicesOf(NodeId nodeId) {
         // TODO Auto-generated method stub
         return null;
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java
index b34b085..31fdfcd 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java
@@ -19,6 +19,7 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.DeviceId;
@@ -74,6 +75,15 @@
     RoleInfo getNodes(NetworkId networkId, DeviceId deviceId);
 
     /**
+     * Returns the mastership info for a device.
+     *
+     * @param networkId virtual network identifier
+     * @param deviceId  the device identifier
+     * @return the mastership info
+     */
+    MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId);
+
+    /**
      * Returns the devices that a controller instance is master of.
      *
      * @param networkId virtual network identifier
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java
index 8e5d005..e5b276b 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java
@@ -31,6 +31,7 @@
 import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
 import org.onosproject.mastership.MastershipAdminService;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipStoreDelegate;
@@ -155,6 +156,12 @@
     }
 
     @Override
+    public MastershipInfo getMastershipFor(DeviceId deviceId) {
+        checkNotNull(deviceId, DEVICE_ID_NULL);
+        return store.getMastership(networkId, deviceId);
+    }
+
+    @Override
     public Set<DeviceId> getDevicesOf(NodeId nodeId) {
         checkNotNull(nodeId, NODE_ID_NULL);
 
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
index 3d1c376..4176130 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.incubator.store.virtual.impl;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +35,7 @@
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.DeviceId;
@@ -45,7 +47,7 @@
 import org.slf4j.Logger;
 
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -64,8 +66,6 @@
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import static com.google.common.base.Preconditions.checkArgument;
 
 @Component(immediate = true, enabled = false)
@@ -188,30 +188,16 @@
     public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
         checkArgument(networkId != null, NETWORK_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+        return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+    }
 
-        Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService.getNodes()
-                .forEach((node) -> roles.put(node.id(),
-                                             getRole(networkId, node.id(), deviceId)));
-
-        NodeId master = null;
-        final List<NodeId> standbys = Lists.newLinkedList();
-
-        List<NodeId> candidates = leadershipService
-                .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
-
-        for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
-            if (entry.getValue() == MastershipRole.MASTER) {
-                master = entry.getKey();
-            } else if (entry.getValue() == MastershipRole.STANDBY) {
-                standbys.add(entry.getKey());
-            }
-        }
-
-        List<NodeId> sortedStandbyList = candidates.stream()
-                .filter(standbys::contains).collect(Collectors.toList());
-
-        return new RoleInfo(master, sortedStandbyList);
+    @Override
+    public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+        return buildMastershipFromLeadership(leadership);
     }
 
     @Override
@@ -322,9 +308,8 @@
         MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
                 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
         leadershipService.withdraw(leadershipTopic);
-        return CompletableFuture.completedFuture(new MastershipEvent(eventType,
-                                                                     deviceId,
-                                                                     getNodes(networkId, deviceId)));
+        return CompletableFuture.completedFuture(
+            new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
     }
 
     private CompletableFuture<MastershipEvent>
@@ -338,6 +323,24 @@
         // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
+    private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+        ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+        if (leadership.leaderNodeId() != null) {
+            builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+        }
+        leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+        clusterService.getNodes().stream()
+            .filter(node -> !leadership.candidates().contains(node.id()))
+            .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+        return new MastershipInfo(
+            leadership.leader() != null ? leadership.leader().term() : 0,
+            leadership.leader() != null
+                ? Optional.of(leadership.leader().nodeId())
+                : Optional.empty(),
+            builder.build());
+    }
+
     private class InternalDeviceMastershipEventListener
             implements LeadershipEventListener {
 
@@ -357,28 +360,23 @@
 
             NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-
-            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
-                    getNodes(networkId, deviceId) : new RoleInfo();
+            MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+                ? buildMastershipFromLeadership(event.subject())
+                : new MastershipInfo();
 
             switch (event.type()) {
                 case LEADER_AND_CANDIDATES_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
-                                                                  deviceId, roleInfo));
-                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case LEADER_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case CANDIDATES_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_DISRUPTED:
-                    notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_RESTORED:
                     // Do nothing, wait for updates from peers
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
index 730ebee..346999b 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
@@ -17,6 +17,7 @@
 package org.onosproject.incubator.store.virtual.impl;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -36,6 +37,7 @@
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.DeviceId;
@@ -50,6 +52,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -122,7 +125,7 @@
                     // remove from backup list
                     removeFromBackups(networkId, deviceId, node);
                     notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                       getNodes(networkId, deviceId)));
+                        getMastership(networkId, deviceId)));
                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
@@ -132,13 +135,13 @@
                     masterMap.put(deviceId, node);
                     incrementTerm(networkId, deviceId);
                     notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                       getNodes(networkId, deviceId)));
+                        getMastership(networkId, deviceId)));
                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
                 // add to backup list
                 if (addToBackup(networkId, deviceId, node)) {
                     notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId,
-                                                       getNodes(networkId, deviceId)));
+                        getMastership(networkId, deviceId)));
                 }
                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
             default:
@@ -184,6 +187,27 @@
     }
 
     @Override
+    public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+        Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+        ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
+        NodeId master = masterMap.get(deviceId);
+        if (master != null) {
+            roleBuilder.put(master, MastershipRole.MASTER);
+        }
+        backups.getOrDefault(master, Collections.emptyList())
+            .forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
+        clusterService.getNodes().stream()
+            .filter(node -> !masterMap.containsValue(node.id()))
+            .forEach(node -> roleBuilder.put(node.id(), MastershipRole.NONE));
+        return new MastershipInfo(
+            termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
+            Optional.ofNullable(master),
+            roleBuilder.build());
+    }
+
+    @Override
     public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
         Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
 
@@ -219,7 +243,7 @@
         }
 
         return CompletableFuture.completedFuture(
-                new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(networkId, deviceId)));
+                new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(networkId, deviceId)));
     }
 
     @Override
@@ -249,14 +273,14 @@
                     // TODO: Should there be new event type for no MASTER?
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 } else {
                     NodeId prevMaster = masterMap.put(deviceId, backup);
                     incrementTerm(networkId, deviceId);
                     addToBackup(networkId, deviceId, prevMaster);
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 }
 
             case STANDBY:
@@ -265,7 +289,7 @@
                 if (modified) {
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(BACKUPS_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 }
                 break;
 
@@ -314,13 +338,13 @@
                 incrementTerm(networkId, deviceId);
                 return CompletableFuture.completedFuture(
                         new MastershipEvent(MASTER_CHANGED, deviceId,
-                                            getNodes(networkId, deviceId)));
+                            getMastership(networkId, deviceId)));
 
             case STANDBY:
                 if (removeFromBackups(networkId, deviceId, nodeId)) {
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(BACKUPS_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 }
                 break;
 
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
index bad64f3..9989a77 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
@@ -38,6 +38,7 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Annotations;
@@ -818,6 +819,11 @@
         }
 
         @Override
+        public MastershipInfo getMastershipFor(DeviceId deviceId) {
+            return null;
+        }
+
+        @Override
         public void addListener(MastershipListener listener) {
 
         }
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 7725ace..366e346 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -750,6 +750,31 @@
     }
 
     /**
+     * Returns a new CompletableFuture completed with the first result from a list of futures. If no future
+     * is completed successfully, the returned future will be completed with the first exception.
+     *
+     * @param futures the input futures
+     * @param <T> future result type
+     * @return a new CompletableFuture
+     */
+    public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures) {
+        CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        CompletableFuture.allOf(futures.stream()
+            .map(future -> future.thenAccept(r -> resultFuture.complete(r)))
+            .toArray(CompletableFuture[]::new))
+            .whenComplete((r, e) -> {
+                if (!resultFuture.isDone()) {
+                    if (e != null) {
+                        resultFuture.completeExceptionally(e);
+                    } else {
+                        resultFuture.complete(null);
+                    }
+                }
+            });
+        return resultFuture;
+    }
+
+    /**
      * Returns a new CompletableFuture completed by with the first positive result from a list of
      * input CompletableFutures.
      *