Distributed group store using eventual consistent map abstraction

Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
index 8da4acd..7523a64 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultEdgeGroupHandler.java
@@ -124,16 +124,23 @@
             tBuilder.setOutput(newNeighborLink.src().port())
                     .setEthDst(deviceConfig.getDeviceMac(
                           newNeighborLink.dst().deviceId()))
-                    .setEthSrc(nodeMacAddr)
-                    .pushMpls()
-                    .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+                    .setEthSrc(nodeMacAddr);
+            if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+                tBuilder.pushMpls()
+                        .setMpls(MplsLabel.
+                                 mplsLabel(ns.getEdgeLabel()));
+            }
             GroupBucket updatedBucket = DefaultGroupBucket.
                     createSelectGroupBucket(tBuilder.build());
             GroupBuckets updatedBuckets = new GroupBuckets(
                                         Arrays.asList(updatedBucket));
             log.debug("newPortToExistingNeighborAtEdgeRouter: "
                     + "groupService.addBucketsToGroup for neighborset{}", ns);
-            groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
+            groupService.addBucketsToGroup(deviceId,
+                                           getGroupKey(ns),
+                                           updatedBuckets,
+                                           getGroupKey(ns),
+                                           appId);
         }
     }
 
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
index 755fac2..ec9e42f 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandler.java
@@ -18,6 +18,7 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,6 +28,7 @@
 
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.MplsLabel;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
@@ -35,6 +37,7 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.group.DefaultGroupBucket;
 import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
 import org.onosproject.net.group.Group;
 import org.onosproject.net.group.GroupBucket;
 import org.onosproject.net.group.GroupBuckets;
@@ -71,6 +74,16 @@
             new HashMap<PortNumber, DeviceId>();
 
     private GroupListener listener = new InternalGroupListener();
+    protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
+    .register(URI.class)
+    .register(HashSet.class)
+    .register(DeviceId.class)
+    .register(PortNumber.class)
+    .register(NeighborSet.class)
+    .register(PolicyGroupIdentifier.class)
+    .register(PolicyGroupParams.class)
+    .register(GroupBucketIdentifier.class)
+    .register(GroupBucketIdentifier.BucketOutputType.class);
 
     protected DefaultGroupHandler(DeviceId deviceId,
                                ApplicationId appId,
@@ -185,9 +198,11 @@
             tBuilder.setOutput(port)
                     .setEthDst(deviceConfig.getDeviceMac(
                          portDeviceMap.get(port)))
-                    .setEthSrc(nodeMacAddr)
-                    .pushMpls()
+                    .setEthSrc(nodeMacAddr);
+            if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+                tBuilder.pushMpls()
                     .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+            }
             GroupBucket removeBucket = DefaultGroupBucket.
                     createSelectGroupBucket(tBuilder.build());
             GroupBuckets removeBuckets = new GroupBuckets(
@@ -196,9 +211,9 @@
                     + "groupService.removeBucketsFromGroup "
                     + "for neighborset{}", deviceId, ns);
             groupService.removeBucketsFromGroup(deviceId,
-                                                ns,
+                                                getGroupKey(ns),
                                                 removeBuckets,
-                                                ns,
+                                                getGroupKey(ns),
                                                 appId);
         }
 
@@ -331,9 +346,12 @@
                             DefaultTrafficTreatment.builder();
                     tBuilder.setOutput(sp)
                             .setEthDst(deviceConfig.getDeviceMac(d))
-                            .setEthSrc(nodeMacAddr)
-                            .pushMpls()
-                            .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+                            .setEthSrc(nodeMacAddr);
+                    if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+                        tBuilder.pushMpls()
+                                .setMpls(MplsLabel.
+                                         mplsLabel(ns.getEdgeLabel()));
+                    }
                     buckets.add(DefaultGroupBucket.createSelectGroupBucket(
                                                                 tBuilder.build()));
                 }
@@ -343,7 +361,7 @@
                                       deviceId,
                                       Group.Type.SELECT,
                                       groupBuckets,
-                                      ns,
+                                      getGroupKey(ns),
                                       appId);
             log.debug("createGroupsFromNeighborsets: "
                     + "groupService.addGroup for neighborset{}", ns);
@@ -386,4 +404,8 @@
             handleGroupEvent(event);
         }
     }
+
+    protected GroupKey getGroupKey(Object obj) {
+        return new DefaultGroupKey(kryo.build().serialize(obj));
+    }
 }
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
index 2bd2b4e..bb1a035 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultGroupHandlerApp.java
@@ -17,8 +17,10 @@
 
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -27,10 +29,13 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.MacAddress;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
@@ -69,10 +74,18 @@
     protected GroupService groupService;
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
 
     private DeviceListener deviceListener = new InternalDeviceListener();
     private LinkListener linkListener = new InternalLinkListener();
 
+    protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
+                        .register(URI.class)
+                        .register(HashSet.class)
+                        .register(DeviceId.class)
+                        .register(NeighborSet.class);
+
     @Activate
     public void activate() {
         appId = coreService.registerApplication("org.onosproject.defaultgrouphandler");
@@ -80,14 +93,23 @@
         deviceService.addListener(deviceListener);
         linkService.addListener(linkListener);
         for (Device device: deviceService.getDevices()) {
-            log.debug("Initiating default group handling for {}", device.id());
-            DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
-                                                              appId,
-                                                              config,
-                                                              linkService,
-                                                              groupService);
-            dgh.createGroups();
-            dghMap.put(device.id(), dgh);
+            if (mastershipService.
+                    getLocalRole(device.id()) == MastershipRole.MASTER) {
+                log.debug("Initiating default group handling for {}", device.id());
+                DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
+                                                                  appId,
+                                                                  config,
+                                                                  linkService,
+                                                                  groupService);
+                dgh.createGroups();
+                dghMap.put(device.id(), dgh);
+            } else {
+                log.debug("Activate: Local role {} "
+                        + "is not MASTER for device {}",
+                          mastershipService.
+                          getLocalRole(device.id()),
+                          device.id());
+            }
         }
         log.info("Activated");
     }
@@ -165,7 +187,15 @@
 
         @Override
         public void event(DeviceEvent event) {
-            switch (event.type()) {
+           if (mastershipService.
+                    getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
+               log.debug("Local role {} is not MASTER for device {}",
+                         mastershipService.
+                         getLocalRole(event.subject().id()),
+                         event.subject().id());
+               return;
+           }
+           switch (event.type()) {
             case DEVICE_ADDED:
                 log.debug("Initiating default group handling for {}", event.subject().id());
                 DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(
@@ -193,6 +223,16 @@
 
         @Override
         public void event(LinkEvent event) {
+            if (mastershipService.
+                    getLocalRole(event.subject().src().deviceId()) !=
+                    MastershipRole.MASTER) {
+                log.debug("InternalLinkListener: Local role {} "
+                        + "is not MASTER for device {}",
+                          mastershipService.
+                          getLocalRole(event.subject().src().deviceId()),
+                          event.subject().src().deviceId());
+                return;
+            }
             switch (event.type()) {
             case LINK_ADDED:
                 if (dghMap.get(event.subject().src().deviceId()) != null) {
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
index a6129c2..1b5bb50 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/DefaultTransitGroupHandler.java
@@ -112,16 +112,23 @@
             tBuilder.setOutput(newNeighborLink.src().port())
                     .setEthDst(deviceConfig.getDeviceMac(
                           newNeighborLink.dst().deviceId()))
-                    .setEthSrc(nodeMacAddr)
-                    .pushMpls()
-                    .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+                    .setEthSrc(nodeMacAddr);
+            if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+                tBuilder.pushMpls()
+                        .setMpls(MplsLabel.
+                                 mplsLabel(ns.getEdgeLabel()));
+            }
             GroupBucket updatedBucket = DefaultGroupBucket.
                     createSelectGroupBucket(tBuilder.build());
             GroupBuckets updatedBuckets = new GroupBuckets(
                                         Arrays.asList(updatedBucket));
             log.debug("newPortToExistingNeighborAtEdgeRouter: "
                     + "groupService.addBucketsToGroup for neighborset{}", ns);
-            groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
+            groupService.addBucketsToGroup(deviceId,
+                                           getGroupKey(ns),
+                                           updatedBuckets,
+                                           getGroupKey(ns),
+                                           appId);
         }
     }
 
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
index cf0ae5e..3a46ce6 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/GroupBucketIdentifier.java
@@ -18,7 +18,6 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.onosproject.net.PortNumber;
-import org.onosproject.net.group.GroupKey;
 
 /**
  * Representation of policy group bucket identifier. Not exposed to
@@ -28,7 +27,7 @@
     private int label;
     private BucketOutputType type;
     private PortNumber outPort;
-    private GroupKey outGroup;
+    private PolicyGroupIdentifier outGroup;
 
     protected enum BucketOutputType {
         PORT,
@@ -44,7 +43,7 @@
     }
 
     protected GroupBucketIdentifier(int label,
-                                    GroupKey outGroup) {
+                                    PolicyGroupIdentifier outGroup) {
         this.label = label;
         this.type = BucketOutputType.GROUP;
         this.outPort = null;
@@ -63,7 +62,7 @@
         return this.outPort;
     }
 
-    protected GroupKey outGroup() {
+    protected PolicyGroupIdentifier outGroup() {
         return this.outGroup;
     }
 }
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
index 497b352..ce739ce 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/NeighborSet.java
@@ -23,7 +23,6 @@
 import java.util.Set;
 
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.group.GroupKey;
 
 /**
  * Representation of a set of neighbor switch dpids along with edge node
@@ -31,9 +30,10 @@
  * ECMP-group that hashes packets to a set of ports connecting to the
  * neighbors in this set.
  */
-public class NeighborSet implements GroupKey {
+public class NeighborSet {
     private final Set<DeviceId> neighbors;
     private final int edgeLabel;
+    public static final int NO_EDGE_LABEL = -1;
 
     /**
      * Constructor with set of neighbors. Edge label is
@@ -43,7 +43,7 @@
      */
     public NeighborSet(Set<DeviceId> neighbors) {
         checkNotNull(neighbors);
-        this.edgeLabel = -1;
+        this.edgeLabel = NO_EDGE_LABEL;
         this.neighbors = new HashSet<DeviceId>();
         this.neighbors.addAll(neighbors);
     }
@@ -65,7 +65,7 @@
      * Default constructor for kryo serialization.
      */
     public NeighborSet() {
-        this.edgeLabel = -1;
+        this.edgeLabel = NO_EDGE_LABEL;
         this.neighbors = new HashSet<DeviceId>();
     }
 
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
index d2b43b3..e0cfe52 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupHandler.java
@@ -37,7 +37,6 @@
 import org.onosproject.net.group.GroupBuckets;
 import org.onosproject.net.group.GroupDescription;
 import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupKey;
 import org.onosproject.net.group.GroupService;
 import org.onosproject.net.link.LinkService;
 import org.slf4j.Logger;
@@ -49,18 +48,17 @@
 public class PolicyGroupHandler extends DefaultGroupHandler {
 
     private final Logger log = getLogger(getClass());
-    private HashMap<GroupKey, GroupKey> dependentGroups =
-            new HashMap<GroupKey, GroupKey>();
+    private HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier> dependentGroups =
+            new HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier>();
 
     /**
-     * Creates policy group handler object.
+     * Policy group handler constructor.
      *
      * @param deviceId device identifier
      * @param appId application identifier
      * @param config interface to retrieve the device properties
      * @param linkService link service object
      * @param groupService group service object
-     * @return policy group handler type
      */
     public PolicyGroupHandler(DeviceId deviceId,
                               ApplicationId appId,
@@ -175,9 +173,11 @@
                     tBuilder.setOutput(bucketId.outPort())
                             .setEthDst(deviceConfig.
                                        getDeviceMac(neighbor))
-                            .setEthSrc(nodeMacAddr)
-                            .pushMpls()
+                            .setEthSrc(nodeMacAddr);
+                    if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
+                        tBuilder.pushMpls()
                             .setMpls(MplsLabel.mplsLabel(bucketId.label()));
+                    }
                     //TODO: BoS
                     outBuckets.add(DefaultGroupBucket.
                                    createSelectGroupBucket(tBuilder.build()));
@@ -196,8 +196,7 @@
     protected void handleGroupEvent(GroupEvent event) {
         if (event.type() == GroupEvent.Type.GROUP_ADDED) {
             if (dependentGroups.get(event.subject().appCookie()) != null) {
-                PolicyGroupIdentifier dependentGroupKey = (PolicyGroupIdentifier)
-                        dependentGroups.get(event.subject().appCookie());
+                PolicyGroupIdentifier dependentGroupKey = dependentGroups.get(event.subject().appCookie());
                 dependentGroups.remove(event.subject().appCookie());
                 boolean fullyResolved = true;
                 for (GroupBucketIdentifier bucketId:
@@ -217,8 +216,11 @@
                                 dependentGroupKey.bucketIds()) {
                         TrafficTreatment.Builder tBuilder =
                                 DefaultTrafficTreatment.builder();
-                        tBuilder.pushMpls()
-                                .setMpls(MplsLabel.mplsLabel(bucketId.label()));
+                        if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
+                            tBuilder.pushMpls()
+                                    .setMpls(MplsLabel.
+                                             mplsLabel(bucketId.label()));
+                        }
                         //TODO: BoS
                         if (bucketId.type() == BucketOutputType.PORT) {
                             DeviceId neighbor = portDeviceMap.
@@ -230,12 +232,14 @@
                         } else {
                             if (groupService.
                                     getGroup(deviceId,
-                                             bucketId.outGroup()) == null) {
+                                             getGroupKey(bucketId.
+                                                       outGroup())) == null) {
                                 throw new IllegalStateException();
                             }
                             GroupId indirectGroupId = groupService.
                                     getGroup(deviceId,
-                                             bucketId.outGroup()).id();
+                                             getGroupKey(bucketId.
+                                                         outGroup())).id();
                             tBuilder.group(indirectGroupId);
                         }
                         outBuckets.add(DefaultGroupBucket.
@@ -251,7 +255,7 @@
         }
     }
 
-    public GroupKey generatePolicyGroupKey(String id,
+    public PolicyGroupIdentifier generatePolicyGroupKey(String id,
                                    List<PolicyGroupParams> params) {
         List<GroupBucketIdentifier> bucketIds = new ArrayList<GroupBucketIdentifier>();
         for (PolicyGroupParams param: params) {
@@ -320,25 +324,28 @@
         return innermostGroupkey;
     }
 
-    public void removeGroupChain(GroupKey key) {
+    public void removeGroupChain(PolicyGroupIdentifier key) {
         if (!(key instanceof PolicyGroupIdentifier)) {
             throw new IllegalArgumentException();
         }
-        List<GroupKey> groupsToBeDeleted = new ArrayList<GroupKey>();
+        List<PolicyGroupIdentifier> groupsToBeDeleted =
+                new ArrayList<PolicyGroupIdentifier>();
         groupsToBeDeleted.add(key);
 
-        Iterator<GroupKey> it = groupsToBeDeleted.iterator();
+        Iterator<PolicyGroupIdentifier> it =
+                groupsToBeDeleted.iterator();
 
         while (it.hasNext()) {
-            PolicyGroupIdentifier innerMostGroupKey =
-                    (PolicyGroupIdentifier) it.next();
+            PolicyGroupIdentifier innerMostGroupKey = it.next();
             for (GroupBucketIdentifier bucketId:
                         innerMostGroupKey.bucketIds()) {
                 if (bucketId.type() != BucketOutputType.GROUP) {
                     groupsToBeDeleted.add(bucketId.outGroup());
                 }
             }
-            groupService.removeGroup(deviceId, innerMostGroupKey, appId);
+            groupService.removeGroup(deviceId,
+                                     getGroupKey(innerMostGroupKey),
+                                     appId);
             it.remove();
         }
     }
diff --git a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
index d213e0c..3c19b29 100644
--- a/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
+++ b/apps/grouphandler/src/main/java/org/onosproject/grouphandler/PolicyGroupIdentifier.java
@@ -17,14 +17,12 @@
 
 import java.util.List;
 
-import org.onosproject.net.group.GroupKey;
-
 /**
  * Representation of policy based group identifiers.
  * Opaque to group handler applications and only the outermost
  * policy group identifier in a chain is visible to the applications.
  */
-public class PolicyGroupIdentifier implements GroupKey {
+public class PolicyGroupIdentifier {
     private String id;
     private List<PolicyGroupParams> inputParams;
     private List<GroupBucketIdentifier> bucketIds;