blob: 74db45711c60171786bda3f0e8711f25636967ca [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Sets;
22import org.onlab.packet.Ethernet;
23import org.onlab.packet.IpAddress;
24import org.onlab.packet.IpPrefix;
25import org.onlab.packet.MacAddress;
26import org.onlab.packet.VlanId;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.incubator.net.config.basics.McastConfig;
31import org.onosproject.net.ConnectPoint;
32import org.onosproject.net.DeviceId;
33import org.onosproject.net.Link;
34import org.onosproject.net.Path;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.DefaultTrafficSelector;
37import org.onosproject.net.flow.DefaultTrafficTreatment;
38import org.onosproject.net.flow.TrafficSelector;
39import org.onosproject.net.flow.TrafficTreatment;
40import org.onosproject.net.flow.criteria.Criteria;
41import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
42import org.onosproject.net.flowobjective.DefaultFilteringObjective;
43import org.onosproject.net.flowobjective.DefaultForwardingObjective;
44import org.onosproject.net.flowobjective.DefaultNextObjective;
45import org.onosproject.net.flowobjective.FilteringObjective;
46import org.onosproject.net.flowobjective.ForwardingObjective;
47import org.onosproject.net.flowobjective.NextObjective;
48import org.onosproject.net.mcast.McastEvent;
49import org.onosproject.net.mcast.McastRouteInfo;
50import org.onosproject.net.topology.TopologyService;
Charles Chan1eaf4802016-04-18 13:44:03 -070051import org.onosproject.segmentrouting.storekey.McastNextObjectiveStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070052import org.onosproject.store.serializers.KryoNamespaces;
53import org.onosproject.store.service.ConsistentMap;
54import org.onosproject.store.service.Serializer;
55import org.onosproject.store.service.StorageService;
56import org.slf4j.Logger;
57import org.slf4j.LoggerFactory;
58
59import java.util.Collection;
60import java.util.Collections;
61import java.util.List;
62import java.util.Optional;
63import java.util.Set;
64
65/**
Charles Chan1eaf4802016-04-18 13:44:03 -070066 * Handles multicast-related events.
Charles Chanc91c8782016-03-30 17:54:24 -070067 */
Charles Chan1eaf4802016-04-18 13:44:03 -070068public class McastHandler {
69 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070070 private final SegmentRoutingManager srManager;
71 private final ApplicationId coreAppId;
72 private StorageService storageService;
73 private TopologyService topologyService;
74 private final KryoNamespace.Builder kryoBuilder;
75 private final ConsistentMap<McastNextObjectiveStoreKey, NextObjective> mcastNextObjStore;
76
77 /**
78 * Constructs the McastEventHandler.
79 *
80 * @param srManager Segment Routing manager
81 */
Charles Chan1eaf4802016-04-18 13:44:03 -070082 public McastHandler(SegmentRoutingManager srManager) {
Charles Chanc91c8782016-03-30 17:54:24 -070083 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
84
85 this.srManager = srManager;
86 this.storageService = srManager.storageService;
87 this.topologyService = srManager.topologyService;
88
89 kryoBuilder = new KryoNamespace.Builder()
90 .register(KryoNamespaces.API)
91 .register(McastNextObjectiveStoreKey.class);
92 mcastNextObjStore = storageService
93 .<McastNextObjectiveStoreKey, NextObjective>consistentMapBuilder()
94 .withName("onos-mcast-nextobj-store")
95 .withSerializer(Serializer.using(kryoBuilder.build()))
96 .build();
97 }
98
99 /**
100 * Processes the SOURCE_ADDED event.
101 *
102 * @param event McastEvent with SOURCE_ADDED type
103 */
104 protected void processSourceAdded(McastEvent event) {
105 log.info("processSourceAdded {}", event);
106 McastRouteInfo mcastRouteInfo = event.subject();
107 if (!mcastRouteInfo.isComplete()) {
108 log.info("Incompleted McastRouteInfo. Abort.");
109 return;
110 }
111 ConnectPoint source = mcastRouteInfo.source().orElse(null);
112 Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
113 IpAddress mcastIp = mcastRouteInfo.route().group();
114
115 sinks.forEach(sink -> {
116 processSinkAddedInternal(source, sink, mcastIp);
117 });
118 }
119
120 /**
121 * Processes the SINK_ADDED event.
122 *
123 * @param event McastEvent with SINK_ADDED type
124 */
125 protected void processSinkAdded(McastEvent event) {
126 log.info("processSinkAdded {}", event);
127 McastRouteInfo mcastRouteInfo = event.subject();
128 if (!mcastRouteInfo.isComplete()) {
129 log.info("Incompleted McastRouteInfo. Abort.");
130 return;
131 }
132 ConnectPoint source = mcastRouteInfo.source().orElse(null);
133 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
134 IpAddress mcastIp = mcastRouteInfo.route().group();
135
136 processSinkAddedInternal(source, sink, mcastIp);
137 }
138
139 /**
140 * Processes the SINK_REMOVED event.
141 *
142 * @param event McastEvent with SINK_REMOVED type
143 */
144 protected void processSinkRemoved(McastEvent event) {
145 log.info("processSinkRemoved {}", event);
146 McastRouteInfo mcastRouteInfo = event.subject();
147 if (!mcastRouteInfo.isComplete()) {
148 log.info("Incompleted McastRouteInfo. Abort.");
149 return;
150 }
151 ConnectPoint source = mcastRouteInfo.source().orElse(null);
152 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
153 IpAddress mcastIp = mcastRouteInfo.route().group();
154 VlanId assignedVlan = assignedVlan();
155
156 // When source and sink are on the same device
157 if (source.deviceId().equals(sink.deviceId())) {
158 // Source and sink are on even the same port. There must be something wrong.
159 if (source.port().equals(sink.port())) {
160 log.warn("Sink is on the same port of source. Abort");
161 return;
162 }
163 removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan);
164 return;
165 }
166
167 // Process the egress device
168 boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan);
169
170 // If this is the last sink on the device, also update upstream
171 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
172 if (mcastPath.isPresent()) {
173 List<Link> links = Lists.newArrayList(mcastPath.get().links());
174 Collections.reverse(links);
175 for (Link link : links) {
176 if (isLast) {
177 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(),
178 mcastIp, assignedVlan);
179 }
180 }
181 }
182 }
183
184 /**
185 * Establishes a path from source to sink for given multicast group.
186 *
187 * @param source connect point of the multicast source
188 * @param sink connection point of the multicast sink
189 * @param mcastIp multicast group IP address
190 */
191 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
192 IpAddress mcastIp) {
193 VlanId assignedVlan = assignedVlan();
194
195 // When source and sink are on the same device
196 if (source.deviceId().equals(sink.deviceId())) {
197 // Source and sink are on even the same port. There must be something wrong.
198 if (source.port().equals(sink.port())) {
199 log.warn("Sink is on the same port of source. Abort");
200 return;
201 }
202 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan);
203 return;
204 }
205
206 // Process the ingress device
207 addFilterToDevice(source.deviceId(), source.port(), assignedVlan);
208
209 // Find a path. If present, create/update groups and flows for each hop
210 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
211 if (mcastPath.isPresent()) {
212 mcastPath.get().links().forEach(link -> {
213 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan);
214 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp, assignedVlan);
215 });
216 // Process the egress device
217 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan);
218 }
219 }
220
221 /**
222 * Adds filtering objective for given device and port.
223 *
224 * @param deviceId device ID
225 * @param port ingress port number
226 * @param assignedVlan assigned VLAN ID
227 */
228 private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan) {
229 // Do nothing if the port is configured as suppressed
230 ConnectPoint connectPt = new ConnectPoint(deviceId, port);
231 if (srManager.deviceConfiguration.suppressSubnet().contains(connectPt) ||
232 srManager.deviceConfiguration.suppressHost().contains(connectPt)) {
233 log.info("Ignore suppressed port {}", connectPt);
234 return;
235 }
236
237 FilteringObjective.Builder filtObjBuilder =
238 filterObjBuilder(deviceId, port, assignedVlan);
239 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add());
240 // TODO add objective context
241 }
242
243 /**
244 * Adds a port to given multicast group on given device. This involves the
245 * update of L3 multicast group and multicast routing table entry.
246 *
247 * @param deviceId device ID
248 * @param port port to be added
249 * @param mcastIp multicast group
250 * @param assignedVlan assigned VLAN ID
251 */
252 private void addPortToDevice(DeviceId deviceId, PortNumber port,
253 IpAddress mcastIp, VlanId assignedVlan) {
254 log.info("Add port {} to {}. mcastIp={}, assignedVlan={}",
255 port, deviceId, mcastIp, assignedVlan);
256 McastNextObjectiveStoreKey mcastNextObjectiveStoreKey =
257 new McastNextObjectiveStoreKey(mcastIp, deviceId);
258 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
259 if (!mcastNextObjStore.containsKey(mcastNextObjectiveStoreKey)) {
260 // First time someone request this mcast group via this device
261 portBuilder.add(port);
262 } else {
263 // This device already serves some subscribers of this mcast group
264 NextObjective nextObj = mcastNextObjStore.get(mcastNextObjectiveStoreKey).value();
265 // Stop if the port is already in the nextobj
266 Set<PortNumber> existingPorts = getPorts(nextObj.next());
267 if (existingPorts.contains(port)) {
268 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
269 return;
270 }
271 portBuilder.addAll(existingPorts).add(port).build();
272 }
273 // Create, store and apply the new nextObj and fwdObj
274 NextObjective newNextObj =
275 nextObjBuilder(mcastIp, assignedVlan, portBuilder.build()).add();
276 ForwardingObjective fwdObj =
277 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add();
278 mcastNextObjStore.put(mcastNextObjectiveStoreKey, newNextObj);
279 srManager.flowObjectiveService.next(deviceId, newNextObj);
280 srManager.flowObjectiveService.forward(deviceId, fwdObj);
281 // TODO add objective callback
282 }
283
284 /**
285 * Removes a port from given multicast group on given device.
286 * This involves the update of L3 multicast group and multicast routing
287 * table entry.
288 *
289 * @param deviceId device ID
290 * @param port port to be added
291 * @param mcastIp multicast group
292 * @param assignedVlan assigned VLAN ID
293 * @return true if this is the last sink on this device
294 */
295 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
296 IpAddress mcastIp, VlanId assignedVlan) {
297 log.info("Remove port {} from {}. mcastIp={}, assignedVlan={}",
298 port, deviceId, mcastIp, assignedVlan);
299 McastNextObjectiveStoreKey mcastNextObjectiveStoreKey =
300 new McastNextObjectiveStoreKey(mcastIp, deviceId);
301 // This device is not serving this multicast group
302 if (!mcastNextObjStore.containsKey(mcastNextObjectiveStoreKey)) {
303 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
304 return false;
305 }
306 NextObjective nextObj = mcastNextObjStore.get(mcastNextObjectiveStoreKey).value();
307
308 Set<PortNumber> existingPorts = getPorts(nextObj.next());
309 // This device does not serve this multicast group
310 if (!existingPorts.contains(port)) {
311 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
312 return false;
313 }
314 // Copy and modify the ImmutableSet
315 existingPorts = Sets.newHashSet(existingPorts);
316 existingPorts.remove(port);
317
318 NextObjective newNextObj;
319 ForwardingObjective fwdObj;
320 if (existingPorts.isEmpty()) {
321 // If this is the last sink, remove flows and groups
322 // NOTE: Rely on GroupStore garbage collection rather than explicitly
323 // remove L3MG since there might be other flows/groups refer to
324 // the same L2IG
325 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove();
326 mcastNextObjStore.remove(mcastNextObjectiveStoreKey);
327 srManager.flowObjectiveService.forward(deviceId, fwdObj);
328 } else {
329 // If this is not the last sink, update flows and groups
330 newNextObj = nextObjBuilder(mcastIp, assignedVlan, existingPorts).add();
331 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add();
332 mcastNextObjStore.put(mcastNextObjectiveStoreKey, newNextObj);
333 srManager.flowObjectiveService.next(deviceId, newNextObj);
334 srManager.flowObjectiveService.forward(deviceId, fwdObj);
335 }
336 // TODO add objective callback
337
338 return existingPorts.isEmpty();
339 }
340
341 /**
342 * Creates a next objective builder for multicast.
343 *
344 * @param mcastIp multicast group
345 * @param assignedVlan assigned VLAN ID
346 * @param outPorts set of output port numbers
347 * @return next objective builder
348 */
349 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
350 VlanId assignedVlan, Set<PortNumber> outPorts) {
351 int nextId = srManager.flowObjectiveService.allocateNextId();
352
353 TrafficSelector metadata =
354 DefaultTrafficSelector.builder()
355 .matchVlanId(assignedVlan)
356 .matchIPDst(mcastIp.toIpPrefix())
357 .build();
358
359 NextObjective.Builder nextObjBuilder = DefaultNextObjective
360 .builder().withId(nextId)
361 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId)
362 .withMeta(metadata);
363
364 outPorts.forEach(port -> {
365 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
366 if (egressVlan().equals(VlanId.NONE)) {
367 tBuilder.popVlan();
368 }
369 tBuilder.setOutput(port);
370 nextObjBuilder.addTreatment(tBuilder.build());
371 });
372
373 return nextObjBuilder;
374 }
375
376 /**
377 * Creates a forwarding objective builder for multicast.
378 *
379 * @param mcastIp multicast group
380 * @param assignedVlan assigned VLAN ID
381 * @param nextId next ID of the L3 multicast group
382 * @return forwarding objective builder
383 */
384 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
385 VlanId assignedVlan, int nextId) {
386 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
387 IpPrefix mcastPrefix = IpPrefix.valueOf(mcastIp, IpPrefix.MAX_INET_MASK_LENGTH);
388 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
389 sbuilder.matchIPDst(mcastPrefix);
390 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
391 metabuilder.matchVlanId(assignedVlan);
392
393 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
394 fwdBuilder.withSelector(sbuilder.build())
395 .withMeta(metabuilder.build())
396 .nextStep(nextId)
397 .withFlag(ForwardingObjective.Flag.SPECIFIC)
398 .fromApp(srManager.appId)
399 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
400 return fwdBuilder;
401 }
402
403 /**
404 * Creates a filtering objective builder for multicast.
405 *
406 * @param deviceId Device ID
407 * @param ingressPort ingress port of the multicast stream
408 * @param assignedVlan assigned VLAN ID
409 * @return filtering objective builder
410 */
411 private FilteringObjective.Builder filterObjBuilder(DeviceId deviceId, PortNumber ingressPort,
412 VlanId assignedVlan) {
413 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
414 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
415 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
416 MacAddress.IPV4_MULTICAST_MASK))
417 .addCondition(Criteria.matchVlanId(egressVlan()))
418 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
419 // vlan assignment is valid only if this instance is master
420 if (srManager.mastershipService.isLocalMaster(deviceId)) {
421 TrafficTreatment tt = DefaultTrafficTreatment.builder()
422 .pushVlan().setVlanId(assignedVlan).build();
423 filtBuilder.withMeta(tt);
424 }
425 return filtBuilder.permit().fromApp(srManager.appId);
426 }
427
428 /**
429 * Gets output ports information from treatments.
430 *
431 * @param treatments collection of traffic treatments
432 * @return set of output port numbers
433 */
434 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
435 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
436 treatments.forEach(treatment -> {
437 treatment.allInstructions().stream()
438 .filter(instr -> instr instanceof OutputInstruction)
439 .forEach(instr -> {
440 builder.add(((OutputInstruction) instr).port());
441 });
442 });
443 return builder.build();
444 }
445
446 /**
447 * Gets a path from src to dst.
448 * If a path was allocated before, returns the allocated path.
449 * Otherwise, randomly pick one from available paths.
450 *
451 * @param src source device ID
452 * @param dst destination device ID
453 * @param mcastIp multicast group
454 * @return an optional path from src to dst
455 */
456 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
457 List<Path> allPaths = Lists.newArrayList(
458 topologyService.getPaths(topologyService.currentTopology(), src, dst));
459 if (allPaths.isEmpty()) {
460 log.warn("Fail to find a path from {} to {}. Abort.", src, dst);
461 return Optional.empty();
462 }
463
464 // If one of the available path is used before, use the same path
465 McastNextObjectiveStoreKey mcastNextObjectiveStoreKey =
466 new McastNextObjectiveStoreKey(mcastIp, src);
467 if (mcastNextObjStore.containsKey(mcastNextObjectiveStoreKey)) {
468 NextObjective nextObj = mcastNextObjStore.get(mcastNextObjectiveStoreKey).value();
469 Set<PortNumber> existingPorts = getPorts(nextObj.next());
470 for (Path path : allPaths) {
471 PortNumber srcPort = path.links().get(0).src().port();
472 if (existingPorts.contains(srcPort)) {
473 return Optional.of(path);
474 }
475 }
476 }
477 // Otherwise, randomly pick a path
478 Collections.shuffle(allPaths);
479 return allPaths.stream().findFirst();
480 }
481
482 /**
483 * Gets egress VLAN from McastConfig.
484 *
485 * @return egress VLAN or VlanId.NONE if not configured
486 */
487 private VlanId egressVlan() {
488 McastConfig mcastConfig =
489 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
490 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
491 }
492
493 /**
494 * Gets assigned VLAN according to the value of egress VLAN.
495 *
496 * @return assigned VLAN
497 */
498 private VlanId assignedVlan() {
499 return (egressVlan().equals(VlanId.NONE)) ?
500 VlanId.vlanId(SegmentRoutingManager.ASSIGNED_VLAN_NO_SUBNET) :
501 egressVlan();
502 }
503}