support addition and removal of mcast sinks
vlan mcast rules are now optional

Change-Id: Icb7022089a6e139970040d8cdea97df0cdc8dc7c
diff --git a/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java b/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
index b0392f4..2b140a8 100644
--- a/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
+++ b/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
@@ -28,10 +28,8 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
-import org.onlab.packet.IPv4;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
-import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.codec.CodecService;
 import org.onosproject.core.ApplicationId;
@@ -60,10 +58,13 @@
 
 import java.util.Dictionary;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static com.google.common.net.MediaType.JSON_UTF_8;
+import static org.onlab.util.Tools.get;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -76,6 +77,10 @@
 
     private static final int DEFAULT_PRIORITY = 1000;
     private static final short DEFAULT_MCAST_VLAN = 4000;
+    private static final String DEFAULT_SYNC_HOST = "10.90.0.8:8181";
+    private static final String DEFAULT_USER = "karaf";
+    private static final String DEFAULT_PASSWORD = "karaf";
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -96,11 +101,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService componentConfigService;
 
-
     protected McastListener listener = new InternalMulticastListener();
 
-
-
     //TODO: move this to a ec map
     private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
 
@@ -109,18 +111,21 @@
 
     private ApplicationId appId;
 
-    //TODO: network config this
-    private short mcastVlan = DEFAULT_MCAST_VLAN;
+    @Property(name = "mcastVlan", intValue = DEFAULT_MCAST_VLAN,
+            label = "VLAN for multicast traffic")
+    private int mcastVlan = DEFAULT_MCAST_VLAN;
 
-    // TODO component config this
+    @Property(name = "vlanEnabled", boolValue = false,
+            label = "Use vlan for multicast traffic")
+    private boolean vlanEnabled = false;
+
+    @Property(name = "priority", intValue = DEFAULT_PRIORITY,
+            label = "Priority for multicast rules")
     private int priority = DEFAULT_PRIORITY;
 
-    private static final String DEFAULT_USER = "karaf";
-    private static final String DEFAULT_PASSWORD = "karaf";
-
-    @Property(name = "syncHost", value = "",
+    @Property(name = "syncHost", value = DEFAULT_SYNC_HOST,
             label = "host:port to synchronize routes to")
-    private String syncHost = "10.90.0.8:8181";
+    private String syncHost = DEFAULT_SYNC_HOST;
 
     @Property(name = "username", value = DEFAULT_USER,
             label = "Username for REST password authentication")
@@ -153,10 +158,37 @@
 
     @Modified
     public void modified(ComponentContext context) {
-        Dictionary<?, ?> properties = context.getProperties();
-        user = Tools.get(properties, "username");
-        password = Tools.get(properties, "password");
-        syncHost = Tools.get(properties, "syncHost");
+        Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+
+        try {
+            String s = get(properties, "username");
+            user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
+
+            s = get(properties, "password");
+            password = isNullOrEmpty(s) ? DEFAULT_PASSWORD : s.trim();
+
+            s = get(properties, "mcastVlan");
+            mcastVlan = isNullOrEmpty(s) ? DEFAULT_MCAST_VLAN : Short.parseShort(s.trim());
+
+            s = get(properties, "vlanEnabled");
+            vlanEnabled = isNullOrEmpty(s) || Boolean.parseBoolean(s.trim());
+
+            s = get(properties, "priority");
+            priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
+
+            s = get(properties, syncHost);
+            syncHost = isNullOrEmpty(s) ? DEFAULT_SYNC_HOST : s.trim();
+        } catch (Exception e) {
+            user = DEFAULT_USER;
+            password = DEFAULT_PASSWORD;
+            syncHost = DEFAULT_SYNC_HOST;
+            mcastVlan = DEFAULT_MCAST_VLAN;
+            vlanEnabled = false;
+            priority = DEFAULT_PRIORITY;
+        }
+
+
     }
 
     private class InternalMulticastListener implements McastListener {
@@ -173,6 +205,7 @@
                     provisionGroup(event.subject());
                     break;
                 case SINK_REMOVED:
+                    unprovisionGroup(event.subject());
                     break;
                 default:
                     log.warn("Unknown mcast event {}", event.type());
@@ -180,6 +213,39 @@
         }
     }
 
+    private void unprovisionGroup(McastRouteInfo info) {
+        if (!info.sink().isPresent()) {
+            log.warn("No sink given after sink removed event: {}", info);
+            return;
+        }
+        ConnectPoint loc = info.sink().get();
+
+        NextObjective next = DefaultNextObjective.builder()
+                .fromApp(appId)
+                .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+                .withType(NextObjective.Type.BROADCAST)
+                .withId(groups.get(info.route().group()))
+                .removeFromExisting(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        //TODO: change to debug
+                        log.info("Next Objective {} installed", objective.id());
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        //TODO: change to debug
+                        log.info("Next Objective {} failed, because {}",
+                                 objective.id(),
+                                 error);
+                    }
+                });
+
+        flowObjectiveService.next(loc.deviceId(), next);
+
+
+    }
+
     private void provisionGroup(McastRouteInfo info) {
         if (!info.sink().isPresent()) {
             log.warn("No sink given after sink added event: {}", info);
@@ -192,12 +258,37 @@
         Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> {
             Integer id = allocateId();
 
-            TrafficSelector mcast = DefaultTrafficSelector.builder()
-                    .matchVlanId(VlanId.vlanId(mcastVlan))
+            NextObjective next = DefaultNextObjective.builder()
+                    .fromApp(appId)
+                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+                    .withType(NextObjective.Type.BROADCAST)
+                    .withId(id)
+                    .add(new ObjectiveContext() {
+                        @Override
+                        public void onSuccess(Objective objective) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} installed", objective.id());
+                        }
+
+                        @Override
+                        public void onError(Objective objective, ObjectiveError error) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} failed, because {}",
+                                     objective.id(),
+                                     error);
+                        }
+                    });
+
+            flowObjectiveService.next(loc.deviceId(), next);
+
+            TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
                     .matchEthType(Ethernet.TYPE_IPV4)
-                    .matchIPProtocol(IPv4.PROTOCOL_IGMP)
-                    .matchIPDst(g.toIpPrefix())
-                    .build();
+                    .matchIPDst(g.toIpPrefix());
+
+
+            if (vlanEnabled) {
+                mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
+            }
 
 
             ForwardingObjective fwd = DefaultForwardingObjective.builder()
@@ -206,7 +297,7 @@
                     .makePermanent()
                     .withFlag(ForwardingObjective.Flag.VERSATILE)
                     .withPriority(priority)
-                    .withSelector(mcast)
+                    .withSelector(mcast.build())
                     .add(new ObjectiveContext() {
                         @Override
                         public void onSuccess(Objective objective) {
@@ -228,28 +319,30 @@
            return id;
         });
 
-        NextObjective next = DefaultNextObjective.builder()
-                .fromApp(appId)
-                .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
-                .withType(NextObjective.Type.BROADCAST)
-                .withId(nextId)
-                .addToExisting(new ObjectiveContext() {
-                    @Override
-                    public void onSuccess(Objective objective) {
-                        //TODO: change to debug
-                        log.info("Next Objective {} installed", objective.id());
-                    }
+        if (!sync.get()) {
+            NextObjective next = DefaultNextObjective.builder()
+                    .fromApp(appId)
+                    .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+                    .withType(NextObjective.Type.BROADCAST)
+                    .withId(nextId)
+                    .addToExisting(new ObjectiveContext() {
+                        @Override
+                        public void onSuccess(Objective objective) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} installed", objective.id());
+                        }
 
-                    @Override
-                    public void onError(Objective objective, ObjectiveError error) {
-                        //TODO: change to debug
-                        log.info("Next Objective {} failed, because {}",
-                                 objective.id(),
-                                 error);
-                    }
-                });
+                        @Override
+                        public void onError(Objective objective, ObjectiveError error) {
+                            //TODO: change to debug
+                            log.info("Next Objective {} failed, because {}",
+                                     objective.id(),
+                                     error);
+                        }
+                    });
 
-        flowObjectiveService.next(loc.deviceId(), next);
+            flowObjectiveService.next(loc.deviceId(), next);
+        }
 
         if (sync.get()) {
             syncRoute(info);
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
index 52760a6..2cec57b 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
@@ -246,7 +246,7 @@
                 .withMeta(DefaultTrafficTreatment.builder()
                                   .setOutput(PortNumber.CONTROLLER).build())
                 .fromApp(appId)
-                .withPriority(1000)
+                .withPriority(10000)
                 .add(new ObjectiveContext() {
                     @Override
                     public void onSuccess(Objective objective) {
@@ -417,6 +417,7 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
+            DeviceId devId = event.subject().id();
             switch (event.type()) {
 
                 case DEVICE_ADDED:
@@ -427,11 +428,15 @@
                 case PORT_STATS_UPDATED:
                     break;
                 case PORT_ADDED:
-                    if (event.port().isEnabled()) {
+                    if (!oltData.get(devId).uplink().equals(event.port().number()) &&
+                            event.port().isEnabled()) {
                         processFilterObjective(event.subject().id(), event.port(), false);
                     }
                     break;
                 case PORT_UPDATED:
+                    if (oltData.get(devId).uplink().equals(event.port().number())) {
+                        break;
+                    }
                     if (event.port().isEnabled()) {
                         processFilterObjective(event.subject().id(), event.port(), false);
                     } else {
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 3163104..624bcd1 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -15,13 +15,10 @@
  */
 package org.onosproject.store.serializers;
 
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 import org.onlab.packet.ChassisId;
 import org.onlab.packet.EthType;
 import org.onlab.packet.Ip4Address;
@@ -90,8 +87,8 @@
 import org.onosproject.net.device.DefaultPortStatistics;
 import org.onosproject.net.device.OchPortDescription;
 import org.onosproject.net.device.OduCltPortDescription;
-import org.onosproject.net.device.OtuPortDescription;
 import org.onosproject.net.device.OmsPortDescription;
+import org.onosproject.net.device.OtuPortDescription;
 import org.onosproject.net.device.PortStatistics;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
@@ -257,12 +254,11 @@
             .register(HashMap.class)
             .register(ConcurrentHashMap.class)
             .register(CopyOnWriteArraySet.class)
-            .register(new JavaSerializer(), Sets.newConcurrentHashSet().getClass())
             .register(ArrayList.class,
                       LinkedList.class,
                       HashSet.class,
                       LinkedHashSet.class
-                      )
+            )
             .register(Maps.immutableEntry("a", "b").getClass())
             .register(new ArraysAsListSerializer(), Arrays.asList().getClass())
             .register(Collections.singletonList(1).getClass())
@@ -541,5 +537,6 @@
 
 
     // not to be instantiated
-    private KryoNamespaces() {}
+    private KryoNamespaces() {
+    }
 }
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index f59c527..b781732 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -25,6 +25,7 @@
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.packet.EthType;
 import org.onlab.packet.IPv4;
+import org.onlab.packet.IpPrefix;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
@@ -47,6 +48,7 @@
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
 import org.onosproject.net.flow.criteria.IPProtocolCriterion;
 import org.onosproject.net.flow.criteria.PortCriterion;
 import org.onosproject.net.flow.criteria.VlanIdCriterion;
@@ -99,6 +101,7 @@
 
     private DeviceId deviceId;
     private ApplicationId appId;
+    private IpPrefix mcastPrefix = IpPrefix.valueOf("224.0.0.0/4");
 
     protected FlowObjectiveStore flowObjectiveStore;
 
@@ -253,26 +256,33 @@
         GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
         GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
 
-        GroupDescription groupDesc =
-                new DefaultGroupDescription(deviceId,
-                                            GroupDescription.Type.ALL,
-                                            new GroupBuckets(Collections.singletonList(bucket)),
-                                            key,
-                                            null,
-                                            nextObjective.appId());
+
 
         pendingGroups.put(key, nextObjective);
 
         switch (nextObjective.op()) {
             case ADD:
+                GroupDescription groupDesc =
+                        new DefaultGroupDescription(deviceId,
+                                                    GroupDescription.Type.ALL,
+                                                    new GroupBuckets(Collections.singletonList(bucket)),
+                                                    key,
+                                                    null,
+                                                    nextObjective.appId());
                 groupService.addGroup(groupDesc);
                 break;
             case REMOVE:
                 groupService.removeGroup(deviceId, key, nextObjective.appId());
                 break;
             case ADD_TO_EXISTING:
+                groupService.addBucketsToGroup(deviceId, key,
+                                               new GroupBuckets(Collections.singletonList(bucket)),
+                                               key, nextObjective.appId());
+                break;
             case REMOVE_FROM_EXISTING:
-                //TODO: handle addition to group when caller signals it.
+                groupService.removeBucketsFromGroup(deviceId, key,
+                                               new GroupBuckets(Collections.singletonList(bucket)),
+                                               key, nextObjective.appId());
                 break;
             default:
                 log.warn("Unknown next objective operation: {}", nextObjective.op());
@@ -287,14 +297,14 @@
             fail(fwd, ObjectiveError.BADPARAMS);
         }
 
-        OLTPipelineGroup next = getGroupForNextObjective(fwd.nextId());
+        GroupKey key = getGroupForNextObjective(fwd.nextId());
 
-        if (next == null) {
+        if (key == null) {
             log.error("Group for forwarding objective missing: {}", fwd);
             fail(fwd, ObjectiveError.GROUPMISSING);
         }
 
-        Group group = groupService.getGroup(deviceId, next.key());
+        Group group = groupService.getGroup(deviceId, key);
         TrafficTreatment treatment =
                 buildTreatment(Instructions.createGroup(group.id()));
 
@@ -330,16 +340,20 @@
 
     private boolean checkForMulticast(ForwardingObjective fwd) {
 
-        VlanIdCriterion vlan = (VlanIdCriterion) filterForCriterion(fwd.selector().criteria(),
-                                                                    Criterion.Type.VLAN_VID);
+        IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
+                                                                                Criterion.Type.IPV4_DST);
 
-        return (vlan != null && vlan.vlanId().equals(VlanId.vlanId(MCAST_VLAN)));
+        if (ip == null) {
+            return false;
+        }
+
+        return mcastPrefix.contains(ip.ip());
 
     }
 
-    private OLTPipelineGroup getGroupForNextObjective(Integer nextId) {
+    private GroupKey getGroupForNextObjective(Integer nextId) {
         NextGroup next = flowObjectiveStore.getNextGroup(nextId);
-        return (OLTPipelineGroup) appKryo.deserialize(next.data());
+        return appKryo.deserialize(next.data());
 
     }
 
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
index 331ecbb..06e908b 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
@@ -54,12 +54,12 @@
         mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
                 .withName(MCASTRIB)
                 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
-                        .register(KryoNamespaces.API)
-                        .register(
-                        MulticastData.class,
-                        McastRoute.class,
-                        McastRoute.Type.class
-                ).build()))
+                                                         .register(KryoNamespaces.API)
+                                                         .register(
+                                                                 MulticastData.class,
+                                                                 McastRoute.class,
+                                                                 McastRoute.Type.class
+                                                         ).build()))
                 //.withRelaxedReadConsistency()
                 .build();
 
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
index 21674a8..7bf4a10 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
@@ -66,9 +66,9 @@
         sinks.put(sink, true);
     }
 
-    public boolean removeSink(ConnectPoint sink) {
+    public void removeSink(ConnectPoint sink) {
         checkNotNull(sink);
-        return sinks.remove(sink);
+        sinks.remove(sink);
     }
 
     public boolean isEmpty() {