Distributed group store using eventual consistent map abstraction
Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
index 8da4acd..7523a64 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
@@ -124,16 +124,23 @@
tBuilder.setOutput(newNeighborLink.src().port())
.setEthDst(deviceConfig.getDeviceMac(
newNeighborLink.dst().deviceId()))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(ns.getEdgeLabel()));
+ }
GroupBucket updatedBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets updatedBuckets = new GroupBuckets(
Arrays.asList(updatedBucket));
log.debug("newPortToExistingNeighborAtEdgeRouter: "
+ "groupService.addBucketsToGroup for neighborset{}", ns);
- groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
+ groupService.addBucketsToGroup(deviceId,
+ getGroupKey(ns),
+ updatedBuckets,
+ getGroupKey(ns),
+ appId);
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
index 755fac2..ec9e42f 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
@@ -18,6 +18,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -27,6 +28,7 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
+import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
@@ -35,6 +37,7 @@
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
@@ -71,6 +74,16 @@
new HashMap<PortNumber, DeviceId>();
private GroupListener listener = new InternalGroupListener();
+ protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
+ .register(URI.class)
+ .register(HashSet.class)
+ .register(DeviceId.class)
+ .register(PortNumber.class)
+ .register(NeighborSet.class)
+ .register(PolicyGroupIdentifier.class)
+ .register(PolicyGroupParams.class)
+ .register(GroupBucketIdentifier.class)
+ .register(GroupBucketIdentifier.BucketOutputType.class);
protected DefaultGroupHandler(DeviceId deviceId,
ApplicationId appId,
@@ -185,9 +198,11 @@
tBuilder.setOutput(port)
.setEthDst(deviceConfig.getDeviceMac(
portDeviceMap.get(port)))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ }
GroupBucket removeBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets removeBuckets = new GroupBuckets(
@@ -196,9 +211,9 @@
+ "groupService.removeBucketsFromGroup "
+ "for neighborset{}", deviceId, ns);
groupService.removeBucketsFromGroup(deviceId,
- ns,
+ getGroupKey(ns),
removeBuckets,
- ns,
+ getGroupKey(ns),
appId);
}
@@ -331,9 +346,12 @@
DefaultTrafficTreatment.builder();
tBuilder.setOutput(sp)
.setEthDst(deviceConfig.getDeviceMac(d))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(ns.getEdgeLabel()));
+ }
buckets.add(DefaultGroupBucket.createSelectGroupBucket(
tBuilder.build()));
}
@@ -343,7 +361,7 @@
deviceId,
Group.Type.SELECT,
groupBuckets,
- ns,
+ getGroupKey(ns),
appId);
log.debug("createGroupsFromNeighborsets: "
+ "groupService.addGroup for neighborset{}", ns);
@@ -386,4 +404,8 @@
handleGroupEvent(event);
}
}
+
+ protected GroupKey getGroupKey(Object obj) {
+ return new DefaultGroupKey(kryo.build().serialize(obj));
+ }
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
index 2bd2b4e..bb1a035 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
@@ -17,8 +17,10 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
@@ -27,10 +29,13 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.MacAddress;
+import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
@@ -69,10 +74,18 @@
protected GroupService groupService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
+ protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
+ .register(URI.class)
+ .register(HashSet.class)
+ .register(DeviceId.class)
+ .register(NeighborSet.class);
+
@Activate
public void activate() {
appId = coreService.registerApplication("org.onosproject.defaultgrouphandler");
@@ -80,14 +93,23 @@
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
for (Device device: deviceService.getDevices()) {
- log.debug("Initiating default group handling for {}", device.id());
- DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
- appId,
- config,
- linkService,
- groupService);
- dgh.createGroups();
- dghMap.put(device.id(), dgh);
+ if (mastershipService.
+ getLocalRole(device.id()) == MastershipRole.MASTER) {
+ log.debug("Initiating default group handling for {}", device.id());
+ DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
+ appId,
+ config,
+ linkService,
+ groupService);
+ dgh.createGroups();
+ dghMap.put(device.id(), dgh);
+ } else {
+ log.debug("Activate: Local role {} "
+ + "is not MASTER for device {}",
+ mastershipService.
+ getLocalRole(device.id()),
+ device.id());
+ }
}
log.info("Activated");
}
@@ -165,7 +187,15 @@
@Override
public void event(DeviceEvent event) {
- switch (event.type()) {
+ if (mastershipService.
+ getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
+ log.debug("Local role {} is not MASTER for device {}",
+ mastershipService.
+ getLocalRole(event.subject().id()),
+ event.subject().id());
+ return;
+ }
+ switch (event.type()) {
case DEVICE_ADDED:
log.debug("Initiating default group handling for {}", event.subject().id());
DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(
@@ -193,6 +223,16 @@
@Override
public void event(LinkEvent event) {
+ if (mastershipService.
+ getLocalRole(event.subject().src().deviceId()) !=
+ MastershipRole.MASTER) {
+ log.debug("InternalLinkListener: Local role {} "
+ + "is not MASTER for device {}",
+ mastershipService.
+ getLocalRole(event.subject().src().deviceId()),
+ event.subject().src().deviceId());
+ return;
+ }
switch (event.type()) {
case LINK_ADDED:
if (dghMap.get(event.subject().src().deviceId()) != null) {
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
index a6129c2..1b5bb50 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
@@ -112,16 +112,23 @@
tBuilder.setOutput(newNeighborLink.src().port())
.setEthDst(deviceConfig.getDeviceMac(
newNeighborLink.dst().deviceId()))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(ns.getEdgeLabel()));
+ }
GroupBucket updatedBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets updatedBuckets = new GroupBuckets(
Arrays.asList(updatedBucket));
log.debug("newPortToExistingNeighborAtEdgeRouter: "
+ "groupService.addBucketsToGroup for neighborset{}", ns);
- groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
+ groupService.addBucketsToGroup(deviceId,
+ getGroupKey(ns),
+ updatedBuckets,
+ getGroupKey(ns),
+ appId);
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
index cf0ae5e..3a46ce6 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
@@ -18,7 +18,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import org.onosproject.net.PortNumber;
-import org.onosproject.net.group.GroupKey;
/**
* Representation of policy group bucket identifier. Not exposed to
@@ -28,7 +27,7 @@
private int label;
private BucketOutputType type;
private PortNumber outPort;
- private GroupKey outGroup;
+ private PolicyGroupIdentifier outGroup;
protected enum BucketOutputType {
PORT,
@@ -44,7 +43,7 @@
}
protected GroupBucketIdentifier(int label,
- GroupKey outGroup) {
+ PolicyGroupIdentifier outGroup) {
this.label = label;
this.type = BucketOutputType.GROUP;
this.outPort = null;
@@ -63,7 +62,7 @@
return this.outPort;
}
- protected GroupKey outGroup() {
+ protected PolicyGroupIdentifier outGroup() {
return this.outGroup;
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
index 497b352..ce739ce 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
@@ -23,7 +23,6 @@
import java.util.Set;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.group.GroupKey;
/**
* Representation of a set of neighbor switch dpids along with edge node
@@ -31,9 +30,10 @@
* ECMP-group that hashes packets to a set of ports connecting to the
* neighbors in this set.
*/
-public class NeighborSet implements GroupKey {
+public class NeighborSet {
private final Set<DeviceId> neighbors;
private final int edgeLabel;
+ public static final int NO_EDGE_LABEL = -1;
/**
* Constructor with set of neighbors. Edge label is
@@ -43,7 +43,7 @@
*/
public NeighborSet(Set<DeviceId> neighbors) {
checkNotNull(neighbors);
- this.edgeLabel = -1;
+ this.edgeLabel = NO_EDGE_LABEL;
this.neighbors = new HashSet<DeviceId>();
this.neighbors.addAll(neighbors);
}
@@ -65,7 +65,7 @@
* Default constructor for kryo serialization.
*/
public NeighborSet() {
- this.edgeLabel = -1;
+ this.edgeLabel = NO_EDGE_LABEL;
this.neighbors = new HashSet<DeviceId>();
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
index d2b43b3..e0cfe52 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
@@ -37,7 +37,6 @@
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.link.LinkService;
import org.slf4j.Logger;
@@ -49,18 +48,17 @@
public class PolicyGroupHandler extends DefaultGroupHandler {
private final Logger log = getLogger(getClass());
- private HashMap<GroupKey, GroupKey> dependentGroups =
- new HashMap<GroupKey, GroupKey>();
+ private HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier> dependentGroups =
+ new HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier>();
/**
- * Creates policy group handler object.
+ * Policy group handler constructor.
*
* @param deviceId device identifier
* @param appId application identifier
* @param config interface to retrieve the device properties
* @param linkService link service object
* @param groupService group service object
- * @return policy group handler type
*/
public PolicyGroupHandler(DeviceId deviceId,
ApplicationId appId,
@@ -175,9 +173,11 @@
tBuilder.setOutput(bucketId.outPort())
.setEthDst(deviceConfig.
getDeviceMac(neighbor))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
+ .setEthSrc(nodeMacAddr);
+ if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
.setMpls(MplsLabel.mplsLabel(bucketId.label()));
+ }
//TODO: BoS
outBuckets.add(DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build()));
@@ -196,8 +196,7 @@
protected void handleGroupEvent(GroupEvent event) {
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
if (dependentGroups.get(event.subject().appCookie()) != null) {
- PolicyGroupIdentifier dependentGroupKey = (PolicyGroupIdentifier)
- dependentGroups.get(event.subject().appCookie());
+ PolicyGroupIdentifier dependentGroupKey = dependentGroups.get(event.subject().appCookie());
dependentGroups.remove(event.subject().appCookie());
boolean fullyResolved = true;
for (GroupBucketIdentifier bucketId:
@@ -217,8 +216,11 @@
dependentGroupKey.bucketIds()) {
TrafficTreatment.Builder tBuilder =
DefaultTrafficTreatment.builder();
- tBuilder.pushMpls()
- .setMpls(MplsLabel.mplsLabel(bucketId.label()));
+ if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(bucketId.label()));
+ }
//TODO: BoS
if (bucketId.type() == BucketOutputType.PORT) {
DeviceId neighbor = portDeviceMap.
@@ -230,12 +232,14 @@
} else {
if (groupService.
getGroup(deviceId,
- bucketId.outGroup()) == null) {
+ getGroupKey(bucketId.
+ outGroup())) == null) {
throw new IllegalStateException();
}
GroupId indirectGroupId = groupService.
getGroup(deviceId,
- bucketId.outGroup()).id();
+ getGroupKey(bucketId.
+ outGroup())).id();
tBuilder.group(indirectGroupId);
}
outBuckets.add(DefaultGroupBucket.
@@ -251,7 +255,7 @@
}
}
- public GroupKey generatePolicyGroupKey(String id,
+ public PolicyGroupIdentifier generatePolicyGroupKey(String id,
List<PolicyGroupParams> params) {
List<GroupBucketIdentifier> bucketIds = new ArrayList<GroupBucketIdentifier>();
for (PolicyGroupParams param: params) {
@@ -320,25 +324,28 @@
return innermostGroupkey;
}
- public void removeGroupChain(GroupKey key) {
+ public void removeGroupChain(PolicyGroupIdentifier key) {
if (!(key instanceof PolicyGroupIdentifier)) {
throw new IllegalArgumentException();
}
- List<GroupKey> groupsToBeDeleted = new ArrayList<GroupKey>();
+ List<PolicyGroupIdentifier> groupsToBeDeleted =
+ new ArrayList<PolicyGroupIdentifier>();
groupsToBeDeleted.add(key);
- Iterator<GroupKey> it = groupsToBeDeleted.iterator();
+ Iterator<PolicyGroupIdentifier> it =
+ groupsToBeDeleted.iterator();
while (it.hasNext()) {
- PolicyGroupIdentifier innerMostGroupKey =
- (PolicyGroupIdentifier) it.next();
+ PolicyGroupIdentifier innerMostGroupKey = it.next();
for (GroupBucketIdentifier bucketId:
innerMostGroupKey.bucketIds()) {
if (bucketId.type() != BucketOutputType.GROUP) {
groupsToBeDeleted.add(bucketId.outGroup());
}
}
- groupService.removeGroup(deviceId, innerMostGroupKey, appId);
+ groupService.removeGroup(deviceId,
+ getGroupKey(innerMostGroupKey),
+ appId);
it.remove();
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
index d213e0c..3c19b29 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
@@ -17,14 +17,12 @@
import java.util.List;
-import org.onosproject.net.group.GroupKey;
-
/**
* Representation of policy based group identifiers.
* Opaque to group handler applications and only the outermost
* policy group identifier in a chain is visible to the applications.
*/
-public class PolicyGroupIdentifier implements GroupKey {
+public class PolicyGroupIdentifier {
private String id;
private List<PolicyGroupParams> inputParams;
private List<GroupBucketIdentifier> bucketIds;