Distributed group store using eventual consistent map abstraction
Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
index a17046f..b50681e 100644
--- a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
+++ b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
@@ -15,11 +15,14 @@
*/
package org.onosproject.bgprouter;
-import com.google.common.collect.ConcurrentHashMultiset;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -30,6 +33,7 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
import org.onosproject.config.NetworkConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -47,12 +51,12 @@
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.host.InterfaceIpAddress;
@@ -67,13 +71,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
/**
* BgpRouter component.
@@ -126,7 +128,7 @@
private final Map<IpAddress, NextHop> nextHops = Maps.newHashMap();
// Stores FIB updates that are waiting for groups to be set up
- private final Multimap<GroupKey, FibEntry> pendingUpdates = HashMultimap.create();
+ private final Multimap<NextHopGroupKey, FibEntry> pendingUpdates = HashMultimap.create();
// Device id of data-plane switch - should be learned from config
private DeviceId deviceId;
@@ -143,6 +145,11 @@
private InternalTableHandler provisionStaticTables = new InternalTableHandler();
+ private KryoNamespace.Builder appKryo = new KryoNamespace.Builder()
+ .register(IpAddress.Version.class)
+ .register(IpAddress.class)
+ .register(NextHopGroupKey.class);
+
@Activate
protected void activate() {
appId = coreService.registerApplication(BGP_ROUTER_APP);
@@ -210,7 +217,9 @@
Group group;
synchronized (pendingUpdates) {
NextHop nextHop = nextHops.get(entry.nextHopIp());
- group = groupService.getGroup(deviceId, nextHop.group());
+ group = groupService.getGroup(deviceId,
+ new DefaultGroupKey(
+ appKryo.build().serialize(nextHop.group())));
if (group == null) {
log.debug("Adding pending flow {}", update.entry());
@@ -309,7 +318,7 @@
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections
.singletonList(bucket)),
- groupKey,
+ new DefaultGroupKey(appKryo.build().serialize(groupKey)),
appId);
groupService.addGroup(groupDescription);
@@ -329,7 +338,10 @@
return null;
}
- Group group = groupService.getGroup(deviceId, nextHop.group());
+ Group group = groupService.getGroup(deviceId,
+ new DefaultGroupKey(appKryo.
+ build().
+ serialize(nextHop.group())));
// FIXME disabling group deletes for now until we verify the logic is OK
/*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
@@ -339,7 +351,9 @@
nextHops.remove(nextHopIp);
- groupService.removeGroup(deviceId, nextHop.group(), appId);
+ groupService.removeGroup(deviceId,
+ new DefaultGroupKey(appKryo.build().serialize(nextHop.group())),
+ appId);
}*/
return group;
@@ -699,8 +713,10 @@
event.type() == GroupEvent.Type.GROUP_UPDATED) {
synchronized (pendingUpdates) {
+ NextHopGroupKey nhGroupKey =
+ appKryo.build().deserialize(group.appCookie().key());
Map<FibEntry, Group> entriesToInstall =
- pendingUpdates.removeAll(group.appCookie())
+ pendingUpdates.removeAll(nhGroupKey)
.stream()
.collect(Collectors
.toMap(e -> e, e -> group));
diff --git a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHop.java b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHop.java
index cc045bc..88ce1a3 100644
--- a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHop.java
+++ b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHop.java
@@ -15,12 +15,12 @@
*/
package org.onosproject.bgprouter;
-import com.google.common.base.MoreObjects;
+import java.util.Objects;
+
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
-import org.onosproject.net.group.GroupKey;
-import java.util.Objects;
+import com.google.common.base.MoreObjects;
/**
* Represents a next hop for routing, whose MAC address has already been resolved.
@@ -29,7 +29,7 @@
private final IpAddress ip;
private final MacAddress mac;
- private final GroupKey group;
+ private final NextHopGroupKey group;
/**
* Creates a new next hop.
@@ -38,7 +38,7 @@
* @param mac next hop's MAC address
* @param group next hop's group
*/
- public NextHop(IpAddress ip, MacAddress mac, GroupKey group) {
+ public NextHop(IpAddress ip, MacAddress mac, NextHopGroupKey group) {
this.ip = ip;
this.mac = mac;
this.group = group;
@@ -67,7 +67,7 @@
*
* @return group
*/
- public GroupKey group() {
+ public NextHopGroupKey group() {
return group;
}
diff --git a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHopGroupKey.java b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHopGroupKey.java
index ae281e3..82a45a0 100644
--- a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHopGroupKey.java
+++ b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/NextHopGroupKey.java
@@ -15,18 +15,18 @@
*/
package org.onosproject.bgprouter;
-import com.google.common.base.MoreObjects;
-import org.onlab.packet.IpAddress;
-import org.onosproject.net.group.GroupKey;
+import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.onlab.packet.IpAddress;
+
+import com.google.common.base.MoreObjects;
/**
* Identifier for a next hop group.
*/
-public class NextHopGroupKey implements GroupKey {
+public class NextHopGroupKey {
private final IpAddress address;
diff --git a/apps/grouphandler/pom.xml b/apps/grouphandler/pom.xml
index a245b9f..17aa23c 100644
--- a/apps/grouphandler/pom.xml
+++ b/apps/grouphandler/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-apps</artifactId>
- <version>1.1.0-SNAPSHOT</version>
+ <version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
index 8da4acd..7523a64 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
@@ -124,16 +124,23 @@
tBuilder.setOutput(newNeighborLink.src().port())
.setEthDst(deviceConfig.getDeviceMac(
newNeighborLink.dst().deviceId()))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(ns.getEdgeLabel()));
+ }
GroupBucket updatedBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets updatedBuckets = new GroupBuckets(
Arrays.asList(updatedBucket));
log.debug("newPortToExistingNeighborAtEdgeRouter: "
+ "groupService.addBucketsToGroup for neighborset{}", ns);
- groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
+ groupService.addBucketsToGroup(deviceId,
+ getGroupKey(ns),
+ updatedBuckets,
+ getGroupKey(ns),
+ appId);
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
index 755fac2..ec9e42f 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
@@ -18,6 +18,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -27,6 +28,7 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
+import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
@@ -35,6 +37,7 @@
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
@@ -71,6 +74,16 @@
new HashMap<PortNumber, DeviceId>();
private GroupListener listener = new InternalGroupListener();
+ protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
+ .register(URI.class)
+ .register(HashSet.class)
+ .register(DeviceId.class)
+ .register(PortNumber.class)
+ .register(NeighborSet.class)
+ .register(PolicyGroupIdentifier.class)
+ .register(PolicyGroupParams.class)
+ .register(GroupBucketIdentifier.class)
+ .register(GroupBucketIdentifier.BucketOutputType.class);
protected DefaultGroupHandler(DeviceId deviceId,
ApplicationId appId,
@@ -185,9 +198,11 @@
tBuilder.setOutput(port)
.setEthDst(deviceConfig.getDeviceMac(
portDeviceMap.get(port)))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ }
GroupBucket removeBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets removeBuckets = new GroupBuckets(
@@ -196,9 +211,9 @@
+ "groupService.removeBucketsFromGroup "
+ "for neighborset{}", deviceId, ns);
groupService.removeBucketsFromGroup(deviceId,
- ns,
+ getGroupKey(ns),
removeBuckets,
- ns,
+ getGroupKey(ns),
appId);
}
@@ -331,9 +346,12 @@
DefaultTrafficTreatment.builder();
tBuilder.setOutput(sp)
.setEthDst(deviceConfig.getDeviceMac(d))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(ns.getEdgeLabel()));
+ }
buckets.add(DefaultGroupBucket.createSelectGroupBucket(
tBuilder.build()));
}
@@ -343,7 +361,7 @@
deviceId,
Group.Type.SELECT,
groupBuckets,
- ns,
+ getGroupKey(ns),
appId);
log.debug("createGroupsFromNeighborsets: "
+ "groupService.addGroup for neighborset{}", ns);
@@ -386,4 +404,8 @@
handleGroupEvent(event);
}
}
+
+ protected GroupKey getGroupKey(Object obj) {
+ return new DefaultGroupKey(kryo.build().serialize(obj));
+ }
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
index 2bd2b4e..bb1a035 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
@@ -17,8 +17,10 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
@@ -27,10 +29,13 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.MacAddress;
+import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
@@ -69,10 +74,18 @@
protected GroupService groupService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
+ protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
+ .register(URI.class)
+ .register(HashSet.class)
+ .register(DeviceId.class)
+ .register(NeighborSet.class);
+
@Activate
public void activate() {
appId = coreService.registerApplication("org.onosproject.defaultgrouphandler");
@@ -80,14 +93,23 @@
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
for (Device device: deviceService.getDevices()) {
- log.debug("Initiating default group handling for {}", device.id());
- DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
- appId,
- config,
- linkService,
- groupService);
- dgh.createGroups();
- dghMap.put(device.id(), dgh);
+ if (mastershipService.
+ getLocalRole(device.id()) == MastershipRole.MASTER) {
+ log.debug("Initiating default group handling for {}", device.id());
+ DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
+ appId,
+ config,
+ linkService,
+ groupService);
+ dgh.createGroups();
+ dghMap.put(device.id(), dgh);
+ } else {
+ log.debug("Activate: Local role {} "
+ + "is not MASTER for device {}",
+ mastershipService.
+ getLocalRole(device.id()),
+ device.id());
+ }
}
log.info("Activated");
}
@@ -165,7 +187,15 @@
@Override
public void event(DeviceEvent event) {
- switch (event.type()) {
+ if (mastershipService.
+ getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
+ log.debug("Local role {} is not MASTER for device {}",
+ mastershipService.
+ getLocalRole(event.subject().id()),
+ event.subject().id());
+ return;
+ }
+ switch (event.type()) {
case DEVICE_ADDED:
log.debug("Initiating default group handling for {}", event.subject().id());
DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(
@@ -193,6 +223,16 @@
@Override
public void event(LinkEvent event) {
+ if (mastershipService.
+ getLocalRole(event.subject().src().deviceId()) !=
+ MastershipRole.MASTER) {
+ log.debug("InternalLinkListener: Local role {} "
+ + "is not MASTER for device {}",
+ mastershipService.
+ getLocalRole(event.subject().src().deviceId()),
+ event.subject().src().deviceId());
+ return;
+ }
switch (event.type()) {
case LINK_ADDED:
if (dghMap.get(event.subject().src().deviceId()) != null) {
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
index a6129c2..1b5bb50 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
@@ -112,16 +112,23 @@
tBuilder.setOutput(newNeighborLink.src().port())
.setEthDst(deviceConfig.getDeviceMac(
newNeighborLink.dst().deviceId()))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(ns.getEdgeLabel()));
+ }
GroupBucket updatedBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets updatedBuckets = new GroupBuckets(
Arrays.asList(updatedBucket));
log.debug("newPortToExistingNeighborAtEdgeRouter: "
+ "groupService.addBucketsToGroup for neighborset{}", ns);
- groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
+ groupService.addBucketsToGroup(deviceId,
+ getGroupKey(ns),
+ updatedBuckets,
+ getGroupKey(ns),
+ appId);
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
index cf0ae5e..3a46ce6 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
@@ -18,7 +18,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import org.onosproject.net.PortNumber;
-import org.onosproject.net.group.GroupKey;
/**
* Representation of policy group bucket identifier. Not exposed to
@@ -28,7 +27,7 @@
private int label;
private BucketOutputType type;
private PortNumber outPort;
- private GroupKey outGroup;
+ private PolicyGroupIdentifier outGroup;
protected enum BucketOutputType {
PORT,
@@ -44,7 +43,7 @@
}
protected GroupBucketIdentifier(int label,
- GroupKey outGroup) {
+ PolicyGroupIdentifier outGroup) {
this.label = label;
this.type = BucketOutputType.GROUP;
this.outPort = null;
@@ -63,7 +62,7 @@
return this.outPort;
}
- protected GroupKey outGroup() {
+ protected PolicyGroupIdentifier outGroup() {
return this.outGroup;
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
index 497b352..ce739ce 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
@@ -23,7 +23,6 @@
import java.util.Set;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.group.GroupKey;
/**
* Representation of a set of neighbor switch dpids along with edge node
@@ -31,9 +30,10 @@
* ECMP-group that hashes packets to a set of ports connecting to the
* neighbors in this set.
*/
-public class NeighborSet implements GroupKey {
+public class NeighborSet {
private final Set<DeviceId> neighbors;
private final int edgeLabel;
+ public static final int NO_EDGE_LABEL = -1;
/**
* Constructor with set of neighbors. Edge label is
@@ -43,7 +43,7 @@
*/
public NeighborSet(Set<DeviceId> neighbors) {
checkNotNull(neighbors);
- this.edgeLabel = -1;
+ this.edgeLabel = NO_EDGE_LABEL;
this.neighbors = new HashSet<DeviceId>();
this.neighbors.addAll(neighbors);
}
@@ -65,7 +65,7 @@
* Default constructor for kryo serialization.
*/
public NeighborSet() {
- this.edgeLabel = -1;
+ this.edgeLabel = NO_EDGE_LABEL;
this.neighbors = new HashSet<DeviceId>();
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
index d2b43b3..e0cfe52 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
@@ -37,7 +37,6 @@
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.link.LinkService;
import org.slf4j.Logger;
@@ -49,18 +48,17 @@
public class PolicyGroupHandler extends DefaultGroupHandler {
private final Logger log = getLogger(getClass());
- private HashMap<GroupKey, GroupKey> dependentGroups =
- new HashMap<GroupKey, GroupKey>();
+ private HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier> dependentGroups =
+ new HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier>();
/**
- * Creates policy group handler object.
+ * Policy group handler constructor.
*
* @param deviceId device identifier
* @param appId application identifier
* @param config interface to retrieve the device properties
* @param linkService link service object
* @param groupService group service object
- * @return policy group handler type
*/
public PolicyGroupHandler(DeviceId deviceId,
ApplicationId appId,
@@ -175,9 +173,11 @@
tBuilder.setOutput(bucketId.outPort())
.setEthDst(deviceConfig.
getDeviceMac(neighbor))
- .setEthSrc(nodeMacAddr)
- .pushMpls()
+ .setEthSrc(nodeMacAddr);
+ if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
.setMpls(MplsLabel.mplsLabel(bucketId.label()));
+ }
//TODO: BoS
outBuckets.add(DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build()));
@@ -196,8 +196,7 @@
protected void handleGroupEvent(GroupEvent event) {
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
if (dependentGroups.get(event.subject().appCookie()) != null) {
- PolicyGroupIdentifier dependentGroupKey = (PolicyGroupIdentifier)
- dependentGroups.get(event.subject().appCookie());
+ PolicyGroupIdentifier dependentGroupKey = dependentGroups.get(event.subject().appCookie());
dependentGroups.remove(event.subject().appCookie());
boolean fullyResolved = true;
for (GroupBucketIdentifier bucketId:
@@ -217,8 +216,11 @@
dependentGroupKey.bucketIds()) {
TrafficTreatment.Builder tBuilder =
DefaultTrafficTreatment.builder();
- tBuilder.pushMpls()
- .setMpls(MplsLabel.mplsLabel(bucketId.label()));
+ if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .setMpls(MplsLabel.
+ mplsLabel(bucketId.label()));
+ }
//TODO: BoS
if (bucketId.type() == BucketOutputType.PORT) {
DeviceId neighbor = portDeviceMap.
@@ -230,12 +232,14 @@
} else {
if (groupService.
getGroup(deviceId,
- bucketId.outGroup()) == null) {
+ getGroupKey(bucketId.
+ outGroup())) == null) {
throw new IllegalStateException();
}
GroupId indirectGroupId = groupService.
getGroup(deviceId,
- bucketId.outGroup()).id();
+ getGroupKey(bucketId.
+ outGroup())).id();
tBuilder.group(indirectGroupId);
}
outBuckets.add(DefaultGroupBucket.
@@ -251,7 +255,7 @@
}
}
- public GroupKey generatePolicyGroupKey(String id,
+ public PolicyGroupIdentifier generatePolicyGroupKey(String id,
List<PolicyGroupParams> params) {
List<GroupBucketIdentifier> bucketIds = new ArrayList<GroupBucketIdentifier>();
for (PolicyGroupParams param: params) {
@@ -320,25 +324,28 @@
return innermostGroupkey;
}
- public void removeGroupChain(GroupKey key) {
+ public void removeGroupChain(PolicyGroupIdentifier key) {
if (!(key instanceof PolicyGroupIdentifier)) {
throw new IllegalArgumentException();
}
- List<GroupKey> groupsToBeDeleted = new ArrayList<GroupKey>();
+ List<PolicyGroupIdentifier> groupsToBeDeleted =
+ new ArrayList<PolicyGroupIdentifier>();
groupsToBeDeleted.add(key);
- Iterator<GroupKey> it = groupsToBeDeleted.iterator();
+ Iterator<PolicyGroupIdentifier> it =
+ groupsToBeDeleted.iterator();
while (it.hasNext()) {
- PolicyGroupIdentifier innerMostGroupKey =
- (PolicyGroupIdentifier) it.next();
+ PolicyGroupIdentifier innerMostGroupKey = it.next();
for (GroupBucketIdentifier bucketId:
innerMostGroupKey.bucketIds()) {
if (bucketId.type() != BucketOutputType.GROUP) {
groupsToBeDeleted.add(bucketId.outGroup());
}
}
- groupService.removeGroup(deviceId, innerMostGroupKey, appId);
+ groupService.removeGroup(deviceId,
+ getGroupKey(innerMostGroupKey),
+ appId);
it.remove();
}
}
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
index d213e0c..3c19b29 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
@@ -17,14 +17,12 @@
import java.util.List;
-import org.onosproject.net.group.GroupKey;
-
/**
* Representation of policy based group identifiers.
* Opaque to group handler applications and only the outermost
* policy group identifier in a chain is visible to the applications.
*/
-public class PolicyGroupIdentifier implements GroupKey {
+public class PolicyGroupIdentifier {
private String id;
private List<PolicyGroupParams> inputParams;
private List<GroupBucketIdentifier> bucketIds;
diff --git a/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java b/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java
index 9248625..c52f771 100644
--- a/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java
+++ b/core/api/src/main/java/org/onosproject/net/group/DefaultGroup.java
@@ -16,13 +16,11 @@
package org.onosproject.net.group;
import static com.google.common.base.MoreObjects.toStringHelper;
-import static org.slf4j.LoggerFactory.getLogger;
import java.util.Objects;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
-import org.slf4j.Logger;
/**
* ONOS implementation of default group that is stored in the system.
@@ -30,9 +28,8 @@
public class DefaultGroup extends DefaultGroupDescription
implements Group, StoredGroupEntry {
- private final Logger log = getLogger(getClass());
-
private GroupState state;
+ private boolean isGroupStateAddedFirstTime;
private long life;
private long packets;
private long bytes;
@@ -215,4 +212,14 @@
.add("state", state)
.toString();
}
+
+ @Override
+ public void setIsGroupStateAddedFirstTime(boolean isGroupStateAddedFirstTime) {
+ this.isGroupStateAddedFirstTime = isGroupStateAddedFirstTime;
+ }
+
+ @Override
+ public boolean isGroupStateAddedFirstTime() {
+ return isGroupStateAddedFirstTime;
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/net/group/DefaultGroupDescription.java b/core/api/src/main/java/org/onosproject/net/group/DefaultGroupDescription.java
index 385a5d6..3ffb2c2 100644
--- a/core/api/src/main/java/org/onosproject/net/group/DefaultGroupDescription.java
+++ b/core/api/src/main/java/org/onosproject/net/group/DefaultGroupDescription.java
@@ -41,7 +41,8 @@
* @param deviceId device identifier
* @param type type of the group
* @param buckets immutable list of group bucket
- * @param appCookie immutable application cookie to be associated with the group
+ * @param appCookie immutable application cookie of type DefaultGroupKey
+ * to be associated with the group
* @param appId application id
*/
public DefaultGroupDescription(DeviceId deviceId,
diff --git a/core/api/src/main/java/org/onosproject/net/group/DefaultGroupKey.java b/core/api/src/main/java/org/onosproject/net/group/DefaultGroupKey.java
new file mode 100644
index 0000000..7f00ae7
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/group/DefaultGroupKey.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.group;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Arrays;
+
+/**
+ * Default implementation of group key interface.
+ */
+public class DefaultGroupKey implements GroupKey {
+
+ private final byte[] key;
+
+ public DefaultGroupKey(byte[] key) {
+ this.key = checkNotNull(key);
+ }
+
+ @Override
+ public byte[] key() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DefaultGroupKey)) {
+ return false;
+ }
+ DefaultGroupKey that = (DefaultGroupKey) o;
+ return (Arrays.equals(this.key, that.key));
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(this.key);
+ }
+
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/net/group/GroupKey.java b/core/api/src/main/java/org/onosproject/net/group/GroupKey.java
index da8d449..9c87b83 100644
--- a/core/api/src/main/java/org/onosproject/net/group/GroupKey.java
+++ b/core/api/src/main/java/org/onosproject/net/group/GroupKey.java
@@ -17,8 +17,15 @@
/**
* Representation of generalized Key that would be used to store
- * groups in < Key, Value > store. Implementation of this interface
- * MUST override "equals()" and "hashcode()" methods.
+ * groups in < Key, Value > store. This key uses a generic
+ * byte array so that applications can associate their groups with
+ * any of their data by translating it into a byte array.
*/
public interface GroupKey {
+ /**
+ * Returns the byte representation of key.
+ *
+ * @return byte array
+ */
+ public byte[] key();
}
diff --git a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
index e7bf4f8..7ccab8a 100644
--- a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
+++ b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.net.group;
+import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
@@ -60,6 +61,15 @@
Group getGroup(DeviceId deviceId, GroupKey appCookie);
/**
+ * Returns the stored group entry for an id.
+ *
+ * @param deviceId the device ID
+ * @param groupId the group identifier
+ * @return a group associated with the key
+ */
+ Group getGroup(DeviceId deviceId, GroupId groupId);
+
+ /**
* Stores a new group entry using the information from group description.
*
* @param groupDesc group description to be used to store group entry
diff --git a/core/api/src/main/java/org/onosproject/net/group/StoredGroupEntry.java b/core/api/src/main/java/org/onosproject/net/group/StoredGroupEntry.java
index 297663f..47d3612 100644
--- a/core/api/src/main/java/org/onosproject/net/group/StoredGroupEntry.java
+++ b/core/api/src/main/java/org/onosproject/net/group/StoredGroupEntry.java
@@ -29,6 +29,23 @@
void setState(Group.GroupState newState);
/**
+ * Sets if group has transitioned to ADDED state for the first time.
+ * This is to differentiate state transitions "from PENDING_ADD to ADDED"
+ * and "from PENDING_UPDATE to ADDED". For internal use only.
+ *
+ * @param isGroupAddedFirstTime true if group moves to ADDED state
+ * for the first time.
+ */
+ void setIsGroupStateAddedFirstTime(boolean isGroupAddedFirstTime);
+
+ /**
+ * Returns the isGroupStateAddedFirstTime value. For internal use only.
+ *
+ * @return isGroupStateAddedFirstTime value
+ */
+ boolean isGroupStateAddedFirstTime();
+
+ /**
* Sets how long this entry has been entered in the system.
*
* @param life epoch time
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
index 2d8f81c..6601cbf 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
@@ -15,7 +15,13 @@
*/
package org.onosproject.net.group.impl;
-import com.google.common.collect.Sets;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -48,12 +54,7 @@
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.slf4j.LoggerFactory.getLogger;
+import com.google.common.collect.Sets;
/**
* Provides implementation of the group service APIs.
@@ -103,6 +104,7 @@
*/
@Override
public void addGroup(GroupDescription groupDesc) {
+ log.trace("In addGroup API");
store.storeGroupDescription(groupDesc);
}
@@ -121,6 +123,7 @@
*/
@Override
public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
+ log.trace("In getGroup API");
return store.getGroup(deviceId, appCookie);
}
@@ -142,6 +145,7 @@
GroupBuckets buckets,
GroupKey newCookie,
ApplicationId appId) {
+ log.trace("In addBucketsToGroup API");
store.updateGroupDescription(deviceId,
oldCookie,
UpdateType.ADD,
@@ -167,6 +171,7 @@
GroupBuckets buckets,
GroupKey newCookie,
ApplicationId appId) {
+ log.trace("In removeBucketsFromGroup API");
store.updateGroupDescription(deviceId,
oldCookie,
UpdateType.REMOVE,
@@ -188,6 +193,7 @@
public void removeGroup(DeviceId deviceId,
GroupKey appCookie,
ApplicationId appId) {
+ log.trace("In removeGroup API");
store.deleteGroupDescription(deviceId, appCookie);
}
@@ -202,11 +208,13 @@
@Override
public Iterable<Group> getGroups(DeviceId deviceId,
ApplicationId appId) {
+ log.trace("In getGroups API");
return store.getGroups(deviceId);
}
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
+ log.trace("In getGroups API");
return store.getGroups(deviceId);
}
@@ -217,6 +225,7 @@
*/
@Override
public void addListener(GroupListener listener) {
+ log.trace("In addListener API");
listenerRegistry.addListener(listener);
}
@@ -227,6 +236,7 @@
*/
@Override
public void removeListener(GroupListener listener) {
+ log.trace("In removeListener API");
listenerRegistry.removeListener(listener);
}
@@ -364,36 +374,52 @@
Set<Group> extraneousStoredEntries =
Sets.newHashSet(store.getExtraneousGroups(deviceId));
- log.trace("Displaying all southboundGroupEntries for device {}", deviceId);
+ log.trace("Displaying all ({}) southboundGroupEntries for device {}",
+ southboundGroupEntries.size(),
+ deviceId);
for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
Group group = it.next();
log.trace("Group {} in device {}", group, deviceId);
}
- log.trace("Displaying all stored group entries for device {}", deviceId);
- for (Iterator<Group> it = storedGroupEntries.iterator(); it.hasNext();) {
- Group group = it.next();
+ log.trace("Displaying all ({}) stored group entries for device {}",
+ storedGroupEntries.size(),
+ deviceId);
+ for (Iterator<Group> it1 = storedGroupEntries.iterator(); it1.hasNext();) {
+ Group group = it1.next();
log.trace("Stored Group {} for device {}", group, deviceId);
}
- for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
- Group group = it.next();
+ for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
+ Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
log.trace("Group AUDIT: group {} exists "
+ "in both planes for device {}",
group.id(), deviceId);
groupAdded(group);
- it.remove();
+ it2.remove();
}
}
for (Group group : southboundGroupEntries) {
- // there are groups in the switch that aren't in the store
- log.trace("Group AUDIT: extraneous group {} exists "
- + "in data plane for device {}",
- group.id(), deviceId);
- extraneousStoredEntries.remove(group);
- extraneousGroup(group);
+ if (store.getGroup(group.deviceId(), group.id()) != null) {
+ // There is a group existing with the same id
+ // It is possible that group update is
+ // in progress while we got a stale info from switch
+ if (!storedGroupEntries.remove(store.getGroup(
+ group.deviceId(), group.id()))) {
+ log.warn("Group AUDIT: Inconsistent state:"
+ + "Group exists in ID based table while "
+ + "not present in key based table");
+ }
+ } else {
+ // there are groups in the switch that aren't in the store
+ log.trace("Group AUDIT: extraneous group {} exists "
+ + "in data plane for device {}",
+ group.id(), deviceId);
+ extraneousStoredEntries.remove(group);
+ extraneousGroup(group);
+ }
}
for (Group group : storedGroupEntries) {
// there are groups in the store that aren't in the switch
diff --git a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
index 2fe5e4b..fa3e987 100644
--- a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
@@ -15,6 +15,12 @@
*/
package org.onosproject.net.group.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -38,6 +44,7 @@
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
@@ -58,8 +65,6 @@
import com.google.common.collect.Iterables;
-import static org.junit.Assert.*;
-
/**
* Test codifying the group service & group provider service contracts.
*/
@@ -108,31 +113,6 @@
mgr.eventDispatcher = null;
}
- private class TestGroupKey implements GroupKey {
- private String groupId;
-
- public TestGroupKey(String id) {
- this.groupId = id;
- }
-
- public String id() {
- return this.groupId;
- }
-
- @Override
- public int hashCode() {
- return groupId.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof TestGroupKey) {
- return this.groupId.equals(((TestGroupKey) obj).id());
- }
- return false;
- }
- }
-
/**
* Tests group service north bound and south bound interfaces.
* The following operations are tested:
@@ -177,7 +157,7 @@
PortNumber.portNumber(32)};
PortNumber[] ports2 = {PortNumber.portNumber(41),
PortNumber.portNumber(42)};
- TestGroupKey key = new TestGroupKey("group1BeforeAudit");
+ GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
List<PortNumber> outPorts = new ArrayList<PortNumber>();
outPorts.addAll(Arrays.asList(ports1));
@@ -224,7 +204,7 @@
providerService.pushGroupMetrics(DID, groupEntries);
// First group metrics would trigger the device audit completion
// post which all pending group requests are also executed.
- TestGroupKey key = new TestGroupKey("group1BeforeAudit");
+ GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, key);
int createdGroupId = createdGroup.id().id();
assertNotEquals(gId1.id(), createdGroupId);
@@ -256,7 +236,7 @@
0);
List<Group> groupEntries = Arrays.asList(group1, group2);
providerService.pushGroupMetrics(DID, groupEntries);
- TestGroupKey key = new TestGroupKey("group1BeforeAudit");
+ GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, key);
List<GroupOperation> expectedGroupOps = Arrays.asList(
GroupOperation.createDeleteGroupOperation(gId1,
@@ -271,7 +251,7 @@
// Test AUDIT with confirmed groups
private void testAuditWithConfirmedGroups() {
- TestGroupKey key = new TestGroupKey("group1BeforeAudit");
+ GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, key);
createdGroup = new DefaultGroup(createdGroup.id(),
DID,
@@ -284,9 +264,9 @@
// Test group add bucket operations
private void testAddBuckets() {
- TestGroupKey addKey = new TestGroupKey("group1AddBuckets");
+ GroupKey addKey = new DefaultGroupKey("group1AddBuckets".getBytes());
- TestGroupKey prevKey = new TestGroupKey("group1BeforeAudit");
+ GroupKey prevKey = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, prevKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(createdGroup.buckets().buckets());
@@ -328,9 +308,9 @@
// Test group remove bucket operations
private void testRemoveBuckets() {
- TestGroupKey removeKey = new TestGroupKey("group1RemoveBuckets");
+ GroupKey removeKey = new DefaultGroupKey("group1RemoveBuckets".getBytes());
- TestGroupKey prevKey = new TestGroupKey("group1AddBuckets");
+ GroupKey prevKey = new DefaultGroupKey("group1AddBuckets".getBytes());
Group createdGroup = groupService.getGroup(DID, prevKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(createdGroup.buckets().buckets());
@@ -372,7 +352,7 @@
// Test group remove operations
private void testRemoveGroup() {
- TestGroupKey currKey = new TestGroupKey("group1RemoveBuckets");
+ GroupKey currKey = new DefaultGroupKey("group1RemoveBuckets".getBytes());
Group existingGroup = groupService.getGroup(DID, currKey);
groupService.removeGroup(DID, currKey, appId);
List<GroupOperation> expectedGroupOps = Arrays.asList(
@@ -397,7 +377,7 @@
PortNumber[] ports2 = {PortNumber.portNumber(41),
PortNumber.portNumber(42)};
// Test Group creation before AUDIT process
- TestGroupKey key = new TestGroupKey("group1BeforeAudit");
+ GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
List<PortNumber> outPorts = new ArrayList<PortNumber>();
outPorts.addAll(Arrays.asList(ports1));
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index c914784..a571b4e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -16,25 +16,48 @@
package org.onosproject.store.group.impl;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L0ModificationInstruction;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flow.instructions.L3ModificationInstruction;
import org.onosproject.net.group.DefaultGroup;
+import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.Group.GroupState;
import org.onosproject.net.group.GroupBucket;
@@ -48,10 +71,21 @@
import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.ecmap.EventuallyConsistentMap;
+import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
+import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
+import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.ClockService;
+import org.onosproject.store.impl.MultiValuedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
-import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
/**
* Manages inventory of group entries using trivial in-memory implementation.
@@ -67,85 +101,165 @@
private final int dummyId = 0xffffffff;
private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
- // inner Map is per device group table
- private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
- groupEntriesByKey = new ConcurrentHashMap<>();
- private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
- groupEntriesById = new ConcurrentHashMap<>();
- private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
- pendingGroupEntriesByKey = new ConcurrentHashMap<>();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ // Per device group table with (device id + app cookie) as key
+ private EventuallyConsistentMap<GroupStoreKeyMapKey,
+ StoredGroupEntry> groupStoreEntriesByKey = null;
+ // Per device group table with (device id + group id) as key
+ private EventuallyConsistentMap<GroupStoreIdMapKey,
+ StoredGroupEntry> groupStoreEntriesById = null;
+ private EventuallyConsistentMap<GroupStoreKeyMapKey,
+ StoredGroupEntry> auditPendingReqQueue = null;
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
extraneousGroupEntriesById = new ConcurrentHashMap<>();
+ private ExecutorService messageHandlingExecutor;
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
private final HashMap<DeviceId, Boolean> deviceAuditStatus =
new HashMap<DeviceId, Boolean>();
private final AtomicInteger groupIdGen = new AtomicInteger();
+ private KryoNamespace.Builder kryoBuilder = null;
+
@Activate
public void activate() {
+ kryoBuilder = new KryoNamespace.Builder()
+ .register(DefaultGroup.class,
+ DefaultGroupBucket.class,
+ DefaultGroupDescription.class,
+ DefaultGroupKey.class,
+ GroupDescription.Type.class,
+ Group.GroupState.class,
+ GroupBuckets.class,
+ DefaultGroupId.class,
+ GroupStoreMessage.class,
+ GroupStoreMessage.Type.class,
+ UpdateType.class,
+ GroupStoreMessageSubjects.class,
+ MultiValuedTimestamp.class,
+ GroupStoreKeyMapKey.class,
+ GroupStoreIdMapKey.class,
+ GroupStoreMapKey.class
+ )
+ .register(URI.class)
+ .register(DeviceId.class)
+ .register(PortNumber.class)
+ .register(DefaultApplicationId.class)
+ .register(DefaultTrafficTreatment.class,
+ Instructions.DropInstruction.class,
+ Instructions.OutputInstruction.class,
+ Instructions.GroupInstruction.class,
+ Instructions.TableTypeTransition.class,
+ FlowRule.Type.class,
+ L0ModificationInstruction.class,
+ L0ModificationInstruction.L0SubType.class,
+ L0ModificationInstruction.ModLambdaInstruction.class,
+ L2ModificationInstruction.class,
+ L2ModificationInstruction.L2SubType.class,
+ L2ModificationInstruction.ModEtherInstruction.class,
+ L2ModificationInstruction.PushHeaderInstructions.class,
+ L2ModificationInstruction.ModVlanIdInstruction.class,
+ L2ModificationInstruction.ModVlanPcpInstruction.class,
+ L2ModificationInstruction.ModMplsLabelInstruction.class,
+ L2ModificationInstruction.ModMplsTtlInstruction.class,
+ L3ModificationInstruction.class,
+ L3ModificationInstruction.L3SubType.class,
+ L3ModificationInstruction.ModIPInstruction.class,
+ L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
+ L3ModificationInstruction.ModTtlInstruction.class,
+ org.onlab.packet.MplsLabel.class
+ )
+ .register(org.onosproject.cluster.NodeId.class)
+ .register(KryoNamespaces.BASIC)
+ .register(KryoNamespaces.MISC);
+
+ messageHandlingExecutor = Executors.
+ newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/group",
+ "message-handlers"));
+ clusterCommunicator.
+ addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ new ClusterGroupMsgHandler(),
+ messageHandlingExecutor);
+
+ log.debug("Creating EC map groupstorekeymap");
+ groupStoreEntriesByKey =
+ new EventuallyConsistentMapImpl<>("groupstorekeymap",
+ clusterService,
+ clusterCommunicator,
+ kryoBuilder,
+ new GroupStoreLogicalClockManager<>());
+ log.trace("Current size {}", groupStoreEntriesByKey.size());
+
+ log.debug("Creating EC map groupstoreidmap");
+ groupStoreEntriesById =
+ new EventuallyConsistentMapImpl<>("groupstoreidmap",
+ clusterService,
+ clusterCommunicator,
+ kryoBuilder,
+ new GroupStoreLogicalClockManager<>());
+ groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
+ log.trace("Current size {}", groupStoreEntriesById.size());
+
+ log.debug("Creating EC map pendinggroupkeymap");
+ auditPendingReqQueue =
+ new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
+ clusterService,
+ clusterCommunicator,
+ kryoBuilder,
+ new GroupStoreLogicalClockManager<>());
+ log.trace("Current size {}", auditPendingReqQueue.size());
+
log.info("Started");
}
@Deactivate
public void deactivate() {
- groupEntriesByKey.clear();
- groupEntriesById.clear();
log.info("Stopped");
}
- private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
- lazyEmptyGroupKeyTable() {
- return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
- }
-
- private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
- lazyEmptyGroupIdTable() {
- return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
- }
-
- private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
- lazyEmptyPendingGroupKeyTable() {
- return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
- }
-
private static NewConcurrentHashMap<GroupId, Group>
- lazyEmptyExtraneousGroupIdTable() {
+ lazyEmptyExtraneousGroupIdTable() {
return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
}
/**
- * Returns the group key table for specified device.
+ * Returns the group store eventual consistent key map.
*
- * @param deviceId identifier of the device
- * @return Map representing group key table of given device.
+ * @return Map representing group key table.
*/
- private ConcurrentMap<GroupKey, StoredGroupEntry> getGroupKeyTable(DeviceId deviceId) {
- return createIfAbsentUnchecked(groupEntriesByKey,
- deviceId, lazyEmptyGroupKeyTable());
+ private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+ getGroupStoreKeyMap() {
+ return groupStoreEntriesByKey;
}
/**
- * Returns the group id table for specified device.
+ * Returns the group store eventual consistent id map.
*
- * @param deviceId identifier of the device
- * @return Map representing group key table of given device.
+ * @return Map representing group id table.
*/
- private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
- return createIfAbsentUnchecked(groupEntriesById,
- deviceId, lazyEmptyGroupIdTable());
+ private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
+ getGroupStoreIdMap() {
+ return groupStoreEntriesById;
}
/**
- * Returns the pending group key table for specified device.
+ * Returns the pending group request table.
*
- * @param deviceId identifier of the device
- * @return Map representing group key table of given device.
+ * @return Map representing group key table.
*/
- private ConcurrentMap<GroupKey, StoredGroupEntry>
- getPendingGroupKeyTable(DeviceId deviceId) {
- return createIfAbsentUnchecked(pendingGroupEntriesByKey,
- deviceId, lazyEmptyPendingGroupKeyTable());
+ private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
+ getPendingGroupKeyTable() {
+ return auditPendingReqQueue;
}
/**
@@ -168,8 +282,8 @@
*/
@Override
public int getGroupCount(DeviceId deviceId) {
- return (groupEntriesByKey.get(deviceId) != null) ?
- groupEntriesByKey.get(deviceId).size() : 0;
+ return (getGroups(deviceId) != null) ?
+ Iterables.size(getGroups(deviceId)) : 0;
}
/**
@@ -182,16 +296,11 @@
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
- return FluentIterable.from(getGroupKeyTable(deviceId).values())
- .transform(
- new Function<StoredGroupEntry, Group>() {
-
- @Override
- public Group apply(
- StoredGroupEntry input) {
- return input;
- }
- });
+ log.trace("getGroups: for device {} total number of groups {}",
+ deviceId, getGroupStoreKeyMap().values().size());
+ return FluentIterable.from(getGroupStoreKeyMap().values())
+ .filter(input -> input.deviceId().equals(deviceId))
+ .transform(input -> input);
}
/**
@@ -204,19 +313,31 @@
*/
@Override
public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
- return (groupEntriesByKey.get(deviceId) != null) ?
- groupEntriesByKey.get(deviceId).get(appCookie) :
- null;
+ return getStoredGroupEntry(deviceId, appCookie);
+ }
+
+ private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
+ GroupKey appCookie) {
+ return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
+ appCookie));
+ }
+
+ @Override
+ public Group getGroup(DeviceId deviceId, GroupId groupId) {
+ return getStoredGroupEntry(deviceId, groupId);
+ }
+
+ private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
+ GroupId groupId) {
+ return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
+ groupId));
}
private int getFreeGroupIdValue(DeviceId deviceId) {
int freeId = groupIdGen.incrementAndGet();
while (true) {
- Group existing = (
- groupEntriesById.get(deviceId) != null) ?
- groupEntriesById.get(deviceId).get(new DefaultGroupId(freeId)) :
- null;
+ Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
if (existing == null) {
existing = (
extraneousGroupEntriesById.get(deviceId) != null) ?
@@ -240,23 +361,45 @@
*/
@Override
public void storeGroupDescription(GroupDescription groupDesc) {
+ log.trace("In storeGroupDescription");
// Check if a group is existing with the same key
if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
+ log.warn("Group already exists with the same key {}",
+ groupDesc.appCookie());
return;
}
- if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
- // Device group audit has not completed yet
- // Add this group description to pending group key table
- // Create a group entry object with Dummy Group ID
- StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
- group.setState(GroupState.WAITING_AUDIT_COMPLETE);
- ConcurrentMap<GroupKey, StoredGroupEntry> pendingKeyTable =
- getPendingGroupKeyTable(groupDesc.deviceId());
- pendingKeyTable.put(groupDesc.appCookie(), group);
+ // Check if group to be created by a remote instance
+ if (mastershipService.getLocalRole(
+ groupDesc.deviceId()) != MastershipRole.MASTER) {
+ log.debug("Device {} local role is not MASTER",
+ groupDesc.deviceId());
+ GroupStoreMessage groupOp = GroupStoreMessage.
+ createGroupAddRequestMsg(groupDesc.deviceId(),
+ groupDesc);
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GroupStoreMessageSubjects.
+ REMOTE_GROUP_OP_REQUEST,
+ kryoBuilder.build().serialize(groupOp));
+ if (!clusterCommunicator.unicast(message,
+ mastershipService.
+ getMasterFor(
+ groupDesc.deviceId()))) {
+ log.warn("Failed to send request to master: {} to {}",
+ message,
+ mastershipService.getMasterFor(groupDesc.deviceId()));
+ //TODO: Send Group operation failure event
+ }
+ log.debug("Sent Group operation request for device {} "
+ + "to remote MASTER {}",
+ groupDesc.deviceId(),
+ mastershipService.getMasterFor(groupDesc.deviceId()));
return;
}
+ log.debug("Store group for device {} is getting handled locally",
+ groupDesc.deviceId());
storeGroupDescriptionInternal(groupDesc);
}
@@ -266,17 +409,34 @@
return;
}
+ if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
+ // Device group audit has not completed yet
+ // Add this group description to pending group key table
+ // Create a group entry object with Dummy Group ID
+ log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
+ + "pending...Queuing Group ADD request",
+ groupDesc.deviceId());
+ StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
+ group.setState(GroupState.WAITING_AUDIT_COMPLETE);
+ EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
+ getPendingGroupKeyTable();
+ pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()),
+ group);
+ return;
+ }
+
// Get a new group identifier
GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
// Create a group entry object
StoredGroupEntry group = new DefaultGroup(id, groupDesc);
- // Insert the newly created group entry into concurrent key and id maps
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(groupDesc.deviceId());
- keyTable.put(groupDesc.appCookie(), group);
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(groupDesc.deviceId());
- idTable.put(id, group);
+ // Insert the newly created group entry into key and id maps
+ getGroupStoreKeyMap().
+ put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
+ groupDesc.appCookie()), group);
+ getGroupStoreIdMap().
+ put(new GroupStoreIdMapKey(groupDesc.deviceId(),
+ id), group);
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
}
@@ -297,6 +457,42 @@
UpdateType type,
GroupBuckets newBuckets,
GroupKey newAppCookie) {
+ // Check if group update to be done by a remote instance
+ if (mastershipService.
+ getLocalRole(deviceId) != MastershipRole.MASTER) {
+ GroupStoreMessage groupOp = GroupStoreMessage.
+ createGroupUpdateRequestMsg(deviceId,
+ oldAppCookie,
+ type,
+ newBuckets,
+ newAppCookie);
+ ClusterMessage message =
+ new ClusterMessage(clusterService.getLocalNode().id(),
+ GroupStoreMessageSubjects.
+ REMOTE_GROUP_OP_REQUEST,
+ kryoBuilder.build().serialize(groupOp));
+ if (!clusterCommunicator.unicast(message,
+ mastershipService.
+ getMasterFor(deviceId))) {
+ log.warn("Failed to send request to master: {} to {}",
+ message,
+ mastershipService.getMasterFor(deviceId));
+ //TODO: Send Group operation failure event
+ }
+ return;
+ }
+ updateGroupDescriptionInternal(deviceId,
+ oldAppCookie,
+ type,
+ newBuckets,
+ newAppCookie);
+ }
+
+ private void updateGroupDescriptionInternal(DeviceId deviceId,
+ GroupKey oldAppCookie,
+ UpdateType type,
+ GroupBuckets newBuckets,
+ GroupKey newAppCookie) {
// Check if a group is existing with the provided key
Group oldGroup = getGroup(deviceId, oldAppCookie);
if (oldGroup == null) {
@@ -323,14 +519,17 @@
newGroup.setPackets(oldGroup.packets());
newGroup.setBytes(oldGroup.bytes());
// Remove the old entry from maps and add new entry using new key
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(oldGroup.deviceId());
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(oldGroup.deviceId());
- keyTable.remove(oldGroup.appCookie());
- idTable.remove(oldGroup.id());
- keyTable.put(newGroup.appCookie(), newGroup);
- idTable.put(newGroup.id(), newGroup);
+ getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
+ oldGroup.appCookie()));
+ getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
+ oldGroup.id()));
+ getGroupStoreKeyMap().
+ put(new GroupStoreKeyMapKey(newGroup.deviceId(),
+ newGroup.appCookie()), newGroup);
+ getGroupStoreIdMap().
+ put(new GroupStoreIdMapKey(newGroup.deviceId(),
+ newGroup.id()), newGroup);
+
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
}
}
@@ -379,10 +578,34 @@
@Override
public void deleteGroupDescription(DeviceId deviceId,
GroupKey appCookie) {
+ // Check if group to be deleted by a remote instance
+ if (mastershipService.
+ getLocalRole(deviceId) != MastershipRole.MASTER) {
+ GroupStoreMessage groupOp = GroupStoreMessage.
+ createGroupDeleteRequestMsg(deviceId,
+ appCookie);
+ ClusterMessage message =
+ new ClusterMessage(clusterService.getLocalNode().id(),
+ GroupStoreMessageSubjects.
+ REMOTE_GROUP_OP_REQUEST,
+ kryoBuilder.build().serialize(groupOp));
+ if (!clusterCommunicator.unicast(message,
+ mastershipService.
+ getMasterFor(deviceId))) {
+ log.warn("Failed to send request to master: {} to {}",
+ message,
+ mastershipService.getMasterFor(deviceId));
+ //TODO: Send Group operation failure event
+ }
+ return;
+ }
+ deleteGroupDescriptionInternal(deviceId, appCookie);
+ }
+
+ private void deleteGroupDescriptionInternal(DeviceId deviceId,
+ GroupKey appCookie) {
// Check if a group is existing with the provided key
- StoredGroupEntry existing = (groupEntriesByKey.get(deviceId) != null) ?
- groupEntriesByKey.get(deviceId).get(appCookie) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
if (existing == null) {
return;
}
@@ -401,26 +624,35 @@
@Override
public void addOrUpdateGroupEntry(Group group) {
// check if this new entry is an update to an existing entry
- StoredGroupEntry existing = (groupEntriesById.get(
- group.deviceId()) != null) ?
- groupEntriesById.get(group.deviceId()).get(group.id()) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
+ group.id());
GroupEvent event = null;
if (existing != null) {
+ log.trace("addOrUpdateGroupEntry: updating group "
+ + "entry {} in device {}",
+ group.id(),
+ group.deviceId());
synchronized (existing) {
existing.setLife(group.life());
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
if (existing.state() == GroupState.PENDING_ADD) {
existing.setState(GroupState.ADDED);
+ existing.setIsGroupStateAddedFirstTime(true);
event = new GroupEvent(Type.GROUP_ADDED, existing);
} else {
- if (existing.state() == GroupState.PENDING_UPDATE) {
- existing.setState(GroupState.PENDING_UPDATE);
- }
+ existing.setState(GroupState.ADDED);
+ existing.setIsGroupStateAddedFirstTime(false);
event = new GroupEvent(Type.GROUP_UPDATED, existing);
}
+ //Re-PUT map entries to trigger map update events
+ getGroupStoreKeyMap().
+ put(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()), existing);
+ getGroupStoreIdMap().
+ put(new GroupStoreIdMapKey(existing.deviceId(),
+ existing.id()), existing);
}
}
@@ -436,18 +668,18 @@
*/
@Override
public void removeGroupEntry(Group group) {
- StoredGroupEntry existing = (groupEntriesById.get(
- group.deviceId()) != null) ?
- groupEntriesById.get(group.deviceId()).get(group.id()) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
+ group.id());
if (existing != null) {
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(existing.deviceId());
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(existing.deviceId());
- idTable.remove(existing.id());
- keyTable.remove(existing.appCookie());
+ log.trace("removeGroupEntry: removing group "
+ + "entry {} in device {}",
+ group.id(),
+ group.deviceId());
+ getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()));
+ getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
+ existing.id()));
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
}
}
@@ -461,9 +693,17 @@
+ "completed for device {}", deviceId);
deviceAuditStatus.put(deviceId, true);
// Execute all pending group requests
- ConcurrentMap<GroupKey, StoredGroupEntry> pendingGroupRequests =
- getPendingGroupKeyTable(deviceId);
- for (Group group:pendingGroupRequests.values()) {
+ List<StoredGroupEntry> pendingGroupRequests =
+ getPendingGroupKeyTable().values()
+ .stream()
+ .filter(g-> g.deviceId().equals(deviceId))
+ .collect(Collectors.toList());
+ log.trace("deviceInitialAuditCompleted: processing "
+ + "pending group add requests for device {} and "
+ + "number of pending requests {}",
+ deviceId,
+ pendingGroupRequests.size());
+ for (Group group:pendingGroupRequests) {
GroupDescription tmp = new DefaultGroupDescription(
group.deviceId(),
group.type(),
@@ -471,8 +711,9 @@
group.appCookie(),
group.appId());
storeGroupDescriptionInternal(tmp);
+ getPendingGroupKeyTable().
+ remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
}
- getPendingGroupKeyTable(deviceId).clear();
} else {
if (deviceAuditStatus.get(deviceId)) {
log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
@@ -494,10 +735,8 @@
@Override
public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
- StoredGroupEntry existing = (groupEntriesById.get(
- deviceId) != null) ?
- groupEntriesById.get(deviceId).get(operation.groupId()) :
- null;
+ StoredGroupEntry existing = getStoredGroupEntry(deviceId,
+ operation.groupId());
if (existing == null) {
log.warn("No group entry with ID {} found ", operation.groupId());
@@ -518,27 +757,37 @@
log.warn("Unknown group operation type {}", operation.opType());
}
- ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
- getGroupKeyTable(existing.deviceId());
- ConcurrentMap<GroupId, StoredGroupEntry> idTable =
- getGroupIdTable(existing.deviceId());
- idTable.remove(existing.id());
- keyTable.remove(existing.appCookie());
+ getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
+ existing.appCookie()));
+ getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
+ existing.id()));
}
@Override
public void addOrUpdateExtraneousGroupEntry(Group group) {
+ log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
+ + "group entry {} in device {}",
+ group.id(),
+ group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.put(group.id(), group);
// Check the reference counter
if (group.referenceCount() == 0) {
+ log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
+ + "counter is zero and triggering remove",
+ group.id(),
+ group.deviceId());
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
}
}
@Override
public void removeExtraneousGroupEntry(Group group) {
+ log.trace("removeExtraneousGroupEntry: remove extraneous "
+ + "group entry {} of device {} from store",
+ group.id(),
+ group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.remove(group.id());
@@ -551,5 +800,192 @@
getExtraneousGroupIdTable(deviceId).values());
}
+ /**
+ * ClockService that generates wallclock based timestamps.
+ */
+ private class GroupStoreLogicalClockManager<T, U>
+ implements ClockService<T, U> {
+ private final AtomicLong sequenceNumber = new AtomicLong(0);
+
+ @Override
+ public Timestamp getTimestamp(T t1, U u1) {
+ return new MultiValuedTimestamp<>(System.currentTimeMillis(),
+ sequenceNumber.getAndIncrement());
+ }
+ }
+
+ /**
+ * Map handler to receive any events when the group map is updated.
+ */
+ private class GroupStoreIdMapListener implements
+ EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
+
+ @Override
+ public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
+ StoredGroupEntry> mapEvent) {
+ GroupEvent groupEvent = null;
+ log.trace("GroupStoreIdMapListener: received groupid map event {}",
+ mapEvent.type());
+ if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
+ log.trace("GroupIdMapListener: Received PUT event");
+ if (mapEvent.value().state() == Group.GroupState.ADDED) {
+ if (mapEvent.value().isGroupStateAddedFirstTime()) {
+ groupEvent = new GroupEvent(Type.GROUP_ADDED,
+ mapEvent.value());
+ log.trace("GroupIdMapListener: Received first time "
+ + "GROUP_ADDED state update");
+ } else {
+ groupEvent = new GroupEvent(Type.GROUP_UPDATED,
+ mapEvent.value());
+ log.trace("GroupIdMapListener: Received following "
+ + "GROUP_ADDED state update");
+ }
+ }
+ } else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
+ log.trace("GroupIdMapListener: Received REMOVE event");
+ groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
+ }
+
+ if (groupEvent != null) {
+ notifyDelegate(groupEvent);
+ }
+ }
+ }
+ /**
+ * Message handler to receive messages from group subsystems of
+ * other cluster members.
+ */
+ private final class ClusterGroupMsgHandler
+ implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ log.trace("ClusterGroupMsgHandler: received remote group message");
+ if (message.subject() ==
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
+ GroupStoreMessage groupOp = kryoBuilder.
+ build().deserialize(message.payload());
+ log.trace("received remote group operation request");
+ if (!(mastershipService.
+ getLocalRole(groupOp.deviceId()) !=
+ MastershipRole.MASTER)) {
+ log.warn("ClusterGroupMsgHandler: This node is not "
+ + "MASTER for device {}", groupOp.deviceId());
+ return;
+ }
+ if (groupOp.type() == GroupStoreMessage.Type.ADD) {
+ log.trace("processing remote group "
+ + "add operation request");
+ storeGroupDescriptionInternal(groupOp.groupDesc());
+ } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
+ log.trace("processing remote group "
+ + "update operation request");
+ updateGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie(),
+ groupOp.updateType(),
+ groupOp.updateBuckets(),
+ groupOp.newAppCookie());
+ } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
+ log.trace("processing remote group "
+ + "delete operation request");
+ deleteGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie());
+ }
+ }
+ }
+ }
+
+ /**
+ * Flattened map key to be used to store group entries.
+ */
+ private class GroupStoreMapKey {
+ private final DeviceId deviceId;
+
+ public GroupStoreMapKey(DeviceId deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupStoreMapKey)) {
+ return false;
+ }
+ GroupStoreMapKey that = (GroupStoreMapKey) o;
+ return this.deviceId.equals(that.deviceId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+
+ result = 31 * result + Objects.hash(this.deviceId);
+
+ return result;
+ }
+ }
+
+ private class GroupStoreKeyMapKey extends GroupStoreMapKey {
+ private final GroupKey appCookie;
+ public GroupStoreKeyMapKey(DeviceId deviceId,
+ GroupKey appCookie) {
+ super(deviceId);
+ this.appCookie = appCookie;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupStoreKeyMapKey)) {
+ return false;
+ }
+ GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
+ return (super.equals(that) &&
+ this.appCookie.equals(that.appCookie));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+
+ result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
+
+ return result;
+ }
+ }
+
+ private class GroupStoreIdMapKey extends GroupStoreMapKey {
+ private final GroupId groupId;
+ public GroupStoreIdMapKey(DeviceId deviceId,
+ GroupId groupId) {
+ super(deviceId);
+ this.groupId = groupId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GroupStoreIdMapKey)) {
+ return false;
+ }
+ GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
+ return (super.equals(that) &&
+ this.groupId.equals(that.groupId));
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+
+ result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
+
+ return result;
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/GroupStoreMessage.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/GroupStoreMessage.java
new file mode 100644
index 0000000..b82754b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/GroupStoreMessage.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.group.impl;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupStore.UpdateType;
+
+/**
+ * Format of the Group store message that is used to
+ * communicate with the peer nodes in the cluster.
+ */
+public final class GroupStoreMessage {
+ private final DeviceId deviceId;
+ private final GroupKey appCookie;
+ private final GroupDescription groupDesc;
+ private final UpdateType updateType;
+ private final GroupBuckets updateBuckets;
+ private final GroupKey newAppCookie;
+ private final Type type;
+
+ /**
+ * Type of group store request.
+ */
+ public enum Type {
+ ADD,
+ UPDATE,
+ DELETE
+ }
+
+ private GroupStoreMessage(Type type,
+ DeviceId deviceId,
+ GroupKey appCookie,
+ GroupDescription groupDesc,
+ UpdateType updateType,
+ GroupBuckets updateBuckets,
+ GroupKey newAppCookie) {
+ this.type = type;
+ this.deviceId = deviceId;
+ this.appCookie = appCookie;
+ this.groupDesc = groupDesc;
+ this.updateType = updateType;
+ this.updateBuckets = updateBuckets;
+ this.newAppCookie = newAppCookie;
+ }
+
+ /**
+ * Creates a group store message for group ADD request.
+ *
+ * @param deviceId device identifier in which group to be added
+ * @param desc group creation parameters
+ * @return constructed group store message
+ */
+ public static GroupStoreMessage createGroupAddRequestMsg(DeviceId deviceId,
+ GroupDescription desc) {
+ return new GroupStoreMessage(Type.ADD,
+ deviceId,
+ null,
+ desc,
+ null,
+ null,
+ null);
+ }
+
+ /**
+ * Creates a group store message for group UPDATE request.
+ *
+ * @param deviceId the device ID
+ * @param appCookie the current group key
+ * @param updateType update (add or delete) type
+ * @param updateBuckets group buckets for updates
+ * @param newAppCookie optional new group key
+ * @return constructed group store message
+ */
+ public static GroupStoreMessage createGroupUpdateRequestMsg(DeviceId deviceId,
+ GroupKey appCookie,
+ UpdateType updateType,
+ GroupBuckets updateBuckets,
+ GroupKey newAppCookie) {
+ return new GroupStoreMessage(Type.UPDATE,
+ deviceId,
+ appCookie,
+ null,
+ updateType,
+ updateBuckets,
+ newAppCookie);
+ }
+
+ /**
+ * Creates a group store message for group DELETE request.
+ *
+ * @param deviceId the device ID
+ * @param appCookie the group key
+ * @return constructed group store message
+ */
+ public static GroupStoreMessage createGroupDeleteRequestMsg(DeviceId deviceId,
+ GroupKey appCookie) {
+ return new GroupStoreMessage(Type.DELETE,
+ deviceId,
+ appCookie,
+ null,
+ null,
+ null,
+ null);
+ }
+
+ /**
+ * Returns the device identifier of this group request.
+ *
+ * @return device identifier
+ */
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ /**
+ * Returns the application cookie associated with this group request.
+ *
+ * @return application cookie
+ */
+ public GroupKey appCookie() {
+ return appCookie;
+ }
+
+ /**
+ * Returns the group create parameters associated with this group request.
+ *
+ * @return group create parameters
+ */
+ public GroupDescription groupDesc() {
+ return groupDesc;
+ }
+
+ /**
+ * Returns the group buckets to be updated as part of this group request.
+ *
+ * @return group buckets to be updated
+ */
+ public GroupBuckets updateBuckets() {
+ return updateBuckets;
+ }
+
+ /**
+ * Returns the update group operation type.
+ *
+ * @return update operation type
+ */
+ public UpdateType updateType() {
+ return updateType;
+ }
+
+ /**
+ * Returns the new application cookie associated with this group operation.
+ *
+ * @return new application cookie
+ */
+ public GroupKey newAppCookie() {
+ return newAppCookie;
+ }
+
+ /**
+ * Returns the type of this group operation.
+ *
+ * @return group message type
+ */
+ public Type type() {
+ return type;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/GroupStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/GroupStoreMessageSubjects.java
new file mode 100644
index 0000000..dbee22c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/GroupStoreMessageSubjects.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.group.impl;
+
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by DistributedGroupRuleStore peer-peer communication.
+ */
+public final class GroupStoreMessageSubjects {
+ private GroupStoreMessageSubjects() {}
+
+ public static final MessageSubject REMOTE_GROUP_OP_REQUEST
+ = new MessageSubject("peer-forward-group-op-req");
+}
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleGroupStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleGroupStore.java
index e2adf8c..d35a5c0 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleGroupStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleGroupStore.java
@@ -209,6 +209,13 @@
null;
}
+ @Override
+ public Group getGroup(DeviceId deviceId, GroupId groupId) {
+ return (groupEntriesById.get(deviceId) != null) ?
+ groupEntriesById.get(deviceId).get(groupId) :
+ null;
+ }
+
private int getFreeGroupIdValue(DeviceId deviceId) {
int freeId = groupIdGen.incrementAndGet();
@@ -551,5 +558,4 @@
getExtraneousGroupIdTable(deviceId).values());
}
-
}
diff --git a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleGroupStoreTest.java b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleGroupStoreTest.java
index 712adcb..92ae5d7 100644
--- a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleGroupStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleGroupStoreTest.java
@@ -36,6 +36,7 @@
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
@@ -70,31 +71,6 @@
simpleGroupStore.deactivate();
}
- public class TestGroupKey implements GroupKey {
- private String groupId;
-
- public TestGroupKey(String id) {
- this.groupId = id;
- }
-
- public String id() {
- return this.groupId;
- }
-
- @Override
- public int hashCode() {
- return groupId.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof TestGroupKey) {
- return this.groupId.equals(((TestGroupKey) obj).id());
- }
- return false;
- }
- }
-
private class InternalGroupStoreDelegate
implements GroupStoreDelegate {
private GroupId createdGroupId = null;
@@ -173,20 +149,20 @@
simpleGroupStore.deviceInitialAuditCompleted(D1, true);
// Testing storeGroup operation
- TestGroupKey newKey = new TestGroupKey("group1");
+ GroupKey newKey = new DefaultGroupKey("group1".getBytes());
testStoreAndGetGroup(newKey);
// Testing addOrUpdateGroupEntry operation from southbound
- TestGroupKey currKey = newKey;
+ GroupKey currKey = newKey;
testAddGroupEntryFromSB(currKey);
// Testing updateGroupDescription for ADD operation from northbound
- newKey = new TestGroupKey("group1AddBuckets");
+ newKey = new DefaultGroupKey("group1AddBuckets".getBytes());
testAddBuckets(currKey, newKey);
// Testing updateGroupDescription for REMOVE operation from northbound
currKey = newKey;
- newKey = new TestGroupKey("group1RemoveBuckets");
+ newKey = new DefaultGroupKey("group1RemoveBuckets".getBytes());
testRemoveBuckets(currKey, newKey);
// Testing addOrUpdateGroupEntry operation from southbound
@@ -201,7 +177,7 @@
}
// Testing storeGroup operation
- private void testStoreAndGetGroup(TestGroupKey key) {
+ private void testStoreAndGetGroup(GroupKey key) {
PortNumber[] ports = {PortNumber.portNumber(31),
PortNumber.portNumber(32)};
List<PortNumber> outPorts = new ArrayList<PortNumber>();
@@ -252,7 +228,7 @@
}
// Testing addOrUpdateGroupEntry operation from southbound
- private void testAddGroupEntryFromSB(TestGroupKey currKey) {
+ private void testAddGroupEntryFromSB(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate addGroupEntryDelegate =
@@ -265,7 +241,7 @@
}
// Testing addOrUpdateGroupEntry operation from southbound
- private void testUpdateGroupEntryFromSB(TestGroupKey currKey) {
+ private void testUpdateGroupEntryFromSB(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate updateGroupEntryDelegate =
@@ -278,7 +254,7 @@
}
// Testing updateGroupDescription for ADD operation from northbound
- private void testAddBuckets(TestGroupKey currKey, TestGroupKey addKey) {
+ private void testAddBuckets(GroupKey currKey, GroupKey addKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(existingGroup.buckets().buckets());
@@ -316,7 +292,7 @@
}
// Testing updateGroupDescription for REMOVE operation from northbound
- private void testRemoveBuckets(TestGroupKey currKey, TestGroupKey removeKey) {
+ private void testRemoveBuckets(GroupKey currKey, GroupKey removeKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(existingGroup.buckets().buckets());
@@ -343,7 +319,7 @@
}
// Testing deleteGroupDescription operation from northbound
- private void testDeleteGroup(TestGroupKey currKey) {
+ private void testDeleteGroup(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate deleteGroupDescDelegate =
new InternalGroupStoreDelegate(currKey,
@@ -355,7 +331,7 @@
}
// Testing removeGroupEntry operation from southbound
- private void testRemoveGroupFromSB(TestGroupKey currKey) {
+ private void testRemoveGroupFromSB(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate removeGroupEntryDelegate =
new InternalGroupStoreDelegate(currKey,
@@ -380,7 +356,7 @@
ApplicationId appId =
new DefaultApplicationId(2, "org.groupstore.test");
- TestGroupKey key = new TestGroupKey("group1");
+ GroupKey key = new DefaultGroupKey("group1".getBytes());
PortNumber[] ports = {PortNumber.portNumber(31),
PortNumber.portNumber(32)};
List<PortNumber> outPorts = new ArrayList<PortNumber>();