[CORD-1623] Fix DistributedMcastStore

Change-Id: I5db88a65aeed7ee46e69701d189c1c27c9eb4df0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
index 8a3db91..6b1eff5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
@@ -15,6 +15,9 @@
  */
 package org.onosproject.store.mcast.impl;
 
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -22,6 +25,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.mcast.McastEvent;
 import org.onosproject.net.mcast.McastRoute;
@@ -30,14 +34,20 @@
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.net.mcast.McastRouteInfo.mcastRouteInfo;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -47,12 +57,9 @@
  */
 @Component(immediate = true)
 @Service
-public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreDelegate>
-        implements McastStore {
-    //FIXME the number of events that will potentially be generated here is
-    // not sustainable, consider changing this to an eventually consistent
-    // map and not emitting events but rather use a provider-like mechanism
-    // to program the dataplane.
+public class DistributedMcastStore
+    extends AbstractStore<McastEvent, McastStoreDelegate>
+    implements McastStore {
 
     private static final String MCASTRIB = "onos-mcast-rib-table";
     private Logger log = getLogger(getClass());
@@ -60,13 +67,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    protected ConsistentMap<McastRoute, MulticastData> mcastRib;
-    protected Map<McastRoute, MulticastData> mcastRoutes;
+    private Map<McastRoute, MulticastData> mcastRoutes;
+    private ConsistentMap<McastRoute, MulticastData> mcastRib;
+    private MapEventListener<McastRoute, MulticastData> mcastRouteListener =
+        new McastRouteListener();
+
+    private ScheduledExecutorService executor;
 
 
     @Activate
     public void activate() {
-
         mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
                 .withName(MCASTRIB)
                 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
@@ -77,34 +87,93 @@
                                                                  McastRoute.class,
                                                                  McastRoute.Type.class
                                                          ).build()))
-                //.withRelaxedReadConsistency()
-                .build();
+        .build();
 
         mcastRoutes = mcastRib.asJavaMap();
-
+        mcastRib.addListener(mcastRouteListener);
 
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        mcastRib.removeListener(mcastRouteListener);
         log.info("Stopped");
     }
 
+
+    private class McastRouteListener implements MapEventListener<McastRoute, MulticastData> {
+        @Override
+        public void event(MapEvent<McastRoute, MulticastData> event) {
+            final McastRoute route = event.key();
+            final MulticastData newData = Optional.ofNullable(event.newValue()).map(Versioned::value).orElse(null);
+            final MulticastData oldData = Optional.ofNullable(event.oldValue()).map(Versioned::value).orElse(null);
+
+            switch (event.type()) {
+                case INSERT:
+                    checkNotNull(newData);
+
+                    if (newData.source() != null) {
+                        notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
+                                mcastRouteInfo(route,
+                                        newData.sinks(),
+                                        newData.source())));
+                    } else if (!newData.sinks().isEmpty()) {
+                        newData.sinks().forEach(sink ->
+                            notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
+                                    mcastRouteInfo(route,
+                                            sink,
+                                            newData.source())))
+                        );
+                    } else {
+                        notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
+                                mcastRouteInfo(route)));
+                    }
+                    break;
+                case UPDATE:
+                    checkNotNull(newData);
+                    checkNotNull(oldData);
+
+                    if (!Objects.equal(oldData.source(), newData.source())) {
+                        notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
+                                                      mcastRouteInfo(route,
+                                                                     newData.sinks(),
+                                                                     newData.source())));
+                    } else {
+                        Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
+                            notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
+                                                          mcastRouteInfo(route,
+                                                                         sink,
+                                                                         newData.source())))
+                        );
+
+                        Sets.difference(oldData.sinks(), newData.sinks()).forEach(sink ->
+                            notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
+                                                          mcastRouteInfo(route,
+                                                                         sink,
+                                                                         newData.source())))
+                        );
+                    }
+                    break;
+                case REMOVE:
+                    notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
+                                                      mcastRouteInfo(route)));
+                    break;
+                default:
+                    log.warn("Unknown mcast operation type: {}", event.type());
+            }
+        }
+    }
+
     @Override
     public void storeRoute(McastRoute route, Type operation) {
         switch (operation) {
             case ADD:
-                if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
-                    notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
-                                                  mcastRouteInfo(route)));
-                }
+                mcastRoutes.putIfAbsent(route, MulticastData.empty());
                 break;
             case REMOVE:
-                if (mcastRoutes.remove(route) != null) {
-                    notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
-                                                  mcastRouteInfo(route)));
-                }
+                // before remove the route should check that source and sinks are removed?
+                mcastRoutes.remove(route);
                 break;
             default:
                 log.warn("Unknown mcast operation type: {}", operation);
@@ -121,15 +190,6 @@
             }
             return v;
         });
-
-
-        if (data != null) {
-            notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
-                                          mcastRouteInfo(route,
-                                                         data.sinks(),
-                                                         source)));
-        }
-
     }
 
     @Override
@@ -152,28 +212,6 @@
             }
             return v;
         });
-
-
-        if (data != null) {
-            switch (operation) {
-                case ADD:
-                    notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
-                                                  mcastRouteInfo(route, sink,
-                                                                 data.source())));
-                    break;
-                case REMOVE:
-                    if (data != null) {
-                        notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
-                                                      mcastRouteInfo(route,
-                                                                     sink,
-                                                                     data.source())));
-                    }
-                    break;
-                default:
-                    log.warn("Unknown mcast operation type: {}", operation);
-            }
-        }
-
     }
 
     @Override
@@ -190,5 +228,4 @@
     public Set<McastRoute> getRoutes() {
         return mcastRoutes.keySet();
     }
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
index 3dd8297..b1214830 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
@@ -20,6 +20,7 @@
 import org.onosproject.net.ConnectPoint;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -80,4 +81,24 @@
         return new MulticastData();
     }
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), source, sinks, isEmpty);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof MulticastData)) {
+            return false;
+        }
+        final MulticastData other = (MulticastData) obj;
+
+        return super.equals(obj) &&
+                Objects.equals(source(), other.source()) &&
+                Objects.equals(sinks(), other.sinks()) &&
+                Objects.equals(isEmpty, other.isEmpty);
+    }
 }