Removing flow modification during sink/source addition/removal.

Changes:
- fwd objectives are pushed only the first time for a given device
- fwd objectives are removed only if the dev is no more serving the group
- mac/vlan filt objectives are pushed only the first time for a given device
- vlan filt objectives are pushed only the first time for a given cpoint
- filt objectives are purged upon device, link and edge port failures
- adds sr-filt-mcast to dump the installed filt obj
- updates onos-diags

Change-Id: I802ebec7e160486fea9ad4bf53aca36a2b1c7146
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index d08f111..281a3c1 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -99,6 +99,7 @@
 import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
 import org.onosproject.segmentrouting.grouphandler.DestinationSet;
 import org.onosproject.segmentrouting.grouphandler.NextNeighbors;
+import org.onosproject.segmentrouting.mcast.McastFilteringObjStoreKey;
 import org.onosproject.segmentrouting.mcast.McastHandler;
 import org.onosproject.segmentrouting.mcast.McastRole;
 import org.onosproject.segmentrouting.mcast.McastRoleStoreKey;
@@ -1030,6 +1031,11 @@
     }
 
     @Override
+    public Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters() {
+        return mcastHandler.getMcastFilters();
+    }
+
+    @Override
     public Map<Set<DeviceId>, NodeId> getShouldProgram() {
         return defaultRoutingHandler == null ? ImmutableMap.of() :
                 ImmutableMap.copyOf(defaultRoutingHandler.shouldProgram);
@@ -1412,6 +1418,8 @@
                              event.type());
                     processPortUpdatedInternal(((Device) event.subject()),
                                        ((DeviceEvent) event).port());
+                    mcastHandler.processPortUpdate(((Device) event.subject()),
+                                                   ((DeviceEvent) event).port());
                 } else if (event.type() == TopologyEvent.Type.TOPOLOGY_CHANGED) {
                     // Process topology event, needed for all modules relying on
                     // topology service for path computation
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index dd84eeb..7721036 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -29,6 +29,7 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.segmentrouting.grouphandler.NextNeighbors;
+import org.onosproject.segmentrouting.mcast.McastFilteringObjStoreKey;
 import org.onosproject.segmentrouting.mcast.McastRole;
 import org.onosproject.segmentrouting.mcast.McastRoleStoreKey;
 import org.onosproject.segmentrouting.pwaas.DefaultL2TunnelDescription;
@@ -346,6 +347,13 @@
     boolean shouldProgram(DeviceId deviceId);
 
     /**
+     * Returns the mcast filtering obj.
+     *
+     * @return the mapping group-node
+     */
+    Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters();
+
+    /**
      * Gets application id.
      *
      * @return application id
@@ -391,4 +399,5 @@
     default Optional<PortNumber> getPairLocalPort(DeviceId deviceId) {
         throw new NotImplementedException("getPairLocalPort not implemented");
     }
+
 }
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/McastFilterListCommand.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/McastFilterListCommand.java
new file mode 100644
index 0000000..bc1eb42
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/McastFilterListCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.DeviceId;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+import org.onosproject.segmentrouting.mcast.McastFilteringObjStoreKey;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command to show the list of mcast filtering obj.
+ */
+@Service
+@Command(scope = "onos", name = "sr-filt-mcast",
+        description = "Lists all mcast filtering objs")
+public class McastFilterListCommand extends AbstractShellCommand {
+
+    private static final String FORMAT_HEADER = "device=%s";
+    private static final String FILTER_HEADER = "\t%s,%s,%s";
+
+    @Override
+    protected void doExecute() {
+        // Get SR service
+        SegmentRoutingService srService = get(SegmentRoutingService.class);
+        // Get the filt objs
+        Map<DeviceId, List<McastFilteringObjStoreKey>> filteringObjKeys = srService.getMcastFilters();
+        filteringObjKeys.forEach(this::printMcastFilter);
+    }
+
+    private void printMcastFilter(DeviceId deviceId, List<McastFilteringObjStoreKey> filteringObjs) {
+        print(FORMAT_HEADER, deviceId);
+        filteringObjs.forEach(filteringObj -> print(FILTER_HEADER, filteringObj.ingressCP(),
+                                                    filteringObj.isIpv4() ? "IPv4" : "IPv6",
+                                                    filteringObj.vlanId()));
+    }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastFilteringObjStoreKey.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastFilteringObjStoreKey.java
new file mode 100644
index 0000000..6d69984
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastFilteringObjStoreKey.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.mcast;
+
+import org.onlab.packet.VlanId;
+import org.onosproject.net.ConnectPoint;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Key of multicast filtering objective store.
+ */
+public class McastFilteringObjStoreKey {
+
+    private final ConnectPoint ingressCP;
+    private final VlanId vlanId;
+    private final boolean isIpv4;
+
+    /**
+     * Constructs the key of multicast filtering objective store.
+     *
+     * @param ingressCP ingress ConnectPoint
+     * @param vlanId vlan id
+     * @param isIpv4 is Ipv4
+     */
+    public McastFilteringObjStoreKey(ConnectPoint ingressCP, VlanId vlanId, boolean isIpv4) {
+        checkNotNull(ingressCP, "connectpoint cannot be null");
+        checkNotNull(vlanId, "vlanid cannot be null");
+        this.ingressCP = ingressCP;
+        this.vlanId = vlanId;
+        this.isIpv4 = isIpv4;
+    }
+
+    // Constructor for serialization
+    private McastFilteringObjStoreKey() {
+        this.ingressCP = null;
+        this.vlanId = null;
+        this.isIpv4 = false;
+    }
+
+
+    /**
+     * Returns the connect point.
+     *
+     * @return ingress connectpoint
+     */
+    public ConnectPoint ingressCP() {
+        return ingressCP;
+    }
+
+    /**
+     * Returns whether the filtering is for ipv4 mcast.
+     *
+     * @return isIpv4
+     */
+    public boolean isIpv4() {
+        return isIpv4;
+    }
+
+    /**
+     * Returns the vlan ID of this key.
+     *
+     * @return vlan ID
+     */
+    public VlanId vlanId() {
+        return vlanId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof McastFilteringObjStoreKey)) {
+            return false;
+        }
+        McastFilteringObjStoreKey that =
+                (McastFilteringObjStoreKey) o;
+        return (Objects.equals(this.ingressCP, that.ingressCP) &&
+                Objects.equals(this.isIpv4, that.isIpv4) &&
+                Objects.equals(this.vlanId, that.vlanId));
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(ingressCP, vlanId, isIpv4);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(getClass())
+                .add("ingressCP", ingressCP)
+                .add("isIpv4", isIpv4)
+                .add("vlanId", vlanId)
+                .toString();
+    }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastFilteringObjStoreSerializer.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastFilteringObjStoreSerializer.java
new file mode 100644
index 0000000..a23fa69
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastFilteringObjStoreSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.mcast;
+
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.packet.VlanId;
+import org.onosproject.net.ConnectPoint;
+
+/**
+ * Custom serializer for {@link McastFilteringObjStoreKey}.
+ */
+class McastFilteringObjStoreSerializer extends Serializer<McastFilteringObjStoreKey> {
+
+    /**
+     * Creates {@link McastFilteringObjStoreSerializer} serializer instance.
+     */
+    McastFilteringObjStoreSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, McastFilteringObjStoreKey object) {
+        kryo.writeClassAndObject(output, object.ingressCP());
+        kryo.writeClassAndObject(output, object.vlanId());
+        kryo.writeClassAndObject(output, object.isIpv4());
+    }
+
+    @Override
+    public McastFilteringObjStoreKey read(Kryo kryo, Input input, Class<McastFilteringObjStoreKey> type) {
+        ConnectPoint ingressCP = (ConnectPoint) kryo.readClassAndObject(input);
+        VlanId vlanId = (VlanId) kryo.readClassAndObject(input);
+        boolean isIpv4 = (boolean) kryo.readClassAndObject(input);
+        return new McastFilteringObjStoreKey(ingressCP, vlanId, isIpv4);
+    }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index ae6902d..52618f4 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -38,11 +38,13 @@
 import org.onosproject.mcast.api.McastRoute;
 import org.onosproject.mcast.api.McastRouteData;
 import org.onosproject.mcast.api.McastRouteUpdate;
+import org.onosproject.net.Device;
 import org.onosproject.net.HostId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
 import org.onosproject.net.Path;
+import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.flowobjective.DefaultObjectiveContext;
 import org.onosproject.net.flowobjective.ForwardingObjective;
@@ -55,6 +57,7 @@
 import org.onosproject.segmentrouting.SegmentRoutingManager;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedSet;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
@@ -64,6 +67,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -100,6 +104,7 @@
     private final McastUtils mcastUtils;
     private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
     private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
+    private final DistributedSet<McastFilteringObjStoreKey> mcastFilteringObjStore;
 
     // Wait time for the cache
     private static final int WAIT_TIME_MS = 1000;
@@ -273,6 +278,15 @@
                 .withName("onos-mcast-role-store")
                 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
                 .build();
+        mcastKryo = new KryoNamespace.Builder()
+                .register(KryoNamespaces.API)
+                .register(new McastFilteringObjStoreSerializer(), McastFilteringObjStoreKey.class);
+        mcastFilteringObjStore = srManager.storageService
+                .<McastFilteringObjStoreKey>setBuilder()
+                .withName("onos-mcast-filtering-store")
+                .withSerializer(Serializer.using(mcastKryo.build("McastHandler-FilteringObj")))
+                .build()
+                .asDistributedSet();
         mcastUtils = new McastUtils(srManager, coreAppId, log);
         // Init the executor service, the buckets corrector and schedule the clean up
         executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
@@ -334,6 +348,7 @@
         executorService.shutdown();
         mcastNextObjStore.destroy();
         mcastRoleStore.destroy();
+        mcastFilteringObjStore.destroy();
         mcastUtils.terminate();
         log.info("Terminated");
     }
@@ -414,6 +429,7 @@
                 log.debug("Skip {} due to empty sources to be removed", mcastIp);
                 return;
             }
+            // Let's heal the trees
             Set<Link> remainingLinks = Sets.newHashSet();
             Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
             Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
@@ -713,8 +729,9 @@
         try {
             log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
             // Process the ingress device
-            mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
-                                         mcastUtils.assignedVlan(source), mcastIp, INGRESS);
+            McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+                                                               mcastUtils.assignedVlan(source), mcastIp.isIp4());
+            addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
             if (source.deviceId().equals(sink.deviceId())) {
                 if (source.port().equals(sink.port())) {
                     log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
@@ -736,14 +753,18 @@
                     addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
                                     mcastUtils.assignedVlan(link.src().deviceId()
                                                                     .equals(source.deviceId()) ? source : null));
-                    mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
-                                      mcastUtils.assignedVlan(null), mcastIp, null);
+                    McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
+                                                                       mcastUtils.assignedVlan(null), mcastIp.isIp4());
+                    addFilterToDevice(filteringKey, mcastIp, null);
                 });
                 // Setup mcast role for the transit
                 links.stream()
                         .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
-                        .forEach(link -> mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
-                                                                                  source), TRANSIT));
+                        .forEach(link -> {
+                            log.trace("Transit links {}", link);
+                            mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
+                                    source), TRANSIT);
+                        });
                 // Process the egress device
                 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
                 // Setup mcast role for egress
@@ -757,6 +778,20 @@
     }
 
     /**
+     * Processes PORT_UPDATED event.
+     *
+     * @param affectedDevice Affected device
+     * @param affectedPort Affected port
+     */
+    public void processPortUpdate(Device affectedDevice, Port affectedPort) {
+        // Clean the filtering obj store. Edge port case.
+        ConnectPoint portDown = new ConnectPoint(affectedDevice.id(), affectedPort.number());
+        if (!affectedPort.isEnabled()) {
+            updateFilterObjStoreByPort(portDown);
+        }
+    }
+
+    /**
      * Processes the LINK_DOWN event.
      *
      * @param linkDown Link that is going down
@@ -765,7 +800,7 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            // Get groups affected by the link down event
+            // Get mcast groups affected by the link going down
             Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
             log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
             affectedGroups.forEach(mcastIp -> {
@@ -789,6 +824,7 @@
             // Get the mcast groups affected by the device going down
             Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
             log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
+            updateFilterObjStoreByDevice(deviceDown);
             affectedGroups.forEach(mcastIp -> {
                 log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
                 recoverFailure(mcastIp, deviceDown);
@@ -1193,6 +1229,23 @@
             }
             // Store the new port
             mcastNextObjStore.put(mcastStoreKey, newNextObj);
+            // Create, store and apply the new nextObj and fwdObj
+            ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
+                        mcastIp, deviceId, port.toLong(), assignedVlan),
+                (objective, error) -> {
+                    log.warn("Failed to add {} on {}/{}, vlan {}: {}",
+                            mcastIp, deviceId, port.toLong(), assignedVlan, error);
+                    srManager.invalidateNextObj(objective.id());
+                });
+            ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
+                                                          newNextObj.id()).add(context);
+            if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+                log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
+            } else {
+                srManager.flowObjectiveService.next(deviceId, newNextObj);
+                srManager.flowObjectiveService.forward(deviceId, fwdObj);
+            }
         } else {
             // This device already serves some subscribers of this mcast group
             NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
@@ -1214,23 +1267,13 @@
             portBuilder.add(port);
             newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                         portBuilder.build(), nextObj.id()).addToExisting();
-        }
-        // Create, store and apply the new nextObj and fwdObj
-        ObjectiveContext context = new DefaultObjectiveContext(
-                (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
-                        mcastIp, deviceId, port.toLong(), assignedVlan),
-                (objective, error) -> {
-                    log.warn("Failed to add {} on {}/{}, vlan {}: {}",
-                            mcastIp, deviceId, port.toLong(), assignedVlan, error);
-                    srManager.invalidateNextObj(objective.id());
-                });
-        ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
-                                                              newNextObj.id()).add(context);
-        if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
-            log.debug("skip next and forward flowobjective addition for device: {}", deviceId);
-        } else {
-            srManager.flowObjectiveService.next(deviceId, newNextObj);
-            srManager.flowObjectiveService.forward(deviceId, fwdObj);
+            if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+                log.debug("skip next flowobjective update for device: {}", deviceId);
+            } else {
+                // no need to update the flow here since we have updated the nextobjective/group
+                // the existing flow will keep pointing to the updated nextobj
+                srManager.flowObjectiveService.next(deviceId, newNextObj);
+            }
         }
     }
 
@@ -1276,31 +1319,27 @@
                     (objective, error) -> log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
                                     mcastIp, deviceId, port.toLong(), assignedVlan, error));
             fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+            if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+                log.debug("skip forward flowobjective removal for device: {}", deviceId);
+            } else {
+                srManager.flowObjectiveService.forward(deviceId, fwdObj);
+            }
             mcastNextObjStore.remove(mcastStoreKey);
         } else {
-            // If this is not the last sink, update flows and groups
-            context = new DefaultObjectiveContext(
-                    (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
-                            mcastIp, deviceId, port.toLong(), assignedVlan),
-                    (objective, error) -> {
-                        log.warn("Failed to update {} on {}/{}, vlan {}: {}",
-                                mcastIp, deviceId, port.toLong(), assignedVlan, error);
-                        srManager.invalidateNextObj(objective.id());
-                    });
             // Here we store the next objective with the remaining port
             newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                         existingPorts, nextObj.id()).removeFromExisting();
-            fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
             mcastNextObjStore.put(mcastStoreKey, newNextObj);
-        }
-        // Let's modify the next objective removing the bucket
-        newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
+             // Let's modify the next objective removing the bucket
+            newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                     ImmutableSet.of(port), nextObj.id()).removeFromExisting();
-        if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
-            log.debug("skip forward and next flow objectives from adding flows on device: {}", deviceId);
-        } else {
-            srManager.flowObjectiveService.next(deviceId, newNextObj);
-            srManager.flowObjectiveService.forward(deviceId, fwdObj);
+            if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+                log.debug("skip next flowobjective update for device: {}", deviceId);
+            } else {
+                // no need to update the flow here since we have updated the next objective + group
+                // the existing flow will keep pointing to the updated nextobj
+                srManager.flowObjectiveService.next(deviceId, newNextObj);
+            }
         }
         return existingPorts.isEmpty();
     }
@@ -1349,8 +1388,9 @@
         links.forEach(link -> {
             addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
                             mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
-            mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
-                              mcastUtils.assignedVlan(null), mcastIp, null);
+            McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(link.dst(),
+                    mcastUtils.assignedVlan(null), mcastIp.isIp4());
+            addFilterToDevice(mcastFilterObjStoreKey, mcastIp, null);
         });
         // Setup mcast role for the transit
         links.stream()
@@ -1779,7 +1819,9 @@
                 sources.forEach(source -> {
                     if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
                         if (install) {
-                            mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
+                            McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+                                    vlanId, mcastRoute.group().isIp4());
+                            addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
                         } else {
                             mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
                         }
@@ -1792,13 +1834,104 @@
     }
 
     /**
+     * Add filtering to the device if needed.
+     *
+     * @param filterObjStoreKey the filtering obj key
+     * @param mcastIp the multicast group
+     * @param mcastRole the multicast role
+     */
+    private void addFilterToDevice(McastFilteringObjStoreKey filterObjStoreKey,
+                                   IpAddress mcastIp,
+                                   McastRole mcastRole) {
+        if (!containsFilterInTheDevice(filterObjStoreKey)) {
+            // if this is the first sink for this group/device
+            // match additionally on mac
+            log.debug("Filtering not available for device {}, vlan {} and {}",
+                      filterObjStoreKey.ingressCP().deviceId(), filterObjStoreKey.vlanId(),
+                      filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
+            mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
+                                         filterObjStoreKey.ingressCP().port(),
+                                         filterObjStoreKey.vlanId(), mcastIp,
+                                         mcastRole, true);
+            mcastFilteringObjStore.add(filterObjStoreKey);
+        } else if (!mcastFilteringObjStore.contains(filterObjStoreKey)) {
+            // match only vlan
+            log.debug("Filtering not available for connect point {}, vlan {} and {}",
+                      filterObjStoreKey.ingressCP(), filterObjStoreKey.vlanId(),
+                      filterObjStoreKey.isIpv4() ? "IPv4" : "IPv6");
+            mcastUtils.addFilterToDevice(filterObjStoreKey.ingressCP().deviceId(),
+                                         filterObjStoreKey.ingressCP().port(),
+                                         filterObjStoreKey.vlanId(), mcastIp,
+                                         mcastRole, false);
+            mcastFilteringObjStore.add(filterObjStoreKey);
+        } else {
+            // do nothing
+            log.debug("Filtering already present. Abort");
+        }
+    }
+
+    /**
+     * Verify if there are related filtering obj in the device.
+     *
+     * @param filteringKey the filtering obj key
+     * @return true if related filtering obj are found
+     */
+    private boolean containsFilterInTheDevice(McastFilteringObjStoreKey filteringKey) {
+        // check if filters are already added on the device
+        McastFilteringObjStoreKey key = mcastFilteringObjStore.stream()
+                .filter(mcastFilteringKey ->
+                                mcastFilteringKey.ingressCP().deviceId().equals(filteringKey.ingressCP().deviceId())
+                                        && mcastFilteringKey.isIpv4() == filteringKey.isIpv4()
+                                        && mcastFilteringKey.vlanId().equals(filteringKey.vlanId())
+                ).findFirst().orElse(null);
+        // we are interested to filt obj on the same device, same vlan and same ip type
+        return key != null;
+    }
+
+    /**
+     * Update the filtering objective store upon device failure.
+     *
+     * @param affectedDevice the affected device
+     */
+    private void updateFilterObjStoreByDevice(DeviceId affectedDevice) {
+        // purge the related filter objective key
+        Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
+        Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
+        McastFilteringObjStoreKey filterKey;
+        while (filterIterator.hasNext()) {
+            filterKey = filterIterator.next();
+            if (filterKey.ingressCP().deviceId().equals(affectedDevice)) {
+                mcastFilteringObjStore.remove(filterKey);
+            }
+        }
+    }
+
+    /**
+     * Update the filtering objective store upon port failure.
+     *
+     * @param affectedPort the affected port
+     */
+    private void updateFilterObjStoreByPort(ConnectPoint affectedPort) {
+        // purge the related filter objective key
+        Set<McastFilteringObjStoreKey> filterObjs = Sets.newHashSet(mcastFilteringObjStore);
+        Iterator<McastFilteringObjStoreKey> filterIterator = filterObjs.iterator();
+        McastFilteringObjStoreKey filterKey;
+        while (filterIterator.hasNext()) {
+            filterKey = filterIterator.next();
+            if (filterKey.ingressCP().equals(affectedPort)) {
+                mcastFilteringObjStore.remove(filterKey);
+            }
+        }
+    }
+
+    /**
      * Performs bucket verification operation for all mcast groups in the devices.
      * Firstly, it verifies that mcast is stable before trying verification operation.
      * Verification consists in creating new nexts with VERIFY operation. Actually,
      * the operation is totally delegated to the driver.
      */
     private final class McastBucketCorrector implements Runnable {
-
+        // Internal params
         private static final int MAX_VERIFY_ON_FLIGHT = 10;
         private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
         // Define the context used for the back pressure mechanism
@@ -1875,7 +2008,7 @@
                         for (DeviceId deviceId : devicesToProcess) {
                             if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
                                 log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
-                                return;
+                                continue;
                             }
                             synchronized (verifyOnFlight) {
                                 while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
@@ -2011,4 +2144,25 @@
     public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
         return mcastUtils.getMcastLeaders(mcastIp);
     }
+
+    /**
+     * Returns the mcast filtering obj.
+     *
+     * @return the mapping group-node
+     */
+    public Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters() {
+        Map<DeviceId, List<McastFilteringObjStoreKey>> mapping = Maps.newHashMap();
+        Set<McastFilteringObjStoreKey> currentKeys = Sets.newHashSet(mcastFilteringObjStore);
+        currentKeys.forEach(filteringObjStoreKey ->
+            mapping.compute(filteringObjStoreKey.ingressCP().deviceId(), (k, v) -> {
+                List<McastFilteringObjStoreKey> values = v;
+                if (values == null) {
+                    values = Lists.newArrayList();
+                }
+                values.add(filteringObjStoreKey);
+                return values;
+            })
+        );
+        return mapping;
+    }
 }
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index ed63eff..c4e1ad4 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -134,10 +134,10 @@
      * @param assignedVlan assigned VLAN ID
      * @param mcastIp the group address
      * @param mcastRole the role of the device
+     * @param matchOnMac match or not on macaddress
      */
     void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan,
-                           IpAddress mcastIp, McastRole mcastRole) {
-
+                           IpAddress mcastIp, McastRole mcastRole, boolean matchOnMac) {
         if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
             log.debug("skip update of fitering objective for unconfigured device: {}", deviceId);
             return;
@@ -147,9 +147,8 @@
         if (MacAddress.NONE.equals(routerMac)) {
             return;
         }
-
         FilteringObjective.Builder filtObjBuilder = filterObjBuilder(port, assignedVlan, mcastIp,
-                                                                     routerMac, mcastRole);
+                                                                     routerMac, mcastRole, matchOnMac);
         ObjectiveContext context = new DefaultObjectiveContext(
                 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
                                          deviceId, port.toLong(), assignedVlan),
@@ -170,7 +169,6 @@
      */
     void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan,
                               IpAddress mcastIp, McastRole mcastRole) {
-
         if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
             log.debug("skip update of fitering objective for unconfigured device: {}", deviceId);
             return;
@@ -180,9 +178,8 @@
         if (MacAddress.NONE.equals(routerMac)) {
             return;
         }
-
         FilteringObjective.Builder filtObjBuilder =
-                filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
+                filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole, false);
         ObjectiveContext context = new DefaultObjectiveContext(
                 (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
                                          deviceId, port.toLong(), assignedVlan),
@@ -367,10 +364,12 @@
      * @param routerMac router MAC. This is carried in metadata and used from some switches that
      *                  need to put unicast entry before multicast entry in TMAC table.
      * @param mcastRole the Multicast role
+     * @param matchOnMac match or not on macaddress
      * @return filtering objective builder
      */
     private FilteringObjective.Builder filterObjBuilder(PortNumber ingressPort, VlanId assignedVlan,
-                                                IpAddress mcastIp, MacAddress routerMac, McastRole mcastRole) {
+                                                        IpAddress mcastIp, MacAddress routerMac, McastRole mcastRole,
+                                                        boolean matchOnMac) {
         FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
         // Let's add the in port matching and the priority
         filtBuilder.withKey(Criteria.matchInPort(ingressPort))
@@ -382,23 +381,27 @@
         } else {
             filtBuilder.addCondition(Criteria.matchVlanId(ingressVlan()));
         }
-        // According to the IP type we set the proper match on the mac address
-        if (mcastIp.isIp4()) {
-            filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
-                                                                MacAddress.IPV4_MULTICAST_MASK));
-        } else {
-            filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
-                                                                MacAddress.IPV6_MULTICAST_MASK));
+        // Add vlan info to the treatment builder
+        TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder()
+                .pushVlan().setVlanId(assignedVlan);
+        // Additionally match on mac address and augment the treatment
+        if (matchOnMac) {
+            // According to the IP type we set the proper match on the mac address
+            if (mcastIp.isIp4()) {
+                filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
+                        MacAddress.IPV4_MULTICAST_MASK));
+            } else {
+                filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
+                        MacAddress.IPV6_MULTICAST_MASK));
+            }
+            // We set mac address to the treatment
+            if (routerMac != null && !routerMac.equals(MacAddress.NONE)) {
+                ttb.setEthDst(routerMac);
+            }
         }
         // We finally build the meta treatment
-        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
-        tBuilder.pushVlan().setVlanId(assignedVlan);
-
-        if (routerMac != null && !routerMac.equals(MacAddress.NONE)) {
-            tBuilder.setEthDst(routerMac);
-        }
-
-        filtBuilder.withMeta(tBuilder.build());
+        TrafficTreatment tt = ttb.build();
+        filtBuilder.withMeta(tt);
         // Done, we return a permit filtering objective
         return filtBuilder.permit().fromApp(srManager.appId());
     }
diff --git a/tools/package/runtime/bin/onos-diagnostics b/tools/package/runtime/bin/onos-diagnostics
index c2e9f77..2ac2310 100755
--- a/tools/package/runtime/bin/onos-diagnostics
+++ b/tools/package/runtime/bin/onos-diagnostics
@@ -105,6 +105,7 @@
     "sr-mcast-role"
     "sr-pw-list"
     "sr-next-mcast"
+    "sr-filt-mcast"
     "sr-next-dst"
     "sr-next-port"
     "sr-next-vlan"