Distributed group store using eventual consistent map abstraction

Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
index a17046f..b50681e 100644
--- a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
+++ b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
@@ -15,11 +15,14 @@
  */
 package org.onosproject.bgprouter;
 
-import com.google.common.collect.ConcurrentHashMultiset;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -30,6 +33,7 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.config.NetworkConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -47,12 +51,12 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.group.DefaultGroupBucket;
 import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
 import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupBucket;
 import org.onosproject.net.group.GroupBuckets;
 import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupKey;
 import org.onosproject.net.group.GroupListener;
 import org.onosproject.net.group.GroupService;
 import org.onosproject.net.host.InterfaceIpAddress;
@@ -67,13 +71,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
 
 /**
  * BgpRouter component.
@@ -126,7 +128,7 @@
     private final Map<IpAddress, NextHop> nextHops = Maps.newHashMap();
 
     // Stores FIB updates that are waiting for groups to be set up
-    private final Multimap<GroupKey, FibEntry> pendingUpdates = HashMultimap.create();
+    private final Multimap<NextHopGroupKey, FibEntry> pendingUpdates = HashMultimap.create();
 
     // Device id of data-plane switch - should be learned from config
     private DeviceId deviceId;
@@ -143,6 +145,11 @@
 
     private InternalTableHandler provisionStaticTables = new InternalTableHandler();
 
+    private KryoNamespace.Builder appKryo = new KryoNamespace.Builder()
+                    .register(IpAddress.Version.class)
+                    .register(IpAddress.class)
+                    .register(NextHopGroupKey.class);
+
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(BGP_ROUTER_APP);
@@ -210,7 +217,9 @@
             Group group;
             synchronized (pendingUpdates) {
                 NextHop nextHop = nextHops.get(entry.nextHopIp());
-                group = groupService.getGroup(deviceId, nextHop.group());
+                group = groupService.getGroup(deviceId,
+                                              new DefaultGroupKey(
+                                              appKryo.build().serialize(nextHop.group())));
 
                 if (group == null) {
                     log.debug("Adding pending flow {}", update.entry());
@@ -309,7 +318,7 @@
                                                   GroupDescription.Type.INDIRECT,
                                                   new GroupBuckets(Collections
                                                                            .singletonList(bucket)),
-                                                  groupKey,
+                                                  new DefaultGroupKey(appKryo.build().serialize(groupKey)),
                                                   appId);
 
             groupService.addGroup(groupDescription);
@@ -329,7 +338,10 @@
             return null;
         }
 
-        Group group = groupService.getGroup(deviceId, nextHop.group());
+        Group group = groupService.getGroup(deviceId,
+                                            new DefaultGroupKey(appKryo.
+                                                                build().
+                                                                serialize(nextHop.group())));
 
         // FIXME disabling group deletes for now until we verify the logic is OK
         /*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
@@ -339,7 +351,9 @@
 
             nextHops.remove(nextHopIp);
 
-            groupService.removeGroup(deviceId, nextHop.group(), appId);
+            groupService.removeGroup(deviceId,
+                                     new DefaultGroupKey(appKryo.build().serialize(nextHop.group())),
+                                     appId);
         }*/
 
         return group;
@@ -699,8 +713,10 @@
                     event.type() == GroupEvent.Type.GROUP_UPDATED) {
                 synchronized (pendingUpdates) {
 
+                    NextHopGroupKey nhGroupKey =
+                            appKryo.build().deserialize(group.appCookie().key());
                     Map<FibEntry, Group> entriesToInstall =
-                            pendingUpdates.removeAll(group.appCookie())
+                            pendingUpdates.removeAll(nhGroupKey)
                                     .stream()
                                     .collect(Collectors
                                                      .toMap(e -> e, e -> group));