support addition and removal of mcast sinks
vlan mcast rules are now optional
Change-Id: Icb7022089a6e139970040d8cdea97df0cdc8dc7c
diff --git a/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java b/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
index b0392f4..2b140a8 100644
--- a/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
+++ b/apps/cordmcast/src/main/java/org/onosproject/cordmcast/CordMcast.java
@@ -28,10 +28,8 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet;
-import org.onlab.packet.IPv4;
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
-import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.codec.CodecService;
import org.onosproject.core.ApplicationId;
@@ -60,10 +58,13 @@
import java.util.Dictionary;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.net.MediaType.JSON_UTF_8;
+import static org.onlab.util.Tools.get;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -76,6 +77,10 @@
private static final int DEFAULT_PRIORITY = 1000;
private static final short DEFAULT_MCAST_VLAN = 4000;
+ private static final String DEFAULT_SYNC_HOST = "10.90.0.8:8181";
+ private static final String DEFAULT_USER = "karaf";
+ private static final String DEFAULT_PASSWORD = "karaf";
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -96,11 +101,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
-
protected McastListener listener = new InternalMulticastListener();
-
-
//TODO: move this to a ec map
private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
@@ -109,18 +111,21 @@
private ApplicationId appId;
- //TODO: network config this
- private short mcastVlan = DEFAULT_MCAST_VLAN;
+ @Property(name = "mcastVlan", intValue = DEFAULT_MCAST_VLAN,
+ label = "VLAN for multicast traffic")
+ private int mcastVlan = DEFAULT_MCAST_VLAN;
- // TODO component config this
+ @Property(name = "vlanEnabled", boolValue = false,
+ label = "Use vlan for multicast traffic")
+ private boolean vlanEnabled = false;
+
+ @Property(name = "priority", intValue = DEFAULT_PRIORITY,
+ label = "Priority for multicast rules")
private int priority = DEFAULT_PRIORITY;
- private static final String DEFAULT_USER = "karaf";
- private static final String DEFAULT_PASSWORD = "karaf";
-
- @Property(name = "syncHost", value = "",
+ @Property(name = "syncHost", value = DEFAULT_SYNC_HOST,
label = "host:port to synchronize routes to")
- private String syncHost = "10.90.0.8:8181";
+ private String syncHost = DEFAULT_SYNC_HOST;
@Property(name = "username", value = DEFAULT_USER,
label = "Username for REST password authentication")
@@ -153,10 +158,37 @@
@Modified
public void modified(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
- user = Tools.get(properties, "username");
- password = Tools.get(properties, "password");
- syncHost = Tools.get(properties, "syncHost");
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+
+ try {
+ String s = get(properties, "username");
+ user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
+
+ s = get(properties, "password");
+ password = isNullOrEmpty(s) ? DEFAULT_PASSWORD : s.trim();
+
+ s = get(properties, "mcastVlan");
+ mcastVlan = isNullOrEmpty(s) ? DEFAULT_MCAST_VLAN : Short.parseShort(s.trim());
+
+ s = get(properties, "vlanEnabled");
+ vlanEnabled = isNullOrEmpty(s) || Boolean.parseBoolean(s.trim());
+
+ s = get(properties, "priority");
+ priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
+
+ s = get(properties, syncHost);
+ syncHost = isNullOrEmpty(s) ? DEFAULT_SYNC_HOST : s.trim();
+ } catch (Exception e) {
+ user = DEFAULT_USER;
+ password = DEFAULT_PASSWORD;
+ syncHost = DEFAULT_SYNC_HOST;
+ mcastVlan = DEFAULT_MCAST_VLAN;
+ vlanEnabled = false;
+ priority = DEFAULT_PRIORITY;
+ }
+
+
}
private class InternalMulticastListener implements McastListener {
@@ -173,6 +205,7 @@
provisionGroup(event.subject());
break;
case SINK_REMOVED:
+ unprovisionGroup(event.subject());
break;
default:
log.warn("Unknown mcast event {}", event.type());
@@ -180,6 +213,39 @@
}
}
+ private void unprovisionGroup(McastRouteInfo info) {
+ if (!info.sink().isPresent()) {
+ log.warn("No sink given after sink removed event: {}", info);
+ return;
+ }
+ ConnectPoint loc = info.sink().get();
+
+ NextObjective next = DefaultNextObjective.builder()
+ .fromApp(appId)
+ .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+ .withType(NextObjective.Type.BROADCAST)
+ .withId(groups.get(info.route().group()))
+ .removeFromExisting(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ //TODO: change to debug
+ log.info("Next Objective {} installed", objective.id());
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ //TODO: change to debug
+ log.info("Next Objective {} failed, because {}",
+ objective.id(),
+ error);
+ }
+ });
+
+ flowObjectiveService.next(loc.deviceId(), next);
+
+
+ }
+
private void provisionGroup(McastRouteInfo info) {
if (!info.sink().isPresent()) {
log.warn("No sink given after sink added event: {}", info);
@@ -192,12 +258,37 @@
Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> {
Integer id = allocateId();
- TrafficSelector mcast = DefaultTrafficSelector.builder()
- .matchVlanId(VlanId.vlanId(mcastVlan))
+ NextObjective next = DefaultNextObjective.builder()
+ .fromApp(appId)
+ .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+ .withType(NextObjective.Type.BROADCAST)
+ .withId(id)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ //TODO: change to debug
+ log.info("Next Objective {} installed", objective.id());
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ //TODO: change to debug
+ log.info("Next Objective {} failed, because {}",
+ objective.id(),
+ error);
+ }
+ });
+
+ flowObjectiveService.next(loc.deviceId(), next);
+
+ TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
- .matchIPProtocol(IPv4.PROTOCOL_IGMP)
- .matchIPDst(g.toIpPrefix())
- .build();
+ .matchIPDst(g.toIpPrefix());
+
+
+ if (vlanEnabled) {
+ mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
+ }
ForwardingObjective fwd = DefaultForwardingObjective.builder()
@@ -206,7 +297,7 @@
.makePermanent()
.withFlag(ForwardingObjective.Flag.VERSATILE)
.withPriority(priority)
- .withSelector(mcast)
+ .withSelector(mcast.build())
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
@@ -228,28 +319,30 @@
return id;
});
- NextObjective next = DefaultNextObjective.builder()
- .fromApp(appId)
- .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
- .withType(NextObjective.Type.BROADCAST)
- .withId(nextId)
- .addToExisting(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- //TODO: change to debug
- log.info("Next Objective {} installed", objective.id());
- }
+ if (!sync.get()) {
+ NextObjective next = DefaultNextObjective.builder()
+ .fromApp(appId)
+ .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
+ .withType(NextObjective.Type.BROADCAST)
+ .withId(nextId)
+ .addToExisting(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ //TODO: change to debug
+ log.info("Next Objective {} installed", objective.id());
+ }
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- //TODO: change to debug
- log.info("Next Objective {} failed, because {}",
- objective.id(),
- error);
- }
- });
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ //TODO: change to debug
+ log.info("Next Objective {} failed, because {}",
+ objective.id(),
+ error);
+ }
+ });
- flowObjectiveService.next(loc.deviceId(), next);
+ flowObjectiveService.next(loc.deviceId(), next);
+ }
if (sync.get()) {
syncRoute(info);
diff --git a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
index 52760a6..2cec57b 100644
--- a/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
+++ b/apps/igmp/src/main/java/org/onosproject/igmp/IgmpSnoop.java
@@ -246,7 +246,7 @@
.withMeta(DefaultTrafficTreatment.builder()
.setOutput(PortNumber.CONTROLLER).build())
.fromApp(appId)
- .withPriority(1000)
+ .withPriority(10000)
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
@@ -417,6 +417,7 @@
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
+ DeviceId devId = event.subject().id();
switch (event.type()) {
case DEVICE_ADDED:
@@ -427,11 +428,15 @@
case PORT_STATS_UPDATED:
break;
case PORT_ADDED:
- if (event.port().isEnabled()) {
+ if (!oltData.get(devId).uplink().equals(event.port().number()) &&
+ event.port().isEnabled()) {
processFilterObjective(event.subject().id(), event.port(), false);
}
break;
case PORT_UPDATED:
+ if (oltData.get(devId).uplink().equals(event.port().number())) {
+ break;
+ }
if (event.port().isEnabled()) {
processFilterObjective(event.subject().id(), event.port(), false);
} else {
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 3163104..624bcd1 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -15,13 +15,10 @@
*/
package org.onosproject.store.serializers;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
import org.onlab.packet.ChassisId;
import org.onlab.packet.EthType;
import org.onlab.packet.Ip4Address;
@@ -90,8 +87,8 @@
import org.onosproject.net.device.DefaultPortStatistics;
import org.onosproject.net.device.OchPortDescription;
import org.onosproject.net.device.OduCltPortDescription;
-import org.onosproject.net.device.OtuPortDescription;
import org.onosproject.net.device.OmsPortDescription;
+import org.onosproject.net.device.OtuPortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
@@ -257,12 +254,11 @@
.register(HashMap.class)
.register(ConcurrentHashMap.class)
.register(CopyOnWriteArraySet.class)
- .register(new JavaSerializer(), Sets.newConcurrentHashSet().getClass())
.register(ArrayList.class,
LinkedList.class,
HashSet.class,
LinkedHashSet.class
- )
+ )
.register(Maps.immutableEntry("a", "b").getClass())
.register(new ArraysAsListSerializer(), Arrays.asList().getClass())
.register(Collections.singletonList(1).getClass())
@@ -541,5 +537,6 @@
// not to be instantiated
- private KryoNamespaces() {}
+ private KryoNamespaces() {
+ }
}
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index f59c527..b781732 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -25,6 +25,7 @@
import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.EthType;
import org.onlab.packet.IPv4;
+import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
@@ -47,6 +48,7 @@
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.IPProtocolCriterion;
import org.onosproject.net.flow.criteria.PortCriterion;
import org.onosproject.net.flow.criteria.VlanIdCriterion;
@@ -99,6 +101,7 @@
private DeviceId deviceId;
private ApplicationId appId;
+ private IpPrefix mcastPrefix = IpPrefix.valueOf("224.0.0.0/4");
protected FlowObjectiveStore flowObjectiveStore;
@@ -253,26 +256,33 @@
GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
- GroupDescription groupDesc =
- new DefaultGroupDescription(deviceId,
- GroupDescription.Type.ALL,
- new GroupBuckets(Collections.singletonList(bucket)),
- key,
- null,
- nextObjective.appId());
+
pendingGroups.put(key, nextObjective);
switch (nextObjective.op()) {
case ADD:
+ GroupDescription groupDesc =
+ new DefaultGroupDescription(deviceId,
+ GroupDescription.Type.ALL,
+ new GroupBuckets(Collections.singletonList(bucket)),
+ key,
+ null,
+ nextObjective.appId());
groupService.addGroup(groupDesc);
break;
case REMOVE:
groupService.removeGroup(deviceId, key, nextObjective.appId());
break;
case ADD_TO_EXISTING:
+ groupService.addBucketsToGroup(deviceId, key,
+ new GroupBuckets(Collections.singletonList(bucket)),
+ key, nextObjective.appId());
+ break;
case REMOVE_FROM_EXISTING:
- //TODO: handle addition to group when caller signals it.
+ groupService.removeBucketsFromGroup(deviceId, key,
+ new GroupBuckets(Collections.singletonList(bucket)),
+ key, nextObjective.appId());
break;
default:
log.warn("Unknown next objective operation: {}", nextObjective.op());
@@ -287,14 +297,14 @@
fail(fwd, ObjectiveError.BADPARAMS);
}
- OLTPipelineGroup next = getGroupForNextObjective(fwd.nextId());
+ GroupKey key = getGroupForNextObjective(fwd.nextId());
- if (next == null) {
+ if (key == null) {
log.error("Group for forwarding objective missing: {}", fwd);
fail(fwd, ObjectiveError.GROUPMISSING);
}
- Group group = groupService.getGroup(deviceId, next.key());
+ Group group = groupService.getGroup(deviceId, key);
TrafficTreatment treatment =
buildTreatment(Instructions.createGroup(group.id()));
@@ -330,16 +340,20 @@
private boolean checkForMulticast(ForwardingObjective fwd) {
- VlanIdCriterion vlan = (VlanIdCriterion) filterForCriterion(fwd.selector().criteria(),
- Criterion.Type.VLAN_VID);
+ IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
+ Criterion.Type.IPV4_DST);
- return (vlan != null && vlan.vlanId().equals(VlanId.vlanId(MCAST_VLAN)));
+ if (ip == null) {
+ return false;
+ }
+
+ return mcastPrefix.contains(ip.ip());
}
- private OLTPipelineGroup getGroupForNextObjective(Integer nextId) {
+ private GroupKey getGroupForNextObjective(Integer nextId) {
NextGroup next = flowObjectiveStore.getNextGroup(nextId);
- return (OLTPipelineGroup) appKryo.deserialize(next.data());
+ return appKryo.deserialize(next.data());
}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
index 331ecbb..06e908b 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
@@ -54,12 +54,12 @@
mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
.withName(MCASTRIB)
.withSerializer(Serializer.using(KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(
- MulticastData.class,
- McastRoute.class,
- McastRoute.Type.class
- ).build()))
+ .register(KryoNamespaces.API)
+ .register(
+ MulticastData.class,
+ McastRoute.class,
+ McastRoute.Type.class
+ ).build()))
//.withRelaxedReadConsistency()
.build();
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
index 21674a8..7bf4a10 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
@@ -66,9 +66,9 @@
sinks.put(sink, true);
}
- public boolean removeSink(ConnectPoint sink) {
+ public void removeSink(ConnectPoint sink) {
checkNotNull(sink);
- return sinks.remove(sink);
+ sinks.remove(sink);
}
public boolean isEmpty() {