Distributed group store using eventual consistent map abstraction
Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
index a17046f..b50681e 100644
--- a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
+++ b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
@@ -15,11 +15,14 @@
*/
package org.onosproject.bgprouter;
-import com.google.common.collect.ConcurrentHashMultiset;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -30,6 +33,7 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
import org.onosproject.config.NetworkConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -47,12 +51,12 @@
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.host.InterfaceIpAddress;
@@ -67,13 +71,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
/**
* BgpRouter component.
@@ -126,7 +128,7 @@
private final Map<IpAddress, NextHop> nextHops = Maps.newHashMap();
// Stores FIB updates that are waiting for groups to be set up
- private final Multimap<GroupKey, FibEntry> pendingUpdates = HashMultimap.create();
+ private final Multimap<NextHopGroupKey, FibEntry> pendingUpdates = HashMultimap.create();
// Device id of data-plane switch - should be learned from config
private DeviceId deviceId;
@@ -143,6 +145,11 @@
private InternalTableHandler provisionStaticTables = new InternalTableHandler();
+ private KryoNamespace.Builder appKryo = new KryoNamespace.Builder()
+ .register(IpAddress.Version.class)
+ .register(IpAddress.class)
+ .register(NextHopGroupKey.class);
+
@Activate
protected void activate() {
appId = coreService.registerApplication(BGP_ROUTER_APP);
@@ -210,7 +217,9 @@
Group group;
synchronized (pendingUpdates) {
NextHop nextHop = nextHops.get(entry.nextHopIp());
- group = groupService.getGroup(deviceId, nextHop.group());
+ group = groupService.getGroup(deviceId,
+ new DefaultGroupKey(
+ appKryo.build().serialize(nextHop.group())));
if (group == null) {
log.debug("Adding pending flow {}", update.entry());
@@ -309,7 +318,7 @@
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections
.singletonList(bucket)),
- groupKey,
+ new DefaultGroupKey(appKryo.build().serialize(groupKey)),
appId);
groupService.addGroup(groupDescription);
@@ -329,7 +338,10 @@
return null;
}
- Group group = groupService.getGroup(deviceId, nextHop.group());
+ Group group = groupService.getGroup(deviceId,
+ new DefaultGroupKey(appKryo.
+ build().
+ serialize(nextHop.group())));
// FIXME disabling group deletes for now until we verify the logic is OK
/*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
@@ -339,7 +351,9 @@
nextHops.remove(nextHopIp);
- groupService.removeGroup(deviceId, nextHop.group(), appId);
+ groupService.removeGroup(deviceId,
+ new DefaultGroupKey(appKryo.build().serialize(nextHop.group())),
+ appId);
}*/
return group;
@@ -699,8 +713,10 @@
event.type() == GroupEvent.Type.GROUP_UPDATED) {
synchronized (pendingUpdates) {
+ NextHopGroupKey nhGroupKey =
+ appKryo.build().deserialize(group.appCookie().key());
Map<FibEntry, Group> entriesToInstall =
- pendingUpdates.removeAll(group.appCookie())
+ pendingUpdates.removeAll(nhGroupKey)
.stream()
.collect(Collectors
.toMap(e -> e, e -> group));