blob: f78722d625bafc16a557b9364a7f33a58e42599b [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Sets;
22import org.onlab.packet.Ethernet;
Charles Chan779fd062016-05-11 20:39:35 -070023import org.onlab.packet.Ip4Prefix;
Charles Chanc91c8782016-03-30 17:54:24 -070024import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpPrefix;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
31import org.onosproject.incubator.net.config.basics.McastConfig;
32import org.onosproject.net.ConnectPoint;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Link;
35import org.onosproject.net.Path;
36import org.onosproject.net.PortNumber;
37import org.onosproject.net.flow.DefaultTrafficSelector;
38import org.onosproject.net.flow.DefaultTrafficTreatment;
39import org.onosproject.net.flow.TrafficSelector;
40import org.onosproject.net.flow.TrafficTreatment;
41import org.onosproject.net.flow.criteria.Criteria;
42import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
43import org.onosproject.net.flowobjective.DefaultFilteringObjective;
44import org.onosproject.net.flowobjective.DefaultForwardingObjective;
45import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan72779502016-04-23 17:36:10 -070046import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070047import org.onosproject.net.flowobjective.FilteringObjective;
48import org.onosproject.net.flowobjective.ForwardingObjective;
49import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070050import org.onosproject.net.flowobjective.ObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070051import org.onosproject.net.mcast.McastEvent;
52import org.onosproject.net.mcast.McastRouteInfo;
53import org.onosproject.net.topology.TopologyService;
Charles Chan370a65b2016-05-10 17:29:47 -070054import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
Charles Chan72779502016-04-23 17:36:10 -070055import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.service.ConsistentMap;
58import org.onosproject.store.service.Serializer;
59import org.onosproject.store.service.StorageService;
Charles Chan72779502016-04-23 17:36:10 -070060import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070061import org.slf4j.Logger;
62import org.slf4j.LoggerFactory;
63
64import java.util.Collection;
65import java.util.Collections;
Charles Chan72779502016-04-23 17:36:10 -070066import java.util.Iterator;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070068import java.util.Map;
Charles Chanc91c8782016-03-30 17:54:24 -070069import java.util.Optional;
70import java.util.Set;
Charles Chan72779502016-04-23 17:36:10 -070071import java.util.stream.Collectors;
72
73import static com.google.common.base.Preconditions.checkState;
Charles Chanc91c8782016-03-30 17:54:24 -070074
75/**
Charles Chan1eaf4802016-04-18 13:44:03 -070076 * Handles multicast-related events.
Charles Chanc91c8782016-03-30 17:54:24 -070077 */
Charles Chan1eaf4802016-04-18 13:44:03 -070078public class McastHandler {
79 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070080 private final SegmentRoutingManager srManager;
81 private final ApplicationId coreAppId;
82 private StorageService storageService;
83 private TopologyService topologyService;
Charles Chan72779502016-04-23 17:36:10 -070084 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
85 private final KryoNamespace.Builder mcastKryo;
86 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
87
88 /**
89 * Role in the multicast tree.
90 */
91 public enum McastRole {
92 /**
93 * The device is the ingress device of this group.
94 */
95 INGRESS,
96 /**
97 * The device is the transit device of this group.
98 */
99 TRANSIT,
100 /**
101 * The device is the egress device of this group.
102 */
103 EGRESS
104 }
Charles Chanc91c8782016-03-30 17:54:24 -0700105
106 /**
107 * Constructs the McastEventHandler.
108 *
109 * @param srManager Segment Routing manager
110 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700111 public McastHandler(SegmentRoutingManager srManager) {
Charles Chanc91c8782016-03-30 17:54:24 -0700112 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700113 this.srManager = srManager;
114 this.storageService = srManager.storageService;
115 this.topologyService = srManager.topologyService;
Charles Chan72779502016-04-23 17:36:10 -0700116 mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700117 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700118 .register(McastStoreKey.class)
119 .register(McastRole.class);
Charles Chanc91c8782016-03-30 17:54:24 -0700120 mcastNextObjStore = storageService
Charles Chan72779502016-04-23 17:36:10 -0700121 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700122 .withName("onos-mcast-nextobj-store")
Charles Chan72779502016-04-23 17:36:10 -0700123 .withSerializer(Serializer.using(mcastKryo.build()))
Charles Chanc91c8782016-03-30 17:54:24 -0700124 .build();
Charles Chan72779502016-04-23 17:36:10 -0700125 mcastRoleStore = storageService
126 .<McastStoreKey, McastRole>consistentMapBuilder()
127 .withName("onos-mcast-role-store")
128 .withSerializer(Serializer.using(mcastKryo.build()))
129 .build();
130 }
131
132 /**
133 * Read initial multicast from mcast store.
134 */
135 public void init() {
136 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
137 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
138 Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
139 sinks.forEach(sink -> {
140 processSinkAddedInternal(source, sink, mcastRoute.group());
141 });
142 });
Charles Chanc91c8782016-03-30 17:54:24 -0700143 }
144
145 /**
146 * Processes the SOURCE_ADDED event.
147 *
148 * @param event McastEvent with SOURCE_ADDED type
149 */
150 protected void processSourceAdded(McastEvent event) {
151 log.info("processSourceAdded {}", event);
152 McastRouteInfo mcastRouteInfo = event.subject();
153 if (!mcastRouteInfo.isComplete()) {
154 log.info("Incompleted McastRouteInfo. Abort.");
155 return;
156 }
157 ConnectPoint source = mcastRouteInfo.source().orElse(null);
158 Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
159 IpAddress mcastIp = mcastRouteInfo.route().group();
160
161 sinks.forEach(sink -> {
162 processSinkAddedInternal(source, sink, mcastIp);
163 });
164 }
165
166 /**
167 * Processes the SINK_ADDED event.
168 *
169 * @param event McastEvent with SINK_ADDED type
170 */
171 protected void processSinkAdded(McastEvent event) {
172 log.info("processSinkAdded {}", event);
173 McastRouteInfo mcastRouteInfo = event.subject();
174 if (!mcastRouteInfo.isComplete()) {
175 log.info("Incompleted McastRouteInfo. Abort.");
176 return;
177 }
178 ConnectPoint source = mcastRouteInfo.source().orElse(null);
179 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
180 IpAddress mcastIp = mcastRouteInfo.route().group();
181
182 processSinkAddedInternal(source, sink, mcastIp);
183 }
184
185 /**
186 * Processes the SINK_REMOVED event.
187 *
188 * @param event McastEvent with SINK_REMOVED type
189 */
190 protected void processSinkRemoved(McastEvent event) {
191 log.info("processSinkRemoved {}", event);
192 McastRouteInfo mcastRouteInfo = event.subject();
193 if (!mcastRouteInfo.isComplete()) {
194 log.info("Incompleted McastRouteInfo. Abort.");
195 return;
196 }
197 ConnectPoint source = mcastRouteInfo.source().orElse(null);
198 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
199 IpAddress mcastIp = mcastRouteInfo.route().group();
Charles Chanc91c8782016-03-30 17:54:24 -0700200
201 // When source and sink are on the same device
202 if (source.deviceId().equals(sink.deviceId())) {
203 // Source and sink are on even the same port. There must be something wrong.
204 if (source.port().equals(sink.port())) {
205 log.warn("Sink is on the same port of source. Abort");
206 return;
207 }
Charles Chana8f9dee2016-05-16 18:44:13 -0700208 removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
Charles Chanc91c8782016-03-30 17:54:24 -0700209 return;
210 }
211
212 // Process the egress device
Charles Chana8f9dee2016-05-16 18:44:13 -0700213 boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
Charles Chan72779502016-04-23 17:36:10 -0700214 if (isLast) {
215 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
216 }
Charles Chanc91c8782016-03-30 17:54:24 -0700217
218 // If this is the last sink on the device, also update upstream
219 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
220 if (mcastPath.isPresent()) {
221 List<Link> links = Lists.newArrayList(mcastPath.get().links());
222 Collections.reverse(links);
223 for (Link link : links) {
224 if (isLast) {
225 isLast = removePortFromDevice(link.src().deviceId(), link.src().port(),
Charles Chana8f9dee2016-05-16 18:44:13 -0700226 mcastIp,
227 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
Charles Chan72779502016-04-23 17:36:10 -0700228 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
Charles Chanc91c8782016-03-30 17:54:24 -0700229 }
230 }
231 }
232 }
233
234 /**
235 * Establishes a path from source to sink for given multicast group.
236 *
237 * @param source connect point of the multicast source
238 * @param sink connection point of the multicast sink
239 * @param mcastIp multicast group IP address
240 */
241 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
242 IpAddress mcastIp) {
Charles Chan72779502016-04-23 17:36:10 -0700243 // Process the ingress device
Charles Chana8f9dee2016-05-16 18:44:13 -0700244 addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source));
Charles Chan72779502016-04-23 17:36:10 -0700245
Charles Chanc91c8782016-03-30 17:54:24 -0700246 // When source and sink are on the same device
247 if (source.deviceId().equals(sink.deviceId())) {
248 // Source and sink are on even the same port. There must be something wrong.
249 if (source.port().equals(sink.port())) {
250 log.warn("Sink is on the same port of source. Abort");
251 return;
252 }
Charles Chana8f9dee2016-05-16 18:44:13 -0700253 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
Charles Chan72779502016-04-23 17:36:10 -0700254 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
Charles Chanc91c8782016-03-30 17:54:24 -0700255 return;
256 }
257
Charles Chanc91c8782016-03-30 17:54:24 -0700258 // Find a path. If present, create/update groups and flows for each hop
259 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
260 if (mcastPath.isPresent()) {
Charles Chan72779502016-04-23 17:36:10 -0700261 List<Link> links = mcastPath.get().links();
262 checkState(links.size() == 2,
263 "Path in leaf-spine topology should always be two hops: ", links);
264
265 links.forEach(link -> {
Charles Chana8f9dee2016-05-16 18:44:13 -0700266 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
267 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
268 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null));
Charles Chanc91c8782016-03-30 17:54:24 -0700269 });
Charles Chan72779502016-04-23 17:36:10 -0700270
Charles Chanc91c8782016-03-30 17:54:24 -0700271 // Process the egress device
Charles Chana8f9dee2016-05-16 18:44:13 -0700272 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
Charles Chan72779502016-04-23 17:36:10 -0700273
274 // Setup mcast roles
275 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
276 McastRole.INGRESS);
277 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
278 McastRole.TRANSIT);
279 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
280 McastRole.EGRESS);
281 } else {
282 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
283 source.deviceId(), sink.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700284 }
285 }
286
287 /**
Charles Chan72779502016-04-23 17:36:10 -0700288 * Processes the LINK_DOWN event.
289 *
290 * @param affectedLink Link that is going down
291 */
292 protected void processLinkDown(Link affectedLink) {
Charles Chan72779502016-04-23 17:36:10 -0700293 getAffectedGroups(affectedLink).forEach(mcastIp -> {
294 // Find out the ingress, transit and egress device of affected group
295 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
296 .stream().findAny().orElse(null);
297 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
298 .stream().findAny().orElse(null);
299 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
Charles Chana8f9dee2016-05-16 18:44:13 -0700300 ConnectPoint source = getSource(mcastIp);
301
302 // Do not proceed if any of these info is missing
303 if (ingressDevice == null || transitDevice == null
304 || egressDevices == null || source == null) {
305 log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
306 ingressDevice, transitDevice, egressDevices, source);
Charles Chan72779502016-04-23 17:36:10 -0700307 return;
308 }
309
310 // Remove entire transit
Charles Chana8f9dee2016-05-16 18:44:13 -0700311 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
Charles Chan72779502016-04-23 17:36:10 -0700312
313 // Remove transit-facing port on ingress device
314 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
315 if (ingressTransitPort != null) {
Charles Chana8f9dee2016-05-16 18:44:13 -0700316 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
Charles Chan72779502016-04-23 17:36:10 -0700317 mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
318 }
319
320 // Construct a new path for each egress device
321 egressDevices.forEach(egressDevice -> {
322 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
323 if (mcastPath.isPresent()) {
324 List<Link> links = mcastPath.get().links();
325 links.forEach(link -> {
Charles Chana8f9dee2016-05-16 18:44:13 -0700326 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
327 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
328 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null));
Charles Chan72779502016-04-23 17:36:10 -0700329 });
330 // Setup new transit mcast role
331 mcastRoleStore.put(new McastStoreKey(mcastIp,
332 links.get(0).dst().deviceId()), McastRole.TRANSIT);
333 } else {
334 log.warn("Fail to recover egress device {} from link failure {}",
335 egressDevice, affectedLink);
Charles Chana8f9dee2016-05-16 18:44:13 -0700336 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
Charles Chan72779502016-04-23 17:36:10 -0700337 }
338 });
339 });
340 }
341
342 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700343 * Adds filtering objective for given device and port.
344 *
345 * @param deviceId device ID
346 * @param port ingress port number
347 * @param assignedVlan assigned VLAN ID
348 */
349 private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan) {
350 // Do nothing if the port is configured as suppressed
Charles Chan370a65b2016-05-10 17:29:47 -0700351 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
352 SegmentRoutingAppConfig appConfig = srManager.cfgService
353 .getConfig(srManager.appId, SegmentRoutingAppConfig.class);
354 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
355 log.info("Ignore suppressed port {}", connectPoint);
Charles Chanc91c8782016-03-30 17:54:24 -0700356 return;
357 }
358
Charles Chan779fd062016-05-11 20:39:35 -0700359 // Reuse unicast VLAN if the port has subnet configured
360 Ip4Prefix portSubnet = srManager.deviceConfiguration.getPortSubnet(deviceId, port);
361 VlanId unicastVlan = srManager.getSubnetAssignedVlanId(deviceId, portSubnet);
362 final VlanId finalVlanId = (unicastVlan != null) ? unicastVlan : assignedVlan;
363
Charles Chanc91c8782016-03-30 17:54:24 -0700364 FilteringObjective.Builder filtObjBuilder =
Charles Chan779fd062016-05-11 20:39:35 -0700365 filterObjBuilder(deviceId, port, finalVlanId);
Charles Chan72779502016-04-23 17:36:10 -0700366 ObjectiveContext context = new DefaultObjectiveContext(
367 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
Charles Chan779fd062016-05-11 20:39:35 -0700368 deviceId, port.toLong(), finalVlanId),
Charles Chan72779502016-04-23 17:36:10 -0700369 (objective, error) ->
370 log.warn("Failed to add filter on {}/{}, vlan {}: {}",
Charles Chan779fd062016-05-11 20:39:35 -0700371 deviceId, port.toLong(), finalVlanId, error));
Charles Chan72779502016-04-23 17:36:10 -0700372 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
Charles Chanc91c8782016-03-30 17:54:24 -0700373 }
374
375 /**
376 * Adds a port to given multicast group on given device. This involves the
377 * update of L3 multicast group and multicast routing table entry.
378 *
379 * @param deviceId device ID
380 * @param port port to be added
381 * @param mcastIp multicast group
382 * @param assignedVlan assigned VLAN ID
383 */
384 private void addPortToDevice(DeviceId deviceId, PortNumber port,
385 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700386 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700387 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -0700388 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700389 // First time someone request this mcast group via this device
390 portBuilder.add(port);
391 } else {
392 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -0700393 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700394 // Stop if the port is already in the nextobj
395 Set<PortNumber> existingPorts = getPorts(nextObj.next());
396 if (existingPorts.contains(port)) {
397 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
398 return;
399 }
400 portBuilder.addAll(existingPorts).add(port).build();
401 }
402 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -0700403 ObjectiveContext context = new DefaultObjectiveContext(
404 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
405 mcastIp, deviceId, port.toLong(), assignedVlan),
406 (objective, error) ->
407 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
408 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chanc91c8782016-03-30 17:54:24 -0700409 NextObjective newNextObj =
410 nextObjBuilder(mcastIp, assignedVlan, portBuilder.build()).add();
411 ForwardingObjective fwdObj =
Charles Chan72779502016-04-23 17:36:10 -0700412 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
413 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700414 srManager.flowObjectiveService.next(deviceId, newNextObj);
415 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700416 }
417
418 /**
419 * Removes a port from given multicast group on given device.
420 * This involves the update of L3 multicast group and multicast routing
421 * table entry.
422 *
423 * @param deviceId device ID
424 * @param port port to be added
425 * @param mcastIp multicast group
426 * @param assignedVlan assigned VLAN ID
427 * @return true if this is the last sink on this device
428 */
429 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
430 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700431 McastStoreKey mcastStoreKey =
432 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700433 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -0700434 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700435 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
436 return false;
437 }
Charles Chan72779502016-04-23 17:36:10 -0700438 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700439
440 Set<PortNumber> existingPorts = getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -0700441 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -0700442 if (!existingPorts.contains(port)) {
443 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
444 return false;
445 }
446 // Copy and modify the ImmutableSet
447 existingPorts = Sets.newHashSet(existingPorts);
448 existingPorts.remove(port);
449
450 NextObjective newNextObj;
451 ForwardingObjective fwdObj;
452 if (existingPorts.isEmpty()) {
453 // If this is the last sink, remove flows and groups
454 // NOTE: Rely on GroupStore garbage collection rather than explicitly
455 // remove L3MG since there might be other flows/groups refer to
456 // the same L2IG
Charles Chan72779502016-04-23 17:36:10 -0700457 ObjectiveContext context = new DefaultObjectiveContext(
458 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
459 mcastIp, deviceId, port.toLong(), assignedVlan),
460 (objective, error) ->
461 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
462 mcastIp, deviceId, port.toLong(), assignedVlan, error));
463 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
464 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -0700465 srManager.flowObjectiveService.forward(deviceId, fwdObj);
466 } else {
467 // If this is not the last sink, update flows and groups
Charles Chan72779502016-04-23 17:36:10 -0700468 ObjectiveContext context = new DefaultObjectiveContext(
469 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
470 mcastIp, deviceId, port.toLong(), assignedVlan),
471 (objective, error) ->
472 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
473 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chanc91c8782016-03-30 17:54:24 -0700474 newNextObj = nextObjBuilder(mcastIp, assignedVlan, existingPorts).add();
475 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add();
Charles Chan72779502016-04-23 17:36:10 -0700476 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700477 srManager.flowObjectiveService.next(deviceId, newNextObj);
478 srManager.flowObjectiveService.forward(deviceId, fwdObj);
479 }
Charles Chanc91c8782016-03-30 17:54:24 -0700480 return existingPorts.isEmpty();
481 }
482
Charles Chan72779502016-04-23 17:36:10 -0700483
484 /**
485 * Removes entire group on given device.
486 *
487 * @param deviceId device ID
488 * @param mcastIp multicast group to be removed
489 * @param assignedVlan assigned VLAN ID
490 */
491 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
492 VlanId assignedVlan) {
493 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
494 // This device is not serving this multicast group
495 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
496 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
497 return;
498 }
499 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
500 // NOTE: Rely on GroupStore garbage collection rather than explicitly
501 // remove L3MG since there might be other flows/groups refer to
502 // the same L2IG
503 ObjectiveContext context = new DefaultObjectiveContext(
504 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
505 mcastIp, deviceId, assignedVlan),
506 (objective, error) ->
507 log.warn("Failed to remove {} on {}, vlan {}: {}",
508 mcastIp, deviceId, assignedVlan, error));
509 ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
510 srManager.flowObjectiveService.forward(deviceId, fwdObj);
511 mcastNextObjStore.remove(mcastStoreKey);
512 mcastRoleStore.remove(mcastStoreKey);
513 }
514
515 /**
516 * Remove all groups on given device.
517 *
518 * @param deviceId device ID
519 */
520 public void removeDevice(DeviceId deviceId) {
521 Iterator<Map.Entry<McastStoreKey, Versioned<NextObjective>>> itNextObj =
522 mcastNextObjStore.entrySet().iterator();
523 while (itNextObj.hasNext()) {
524 Map.Entry<McastStoreKey, Versioned<NextObjective>> entry = itNextObj.next();
525 if (entry.getKey().deviceId().equals(deviceId)) {
Charles Chana8f9dee2016-05-16 18:44:13 -0700526 ConnectPoint source = getSource(entry.getKey().mcastIp());
527 removeGroupFromDevice(entry.getKey().deviceId(), entry.getKey().mcastIp(),
528 assignedVlan(deviceId.equals(source.deviceId()) ? source : null));
Charles Chan72779502016-04-23 17:36:10 -0700529 itNextObj.remove();
530 }
531 }
532
533 Iterator<Map.Entry<McastStoreKey, Versioned<McastRole>>> itRole =
534 mcastRoleStore.entrySet().iterator();
535 while (itRole.hasNext()) {
536 Map.Entry<McastStoreKey, Versioned<McastRole>> entry = itRole.next();
537 if (entry.getKey().deviceId().equals(deviceId)) {
538 itRole.remove();
539 }
540 }
541
542 }
543
Charles Chanc91c8782016-03-30 17:54:24 -0700544 /**
545 * Creates a next objective builder for multicast.
546 *
547 * @param mcastIp multicast group
548 * @param assignedVlan assigned VLAN ID
549 * @param outPorts set of output port numbers
550 * @return next objective builder
551 */
552 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
553 VlanId assignedVlan, Set<PortNumber> outPorts) {
554 int nextId = srManager.flowObjectiveService.allocateNextId();
555
556 TrafficSelector metadata =
557 DefaultTrafficSelector.builder()
558 .matchVlanId(assignedVlan)
559 .matchIPDst(mcastIp.toIpPrefix())
560 .build();
561
562 NextObjective.Builder nextObjBuilder = DefaultNextObjective
563 .builder().withId(nextId)
564 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId)
565 .withMeta(metadata);
566
567 outPorts.forEach(port -> {
568 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
569 if (egressVlan().equals(VlanId.NONE)) {
570 tBuilder.popVlan();
571 }
572 tBuilder.setOutput(port);
573 nextObjBuilder.addTreatment(tBuilder.build());
574 });
575
576 return nextObjBuilder;
577 }
578
579 /**
580 * Creates a forwarding objective builder for multicast.
581 *
582 * @param mcastIp multicast group
583 * @param assignedVlan assigned VLAN ID
584 * @param nextId next ID of the L3 multicast group
585 * @return forwarding objective builder
586 */
587 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
588 VlanId assignedVlan, int nextId) {
589 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
590 IpPrefix mcastPrefix = IpPrefix.valueOf(mcastIp, IpPrefix.MAX_INET_MASK_LENGTH);
591 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
592 sbuilder.matchIPDst(mcastPrefix);
593 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
594 metabuilder.matchVlanId(assignedVlan);
595
596 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
597 fwdBuilder.withSelector(sbuilder.build())
598 .withMeta(metabuilder.build())
599 .nextStep(nextId)
600 .withFlag(ForwardingObjective.Flag.SPECIFIC)
601 .fromApp(srManager.appId)
602 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
603 return fwdBuilder;
604 }
605
606 /**
607 * Creates a filtering objective builder for multicast.
608 *
609 * @param deviceId Device ID
610 * @param ingressPort ingress port of the multicast stream
611 * @param assignedVlan assigned VLAN ID
612 * @return filtering objective builder
613 */
614 private FilteringObjective.Builder filterObjBuilder(DeviceId deviceId, PortNumber ingressPort,
615 VlanId assignedVlan) {
616 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
617 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
618 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
619 MacAddress.IPV4_MULTICAST_MASK))
620 .addCondition(Criteria.matchVlanId(egressVlan()))
621 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
622 // vlan assignment is valid only if this instance is master
623 if (srManager.mastershipService.isLocalMaster(deviceId)) {
624 TrafficTreatment tt = DefaultTrafficTreatment.builder()
625 .pushVlan().setVlanId(assignedVlan).build();
626 filtBuilder.withMeta(tt);
627 }
628 return filtBuilder.permit().fromApp(srManager.appId);
629 }
630
631 /**
632 * Gets output ports information from treatments.
633 *
634 * @param treatments collection of traffic treatments
635 * @return set of output port numbers
636 */
637 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
638 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
639 treatments.forEach(treatment -> {
640 treatment.allInstructions().stream()
641 .filter(instr -> instr instanceof OutputInstruction)
642 .forEach(instr -> {
643 builder.add(((OutputInstruction) instr).port());
644 });
645 });
646 return builder.build();
647 }
648
649 /**
650 * Gets a path from src to dst.
651 * If a path was allocated before, returns the allocated path.
652 * Otherwise, randomly pick one from available paths.
653 *
654 * @param src source device ID
655 * @param dst destination device ID
656 * @param mcastIp multicast group
657 * @return an optional path from src to dst
658 */
659 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
660 List<Path> allPaths = Lists.newArrayList(
661 topologyService.getPaths(topologyService.currentTopology(), src, dst));
Charles Chan72779502016-04-23 17:36:10 -0700662 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Charles Chanc91c8782016-03-30 17:54:24 -0700663 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -0700664 return Optional.empty();
665 }
666
667 // If one of the available path is used before, use the same path
Charles Chan72779502016-04-23 17:36:10 -0700668 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, src);
669 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
670 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700671 Set<PortNumber> existingPorts = getPorts(nextObj.next());
672 for (Path path : allPaths) {
673 PortNumber srcPort = path.links().get(0).src().port();
674 if (existingPorts.contains(srcPort)) {
675 return Optional.of(path);
676 }
677 }
678 }
679 // Otherwise, randomly pick a path
680 Collections.shuffle(allPaths);
681 return allPaths.stream().findFirst();
682 }
683
684 /**
Charles Chan72779502016-04-23 17:36:10 -0700685 * Gets device(s) of given role in given multicast group.
686 *
687 * @param mcastIp multicast IP
688 * @param role multicast role
689 * @return set of device ID or empty set if not found
690 */
691 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
692 return mcastRoleStore.entrySet().stream()
693 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
694 entry.getValue().value() == role)
695 .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
696 .collect(Collectors.toSet());
697 }
698
699 /**
Charles Chana8f9dee2016-05-16 18:44:13 -0700700 * Gets source connect point of given multicast group.
701 *
702 * @param mcastIp multicast IP
703 * @return source connect point or null if not found
704 */
705 private ConnectPoint getSource(IpAddress mcastIp) {
706 return srManager.multicastRouteService.getRoutes().stream()
707 .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
708 .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
709 .findAny().orElse(null);
710 }
711
712 /**
Charles Chan72779502016-04-23 17:36:10 -0700713 * Gets groups which is affected by the link down event.
714 *
715 * @param link link going down
716 * @return a set of multicast IpAddress
717 */
718 private Set<IpAddress> getAffectedGroups(Link link) {
719 DeviceId deviceId = link.src().deviceId();
720 PortNumber port = link.src().port();
721 return mcastNextObjStore.entrySet().stream()
722 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
723 getPorts(entry.getValue().value().next()).contains(port))
724 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
725 .collect(Collectors.toSet());
726 }
727
728 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700729 * Gets egress VLAN from McastConfig.
730 *
731 * @return egress VLAN or VlanId.NONE if not configured
732 */
733 private VlanId egressVlan() {
734 McastConfig mcastConfig =
735 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
736 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
737 }
738
739 /**
740 * Gets assigned VLAN according to the value of egress VLAN.
Charles Chana8f9dee2016-05-16 18:44:13 -0700741 * If connect point is specified, try to reuse the assigned VLAN on the connect point.
Charles Chanc91c8782016-03-30 17:54:24 -0700742 *
Charles Chana8f9dee2016-05-16 18:44:13 -0700743 * @param cp connect point; Can be null if not specified
744 * @return assigned VLAN ID
Charles Chanc91c8782016-03-30 17:54:24 -0700745 */
Charles Chana8f9dee2016-05-16 18:44:13 -0700746 private VlanId assignedVlan(ConnectPoint cp) {
747 // Use the egressVlan if it is tagged
748 if (!egressVlan().equals(VlanId.NONE)) {
749 return egressVlan();
750 }
751 // Reuse unicast VLAN if the port has subnet configured
752 if (cp != null) {
753 Ip4Prefix portSubnet = srManager.deviceConfiguration
754 .getPortSubnet(cp.deviceId(), cp.port());
755 VlanId unicastVlan = srManager.getSubnetAssignedVlanId(cp.deviceId(), portSubnet);
756 if (unicastVlan != null) {
757 return unicastVlan;
758 }
759 }
760 // By default, use VLAN_NO_SUBNET
761 return VlanId.vlanId(SegmentRoutingManager.ASSIGNED_VLAN_NO_SUBNET);
Charles Chanc91c8782016-03-30 17:54:24 -0700762 }
Charles Chan72779502016-04-23 17:36:10 -0700763
764 /**
765 * Gets the spine-facing port on ingress device of given multicast group.
766 *
767 * @param mcastIp multicast IP
768 * @return spine-facing port on ingress device
769 */
770 private PortNumber ingressTransitPort(IpAddress mcastIp) {
771 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
772 .stream().findAny().orElse(null);
773 if (ingressDevice != null) {
774 NextObjective nextObj = mcastNextObjStore
775 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
776 Set<PortNumber> ports = getPorts(nextObj.next());
777
778 for (PortNumber port : ports) {
779 // Spine-facing port should have no subnet and no xconnect
780 if (srManager.deviceConfiguration != null &&
781 srManager.deviceConfiguration.getPortSubnet(ingressDevice, port) == null &&
782 srManager.deviceConfiguration.getXConnects().values().stream()
783 .allMatch(connectPoints ->
784 connectPoints.stream().noneMatch(connectPoint ->
785 connectPoint.port().equals(port))
786 )) {
787 return port;
788 }
789 }
790 }
791 return null;
792 }
Charles Chanc91c8782016-03-30 17:54:24 -0700793}