Distributed group store using eventual consistent map abstraction

Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
index 755fac2..ec9e42f 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
@@ -18,6 +18,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,6 +28,7 @@
 
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.MplsLabel;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
@@ -35,6 +37,7 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.group.DefaultGroupBucket;
 import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
 import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupBucket;
 import org.onosproject.net.group.GroupBuckets;
@@ -71,6 +74,16 @@
             new HashMap<PortNumber, DeviceId>();
 
     private GroupListener listener = new InternalGroupListener();
+    protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
+    .register(URI.class)
+    .register(HashSet.class)
+    .register(DeviceId.class)
+    .register(PortNumber.class)
+    .register(NeighborSet.class)
+    .register(PolicyGroupIdentifier.class)
+    .register(PolicyGroupParams.class)
+    .register(GroupBucketIdentifier.class)
+    .register(GroupBucketIdentifier.BucketOutputType.class);
 
     protected DefaultGroupHandler(DeviceId deviceId,
                                ApplicationId appId,
@@ -185,9 +198,11 @@
             tBuilder.setOutput(port)
                     .setEthDst(deviceConfig.getDeviceMac(
                          portDeviceMap.get(port)))
-                    .setEthSrc(nodeMacAddr)
-                    .pushMpls()
+                    .setEthSrc(nodeMacAddr);
+            if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+                tBuilder.pushMpls()
                     .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+            }
             GroupBucket removeBucket = DefaultGroupBucket.
                     createSelectGroupBucket(tBuilder.build());
             GroupBuckets removeBuckets = new GroupBuckets(
@@ -196,9 +211,9 @@
                     + "groupService.removeBucketsFromGroup "
                     + "for neighborset{}", deviceId, ns);
             groupService.removeBucketsFromGroup(deviceId,
-                                                ns,
+                                                getGroupKey(ns),
                                                 removeBuckets,
-                                                ns,
+                                                getGroupKey(ns),
                                                 appId);
         }
 
@@ -331,9 +346,12 @@
                             DefaultTrafficTreatment.builder();
                     tBuilder.setOutput(sp)
                             .setEthDst(deviceConfig.getDeviceMac(d))
-                            .setEthSrc(nodeMacAddr)
-                            .pushMpls()
-                            .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+                            .setEthSrc(nodeMacAddr);
+                    if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+                        tBuilder.pushMpls()
+                                .setMpls(MplsLabel.
+                                         mplsLabel(ns.getEdgeLabel()));
+                    }
                     buckets.add(DefaultGroupBucket.createSelectGroupBucket(
                                                                 tBuilder.build()));
                 }
@@ -343,7 +361,7 @@
                                       deviceId,
                                       Group.Type.SELECT,
                                       groupBuckets,
-                                      ns,
+                                      getGroupKey(ns),
                                       appId);
             log.debug("createGroupsFromNeighborsets: "
                     + "groupService.addGroup for neighborset{}", ns);
@@ -386,4 +404,8 @@
             handleGroupEvent(event);
         }
     }
+
+    protected GroupKey getGroupKey(Object obj) {
+        return new DefaultGroupKey(kryo.build().serialize(obj));
+    }
 }