Cherry picking multiple commits for FlowRuleStore optimaization
Change-Id: I8013fe89f987bb12a193d3f3dc793a6a89609eb1
diff --git a/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java b/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java
index 560203b..0ebd4cc 100644
--- a/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java
+++ b/apps/faultmanagement/fmmgr/src/test/java/org/onosproject/faultmanagement/impl/PollingAlarmProviderTest.java
@@ -17,6 +17,7 @@
package org.onosproject.faultmanagement.impl;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.Test;
@@ -25,7 +26,6 @@
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
import org.onosproject.incubator.net.faultmanagement.alarm.Alarm;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmConsumer;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmId;
@@ -35,6 +35,7 @@
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderService;
import org.onosproject.incubator.net.faultmanagement.alarm.DefaultAlarm;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipServiceAdapter;
@@ -64,6 +65,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import static org.junit.Assert.*;
@@ -96,7 +98,7 @@
private final MastershipEvent mastershipEvent =
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, DEVICE_ID,
- new RoleInfo(nodeId, ImmutableList.of()));
+ new MastershipInfo(1, Optional.of(nodeId), ImmutableMap.of()));
private final DeviceEvent deviceEvent =
new DeviceEvent(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED, device);
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
index db5ba74..e7c0169 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
@@ -29,9 +29,6 @@
*/
public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceId> {
- //Contains master and standby information.
- RoleInfo roleInfo;
-
/**
* Type of mastership events.
*/
@@ -55,45 +52,58 @@
SUSPENDED
}
+ private final MastershipInfo mastershipInfo;
+
/**
* Creates an event of a given type and for the specified device,
* role information, and the current time.
*
- * @param type mastership event type
- * @param device event device subject
- * @param info mastership role information
+ * @param type mastership event type
+ * @param device event device subject
+ * @param mastershipInfo mastership info
*/
- public MastershipEvent(Type type, DeviceId device, RoleInfo info) {
+ public MastershipEvent(Type type, DeviceId device, MastershipInfo mastershipInfo) {
super(type, device);
- this.roleInfo = info;
+ this.mastershipInfo = mastershipInfo;
}
/**
* Creates an event of a given type and for the specified device, master,
* and time.
*
- * @param type mastership event type
- * @param device event device subject
- * @param info role information
- * @param time occurrence time
+ * @param type mastership event type
+ * @param device event device subject
+ * @param mastershipInfo mastership information
+ * @param time occurrence time
*/
- public MastershipEvent(Type type, DeviceId device, RoleInfo info, long time) {
+ public MastershipEvent(Type type, DeviceId device, MastershipInfo mastershipInfo, long time) {
super(type, device, time);
- this.roleInfo = info;
+ this.mastershipInfo = mastershipInfo;
+ }
+
+ /**
+ * Returns the mastership info.
+ *
+ * @return the mastership info
+ */
+ public MastershipInfo mastershipInfo() {
+ return mastershipInfo;
}
/**
* Returns the current role state for the subject.
*
* @return RoleInfo associated with Device ID subject
+ * @deprecated since 1.14
*/
+ @Deprecated
public RoleInfo roleInfo() {
- return roleInfo;
+ return new RoleInfo(mastershipInfo.master().orElse(null), mastershipInfo.backups());
}
@Override
public int hashCode() {
- return Objects.hash(type(), subject(), roleInfo(), time());
+ return Objects.hash(type(), subject(), mastershipInfo(), time());
}
@Override
@@ -105,7 +115,7 @@
final MastershipEvent other = (MastershipEvent) obj;
return Objects.equals(this.type(), other.type()) &&
Objects.equals(this.subject(), other.subject()) &&
- Objects.equals(this.roleInfo(), other.roleInfo()) &&
+ Objects.equals(this.mastershipInfo(), other.mastershipInfo()) &&
Objects.equals(this.time(), other.time());
}
return false;
@@ -117,7 +127,7 @@
.add("time", Tools.defaultOffsetDataTime(time()))
.add("type", type())
.add("subject", subject())
- .add("roleInfo", roleInfo)
+ .add("mastershipInfo", mastershipInfo())
.toString();
}
}
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipInfo.java b/core/api/src/main/java/org/onosproject/mastership/MastershipInfo.java
new file mode 100644
index 0000000..c9a4f0e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipInfo.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.mastership;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.MastershipRole;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Mastership info.
+ */
+public final class MastershipInfo {
+ private final long term;
+ private final Optional<NodeId> master;
+ private final ImmutableMap<NodeId, MastershipRole> roles;
+
+ public MastershipInfo() {
+ this(0, Optional.empty(), ImmutableMap.of());
+ }
+
+ public MastershipInfo(long term, Optional<NodeId> master, ImmutableMap<NodeId, MastershipRole> roles) {
+ this.term = term;
+ this.master = master;
+ this.roles = roles;
+ }
+
+ /**
+ * Returns the mastership term.
+ *
+ * @return the mastership term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the current master.
+ *
+ * @return the current master
+ */
+ public Optional<NodeId> master() {
+ return master;
+ }
+
+ /**
+ * Returns a sorted list of standby nodes.
+ *
+ * @return a sorted list of standby nodes
+ */
+ public List<NodeId> backups() {
+ return getRoles(MastershipRole.STANDBY);
+ }
+
+ /**
+ * Returns the list of nodes with the given role.
+ *
+ * @param role the role by which to filter nodes
+ * @return an immutable list of nodes with the given role sorted in priority order
+ */
+ public List<NodeId> getRoles(MastershipRole role) {
+ return ImmutableList.copyOf(roles.entrySet()
+ .stream()
+ .filter(entry -> entry.getValue() == role)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList()));
+ }
+
+ /**
+ * Returns the current role for the given node.
+ *
+ * @param nodeId the node for which to return the current role
+ * @return the current role for the given node
+ */
+ public MastershipRole getRole(NodeId nodeId) {
+ return roles.get(nodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(term, master, roles);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof MastershipInfo) {
+ MastershipInfo that = (MastershipInfo) object;
+ return this.term == that.term
+ && Objects.equals(this.master, that.master)
+ && Objects.equals(this.roles, that.roles);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("term", term)
+ .add("master", master)
+ .add("roles", roles)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
index 86042b0..a5a0c43 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
@@ -120,6 +120,14 @@
RoleInfo getNodesFor(DeviceId deviceId);
/**
+ * Returns the mastership info for the given device.
+ *
+ * @param deviceId the device for which to return the mastership info
+ * @return the mastership info for the given device
+ */
+ MastershipInfo getMastershipFor(DeviceId deviceId);
+
+ /**
* Returns the devices for which a controller is master.
* <p>
* Returned Set may contain DeviceId which no longer exist in the system.
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
index 3650388..7f43a15 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
@@ -73,7 +73,6 @@
*/
Set<DeviceId> getDevices(NodeId nodeId);
-
/**
* Sets a device's role for a specified controller instance.
*
@@ -93,6 +92,14 @@
MastershipTerm getTermFor(DeviceId deviceId);
/**
+ * Returns the mastership info for the given device.
+ *
+ * @param deviceId the device for which to return the mastership info
+ * @return the mastership info for the given device
+ */
+ MastershipInfo getMastership(DeviceId deviceId);
+
+ /**
* Sets a controller instance's mastership role to STANDBY for a device.
* If the role is MASTER, another controller instance will be selected
* as a candidate master.
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java b/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java
index c5f6ba9..7b791f4 100644
--- a/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipEventTest.java
@@ -15,14 +15,15 @@
*/
package org.onosproject.mastership;
+import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
-import java.util.Arrays;
+import java.util.Optional;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -36,21 +37,33 @@
private final DeviceId deviceId2 = DeviceId.deviceId("bar:baz");
private final NodeId node1 = new NodeId("1");
private final NodeId node2 = new NodeId("2");
- private final RoleInfo roleInfo1 = new RoleInfo(node1, Arrays.asList(node1, node2));
- private final RoleInfo roleInfo2 = new RoleInfo(node2, Arrays.asList(node2, node1));
+ private final MastershipInfo mastershipInfo1 = new MastershipInfo(
+ 1,
+ Optional.of(node1),
+ ImmutableMap.<NodeId, MastershipRole>builder()
+ .put(node1, MastershipRole.MASTER)
+ .put(node2, MastershipRole.STANDBY)
+ .build());
+ private final MastershipInfo mastershipInfo2 = new MastershipInfo(
+ 2,
+ Optional.of(node1),
+ ImmutableMap.<NodeId, MastershipRole>builder()
+ .put(node2, MastershipRole.MASTER)
+ .put(node1, MastershipRole.STANDBY)
+ .build());
private final MastershipEvent event1 =
- new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId1, roleInfo1);
+ new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId1, mastershipInfo1);
private final MastershipEvent event2 =
- new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo1);
+ new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo1);
private final MastershipEvent event3 =
- new MastershipEvent(MastershipEvent.Type.SUSPENDED, deviceId1, roleInfo1);
+ new MastershipEvent(MastershipEvent.Type.SUSPENDED, deviceId1, mastershipInfo1);
private final MastershipEvent event4 =
- new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo2, time);
+ new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo2, time);
private final MastershipEvent sameAsEvent4 =
- new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, roleInfo2, time);
+ new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId1, mastershipInfo2, time);
private final MastershipEvent event5 =
- new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId2, roleInfo1);
+ new MastershipEvent(MastershipEvent.Type.BACKUPS_CHANGED, deviceId2, mastershipInfo1);
/**
* Tests for proper operation of equals(), hashCode() and toString() methods.
@@ -73,12 +86,12 @@
public void checkConstruction() {
assertThat(event1.type(), is(MastershipEvent.Type.BACKUPS_CHANGED));
assertThat(event1.subject(), is(deviceId1));
- assertThat(event1.roleInfo(), is(roleInfo1));
+ assertThat(event1.mastershipInfo(), is(mastershipInfo1));
assertThat(event4.time(), is(time));
assertThat(event4.type(), is(MastershipEvent.Type.MASTER_CHANGED));
assertThat(event4.subject(), is(deviceId1));
- assertThat(event4.roleInfo(), is(roleInfo2));
+ assertThat(event4.mastershipInfo(), is(mastershipInfo2));
}
}
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipInfoTest.java b/core/api/src/test/java/org/onosproject/mastership/MastershipInfoTest.java
new file mode 100644
index 0000000..89a3350
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipInfoTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.mastership;
+
+import java.util.Optional;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.MastershipRole;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Mastership info test.
+ */
+public class MastershipInfoTest {
+ private final NodeId node1 = new NodeId("1");
+ private final NodeId node2 = new NodeId("2");
+ private final NodeId node3 = new NodeId("3");
+ private final NodeId node4 = new NodeId("4");
+
+ private final MastershipInfo mastershipInfo = new MastershipInfo(
+ 1,
+ Optional.of(node1),
+ ImmutableMap.<NodeId, MastershipRole>builder()
+ .put(node1, MastershipRole.MASTER)
+ .put(node2, MastershipRole.STANDBY)
+ .put(node3, MastershipRole.STANDBY)
+ .put(node4, MastershipRole.NONE)
+ .build());
+
+ @Test
+ public void testMastershipInfo() throws Exception {
+ assertEquals(1, mastershipInfo.term());
+ assertEquals(node1, mastershipInfo.master().get());
+ assertEquals(Lists.newArrayList(node1), mastershipInfo.getRoles(MastershipRole.MASTER));
+ assertEquals(Lists.newArrayList(node2, node3), mastershipInfo.backups());
+ assertEquals(Lists.newArrayList(node2, node3), mastershipInfo.getRoles(MastershipRole.STANDBY));
+ assertEquals(Lists.newArrayList(node4), mastershipInfo.getRoles(MastershipRole.NONE));
+ }
+
+ @Test
+ public void testEquals() throws Exception {
+ assertEquals(mastershipInfo, mastershipInfo);
+ assertNotEquals(mastershipInfo, new MastershipInfo(1, Optional.of(node1), ImmutableMap.of()));
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
index a0b203f..d9a2971 100644
--- a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
@@ -48,6 +48,11 @@
}
@Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
return null;
}
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java
index 31480e8..d4df583 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleMastershipStore.java
@@ -27,10 +27,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -48,6 +50,7 @@
import org.onosproject.core.Version;
import org.onosproject.core.VersionService;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStore;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
@@ -177,7 +180,7 @@
}
return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
}
@Override
@@ -219,8 +222,7 @@
incrementTerm(deviceId);
// remove from backup list
removeFromBackups(deviceId, node);
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(deviceId)));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
@@ -229,14 +231,12 @@
// no master => become master
masterMap.put(deviceId, node);
incrementTerm(deviceId);
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(deviceId)));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
// add to backup list
if (addToBackup(deviceId, node)) {
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(deviceId)));
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
}
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
default:
@@ -299,6 +299,21 @@
}
@Override
+ public MastershipInfo getMastership(DeviceId deviceId) {
+ ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
+ NodeId master = masterMap.get(deviceId);
+ if (master != null) {
+ roleBuilder.put(master, MastershipRole.MASTER);
+ }
+ backups.getOrDefault(deviceId, Collections.emptyList())
+ .forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
+ return new MastershipInfo(
+ termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
+ Optional.ofNullable(master),
+ roleBuilder.build());
+ }
+
+ @Override
public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
@@ -309,13 +324,13 @@
masterMap.remove(deviceId);
// TODO: Should there be new event type for no MASTER?
return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
} else {
NodeId prevMaster = masterMap.put(deviceId, backup);
incrementTerm(deviceId);
addToBackup(deviceId, prevMaster);
return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
}
case STANDBY:
@@ -323,7 +338,7 @@
boolean modified = addToBackup(deviceId, nodeId);
if (modified) {
return CompletableFuture.completedFuture(
- new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
}
break;
@@ -357,12 +372,12 @@
masterMap.put(deviceId, backup);
incrementTerm(deviceId);
return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(deviceId)));
case STANDBY:
if (removeFromBackups(deviceId, nodeId)) {
return CompletableFuture.completedFuture(
- new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, getMastership(deviceId)));
}
break;
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index c744d61..5dea660 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -39,6 +39,7 @@
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipStore;
@@ -239,6 +240,13 @@
}
@Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ checkPermission(CLUSTER_READ);
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getMastership(deviceId);
+ }
+
+ @Override
public MastershipTerm getMastershipTerm(DeviceId deviceId) {
checkPermission(CLUSTER_READ);
return store.getTermFor(deviceId);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
index 634582a..feb517c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
@@ -29,21 +29,33 @@
*/
public final class ReplicaInfo {
+ private final long term;
private final Optional<NodeId> master;
private final List<NodeId> backups;
/**
* Creates a ReplicaInfo instance.
*
+ * @param term monotonically increasing unique mastership term
* @param master NodeId of the node where the master copy should be
* @param backups list of NodeId, where backup copies should be placed
*/
- public ReplicaInfo(NodeId master, List<NodeId> backups) {
+ public ReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+ this.term = term;
this.master = Optional.ofNullable(master);
this.backups = checkNotNull(backups);
}
/**
+ * Returns the mastership term.
+ *
+ * @return the mastership term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
* Returns the NodeId, if there is a Node where the master copy should be.
*
* @return NodeId, where the master copy should be placed
@@ -78,6 +90,7 @@
// for Serializer
private ReplicaInfo() {
+ this.term = 0;
this.master = Optional.empty();
this.backups = Collections.emptyList();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
new file mode 100644
index 0000000..7034118
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Identifier representing a backup of a distinct bucket to a specific node.
+ */
+public class BackupOperation {
+ private final NodeId nodeId;
+ private final int bucketId;
+
+ BackupOperation(NodeId nodeId, int bucketId) {
+ this.nodeId = nodeId;
+ this.bucketId = bucketId;
+ }
+
+ /**
+ * Returns the node identifier.
+ *
+ * @return the node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns the bucket identifier.
+ *
+ * @return the bucket identifier
+ */
+ public int bucket() {
+ return bucketId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeId, bucketId);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof BackupOperation) {
+ BackupOperation that = (BackupOperation) other;
+ return this.nodeId.equals(that.nodeId)
+ && this.bucketId == that.bucketId;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("nodeId", nodeId())
+ .add("bucket", bucket())
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
new file mode 100644
index 0000000..33cc304
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.net.DeviceId;
+
+/**
+ * Represents a distinct device flow bucket.
+ */
+public class BucketId {
+ private final DeviceId deviceId;
+ private final int bucket;
+
+ BucketId(DeviceId deviceId, int bucket) {
+ this.deviceId = deviceId;
+ this.bucket = bucket;
+ }
+
+ /**
+ * Returns the bucket device identifier.
+ *
+ * @return the device identifier
+ */
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ /**
+ * Returns the bucket number.
+ *
+ * @return the bucket number
+ */
+ public int bucket() {
+ return bucket;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, bucket);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof BucketId) {
+ BucketId that = (BucketId) other;
+ return this.deviceId.equals(that.deviceId)
+ && this.bucket == that.bucket;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s/%d", deviceId, bucket);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
new file mode 100644
index 0000000..b49ff4e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -0,0 +1,852 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flow table for all flows associated with a specific device.
+ * <p>
+ * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
+ * table performs communication independent of other device flow tables for more parallelism.
+ * <p>
+ * This implementation uses several different replication protocols. Changes that occur on the device master are
+ * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
+ * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
+ * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
+ * device, allowing mastership to be reassigned to non-backup nodes.
+ */
+public class DeviceFlowTable {
+ private static final int NUM_BUCKETS = 1024;
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(BucketId.class)
+ .register(FlowBucket.class)
+ .register(FlowBucketDigest.class)
+ .register(LogicalTimestamp.class)
+ .register(Timestamped.class)
+ .build());
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final MessageSubject getDigestsSubject;
+ private final MessageSubject getBucketSubject;
+ private final MessageSubject backupSubject;
+
+ private final DeviceId deviceId;
+ private final ClusterCommunicationService clusterCommunicator;
+ private final LifecycleManager lifecycleManager;
+ private final ScheduledExecutorService executorService;
+ private final NodeId localNodeId;
+
+ private final LogicalClock clock = new LogicalClock();
+
+ private volatile DeviceReplicaInfo replicaInfo;
+ private volatile long activeTerm;
+
+ private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
+ @Override
+ public void event(LifecycleEvent event) {
+ executorService.execute(() -> onLifecycleEvent(event));
+ }
+ };
+
+ private ScheduledFuture<?> backupFuture;
+ private ScheduledFuture<?> antiEntropyFuture;
+
+ private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
+ private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
+
+ private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
+ private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
+
+ DeviceFlowTable(
+ DeviceId deviceId,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ LifecycleManager lifecycleManager,
+ ScheduledExecutorService executorService,
+ long backupPeriod,
+ long antiEntropyPeriod) {
+ this.deviceId = deviceId;
+ this.clusterCommunicator = clusterCommunicator;
+ this.lifecycleManager = lifecycleManager;
+ this.executorService = executorService;
+ this.localNodeId = clusterService.getLocalNode().id();
+
+ addListeners();
+
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
+ }
+
+ getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
+ getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
+ backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
+
+ setBackupPeriod(backupPeriod);
+ setAntiEntropyPeriod(antiEntropyPeriod);
+ registerSubscribers();
+
+ startTerm(lifecycleManager.getReplicaInfo());
+ }
+
+ /**
+ * Sets the flow table backup period.
+ *
+ * @param backupPeriod the flow table backup period in milliseconds
+ */
+ synchronized void setBackupPeriod(long backupPeriod) {
+ ScheduledFuture<?> backupFuture = this.backupFuture;
+ if (backupFuture != null) {
+ backupFuture.cancel(false);
+ }
+ this.backupFuture = executorService.scheduleAtFixedRate(
+ this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Sets the flow table anti-entropy period.
+ *
+ * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
+ */
+ synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
+ ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+ if (antiEntropyFuture != null) {
+ antiEntropyFuture.cancel(false);
+ }
+ this.antiEntropyFuture = executorService.scheduleAtFixedRate(
+ this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Counts the flows in the table.
+ *
+ * @return the total number of flows in the table
+ */
+ public int count() {
+ return flowBuckets.values().stream()
+ .mapToInt(FlowBucket::count)
+ .sum();
+ }
+
+ /**
+ * Returns the flow entry for the given rule.
+ *
+ * @param rule the rule for which to lookup the flow entry
+ * @return the flow entry for the given rule
+ */
+ public StoredFlowEntry getFlowEntry(FlowRule rule) {
+ return getBucket(rule.id())
+ .getFlowEntries(rule.id())
+ .get(rule);
+ }
+
+ /**
+ * Returns the set of flow entries in the table.
+ *
+ * @return the set of flow entries in the table
+ */
+ public Set<FlowEntry> getFlowEntries() {
+ return flowBuckets.values().stream()
+ .flatMap(bucket -> bucket.getFlowBucket().values().stream())
+ .flatMap(entries -> entries.values().stream())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns the bucket for the given flow identifier.
+ *
+ * @param flowId the flow identifier
+ * @return the bucket for the given flow identifier
+ */
+ private FlowBucket getBucket(FlowId flowId) {
+ return getBucket(bucket(flowId));
+ }
+
+ /**
+ * Returns the bucket with the given identifier.
+ *
+ * @param bucketId the bucket identifier
+ * @return the bucket with the given identifier
+ */
+ private FlowBucket getBucket(int bucketId) {
+ return flowBuckets.get(bucketId);
+ }
+
+ /**
+ * Returns the bucket number for the given flow identifier.
+ *
+ * @param flowId the flow identifier
+ * @return the bucket number for the given flow identifier
+ */
+ private int bucket(FlowId flowId) {
+ return Math.abs((int) (flowId.id() % NUM_BUCKETS));
+ }
+
+ /**
+ * Returns the digests for all buckets in the flow table for the device.
+ *
+ * @return the set of digests for all buckets for the device
+ */
+ private Set<FlowBucketDigest> getDigests() {
+ return flowBuckets.values()
+ .stream()
+ .map(bucket -> bucket.getDigest())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns the digest for the given bucket.
+ *
+ * @param bucket the bucket for which to return the digest
+ * @return the digest for the given bucket
+ */
+ private FlowBucketDigest getDigest(int bucket) {
+ return flowBuckets.get(bucket).getDigest();
+ }
+
+ /**
+ * Adds an entry to the table.
+ *
+ * @param rule the rule to add
+ * @return a future to be completed once the rule has been added
+ */
+ public CompletableFuture<Void> add(FlowEntry rule) {
+ return runInTerm(rule.id(), (bucket, term) -> {
+ bucket.add(rule, term, clock);
+ return null;
+ });
+ }
+
+ /**
+ * Updates an entry in the table.
+ *
+ * @param rule the rule to update
+ * @return a future to be completed once the rule has been updated
+ */
+ public CompletableFuture<Void> update(FlowEntry rule) {
+ return runInTerm(rule.id(), (bucket, term) -> {
+ bucket.update(rule, term, clock);
+ return null;
+ });
+ }
+
+ /**
+ * Applies the given update function to the rule.
+ *
+ * @param rule the rule to update
+ * @param function the update function to apply
+ * @param <T> the result type
+ * @return a future to be completed with the update result or {@code null} if the rule was not updated
+ */
+ public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+ return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
+ }
+
+ /**
+ * Removes an entry from the table.
+ *
+ * @param rule the rule to remove
+ * @return a future to be completed once the rule has been removed
+ */
+ public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
+ return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
+ }
+
+ /**
+ * Runs the given function in the current term.
+ *
+ * @param flowId the flow identifier indicating the bucket in which to run the function
+ * @param function the function to execute in the current term
+ * @param <T> the future result type
+ * @return a future to be completed with the function result once it has been run
+ */
+ private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return Tools.exceptionalFuture(new IllegalStateException());
+ }
+
+ FlowBucket bucket = getBucket(flowId);
+
+ // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+ // the change to be executed once the master has been synchronized.
+ final long term = replicaInfo.term();
+ if (activeTerm < term) {
+ log.debug("Enqueueing operation for device {}", deviceId);
+ synchronized (flowTasks) {
+ // Double checked lock on the active term.
+ if (activeTerm < term) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+ .add(() -> future.complete(function.apply(bucket, term)));
+ return future;
+ }
+ }
+ }
+ return CompletableFuture.completedFuture(function.apply(bucket, term));
+ }
+
+ /**
+ * Backs up all buckets in the given device to the given node.
+ */
+ private void backup() {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+ // If the local node is not currently the master, skip the backup.
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return;
+ }
+
+ // Otherwise, iterate through backup nodes and backup the device.
+ for (NodeId nodeId : replicaInfo.backups()) {
+ try {
+ backup(nodeId, replicaInfo.term());
+ } catch (Exception e) {
+ log.error("Backup of " + deviceId + " to " + nodeId + " failed", e);
+ }
+ }
+ }
+
+ /**
+ * Backs up all buckets for the device to the given node.
+ *
+ * @param nodeId the node to which to back up the device
+ * @param term the term for which to backup to the node
+ */
+ private void backup(NodeId nodeId, long term) {
+ for (FlowBucket bucket : flowBuckets.values()) {
+ // If the bucket is not in the current term, skip it. This forces synchronization of the bucket
+ // to occur prior to the new master replicating changes in the bucket to backups.
+ if (bucket.term() != term) {
+ continue;
+ }
+
+ // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
+ LogicalTimestamp timestamp = bucket.timestamp();
+
+ // If the backup can be run (no concurrent backup to the node in progress) then run it.
+ BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
+ if (startBackup(operation, timestamp)) {
+ backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+ if (error != null) {
+ log.debug("Backup operation {} failed", operation, error);
+ failBackup(operation);
+ } else if (succeeded) {
+ succeedBackup(operation, timestamp);
+ backup(nodeId, term);
+ } else {
+ log.debug("Backup operation {} failed: term mismatch", operation);
+ failBackup(operation);
+ }
+ }, executorService);
+ }
+ }
+ }
+
+ /**
+ * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
+ * <p>
+ * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
+ * are pending replication for the backup operation.
+ *
+ * @param operation the operation to start
+ * @param timestamp the timestamp for which to start the backup operation
+ * @return indicates whether the given backup operation should be started
+ */
+ private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+ LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
+ return timestamp != null
+ && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
+ && inFlightUpdates.add(operation);
+ }
+
+ /**
+ * Fails the given backup operation.
+ *
+ * @param operation the backup operation to fail
+ */
+ private void failBackup(BackupOperation operation) {
+ inFlightUpdates.remove(operation);
+ }
+
+ /**
+ * Succeeds the given backup operation.
+ * <p>
+ * The last backup time for the operation will be updated and the operation will be removed from
+ * in-flight updates.
+ *
+ * @param operation the operation to succeed
+ * @param timestamp the timestamp at which the operation was <em>started</em>
+ */
+ private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+ lastBackupTimes.put(operation, timestamp);
+ inFlightUpdates.remove(operation);
+ }
+
+ /**
+ * Resets the last completion time for the given backup operation to ensure it's replicated again.
+ *
+ * @param operation the backup operation to reset
+ */
+ private void resetBackup(BackupOperation operation) {
+ lastBackupTimes.remove(operation);
+ }
+
+ /**
+ * Performs the given backup operation.
+ *
+ * @param bucket the bucket to backup
+ * @param nodeId the node to which to backup the bucket
+ * @return a future to be completed with a boolean indicating whether the backup operation was successful
+ */
+ private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
+ if (log.isDebugEnabled()) {
+ log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
+ }
+ return sendWithTimestamp(bucket, backupSubject, nodeId);
+ }
+
+ /**
+ * Handles a flow bucket backup from a remote peer.
+ *
+ * @param flowBucket the flow bucket to back up
+ * @return the set of flows that could not be backed up
+ */
+ private boolean onBackup(FlowBucket flowBucket) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} - Received {} flow entries in bucket {} to backup",
+ deviceId, flowBucket.count(), flowBucket.bucketId());
+ }
+
+ try {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+ // If the backup is for a different term, reject the request until we learn about the new term.
+ if (flowBucket.term() != replicaInfo.term()) {
+ log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
+ return false;
+ }
+
+ flowBuckets.compute(flowBucket.bucketId().bucket(),
+ (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+ return true;
+ } catch (Exception e) {
+ log.warn("Failure processing backup request", e);
+ return false;
+ }
+ }
+
+ /**
+ * Runs the anti-entropy protocol.
+ */
+ private void runAntiEntropy() {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return;
+ }
+
+ for (NodeId nodeId : replicaInfo.backups()) {
+ runAntiEntropy(nodeId);
+ }
+ }
+
+ /**
+ * Runs the anti-entropy protocol against the given peer.
+ *
+ * @param nodeId the node with which to execute the anti-entropy protocol
+ */
+ private void runAntiEntropy(NodeId nodeId) {
+ requestDigests(nodeId).thenAcceptAsync((digests) -> {
+ // Compute a set of missing BucketIds based on digest times and send them back to the master.
+ for (FlowBucketDigest remoteDigest : digests) {
+ FlowBucket localBucket = getBucket(remoteDigest.bucket());
+ if (localBucket.getDigest().isNewerThan(remoteDigest)) {
+ log.debug("Detected missing flow entries on node {} in bucket {}/{}",
+ nodeId, deviceId, remoteDigest.bucket());
+ resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+ }
+ }
+ }, executorService);
+ }
+
+ /**
+ * Sends a digest request to the given node.
+ *
+ * @param nodeId the node to which to send the request
+ * @return future to be completed with the set of digests for the given device on the given node
+ */
+ private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
+ return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
+ }
+
+ /**
+ * Synchronizes flows from the previous master or backups.
+ *
+ * @param prevReplicaInfo the previous replica info
+ * @param newReplicaInfo the new replica info
+ */
+ private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+ if (prevReplicaInfo == null) {
+ activateMaster(newReplicaInfo);
+ } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
+ syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
+ } else {
+ syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+ }
+ }
+
+ /**
+ * Synchronizes flows from the previous master, falling back to backups if the master fails.
+ *
+ * @param prevReplicaInfo the previous replica info
+ * @param newReplicaInfo the new replica info
+ */
+ private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+ syncFlowsOn(prevReplicaInfo.master())
+ .whenCompleteAsync((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
+ syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+ } else {
+ activateMaster(newReplicaInfo);
+ }
+ }, executorService);
+ }
+
+ /**
+ * Synchronizes flows from the previous backups.
+ *
+ * @param prevReplicaInfo the previous replica info
+ * @param newReplicaInfo the new replica info
+ */
+ private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+ List<NodeId> backups = prevReplicaInfo.backups()
+ .stream()
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .collect(Collectors.toList());
+ syncFlowsOn(backups)
+ .whenCompleteAsync((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
+ }
+ activateMaster(newReplicaInfo);
+ }, executorService);
+ }
+
+ /**
+ * Synchronizes flows for the device on the given nodes.
+ *
+ * @param nodes the nodes via which to synchronize the flows
+ * @return a future to be completed once flows have been synchronizes
+ */
+ private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
+ return nodes.isEmpty()
+ ? CompletableFuture.completedFuture(null)
+ : Tools.firstOf(nodes.stream()
+ .map(node -> syncFlowsOn(node))
+ .collect(Collectors.toList()))
+ .thenApply(v -> null);
+ }
+
+ /**
+ * Synchronizes flows for the device from the given node.
+ *
+ * @param nodeId the node from which to synchronize flows
+ * @return a future to be completed once the flows have been synchronizes
+ */
+ private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
+ return requestDigests(nodeId)
+ .thenCompose(digests -> Tools.allOf(digests.stream()
+ .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
+ .map(digest -> syncBucketOn(nodeId, digest.bucket()))
+ .collect(Collectors.toList())))
+ .thenApply(v -> null);
+ }
+
+ /**
+ * Synchronizes the given bucket on the given node.
+ *
+ * @param nodeId the node on which to synchronize the bucket
+ * @param bucketNumber the bucket to synchronize
+ * @return a future to be completed once the bucket has been synchronizes
+ */
+ private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
+ return requestBucket(nodeId, bucketNumber)
+ .thenAcceptAsync(flowBucket -> {
+ flowBuckets.compute(flowBucket.bucketId().bucket(),
+ (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+ }, executorService);
+ }
+
+ /**
+ * Requests the given bucket from the given node.
+ *
+ * @param nodeId the node from which to request the bucket
+ * @param bucket the bucket to request
+ * @return a future to be completed with the bucket
+ */
+ private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
+ log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
+ return sendWithTimestamp(bucket, getBucketSubject, nodeId);
+ }
+
+ /**
+ * Handles a flow bucket request.
+ *
+ * @param bucket the bucket number
+ * @return the flow bucket
+ */
+ private FlowBucket onGetBucket(int bucket) {
+ return flowBuckets.get(bucket);
+ }
+
+ /**
+ * Activates the new master term.
+ *
+ * @param replicaInfo the new replica info
+ */
+ private void activateMaster(DeviceReplicaInfo replicaInfo) {
+ log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ activateBucket(i);
+ }
+ lifecycleManager.activate(replicaInfo.term());
+ activeTerm = replicaInfo.term();
+ }
+
+ /**
+ * Activates the given bucket number.
+ *
+ * @param bucket the bucket number to activate
+ */
+ private void activateBucket(int bucket) {
+ Queue<Runnable> tasks;
+ synchronized (flowTasks) {
+ tasks = flowTasks.remove(bucket);
+ }
+ if (tasks != null) {
+ log.debug("Completing enqueued operations for device {}", deviceId);
+ tasks.forEach(task -> task.run());
+ }
+ }
+
+ /**
+ * Handles a lifecycle event.
+ */
+ private void onLifecycleEvent(LifecycleEvent event) {
+ log.debug("Received lifecycle event for device {}: {}", deviceId, event);
+ switch (event.type()) {
+ case TERM_START:
+ startTerm(event.subject());
+ break;
+ case TERM_ACTIVE:
+ activateTerm(event.subject());
+ break;
+ case TERM_UPDATE:
+ updateTerm(event.subject());
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Handles a replica change at the start of a new term.
+ */
+ private void startTerm(DeviceReplicaInfo replicaInfo) {
+ DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+ this.replicaInfo = replicaInfo;
+ if (replicaInfo.isMaster(localNodeId)) {
+ log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
+ syncFlows(oldReplicaInfo, replicaInfo);
+ }
+ }
+
+ /**
+ * Handles the activation of a term.
+ */
+ private void activateTerm(DeviceReplicaInfo replicaInfo) {
+ if (replicaInfo.term() < this.replicaInfo.term()) {
+ return;
+ }
+ if (replicaInfo.term() > this.replicaInfo.term()) {
+ this.replicaInfo = replicaInfo;
+ }
+
+ // If the local node is neither the master or a backup for the device, clear the flow table.
+ if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
+ flowBuckets.values().forEach(bucket -> bucket.clear());
+ }
+ activeTerm = replicaInfo.term();
+ }
+
+ /**
+ * Handles an update to a term.
+ */
+ private void updateTerm(DeviceReplicaInfo replicaInfo) {
+ if (replicaInfo.term() == this.replicaInfo.term()) {
+ this.replicaInfo = replicaInfo;
+
+ // If the local node is neither the master or a backup for the device *and the term is active*,
+ // clear the flow table.
+ if (activeTerm == replicaInfo.term()
+ && !replicaInfo.isMaster(localNodeId)
+ && !replicaInfo.isBackup(localNodeId)) {
+ flowBuckets.values().forEach(bucket -> bucket.clear());
+ }
+ }
+ }
+
+ /**
+ * Sends a message to the given node wrapped in a Lamport timestamp.
+ * <p>
+ * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
+ * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
+ *
+ * @param message the message to send
+ * @param subject the message subject
+ * @param toNodeId the node to which to send the message
+ * @param <M> the message type
+ * @param <R> the response type
+ * @return a future to be completed with the response
+ */
+ private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
+ return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
+ clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
+ .thenApply(response -> {
+ clock.tick(response.timestamp());
+ return response.value();
+ });
+ }
+
+ /**
+ * Receives messages to the given subject wrapped in Lamport timestamps.
+ * <p>
+ * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
+ * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
+ *
+ * @param subject the subject for which to register the subscriber
+ * @param function the raw message handler
+ * @param <M> the raw message type
+ * @param <R> the raw response type
+ */
+ private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
+ clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
+ clock.tick(request.timestamp());
+ return clock.timestamp(function.apply(request.value()));
+ }, SERIALIZER::encode, executorService);
+ }
+
+ /**
+ * Registers internal message subscribers.
+ */
+ private void registerSubscribers() {
+ receiveWithTimestamp(getDigestsSubject, v -> getDigests());
+ receiveWithTimestamp(getBucketSubject, this::onGetBucket);
+ receiveWithTimestamp(backupSubject, this::onBackup);
+ }
+
+ /**
+ * Unregisters internal message subscribers.
+ */
+ private void unregisterSubscribers() {
+ clusterCommunicator.removeSubscriber(getDigestsSubject);
+ clusterCommunicator.removeSubscriber(getBucketSubject);
+ clusterCommunicator.removeSubscriber(backupSubject);
+ }
+
+ /**
+ * Adds internal event listeners.
+ */
+ private void addListeners() {
+ lifecycleManager.addListener(lifecycleEventListener);
+ }
+
+ /**
+ * Removes internal event listeners.
+ */
+ private void removeListeners() {
+ lifecycleManager.removeListener(lifecycleEventListener);
+ }
+
+ /**
+ * Cancels recurrent scheduled futures.
+ */
+ private synchronized void cancelFutures() {
+ ScheduledFuture<?> backupFuture = this.backupFuture;
+ if (backupFuture != null) {
+ backupFuture.cancel(false);
+ }
+
+ ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+ if (antiEntropyFuture != null) {
+ antiEntropyFuture.cancel(false);
+ }
+ }
+
+ /**
+ * Purges the flow table.
+ */
+ public void purge() {
+ flowTasks.clear();
+ flowBuckets.values().forEach(bucket -> bucket.purge());
+ lastBackupTimes.clear();
+ inFlightUpdates.clear();
+ }
+
+ /**
+ * Closes the device flow table.
+ */
+ public void close() {
+ removeListeners();
+ unregisterSubscribers();
+ cancelFutures();
+ lifecycleManager.close();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
new file mode 100644
index 0000000..30f9396
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Device term context.
+ */
+public class DeviceReplicaInfo {
+ private final long term;
+ private final NodeId master;
+ private final List<NodeId> backups;
+
+ public DeviceReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+ this.term = term;
+ this.master = master;
+ this.backups = backups;
+ }
+
+ /**
+ * Returns the mastership term.
+ *
+ * @return the mastership term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the master for the {@link #term()}.
+ *
+ * @return the master {@link NodeId} for the current {@link #term()}
+ */
+ public NodeId master() {
+ return master;
+ }
+
+ /**
+ * Returns a boolean indicating whether the given {@link NodeId} is the current master.
+ *
+ * @param nodeId the node ID to check
+ * @return indicates whether the given node identifier is the identifier of the current master
+ */
+ public boolean isMaster(NodeId nodeId) {
+ return Objects.equals(master, nodeId);
+ }
+
+ /**
+ * Returns a list of all active backup nodes in priority order.
+ * <p>
+ * The returned backups are limited by the flow rule store's configured backup count.
+ *
+ * @return a list of backup nodes in priority order
+ */
+ public List<NodeId> backups() {
+ return backups;
+ }
+
+ /**
+ * Returns a boolean indicating whether the given node is a backup.
+ *
+ * @param nodeId the node identifier
+ * @return indicates whether the given node is a backup
+ */
+ public boolean isBackup(NodeId nodeId) {
+ return backups.contains(nodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(term, master, backups);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof DeviceReplicaInfo) {
+ DeviceReplicaInfo that = (DeviceReplicaInfo) object;
+ return this.term == that.term
+ && Objects.equals(this.master, that.master)
+ && Objects.equals(this.backups, that.backups);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("term", term())
+ .add("master", master())
+ .add("backups", backups())
+ .toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index f998dad..4502016 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -1,23 +1,24 @@
- /*
- * Copyright 2014-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/*
+* Copyright 2014-present Open Networking Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.onosproject.store.flow.impl;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -25,11 +26,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -46,20 +48,17 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
+import org.onosproject.event.AbstractListenerManager;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowEntry.FlowEntryState;
-import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
-import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleEvent.Type;
import org.onosproject.net.flow.FlowRuleService;
@@ -67,38 +66,41 @@
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
+import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
@@ -111,8 +113,8 @@
@Component(immediate = true)
@Service
public class ECFlowRuleStore
- extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
- implements FlowRuleStore {
+ extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+ implements FlowRuleStore {
private final Logger log = getLogger(getClass());
@@ -120,23 +122,27 @@
private static final int DEFAULT_MAX_BACKUP_COUNT = 2;
private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
+ private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
- // number of devices whose flow entries will be backed up in one communication round
- private static final int FLOW_TABLE_BACKUP_BATCH_SIZE = 1;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
- label = "Number of threads in the message handler pool")
+ label = "Number of threads in the message handler pool")
private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
- label = "Delay in ms between successive backup runs")
+ label = "Delay in ms between successive backup runs")
private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
+
+ @Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
+ label = "Delay in ms between anti-entropy runs")
+ private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
+
@Property(name = "persistenceEnabled", boolValue = false,
- label = "Indicates whether or not changes in the flow table should be persisted to disk.")
+ label = "Indicates whether or not changes in the flow table should be persisted to disk.")
private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
@Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
- label = "Max number of backup copies for each device")
+ label = "Max number of backup copies for each device")
private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
private InternalFlowTable flowTable = new InternalFlowTable();
@@ -169,24 +175,28 @@
private ExecutorService messageHandlingExecutor;
private ExecutorService eventHandler;
- private ScheduledFuture<?> backupTask;
private final ScheduledExecutorService backupSenderExecutor =
- Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
- new InternalTableStatsListener();
+ new InternalTableStatsListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
+ protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(BucketId.class)
+ .register(FlowBucket.class)
+ .build());
protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MastershipBasedTimestamp.class);
+ .register(KryoNamespaces.API)
+ .register(BucketId.class)
+ .register(MastershipBasedTimestamp.class);
- private EventuallyConsistentMap<DeviceId, Integer> flowCounts;
+ protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
private IdGenerator idGenerator;
private NodeId local;
@@ -200,45 +210,37 @@
local = clusterService.getLocalNode().id();
eventHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/flow", "event-handler", log));
+ groupedThreads("onos/flow", "event-handler", log));
messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
registerMessageHandlers(messageHandlingExecutor);
- replicaInfoManager.addListener(flowTable);
- backupTask = backupSenderExecutor.scheduleWithFixedDelay(
- flowTable::backup,
- 0,
- backupPeriod,
- TimeUnit.MILLISECONDS);
-
- flowCounts = storageService.<DeviceId, Integer>eventuallyConsistentMapBuilder()
- .withName("onos-flow-counts")
- .withSerializer(serializerBuilder)
- .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .withTombstonesDisabled()
- .build();
+ mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
+ .withName("onos-flow-store-terms")
+ .withSerializer(serializer)
+ .buildAsyncMap();
deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
- .withName("onos-flow-table-stats")
- .withSerializer(serializerBuilder)
- .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .withTombstonesDisabled()
- .build();
+ .withName("onos-flow-table-stats")
+ .withSerializer(serializerBuilder)
+ .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
deviceTableStats.addListener(tableStatsListener);
+ deviceService.addListener(flowTable);
+ deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
+
logConfig("Started");
}
@Deactivate
public void deactivate(ComponentContext context) {
- replicaInfoManager.removeListener(flowTable);
- backupTask.cancel(true);
configService.unregisterProperties(getClass(), false);
unregisterMessageHandlers();
+ deviceService.removeListener(flowTable);
deviceTableStats.removeListener(tableStatsListener);
deviceTableStats.destroy();
eventHandler.shutdownNow();
@@ -259,6 +261,7 @@
int newPoolSize;
int newBackupPeriod;
int newBackupCount;
+ int newAntiEntropyPeriod;
try {
String s = get(properties, "msgHandlerPoolSize");
newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
@@ -268,39 +271,37 @@
s = get(properties, "backupCount");
newBackupCount = isNullOrEmpty(s) ? backupCount : Integer.parseInt(s.trim());
+
+ s = get(properties, "antiEntropyPeriod");
+ newAntiEntropyPeriod = isNullOrEmpty(s) ? antiEntropyPeriod : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
newBackupCount = DEFAULT_MAX_BACKUP_COUNT;
+ newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
}
- boolean restartBackupTask = false;
-
if (newBackupPeriod != backupPeriod) {
backupPeriod = newBackupPeriod;
- restartBackupTask = true;
+ flowTable.setBackupPeriod(newBackupPeriod);
}
- if (restartBackupTask) {
- if (backupTask != null) {
- // cancel previously running task
- backupTask.cancel(false);
- }
- backupTask = backupSenderExecutor.scheduleWithFixedDelay(
- flowTable::backup,
- 0,
- backupPeriod,
- TimeUnit.MILLISECONDS);
+
+ if (newAntiEntropyPeriod != antiEntropyPeriod) {
+ antiEntropyPeriod = newAntiEntropyPeriod;
+ flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
}
+
if (newPoolSize != msgHandlerPoolSize) {
msgHandlerPoolSize = newPoolSize;
ExecutorService oldMsgHandler = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
// replace previously registered handlers.
registerMessageHandlers(messageHandlingExecutor);
oldMsgHandler.shutdown();
}
+
if (backupCount != newBackupCount) {
backupCount = newBackupCount;
}
@@ -308,23 +309,23 @@
}
private void registerMessageHandlers(ExecutorService executor) {
-
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
- REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
+ REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
clusterCommunicator.addSubscriber(
- GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
+ GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
+ GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+ GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
+ REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
}
private void unregisterMessageHandlers() {
clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+ clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
@@ -333,24 +334,38 @@
private void logConfig(String prefix) {
log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
- prefix, msgHandlerPoolSize, backupPeriod, backupCount);
+ prefix, msgHandlerPoolSize, backupPeriod, backupCount);
}
- // This is not a efficient operation on a distributed sharded
- // flow store. We need to revisit the need for this operation or at least
- // make it device specific.
@Override
public int getFlowRuleCount() {
return Streams.stream(deviceService.getDevices()).parallel()
- .mapToInt(device -> getFlowRuleCount(device.id()))
- .sum();
+ .mapToInt(device -> getFlowRuleCount(device.id()))
+ .sum();
}
@Override
public int getFlowRuleCount(DeviceId deviceId) {
- Integer count = flowCounts.get(deviceId);
- return count != null ? count : flowTable.flowEntries.get(deviceId) != null ?
- flowTable.flowEntries.get(deviceId).keySet().size() : 0;
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
+ return 0;
+ }
+
+ if (Objects.equals(local, master)) {
+ return flowTable.getFlowRuleCount(deviceId);
+ }
+
+ log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ deviceId,
+ GET_DEVICE_FLOW_COUNT,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ 0);
}
@Override
@@ -367,16 +382,16 @@
}
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
- master, rule.deviceId());
+ master, rule.deviceId());
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
- ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- null);
+ ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
@Override
@@ -393,31 +408,31 @@
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- master, deviceId);
+ master, deviceId);
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
- ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptyList());
+ ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptyList());
}
@Override
public void storeFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(
- Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
- rule.deviceId(), idGenerator.getNewId()));
+ Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+ rule.deviceId(), idGenerator.getNewId()));
}
@Override
public void storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
return;
}
@@ -427,11 +442,13 @@
if (master == null) {
log.warn("No master for {} ", deviceId);
- updateStoreInternal(operation);
-
+ Set<FlowRule> allFailures = operation.getOperations()
+ .stream()
+ .map(op -> op.target())
+ .collect(Collectors.toSet());
notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, deviceId)));
return;
}
@@ -441,26 +458,26 @@
}
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
- master, deviceId);
+ master, deviceId);
clusterCommunicator.unicast(operation,
- APPLY_BATCH_FLOWS,
- serializer::encode,
- master)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to storeBatch: {} to {}", operation, master, error);
+ APPLY_BATCH_FLOWS,
+ serializer::encode,
+ master)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to storeBatch: {} to {}", operation, master, error);
- Set<FlowRule> allFailures = operation.getOperations()
- .stream()
- .map(op -> op.target())
- .collect(Collectors.toSet());
+ Set<FlowRule> allFailures = operation.getOperations()
+ .stream()
+ .map(op -> op.target())
+ .collect(Collectors.toSet());
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(false, allFailures, deviceId)));
- }
- });
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, deviceId)));
+ }
+ });
}
private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -470,65 +487,63 @@
Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
if (currentOps.isEmpty()) {
batchOperationComplete(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), did)));
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), did)));
return;
}
notifyDelegate(FlowRuleBatchEvent.requested(new
- FlowRuleBatchRequest(operation.id(),
- currentOps), operation.deviceId()));
+ FlowRuleBatchRequest(operation.id(),
+ currentOps), operation.deviceId()));
}
private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
return operation.getOperations().stream().map(
- op -> {
- StoredFlowEntry entry;
- switch (op.operator()) {
- case ADD:
- entry = new DefaultFlowEntry(op.target());
- flowTable.add(entry);
+ op -> {
+ StoredFlowEntry entry;
+ switch (op.operator()) {
+ case ADD:
+ entry = new DefaultFlowEntry(op.target());
+ flowTable.add(entry);
+ return op;
+ case MODIFY:
+ entry = new DefaultFlowEntry(op.target());
+ flowTable.update(entry);
+ return op;
+ case REMOVE:
+ return flowTable.update(op.target(), stored -> {
+ stored.setState(FlowEntryState.PENDING_REMOVE);
+ log.debug("Setting state of rule to pending remove: {}", stored);
return op;
- case MODIFY:
- entry = new DefaultFlowEntry(op.target());
- flowTable.update(entry);
- return op;
- case REMOVE:
- entry = flowTable.getFlowEntry(op.target());
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- flowTable.update(entry);
- log.debug("Setting state of rule to pending remove: {}", entry);
- return op;
- }
- break;
- default:
- log.warn("Unknown flow operation operator: {}", op.operator());
- }
- return null;
+ });
+ default:
+ log.warn("Unknown flow operation operator: {}", op.operator());
}
+ return null;
+ }
).filter(Objects::nonNull).collect(Collectors.toSet());
}
@Override
public void deleteFlowRule(FlowRule rule) {
storeBatch(
- new FlowRuleBatchOperation(
- Collections.singletonList(
- new FlowRuleBatchEntry(
- FlowRuleOperation.REMOVE,
- rule)), rule.deviceId(), idGenerator.getNewId()));
+ new FlowRuleBatchOperation(
+ Collections.singletonList(
+ new FlowRuleBatchEntry(
+ FlowRuleOperation.REMOVE,
+ rule)), rule.deviceId(), idGenerator.getNewId()));
}
@Override
public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
if (mastershipService.isLocalMaster(rule.deviceId())) {
- StoredFlowEntry stored = flowTable.getFlowEntry(rule);
- if (stored != null &&
- stored.state() != FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntryState.PENDING_ADD);
- return new FlowRuleEvent(Type.RULE_UPDATED, rule);
- }
+ return flowTable.update(rule, stored -> {
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.PENDING_ADD);
+ return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+ }
+ return null;
+ });
}
return null;
}
@@ -541,14 +556,12 @@
}
log.warn("Tried to update FlowRule {} state,"
- + " while the Node was not the master.", rule);
+ + " while the Node was not the master.", rule);
return null;
}
private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
- // check if this new rule is an update to an existing entry
- StoredFlowEntry stored = flowTable.getFlowEntry(rule);
- if (stored != null) {
+ FlowRuleEvent event = flowTable.update(rule, stored -> {
stored.setBytes(rule.bytes());
stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
stored.setLiveType(rule.liveType());
@@ -556,11 +569,12 @@
stored.setLastSeen();
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
- // Update the flow table to ensure the changes are replicated
- flowTable.update(stored);
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+ });
+ if (event != null) {
+ return event;
}
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
@@ -586,14 +600,17 @@
}
log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
- master, deviceId);
+ master, deviceId);
- return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
- rule,
- REMOVE_FLOW_ENTRY,
- serializer::encode,
- serializer::decode,
- master));
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ rule,
+ REMOVE_FLOW_ENTRY,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -654,307 +671,186 @@
}
}
- private class BackupOperation {
- private final NodeId nodeId;
- private final DeviceId deviceId;
-
- public BackupOperation(NodeId nodeId, DeviceId deviceId) {
- this.nodeId = nodeId;
- this.deviceId = deviceId;
- }
+ private class InternalFlowTable implements DeviceListener {
+ private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
@Override
- public int hashCode() {
- return Objects.hash(nodeId, deviceId);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof BackupOperation) {
- BackupOperation that = (BackupOperation) other;
- return this.nodeId.equals(that.nodeId) &&
- this.deviceId.equals(that.deviceId);
- } else {
- return false;
+ public void event(DeviceEvent event) {
+ if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
+ addDevice(event.subject().id());
}
}
- }
-
- private class InternalFlowTable implements ReplicaInfoEventListener {
-
- //TODO replace the Map<V,V> with ExtendedSet
- private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- flowEntries = Maps.newConcurrentMap();
-
- private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
- private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
-
- @Override
- public void event(ReplicaInfoEvent event) {
- eventHandler.execute(() -> handleEvent(event));
- }
-
- private void handleEvent(ReplicaInfoEvent event) {
- DeviceId deviceId = event.subject();
- if (!mastershipService.isLocalMaster(deviceId)) {
- return;
- }
- if (event.type() == MASTER_CHANGED) {
- lastUpdateTimes.put(deviceId, System.currentTimeMillis());
- }
- backupSenderExecutor.schedule(this::backup, 0, TimeUnit.SECONDS);
- }
-
- private void sendBackups(NodeId nodeId, Set<DeviceId> deviceIds) {
- // split up the devices into smaller batches and send them separately.
- Iterables.partition(deviceIds, FLOW_TABLE_BACKUP_BATCH_SIZE)
- .forEach(ids -> backupFlowEntries(nodeId, Sets.newHashSet(ids)));
- }
-
- private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
- if (deviceIds.isEmpty()) {
- return;
- }
- log.debug("Sending flowEntries for devices {} to {} for backup.", deviceIds, nodeId);
- Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- deviceFlowEntries = Maps.newConcurrentMap();
- deviceIds.forEach(id -> {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = getFlowTableCopy(id);
- int flowCount = copy.entrySet().stream()
- .mapToInt(e -> e.getValue().values().size()).sum();
- flowCounts.put(id, flowCount);
- deviceFlowEntries.put(id, copy);
- });
- clusterCommunicator.<Map<DeviceId,
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>,
- Set<DeviceId>>
- sendAndReceive(deviceFlowEntries,
- FLOW_TABLE_BACKUP,
- serializer::encode,
- serializer::decode,
- nodeId)
- .whenComplete((backedupDevices, error) -> {
- Set<DeviceId> devicesNotBackedup = error != null ?
- deviceFlowEntries.keySet() :
- Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
- if (devicesNotBackedup.size() > 0) {
- log.warn("Failed to backup devices: {}. Reason: {}, Node: {}",
- devicesNotBackedup, error != null ? error.getMessage() : "none",
- nodeId);
- }
- if (backedupDevices != null) {
- backedupDevices.forEach(id -> {
- lastBackupTimes.put(new BackupOperation(nodeId, id), System.currentTimeMillis());
- });
- }
- });
- }
/**
- * Returns the flow table for specified device.
+ * Adds the given device to the flow table.
*
- * @param deviceId identifier of the device
- * @return Map representing Flow Table of given device.
+ * @param deviceId the device to add to the table
*/
- private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
- if (persistenceEnabled) {
- return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
- .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
- .withName("FlowTable:" + deviceId.toString())
- .withSerializer(new Serializer() {
- @Override
- public <T> byte[] encode(T object) {
- return serializer.encode(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return serializer.decode(bytes);
- }
-
- @Override
- public <T> T copy(T object) {
- return serializer.copy(object);
- }
- })
- .build());
- } else {
- return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
- }
+ public void addDevice(DeviceId deviceId) {
+ flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+ id,
+ clusterService,
+ clusterCommunicator,
+ new InternalLifecycleManager(id),
+ backupSenderExecutor,
+ backupPeriod,
+ antiEntropyPeriod));
}
- private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTableCopy(DeviceId deviceId) {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
- if (persistenceEnabled) {
- return flowEntries.computeIfAbsent(deviceId, id -> persistenceService
- .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
- .withName("FlowTable:" + deviceId.toString())
- .withSerializer(new Serializer() {
- @Override
- public <T> byte[] encode(T object) {
- return serializer.encode(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return serializer.decode(bytes);
- }
-
- @Override
- public <T> T copy(T object) {
- return serializer.copy(object);
- }
- })
- .build());
- } else {
- flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
- copy.put(k, Maps.newHashMap(v));
- });
- return copy;
- }
+ /**
+ * Sets the flow table backup period.
+ *
+ * @param backupPeriod the flow table backup period
+ */
+ void setBackupPeriod(int backupPeriod) {
+ flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
}
- private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
- return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+ /**
+ * Sets the flow table anti-entropy period.
+ *
+ * @param antiEntropyPeriod the flow table anti-entropy period
+ */
+ void setAntiEntropyPeriod(int antiEntropyPeriod) {
+ flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
}
- private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
- return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
+ /**
+ * Returns the flow table for a specific device.
+ *
+ * @param deviceId the device identifier
+ * @return the flow table for the given device
+ */
+ private DeviceFlowTable getFlowTable(DeviceId deviceId) {
+ DeviceFlowTable flowTable = flowTables.get(deviceId);
+ return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+ deviceId,
+ clusterService,
+ clusterCommunicator,
+ new InternalLifecycleManager(deviceId),
+ backupSenderExecutor,
+ backupPeriod,
+ antiEntropyPeriod));
}
- private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
- return getFlowTable(deviceId).values().stream()
- .flatMap(m -> m.values().stream())
- .collect(Collectors.toSet());
+ /**
+ * Returns the flow rule count for the given device.
+ *
+ * @param deviceId the device for which to return the flow rule count
+ * @return the flow rule count for the given device
+ */
+ public int getFlowRuleCount(DeviceId deviceId) {
+ return getFlowTable(deviceId).count();
}
+ /**
+ * Returns the flow entry for the given rule.
+ *
+ * @param rule the rule for which to return the flow entry
+ * @return the flow entry for the given rule
+ */
public StoredFlowEntry getFlowEntry(FlowRule rule) {
- return getFlowEntryInternal(rule);
+ return getFlowTable(rule.deviceId()).getFlowEntry(rule);
}
+ /**
+ * Returns the set of flow entries for the given device.
+ *
+ * @param deviceId the device for which to lookup flow entries
+ * @return the set of flow entries for the given device
+ */
public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
- return getFlowEntriesInternal(deviceId);
+ return getFlowTable(deviceId).getFlowEntries();
}
+ /**
+ * Adds the given flow rule.
+ *
+ * @param rule the rule to add
+ */
public void add(FlowEntry rule) {
- getFlowEntriesInternal(rule.deviceId(), rule.id())
- .compute((StoredFlowEntry) rule, (k, stored) -> {
- return (StoredFlowEntry) rule;
- });
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
+ Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).add(rule),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
+ /**
+ * Updates the given flow rule.
+ *
+ * @param rule the rule to update
+ */
public void update(FlowEntry rule) {
- getFlowEntriesInternal(rule.deviceId(), rule.id())
- .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
- if (rule instanceof DefaultFlowEntry) {
- DefaultFlowEntry updated = (DefaultFlowEntry) rule;
- if (stored instanceof DefaultFlowEntry) {
- DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
- if (updated.created() >= storedEntry.created()) {
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
- return updated;
- } else {
- log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
- return stored;
- }
- }
- }
- return stored;
- });
+ Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).update(rule),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
+ /**
+ * Applies the given update function to the rule.
+ *
+ * @param function the update function to apply
+ * @return a future to be completed with the update event or {@code null} if the rule was not updated
+ */
+ public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+ return Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).update(rule, function),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
+ }
+
+ /**
+ * Removes the given flow rule.
+ *
+ * @param rule the rule to remove
+ */
public FlowEntry remove(FlowEntry rule) {
- final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
- final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
- flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
- flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
- if (rule instanceof DefaultFlowEntry) {
- DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
- if (stored instanceof DefaultFlowEntry) {
- DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
- if (toRemove.created() < storedEntry.created()) {
- log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
- // the key is not updated, removedRule remains null
- return stored;
- }
- }
- }
- removedRule.set(stored);
- return null;
- });
- return flowEntries.isEmpty() ? null : flowEntries;
- });
-
- if (removedRule.get() != null) {
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
- return removedRule.get();
- } else {
- return null;
- }
+ return Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).remove(rule),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
+ /**
+ * Purges flow rules for the given device.
+ *
+ * @param deviceId the device for which to purge flow rules
+ */
public void purgeFlowRule(DeviceId deviceId) {
- flowEntries.remove(deviceId);
+ // If the device is still present in the store, purge the underlying DeviceFlowTable.
+ // Otherwise, remove the DeviceFlowTable and unregister message handlers.
+ if (deviceService.getDevice(deviceId) != null) {
+ DeviceFlowTable flowTable = flowTables.get(deviceId);
+ if (flowTable != null) {
+ flowTable.purge();
+ }
+ } else {
+ DeviceFlowTable flowTable = flowTables.remove(deviceId);
+ if (flowTable != null) {
+ flowTable.close();
+ }
+ }
}
+ /**
+ * Purges all flow rules from the table.
+ */
public void purgeFlowRules() {
- flowEntries.clear();
- }
-
- private List<NodeId> getBackupNodes(DeviceId deviceId) {
- // The returned backup node list is in the order of preference i.e. next likely master first.
- List<NodeId> allPossibleBackupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
- return ImmutableList.copyOf(allPossibleBackupNodes)
- .subList(0, Math.min(allPossibleBackupNodes.size(), backupCount));
- }
-
- private void backup() {
- try {
- // compute a mapping from node to the set of devices whose flow entries it should backup
- Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
- flowEntries.keySet().forEach(deviceId -> {
- List<NodeId> backupNodes = getBackupNodes(deviceId);
- backupNodes.forEach(backupNode -> {
- if (lastBackupTimes.getOrDefault(new BackupOperation(backupNode, deviceId), 0L)
- < lastUpdateTimes.getOrDefault(deviceId, 0L)) {
- devicesToBackupByNode.computeIfAbsent(backupNode,
- nodeId -> Sets.newHashSet()).add(deviceId);
- }
- });
- });
- // send the device flow entries to their respective backup nodes
- devicesToBackupByNode.forEach(this::sendBackups);
- } catch (Exception e) {
- log.error("Backup failed.", e);
+ Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
+ while (iterator.hasNext()) {
+ iterator.next().close();
+ iterator.remove();
}
}
-
- private Set<DeviceId> onBackupReceipt(Map<DeviceId,
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) {
- log.debug("Received flowEntries for {} to backup", flowTables.keySet());
- Set<DeviceId> backedupDevices = Sets.newHashSet();
- try {
- flowTables.forEach((deviceId, deviceFlowTable) -> {
- // Only process those devices are that not managed by the local node.
- if (!Objects.equals(local, mastershipService.getMasterFor(deviceId))) {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
- getFlowTable(deviceId);
- backupFlowTable.clear();
- backupFlowTable.putAll(deviceFlowTable);
- backedupDevices.add(deviceId);
- }
- });
- } catch (Exception e) {
- log.warn("Failure processing backup request", e);
- }
- return backedupDevices;
- }
}
@Override
- public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
- List<TableStatisticsEntry> tableStats) {
+ public FlowRuleEvent updateTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStats) {
deviceTableStats.put(deviceId, tableStats);
return null;
}
@@ -978,16 +874,116 @@
@Override
public long getActiveFlowRuleCount(DeviceId deviceId) {
return Streams.stream(getTableStatistics(deviceId))
- .mapToLong(TableStatisticsEntry::activeFlowEntries)
- .sum();
+ .mapToLong(TableStatisticsEntry::activeFlowEntries)
+ .sum();
}
private class InternalTableStatsListener
implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
@Override
public void event(EventuallyConsistentMapEvent<DeviceId,
- List<TableStatisticsEntry>> event) {
+ List<TableStatisticsEntry>> event) {
//TODO: Generate an event to listeners (do we need?)
}
}
-}
+
+ /**
+ * Device lifecycle manager implementation.
+ */
+ private final class InternalLifecycleManager
+ extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
+ implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
+
+ private final DeviceId deviceId;
+
+ private volatile DeviceReplicaInfo replicaInfo;
+
+ InternalLifecycleManager(DeviceId deviceId) {
+ this.deviceId = deviceId;
+ replicaInfoManager.addListener(this);
+ mastershipTermLifecycles.addListener(this);
+ replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
+ }
+
+ @Override
+ public DeviceReplicaInfo getReplicaInfo() {
+ return replicaInfo;
+ }
+
+ @Override
+ public void activate(long term) {
+ final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (replicaInfo != null && replicaInfo.term() == term) {
+ mastershipTermLifecycles.put(deviceId, term);
+ }
+ }
+
+ @Override
+ public void event(ReplicaInfoEvent event) {
+ if (event.subject().equals(deviceId)) {
+ onReplicaInfoChange(event.replicaInfo());
+ }
+ }
+
+ @Override
+ public void event(MapEvent<DeviceId, Long> event) {
+ if (event.key().equals(deviceId) && event.newValue() != null) {
+ onActivate(event.newValue().value());
+ }
+ }
+
+ /**
+ * Handles a term activation event.
+ *
+ * @param term the term that was activated
+ */
+ private void onActivate(long term) {
+ final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (replicaInfo != null && replicaInfo.term() == term) {
+ NodeId master = replicaInfo.master().orElse(null);
+ List<NodeId> backups = replicaInfo.backups()
+ .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+ listenerRegistry.process(new LifecycleEvent(
+ LifecycleEvent.Type.TERM_ACTIVE,
+ new DeviceReplicaInfo(term, master, backups)));
+ }
+ }
+
+ /**
+ * Handles a replica info change event.
+ *
+ * @param replicaInfo the updated replica info
+ */
+ private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
+ DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+ this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
+ if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
+ if (oldReplicaInfo != null) {
+ listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
+ }
+ listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
+ } else if (oldReplicaInfo.term() == replicaInfo.term()) {
+ listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_UPDATE, this.replicaInfo));
+ }
+ }
+
+ /**
+ * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
+ *
+ * @param replicaInfo the replica info to convert
+ * @return the converted replica info
+ */
+ private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
+ NodeId master = replicaInfo.master().orElse(null);
+ List<NodeId> backups = replicaInfo.backups()
+ .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+ return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
+ }
+
+ @Override
+ public void close() {
+ replicaInfoManager.removeListener(this);
+ mastershipTermLifecycles.removeListener(this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index fd4f6d7..c57e9eb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -32,6 +32,9 @@
public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
= new MessageSubject("peer-forward-get-device-flow-entries");
+ public static final MessageSubject GET_DEVICE_FLOW_COUNT
+ = new MessageSubject("peer-forward-get-flow-count");
+
public static final MessageSubject REMOVE_FLOW_ENTRY
= new MessageSubject("peer-forward-remove-flow-entry");
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
new file mode 100644
index 0000000..c1fea52
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Container for a bucket of flows assigned to a specific device.
+ * <p>
+ * The bucket is mutable. When changes are made to the bucket, the term and timestamp in which the change
+ * occurred is recorded for ordering changes.
+ */
+public class FlowBucket {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
+ private final BucketId bucketId;
+ private volatile long term;
+ private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
+ private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
+
+ FlowBucket(BucketId bucketId) {
+ this.bucketId = bucketId;
+ }
+
+ /**
+ * Returns the flow bucket identifier.
+ *
+ * @return the flow bucket identifier
+ */
+ public BucketId bucketId() {
+ return bucketId;
+ }
+
+ /**
+ * Returns the flow bucket term.
+ *
+ * @return the flow bucket term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the flow bucket timestamp.
+ *
+ * @return the flow bucket timestamp
+ */
+ public LogicalTimestamp timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Returns the digest for the bucket.
+ *
+ * @return the digest for the bucket
+ */
+ public FlowBucketDigest getDigest() {
+ return new FlowBucketDigest(bucketId().bucket(), term(), timestamp());
+ }
+
+ /**
+ * Returns the flow entries in the bucket.
+ *
+ * @return the flow entries in the bucket
+ */
+ public Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowBucket() {
+ return flowBucket;
+ }
+
+ /**
+ * Returns the flow entries for the given flow.
+ *
+ * @param flowId the flow identifier
+ * @return the flows for the given flow ID
+ */
+ public Map<StoredFlowEntry, StoredFlowEntry> getFlowEntries(FlowId flowId) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(flowId);
+ return flowEntries != null ? flowEntries : flowBucket.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+ }
+
+ /**
+ * Counts the flows in the bucket.
+ *
+ * @return the number of flows in the bucket
+ */
+ public int count() {
+ return flowBucket.values()
+ .stream()
+ .mapToInt(entry -> entry.values().size())
+ .sum();
+ }
+
+ /**
+ * Records an update to the bucket.
+ */
+ private void recordUpdate(long term, LogicalTimestamp timestamp) {
+ this.term = term;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Adds the given flow rule to the bucket.
+ *
+ * @param rule the rule to add
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ */
+ public void add(FlowEntry rule, long term, LogicalClock clock) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+ if (flowEntries == null) {
+ flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+ }
+ flowEntries.put((StoredFlowEntry) rule, (StoredFlowEntry) rule);
+ recordUpdate(term, clock.getTimestamp());
+ }
+
+ /**
+ * Updates the given flow rule in the bucket.
+ *
+ * @param rule the rule to update
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ */
+ public void update(FlowEntry rule, long term, LogicalClock clock) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+ if (flowEntries == null) {
+ flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+ }
+ flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+ if (rule instanceof DefaultFlowEntry) {
+ DefaultFlowEntry updated = (DefaultFlowEntry) rule;
+ if (stored instanceof DefaultFlowEntry) {
+ DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+ if (updated.created() >= storedEntry.created()) {
+ recordUpdate(term, clock.getTimestamp());
+ return updated;
+ } else {
+ LOGGER.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
+ return stored;
+ }
+ }
+ }
+ return stored;
+ });
+ }
+
+ /**
+ * Applies the given update function to the rule.
+ *
+ * @param rule the rule to update
+ * @param function the update function to apply
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ * @param <T> the result type
+ * @return the update result or {@code null} if the rule was not updated
+ */
+ public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function, long term, LogicalClock clock) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+ if (flowEntries == null) {
+ flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+ }
+
+ AtomicReference<T> resultRef = new AtomicReference<>();
+ flowEntries.computeIfPresent(new DefaultFlowEntry(rule), (k, stored) -> {
+ if (stored != null) {
+ T result = function.apply(stored);
+ if (result != null) {
+ recordUpdate(term, clock.getTimestamp());
+ resultRef.set(result);
+ }
+ }
+ return stored;
+ });
+ return resultRef.get();
+ }
+
+ /**
+ * Removes the given flow rule from the bucket.
+ *
+ * @param rule the rule to remove
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ * @return the removed flow entry
+ */
+ public FlowEntry remove(FlowEntry rule, long term, LogicalClock clock) {
+ final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
+ flowBucket.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
+ flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+ if (rule instanceof DefaultFlowEntry) {
+ DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
+ if (stored instanceof DefaultFlowEntry) {
+ DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+ if (toRemove.created() < storedEntry.created()) {
+ LOGGER.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
+ // the key is not updated, removedRule remains null
+ return stored;
+ }
+ }
+ }
+ removedRule.set(stored);
+ return null;
+ });
+ return flowEntries.isEmpty() ? null : flowEntries;
+ });
+
+ if (removedRule.get() != null) {
+ recordUpdate(term, clock.getTimestamp());
+ return removedRule.get();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Purges the bucket.
+ */
+ public void purge() {
+ flowBucket.clear();
+ }
+
+ /**
+ * Clears the bucket.
+ */
+ public void clear() {
+ term = 0;
+ timestamp = new LogicalTimestamp(0);
+ flowBucket.clear();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
new file mode 100644
index 0000000..b5453e5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Flow bucket digest.
+ */
+public class FlowBucketDigest {
+ private final int bucket;
+ private final long term;
+ private final LogicalTimestamp timestamp;
+
+ FlowBucketDigest(int bucket, long term, LogicalTimestamp timestamp) {
+ this.bucket = bucket;
+ this.term = term;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Returns the bucket identifier.
+ *
+ * @return the bucket identifier
+ */
+ public int bucket() {
+ return bucket;
+ }
+
+ /**
+ * Returns the bucket term.
+ *
+ * @return the bucket term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the bucket timestamp.
+ *
+ * @return the bucket timestamp
+ */
+ public LogicalTimestamp timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Returns a boolean indicating whether this digest is newer than the given digest.
+ *
+ * @param digest the digest to check
+ * @return indicates whether this digest is newer than the given digest
+ */
+ public boolean isNewerThan(FlowBucketDigest digest) {
+ return digest == null || term() > digest.term() || timestamp().isNewerThan(digest.timestamp());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucket);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ return object instanceof FlowBucketDigest
+ && ((FlowBucketDigest) object).bucket == bucket;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
new file mode 100644
index 0000000..ab6262f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Flow table lifecycle event.
+ */
+public class LifecycleEvent extends AbstractEvent<LifecycleEvent.Type, DeviceReplicaInfo> {
+
+ /**
+ * Lifecycle event type.
+ */
+ public enum Type {
+ TERM_START,
+ TERM_ACTIVE,
+ TERM_UPDATE,
+ TERM_END,
+ }
+
+ public LifecycleEvent(Type type, DeviceReplicaInfo subject) {
+ super(type, subject);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
new file mode 100644
index 0000000..a2f3b3d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Flow table lifecycle event listener.
+ */
+public interface LifecycleEventListener extends EventListener<LifecycleEvent> {
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
new file mode 100644
index 0000000..213454c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.ListenerService;
+
+/**
+ * Flow table lifecycle manager.
+ */
+public interface LifecycleManager extends ListenerService<LifecycleEvent, LifecycleEventListener> {
+
+ /**
+ * Returns the current term.
+ *
+ * @return the current term
+ */
+ DeviceReplicaInfo getReplicaInfo();
+
+ /**
+ * Activates the given term.
+ *
+ * @param term the term to activate
+ */
+ void activate(long term);
+
+ /**
+ * Closes the lifecycle manager.
+ */
+ void close();
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
new file mode 100644
index 0000000..92d39cf
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Logical clock.
+ */
+public final class LogicalClock {
+ private final AtomicLong timestamp = new AtomicLong();
+
+ /**
+ * Records an event.
+ *
+ * @param timestamp the event timestamp
+ */
+ public void tick(LogicalTimestamp timestamp) {
+ this.timestamp.accumulateAndGet(timestamp.value(), (x, y) -> Math.max(x, y) + 1);
+ }
+
+ /**
+ * Returns a timestamped value.
+ *
+ * @param value the value to timestamp
+ * @param <T> the value type
+ * @return the timestamped value
+ */
+ public <T> Timestamped<T> timestamp(T value) {
+ return new Timestamped<>(value, getTimestamp());
+ }
+
+ /**
+ * Increments and returns the current timestamp.
+ *
+ * @return the current timestamp
+ */
+ public LogicalTimestamp getTimestamp() {
+ return new LogicalTimestamp(timestamp.incrementAndGet());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index 0f47c08..2da23a4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,18 +15,16 @@
*/
package org.onosproject.store.flow.impl;
-import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -36,8 +34,6 @@
import org.onosproject.store.flow.ReplicaInfoService;
import org.slf4j.Logger;
-import java.util.Collections;
-import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -79,7 +75,7 @@
@Override
public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
- return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
+ return buildFromRoleInfo(mastershipService.getMastershipFor(deviceId));
}
@Override
@@ -92,17 +88,15 @@
listenerRegistry.removeListener(checkNotNull(listener));
}
- private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
- List<NodeId> backups = roles.backups() == null ?
- Collections.emptyList() : ImmutableList.copyOf(roles.backups());
- return new ReplicaInfo(roles.master(), backups);
+ private static ReplicaInfo buildFromRoleInfo(MastershipInfo mastership) {
+ return new ReplicaInfo(mastership.term(), mastership.master().orElse(null), mastership.backups());
}
final class InternalMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
- final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
+ final ReplicaInfo replicaInfo = buildFromRoleInfo(event.mastershipInfo());
switch (event.type()) {
case MASTER_CHANGED:
eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
new file mode 100644
index 0000000..e7f566b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Timestamped value.
+ */
+public class Timestamped<T> {
+ private final T value;
+ private final LogicalTimestamp timestamp;
+
+ public Timestamped(T value, LogicalTimestamp timestamp) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Returns the value.
+ *
+ * @return the value
+ */
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Returns the timestamp.
+ *
+ * @return the timestamp
+ */
+ public LogicalTimestamp timestamp() {
+ return timestamp;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 8521f52..30ece6e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -23,7 +23,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -34,6 +35,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -50,6 +52,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStore;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
@@ -62,10 +65,7 @@
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
-import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
/**
* Implementation of the MastershipStore on top of Leadership Service.
@@ -158,7 +158,7 @@
NodeId leader = leadership == null ? null : leadership.leaderNodeId();
List<NodeId> candidates = leadership == null ?
ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
- MastershipRole role = Objects.equal(localNodeId, leader) ?
+ MastershipRole role = Objects.equals(localNodeId, leader) ?
MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
return CompletableFuture.completedFuture(role);
}
@@ -173,7 +173,7 @@
NodeId leader = leadership == null ? null : leadership.leaderNodeId();
List<NodeId> candidates = leadership == null ?
ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
- return Objects.equal(nodeId, leader) ?
+ return Objects.equals(nodeId, leader) ?
MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
}
@@ -187,27 +187,15 @@
@Override
public RoleInfo getNodes(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
+ MastershipInfo mastership = getMastership(deviceId);
+ return new RoleInfo(mastership.master().orElse(null), mastership.backups());
+ }
- Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService.getNodes()
- .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
-
- NodeId master = null;
- final List<NodeId> standbys = Lists.newLinkedList();
-
- List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
-
- for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
- if (entry.getValue() == MastershipRole.MASTER) {
- master = entry.getKey();
- } else if (entry.getValue() == MastershipRole.STANDBY) {
- standbys.add(entry.getKey());
- }
- }
-
- List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
-
- return new RoleInfo(master, sortedStandbyList);
+ @Override
+ public MastershipInfo getMastership(DeviceId deviceId) {
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
+ return buildMastershipFromLeadership(leadership);
}
@Override
@@ -263,7 +251,7 @@
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
NodeId newMaster = candidates.stream()
- .filter(candidate -> !Objects.equal(nodeId, candidate))
+ .filter(candidate -> !Objects.equals(nodeId, candidate))
.findFirst()
.orElse(null);
log.info("Transitioning to role {} for {}. Next master: {}",
@@ -304,7 +292,7 @@
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
leadershipService.withdraw(leadershipTopic);
- return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
+ return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getMastership(deviceId)));
}
@Override
@@ -312,6 +300,27 @@
// Noop. LeadershipService already takes care of detecting and purging stale locks.
}
+ private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+ ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+ if (leadership.leaderNodeId() != null) {
+ builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+ }
+ leadership.candidates().stream()
+ .filter(nodeId -> !Objects.equals(leadership.leaderNodeId(), nodeId))
+ .forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+ clusterService.getNodes().stream()
+ .filter(node -> !Objects.equals(leadership.leaderNodeId(), node.id()))
+ .filter(node -> !leadership.candidates().contains(node.id()))
+ .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+ return new MastershipInfo(
+ leadership.leader() != null ? leadership.leader().term() : 0,
+ leadership.leader() != null
+ ? Optional.of(leadership.leader().nodeId())
+ : Optional.empty(),
+ builder.build());
+ }
+
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
@Override
@@ -328,27 +337,29 @@
private void handleEvent(LeadershipEvent event) {
Leadership leadership = event.subject();
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
- RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
- getNodes(deviceId) : new RoleInfo();
+ MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+ ? buildMastershipFromLeadership(event.subject())
+ : new MastershipInfo();
+
switch (event.type()) {
- case LEADER_AND_CANDIDATES_CHANGED:
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
- break;
- case LEADER_CHANGED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
- break;
- case CANDIDATES_CHANGED:
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
- break;
- case SERVICE_DISRUPTED:
- notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
- break;
- case SERVICE_RESTORED:
- // Do nothing, wait for updates from peers
- break;
- default:
- return;
+ case LEADER_AND_CANDIDATES_CHANGED:
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
+ break;
+ case LEADER_CHANGED:
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
+ break;
+ case CANDIDATES_CHANGED:
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+ break;
+ case SERVICE_DISRUPTED:
+ notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
+ break;
+ case SERVICE_RESTORED:
+ // Do nothing, wait for updates from peers
+ break;
+ default:
+ return;
}
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 7fd0412..72562f9 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -16,6 +16,7 @@
package org.onosproject.store.flow.impl;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import org.junit.After;
import org.junit.Before;
@@ -26,6 +27,7 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.DeviceId;
@@ -40,10 +42,16 @@
import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.persistence.PersistenceServiceAdapter;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncConsistentMapAdapter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.TestStorageService;
import org.onlab.packet.Ip4Address;
import java.util.Iterator;
+import java.util.Optional;
+
import org.osgi.service.component.ComponentContext;
import static org.easymock.EasyMock.createMock;
@@ -103,6 +111,16 @@
public NodeId getMasterFor(DeviceId deviceId) {
return new NodeId("1");
}
+
+ @Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return new MastershipInfo(
+ 1,
+ Optional.of(NodeId.nodeId("1")),
+ ImmutableMap.<NodeId, MastershipRole>builder()
+ .put(NodeId.nodeId("1"), MastershipRole.MASTER)
+ .build());
+ }
}
@@ -132,8 +150,27 @@
@Before
public void setUp() throws Exception {
flowStoreImpl = new ECFlowRuleStore();
- flowStoreImpl.storageService = new TestStorageService();
- flowStoreImpl.replicaInfoManager = new ReplicaInfoManager();
+ flowStoreImpl.storageService = new TestStorageService() {
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ return new ConsistentMapBuilder<K, V>() {
+ @Override
+ public AsyncConsistentMap<K, V> buildAsyncMap() {
+ return new AsyncConsistentMapAdapter<K, V>();
+ }
+
+ @Override
+ public ConsistentMap<K, V> build() {
+ return null;
+ }
+ };
+ }
+ };
+
+ ReplicaInfoManager replicaInfoManager = new ReplicaInfoManager();
+ replicaInfoManager.mastershipService = new MasterOfAll();
+
+ flowStoreImpl.replicaInfoManager = replicaInfoManager;
mockClusterService = createMock(ClusterService.class);
flowStoreImpl.clusterService = mockClusterService;
nodeId = new NodeId("1");
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index dd822df..40aa395 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,32 +15,33 @@
*/
package org.onosproject.store.flow.impl;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipEvent.Type;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -101,7 +102,7 @@
// fake MastershipEvent
eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
- new RoleInfo(NID1, new LinkedList<>())));
+ new MastershipInfo(1, Optional.of(NID1), ImmutableMap.of(NID1, MastershipRole.MASTER))));
assertTrue(latch.await(1, TimeUnit.SECONDS));
}
@@ -149,8 +150,11 @@
}
@Override
- public RoleInfo getNodesFor(DeviceId deviceId) {
- return new RoleInfo(masters.get(deviceId), Collections.emptyList());
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return new MastershipInfo(
+ 1,
+ Optional.ofNullable(masters.get(deviceId)),
+ ImmutableMap.of(NID1, MastershipRole.MASTER));
}
@Override
diff --git a/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java b/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java
index eed7b58..81349a3 100644
--- a/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java
+++ b/drivers/netconf/src/test/java/org/onosproject/drivers/netconf/MockMastershipService.java
@@ -20,6 +20,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -81,6 +82,11 @@
}
@Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
// TODO Auto-generated method stub
return null;
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java
index b34b085..31fdfcd 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMastershipStore.java
@@ -19,6 +19,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
@@ -74,6 +75,15 @@
RoleInfo getNodes(NetworkId networkId, DeviceId deviceId);
/**
+ * Returns the mastership info for a device.
+ *
+ * @param networkId virtual network identifier
+ * @param deviceId the device identifier
+ * @return the mastership info
+ */
+ MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId);
+
+ /**
* Returns the devices that a controller instance is master of.
*
* @param networkId virtual network identifier
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java
index 8e5d005..e5b276b 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMastershipManager.java
@@ -31,6 +31,7 @@
import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipStoreDelegate;
@@ -155,6 +156,12 @@
}
@Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getMastership(networkId, deviceId);
+ }
+
+ @Override
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
checkNotNull(nodeId, NODE_ID_NULL);
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
index 3d1c376..4176130 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -16,6 +16,7 @@
package org.onosproject.incubator.store.virtual.impl;
+import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +35,7 @@
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
@@ -45,7 +47,7 @@
import org.slf4j.Logger;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -64,8 +66,6 @@
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkArgument;
@Component(immediate = true, enabled = false)
@@ -188,30 +188,16 @@
public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
checkArgument(networkId != null, NETWORK_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+ return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+ }
- Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService.getNodes()
- .forEach((node) -> roles.put(node.id(),
- getRole(networkId, node.id(), deviceId)));
-
- NodeId master = null;
- final List<NodeId> standbys = Lists.newLinkedList();
-
- List<NodeId> candidates = leadershipService
- .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
-
- for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
- if (entry.getValue() == MastershipRole.MASTER) {
- master = entry.getKey();
- } else if (entry.getValue() == MastershipRole.STANDBY) {
- standbys.add(entry.getKey());
- }
- }
-
- List<NodeId> sortedStandbyList = candidates.stream()
- .filter(standbys::contains).collect(Collectors.toList());
-
- return new RoleInfo(master, sortedStandbyList);
+ @Override
+ public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+ return buildMastershipFromLeadership(leadership);
}
@Override
@@ -322,9 +308,8 @@
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
leadershipService.withdraw(leadershipTopic);
- return CompletableFuture.completedFuture(new MastershipEvent(eventType,
- deviceId,
- getNodes(networkId, deviceId)));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
}
private CompletableFuture<MastershipEvent>
@@ -338,6 +323,24 @@
// Noop. LeadershipService already takes care of detecting and purging stale locks.
}
+ private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+ ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+ if (leadership.leaderNodeId() != null) {
+ builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+ }
+ leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+ clusterService.getNodes().stream()
+ .filter(node -> !leadership.candidates().contains(node.id()))
+ .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+ return new MastershipInfo(
+ leadership.leader() != null ? leadership.leader().term() : 0,
+ leadership.leader() != null
+ ? Optional.of(leadership.leader().nodeId())
+ : Optional.empty(),
+ builder.build());
+ }
+
private class InternalDeviceMastershipEventListener
implements LeadershipEventListener {
@@ -357,28 +360,23 @@
NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-
- RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
- getNodes(networkId, deviceId) : new RoleInfo();
+ MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+ ? buildMastershipFromLeadership(event.subject())
+ : new MastershipInfo();
switch (event.type()) {
case LEADER_AND_CANDIDATES_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
- deviceId, roleInfo));
- notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case LEADER_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case CANDIDATES_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
break;
case SERVICE_DISRUPTED:
- notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
break;
case SERVICE_RESTORED:
// Do nothing, wait for updates from peers
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
index 730ebee..346999b 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
@@ -17,6 +17,7 @@
package org.onosproject.incubator.store.virtual.impl;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -36,6 +37,7 @@
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
@@ -50,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -122,7 +125,7 @@
// remove from backup list
removeFromBackups(networkId, deviceId, node);
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
@@ -132,13 +135,13 @@
masterMap.put(deviceId, node);
incrementTerm(networkId, deviceId);
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
// add to backup list
if (addToBackup(networkId, deviceId, node)) {
notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
default:
@@ -184,6 +187,27 @@
}
@Override
+ public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+ Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+ Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
+ Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+ ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
+ NodeId master = masterMap.get(deviceId);
+ if (master != null) {
+ roleBuilder.put(master, MastershipRole.MASTER);
+ }
+ backups.getOrDefault(master, Collections.emptyList())
+ .forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
+ clusterService.getNodes().stream()
+ .filter(node -> !masterMap.containsValue(node.id()))
+ .forEach(node -> roleBuilder.put(node.id(), MastershipRole.NONE));
+ return new MastershipInfo(
+ termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
+ Optional.ofNullable(master),
+ roleBuilder.build());
+ }
+
+ @Override
public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
@@ -219,7 +243,7 @@
}
return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(networkId, deviceId)));
+ new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(networkId, deviceId)));
}
@Override
@@ -249,14 +273,14 @@
// TODO: Should there be new event type for no MASTER?
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
} else {
NodeId prevMaster = masterMap.put(deviceId, backup);
incrementTerm(networkId, deviceId);
addToBackup(networkId, deviceId, prevMaster);
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
case STANDBY:
@@ -265,7 +289,7 @@
if (modified) {
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
break;
@@ -314,13 +338,13 @@
incrementTerm(networkId, deviceId);
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
case STANDBY:
if (removeFromBackups(networkId, deviceId, nodeId)) {
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
break;
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
index bad64f3..9989a77 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LldpLinkProviderTest.java
@@ -38,6 +38,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Annotations;
@@ -818,6 +819,11 @@
}
@Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return null;
+ }
+
+ @Override
public void addListener(MastershipListener listener) {
}
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 7725ace..366e346 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -750,6 +750,31 @@
}
/**
+ * Returns a new CompletableFuture completed with the first result from a list of futures. If no future
+ * is completed successfully, the returned future will be completed with the first exception.
+ *
+ * @param futures the input futures
+ * @param <T> future result type
+ * @return a new CompletableFuture
+ */
+ public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures) {
+ CompletableFuture<T> resultFuture = new CompletableFuture<>();
+ CompletableFuture.allOf(futures.stream()
+ .map(future -> future.thenAccept(r -> resultFuture.complete(r)))
+ .toArray(CompletableFuture[]::new))
+ .whenComplete((r, e) -> {
+ if (!resultFuture.isDone()) {
+ if (e != null) {
+ resultFuture.completeExceptionally(e);
+ } else {
+ resultFuture.complete(null);
+ }
+ }
+ });
+ return resultFuture;
+ }
+
+ /**
* Returns a new CompletableFuture completed by with the first positive result from a list of
* input CompletableFutures.
*