Moving Source from connect point to HostId in MulticastHandling

Change-Id: Ie8f678e150b7ee388680b8d8f27df0bce60ec01f
diff --git a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java
index e95c417..ce81ea9 100644
--- a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java
+++ b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java
@@ -17,7 +17,6 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.HostId;
 
@@ -38,7 +37,7 @@
 @Beta
 public final class McastRouteData {
 
-    private final ConcurrentHashMap<ConnectPoint, Boolean> sources = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<HostId, Set<ConnectPoint>> sources = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<HostId, Set<ConnectPoint>> sinks = new ConcurrentHashMap<>();
 
     private McastRouteData() {
@@ -49,13 +48,41 @@
      *
      * @return set of sources
      */
-    public Set<ConnectPoint> sources() {
-        return ImmutableSet.copyOf(sources.keySet());
+    public Map<HostId, Set<ConnectPoint>> sources() {
+        return ImmutableMap.copyOf(sources);
     }
 
     /**
      * Sources contained in the associated route.
      *
+     * @return map of hostIds and associated sources
+     */
+    public Set<ConnectPoint> allSources() {
+        return sources.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+    }
+
+    /**
+     * Sources contained in the associated route for the given host.
+     *
+     * @param hostId the host
+     * @return set of sources
+     */
+    public Set<ConnectPoint> sources(HostId hostId) {
+        return sources.get(hostId);
+    }
+
+    /**
+     * Sources contained in the associated route that are not bound to any host.
+     *
+     * @return set of sources
+     */
+    public Set<ConnectPoint> nonHostSources() {
+        return sources.get(HostId.NONE);
+    }
+
+    /**
+     * Sinks contained in the associated route.
+     *
      * @return map of hostIds and associated sinks
      */
     public Map<HostId, Set<ConnectPoint>> sinks() {
@@ -91,13 +118,34 @@
     }
 
     /**
-     * Add the sources for the associated route.
+     * Adds sources for a given host Id. If the Host Id is {@link HostId#NONE} the sources are intended to be
+     * used at all times independently of the attached host.
      *
-     * @param sources set of sources
+     * @param hostId the host
+     * @param sources  the sources
      */
-    public void addSources(Set<ConnectPoint> sources) {
+    public void addSources(HostId hostId, Set<ConnectPoint> sources) {
+        checkNotNull(hostId);
         checkArgument(!sources.contains(null));
-        sources.forEach(source -> this.sources.put(source, true));
+        //if existing we add to current set, otherwise we put them all
+        this.sources.compute(hostId, (host, existingSources) -> {
+            if (existingSources != null) {
+                existingSources.addAll(sources);
+                return existingSources;
+            } else {
+                return sources;
+            }
+        });
+    }
+
+    /**
+     * Adds sources for this route that are not associated directly with a given host.
+     *
+     * @param sources the sources
+     */
+    public void addNonHostSources(Set<ConnectPoint> sources) {
+        checkArgument(!sources.contains(null));
+        addSources(HostId.NONE, sources);
     }
 
     /**
@@ -108,13 +156,31 @@
     }
 
     /**
-     * Removes the given sources contained in the associated route.
+     * Removes the given source contained in the associated route.
      *
+     * @param source the source to remove
+     */
+    public void removeSource(HostId source) {
+        checkNotNull(source);
+        sources.remove(source);
+    }
+
+    /**
+     * Removes all the given sources for the given host for this route.
+     *
+     * @param hostId  the host
      * @param sources the sources to remove
      */
-    public void removeSources(Set<ConnectPoint> sources) {
+    public void removeSources(HostId hostId, Set<ConnectPoint> sources) {
+        checkNotNull(hostId);
         checkArgument(!sources.contains(null));
-        sources.forEach(this.sources::remove);
+        //if existing we remove from current set, otherwise just skip them
+        this.sources.compute(hostId, (host, existingSources) -> {
+            if (existingSources != null) {
+                existingSources.removeAll(sources);
+            }
+            return existingSources;
+        });
     }
 
     /**
@@ -128,11 +194,14 @@
         checkNotNull(hostId);
         checkArgument(!sinks.contains(null));
         //if existing we add to current set, otherwise we put them all
-        if (this.sinks.containsKey(hostId)) {
-            this.sinks.get(hostId).addAll(sinks);
-        } else {
-            this.sinks.put(hostId, sinks);
-        }
+        this.sinks.compute(hostId, (host, existingSinks) -> {
+            if (existingSinks != null) {
+                existingSinks.addAll(sinks);
+                return existingSinks;
+            } else {
+                return sinks;
+            }
+        });
     }
 
     /**
@@ -142,7 +211,7 @@
      */
     public void addNonHostSinks(Set<ConnectPoint> sinks) {
         checkArgument(!sinks.contains(null));
-        this.sinks.put(HostId.NONE, sinks);
+        this.addSinks(HostId.NONE, sinks);
     }
 
     /**
@@ -172,9 +241,12 @@
         checkNotNull(hostId);
         checkArgument(!sinks.contains(null));
         //if existing we remove from current set, otherwise just skip them
-        if (this.sinks.containsKey(hostId)) {
-            this.sinks.get(hostId).removeAll(sinks);
-        }
+        this.sinks.compute(hostId, (host, existingSinks) -> {
+            if (existingSinks != null) {
+                existingSinks.removeAll(sinks);
+            }
+            return existingSinks;
+        });
     }
 
     /**