Moving Source from connect point to HostId in MulticastHandling

Change-Id: Ie8f678e150b7ee388680b8d8f27df0bce60ec01f
diff --git a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java
index 84e2131..c9baac3 100644
--- a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java
+++ b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java
@@ -17,7 +17,6 @@
 
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -111,15 +110,25 @@
         mcastRoutes.remove(route);
     }
 
+
     @Override
-    public void storeSources(McastRoute route, Set<ConnectPoint> sources) {
+    public void storeSource(McastRoute route, HostId hostId, Set<ConnectPoint> connectPoints) {
         mcastRoutes.compute(route, (k, v) -> {
-            v.addSources(sources);
+            v.addSources(hostId, connectPoints);
             return v;
         });
     }
 
     @Override
+    public void storeSources(McastRoute route, Set<ConnectPoint> sources) {
+        mcastRoutes.compute(route, (k, v) -> {
+            v.addSources(HostId.NONE, sources);
+            return v;
+        });
+    }
+
+
+    @Override
     public void removeSources(McastRoute route) {
         mcastRoutes.compute(route, (k, v) -> {
             v.removeSources();
@@ -129,15 +138,23 @@
     }
 
     @Override
-    public void removeSources(McastRoute route, Set<ConnectPoint> sources) {
+    public void removeSource(McastRoute route, HostId source) {
         mcastRoutes.compute(route, (k, v) -> {
-            v.removeSources(sources);
+            v.removeSource(source);
             // Since there are no sources, we should remove the route
             return v.sources().isEmpty() ? null : v;
         });
     }
 
     @Override
+    public void removeSources(McastRoute route, HostId hostId, Set<ConnectPoint> sources) {
+        mcastRoutes.compute(route, (k, v) -> {
+            v.removeSources(hostId, sources);
+            return v;
+        });
+    }
+
+    @Override
     public void addSink(McastRoute route, HostId hostId, Set<ConnectPoint> sinks) {
         mcastRoutes.compute(route, (k, v) -> {
             v.addSinks(hostId, sinks);
@@ -189,7 +206,14 @@
     @Override
     public Set<ConnectPoint> sourcesFor(McastRoute route) {
         McastRouteData data = mcastRoutes.getOrDefault(route, null);
-        return data == null ? ImmutableSet.of() : ImmutableSet.copyOf(data.sources());
+        return data == null ? ImmutableSet.of() : ImmutableSet.copyOf(data.sources().values().stream()
+                .flatMap(Collection::stream).collect(Collectors.toSet()));
+    }
+
+    @Override
+    public Set<ConnectPoint> sourcesFor(McastRoute route, HostId hostId) {
+        McastRouteData data = mcastRoutes.getOrDefault(route, null);
+        return data == null ? ImmutableSet.of() : ImmutableSet.copyOf(data.sources(hostId));
     }
 
     @Override
@@ -227,27 +251,18 @@
             switch (event.type()) {
                 case INSERT:
                     checkNotNull(newData);
-                    McastEvent.Type type;
-                    if (!newData.sources().isEmpty() || !newData.sinks().isEmpty()) {
-                        type = McastEvent.Type.SOURCES_ADDED;
-                    } else if (!newData.sinks().isEmpty()) {
-                        type = McastEvent.Type.SINKS_ADDED;
-                    } else {
-                        type = McastEvent.Type.ROUTE_ADDED;
-                    }
-                    notifyDelegate(new McastEvent(type, null,
+                    notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED, null,
                             mcastRouteUpdate(route, newData.sources(), newData.sinks())));
                     break;
                 case UPDATE:
                     checkNotNull(newData);
                     checkNotNull(oldData);
 
-                    if (!Sets.difference(newData.sources(), oldData.sources()).isEmpty()) {
+                    if (newData.allSources().size() > oldData.allSources().size()) {
                         notifyDelegate(new McastEvent(McastEvent.Type.SOURCES_ADDED,
                                 mcastRouteUpdate(route, oldData.sources(), oldData.sinks()),
                                 mcastRouteUpdate(route, newData.sources(), newData.sinks())));
-                    }
-                    if (!Sets.difference(oldData.sources(), newData.sources()).isEmpty()) {
+                    } else if (newData.allSources().size() < oldData.allSources().size()) {
                         notifyDelegate(new McastEvent(McastEvent.Type.SOURCES_REMOVED,
                                 mcastRouteUpdate(route, oldData.sources(), oldData.sinks()),
                                 mcastRouteUpdate(route, newData.sources(), newData.sinks())));