blob: e917b300d2bebcdf4d87d7aa4503fe42663bc510 [file] [log] [blame]
Charles Chand55e84d2016-03-30 17:54:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Charles Chand55e84d2016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
Pier Luigibad6d6c2018-01-23 16:06:38 +010021import com.google.common.collect.Maps;
Charles Chand55e84d2016-03-30 17:54:24 -070022import com.google.common.collect.Sets;
23import org.onlab.packet.Ethernet;
24import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpPrefix;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
Pier Luigieba73a02018-01-16 10:47:50 +010029import org.onosproject.cluster.NodeId;
Charles Chand55e84d2016-03-30 17:54:24 -070030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
Ray Milkey6c1f0f02017-08-15 11:02:29 -070032import org.onosproject.net.config.basics.McastConfig;
Charles Chand55e84d2016-03-30 17:54:24 -070033import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.Link;
36import org.onosproject.net.Path;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.TrafficSelector;
41import org.onosproject.net.flow.TrafficTreatment;
42import org.onosproject.net.flow.criteria.Criteria;
43import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
44import org.onosproject.net.flowobjective.DefaultFilteringObjective;
45import org.onosproject.net.flowobjective.DefaultForwardingObjective;
46import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan2199c302016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chand55e84d2016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.FilteringObjective;
49import org.onosproject.net.flowobjective.ForwardingObjective;
50import org.onosproject.net.flowobjective.NextObjective;
Charles Chan2199c302016-04-23 17:36:10 -070051import org.onosproject.net.flowobjective.ObjectiveContext;
Charles Chand55e84d2016-03-30 17:54:24 -070052import org.onosproject.net.mcast.McastEvent;
Pier Luigib72201b2018-01-25 16:16:02 +010053import org.onosproject.net.mcast.McastRoute;
Charles Chand55e84d2016-03-30 17:54:24 -070054import org.onosproject.net.mcast.McastRouteInfo;
55import org.onosproject.net.topology.TopologyService;
Charles Chan6ea94fc2016-05-10 17:29:47 -070056import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
Charles Chan2199c302016-04-23 17:36:10 -070057import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chand55e84d2016-03-30 17:54:24 -070058import org.onosproject.store.serializers.KryoNamespaces;
59import org.onosproject.store.service.ConsistentMap;
60import org.onosproject.store.service.Serializer;
61import org.onosproject.store.service.StorageService;
Pier Luigieba73a02018-01-16 10:47:50 +010062import org.onosproject.store.service.Versioned;
Charles Chand55e84d2016-03-30 17:54:24 -070063import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
65
Pier Luigib72201b2018-01-25 16:16:02 +010066import java.time.Instant;
Charles Chand55e84d2016-03-30 17:54:24 -070067import java.util.Collection;
68import java.util.Collections;
Pier Luigibad6d6c2018-01-23 16:06:38 +010069import java.util.Comparator;
Charles Chand55e84d2016-03-30 17:54:24 -070070import java.util.List;
Charles Chan2199c302016-04-23 17:36:10 -070071import java.util.Map;
Charles Chand55e84d2016-03-30 17:54:24 -070072import java.util.Optional;
73import java.util.Set;
Pier Luigib72201b2018-01-25 16:16:02 +010074import java.util.concurrent.ScheduledExecutorService;
75import java.util.concurrent.TimeUnit;
76import java.util.concurrent.locks.Lock;
77import java.util.concurrent.locks.ReentrantLock;
Charles Chan2199c302016-04-23 17:36:10 -070078import java.util.stream.Collectors;
79
80import static com.google.common.base.Preconditions.checkState;
Pier Luigib72201b2018-01-25 16:16:02 +010081import static java.util.concurrent.Executors.newScheduledThreadPool;
82import static org.onlab.util.Tools.groupedThreads;
Charles Chan59cc16d2017-02-02 16:20:42 -080083import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
Charles Chand55e84d2016-03-30 17:54:24 -070084
85/**
Charles Chand2990362016-04-18 13:44:03 -070086 * Handles multicast-related events.
Charles Chand55e84d2016-03-30 17:54:24 -070087 */
Charles Chand2990362016-04-18 13:44:03 -070088public class McastHandler {
89 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chand55e84d2016-03-30 17:54:24 -070090 private final SegmentRoutingManager srManager;
91 private final ApplicationId coreAppId;
Charles Chanfc5c7802016-05-17 13:13:55 -070092 private final StorageService storageService;
93 private final TopologyService topologyService;
Charles Chan2199c302016-04-23 17:36:10 -070094 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
95 private final KryoNamespace.Builder mcastKryo;
96 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
97
Pier Luigib72201b2018-01-25 16:16:02 +010098 // Mcast lock to serialize local operations
99 private final Lock mcastLock = new ReentrantLock();
100
101 /**
102 * Acquires the lock used when making mcast changes.
103 */
104 private void mcastLock() {
105 mcastLock.lock();
106 }
107
108 /**
109 * Releases the lock used when making mcast changes.
110 */
111 private void mcastUnlock() {
112 mcastLock.unlock();
113 }
114
115 // Stability threshold for Mcast. Seconds
116 private static final long MCAST_STABLITY_THRESHOLD = 5;
117 // Last change done
118 private Instant lastMcastChange = Instant.now();
119
120 /**
121 * Determines if mcast in the network has been stable in the last
122 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
123 * to the last mcast change timestamp.
124 *
125 * @return true if stable
126 */
127 private boolean isMcastStable() {
128 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
129 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
130 log.debug("Mcast stable since {}s", now - last);
131 return (now - last) > MCAST_STABLITY_THRESHOLD;
132 }
133
134 // Verify interval for Mcast
135 private static final long MCAST_VERIFY_INTERVAL = 30;
136
137 // Executor for mcast bucket corrector
138 private ScheduledExecutorService executorService
139 = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
140
Charles Chan2199c302016-04-23 17:36:10 -0700141 /**
142 * Role in the multicast tree.
143 */
144 public enum McastRole {
145 /**
146 * The device is the ingress device of this group.
147 */
148 INGRESS,
149 /**
150 * The device is the transit device of this group.
151 */
152 TRANSIT,
153 /**
154 * The device is the egress device of this group.
155 */
156 EGRESS
157 }
Charles Chand55e84d2016-03-30 17:54:24 -0700158
159 /**
160 * Constructs the McastEventHandler.
161 *
162 * @param srManager Segment Routing manager
163 */
Charles Chand2990362016-04-18 13:44:03 -0700164 public McastHandler(SegmentRoutingManager srManager) {
Charles Chand55e84d2016-03-30 17:54:24 -0700165 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chand55e84d2016-03-30 17:54:24 -0700166 this.srManager = srManager;
167 this.storageService = srManager.storageService;
168 this.topologyService = srManager.topologyService;
Charles Chan2199c302016-04-23 17:36:10 -0700169 mcastKryo = new KryoNamespace.Builder()
Charles Chand55e84d2016-03-30 17:54:24 -0700170 .register(KryoNamespaces.API)
Charles Chan2199c302016-04-23 17:36:10 -0700171 .register(McastStoreKey.class)
172 .register(McastRole.class);
Charles Chand55e84d2016-03-30 17:54:24 -0700173 mcastNextObjStore = storageService
Charles Chan2199c302016-04-23 17:36:10 -0700174 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chand55e84d2016-03-30 17:54:24 -0700175 .withName("onos-mcast-nextobj-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700176 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chand55e84d2016-03-30 17:54:24 -0700177 .build();
Charles Chan2199c302016-04-23 17:36:10 -0700178 mcastRoleStore = storageService
179 .<McastStoreKey, McastRole>consistentMapBuilder()
180 .withName("onos-mcast-role-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700181 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan2199c302016-04-23 17:36:10 -0700182 .build();
Pier Luigib72201b2018-01-25 16:16:02 +0100183 // Init the executor service and the buckets corrector
184 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
185 MCAST_VERIFY_INTERVAL,
186 TimeUnit.SECONDS);
Charles Chan2199c302016-04-23 17:36:10 -0700187 }
188
189 /**
190 * Read initial multicast from mcast store.
191 */
Charles Chanfc5c7802016-05-17 13:13:55 -0700192 protected void init() {
Charles Chan2199c302016-04-23 17:36:10 -0700193 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
194 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
195 Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
196 sinks.forEach(sink -> {
197 processSinkAddedInternal(source, sink, mcastRoute.group());
198 });
199 });
Charles Chand55e84d2016-03-30 17:54:24 -0700200 }
201
202 /**
Pier Luigib72201b2018-01-25 16:16:02 +0100203 * Clean up when deactivating the application.
204 */
205 protected void terminate() {
206 executorService.shutdown();
207 }
208
209 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700210 * Processes the SOURCE_ADDED event.
211 *
212 * @param event McastEvent with SOURCE_ADDED type
213 */
214 protected void processSourceAdded(McastEvent event) {
215 log.info("processSourceAdded {}", event);
216 McastRouteInfo mcastRouteInfo = event.subject();
217 if (!mcastRouteInfo.isComplete()) {
218 log.info("Incompleted McastRouteInfo. Abort.");
219 return;
220 }
221 ConnectPoint source = mcastRouteInfo.source().orElse(null);
222 Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
223 IpAddress mcastIp = mcastRouteInfo.route().group();
224
Pier Luigib72201b2018-01-25 16:16:02 +0100225 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
Charles Chand55e84d2016-03-30 17:54:24 -0700226 }
227
228 /**
229 * Processes the SINK_ADDED event.
230 *
231 * @param event McastEvent with SINK_ADDED type
232 */
233 protected void processSinkAdded(McastEvent event) {
234 log.info("processSinkAdded {}", event);
235 McastRouteInfo mcastRouteInfo = event.subject();
236 if (!mcastRouteInfo.isComplete()) {
237 log.info("Incompleted McastRouteInfo. Abort.");
238 return;
239 }
240 ConnectPoint source = mcastRouteInfo.source().orElse(null);
241 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
242 IpAddress mcastIp = mcastRouteInfo.route().group();
243
244 processSinkAddedInternal(source, sink, mcastIp);
245 }
246
247 /**
248 * Processes the SINK_REMOVED event.
249 *
250 * @param event McastEvent with SINK_REMOVED type
251 */
252 protected void processSinkRemoved(McastEvent event) {
253 log.info("processSinkRemoved {}", event);
254 McastRouteInfo mcastRouteInfo = event.subject();
255 if (!mcastRouteInfo.isComplete()) {
256 log.info("Incompleted McastRouteInfo. Abort.");
257 return;
258 }
259 ConnectPoint source = mcastRouteInfo.source().orElse(null);
260 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
261 IpAddress mcastIp = mcastRouteInfo.route().group();
Charles Chand55e84d2016-03-30 17:54:24 -0700262
Pier Luigib72201b2018-01-25 16:16:02 +0100263 processSinkRemovedInternal(source, sink, mcastIp);
264 }
Charles Chan1588e7b2016-06-28 16:50:13 -0700265
Pier Luigib72201b2018-01-25 16:16:02 +0100266 /**
267 * Removes a path from source to sink for given multicast group.
268 *
269 * @param source connect point of the multicast source
270 * @param sink connection point of the multicast sink
271 * @param mcastIp multicast group IP address
272 */
273 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
274 IpAddress mcastIp) {
275 lastMcastChange = Instant.now();
276 mcastLock();
277 try {
278 // Continue only when this instance is the master of source device
279 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
280 log.debug("Skip {} due to lack of mastership of the source device {}",
281 mcastIp, source.deviceId());
Charles Chand55e84d2016-03-30 17:54:24 -0700282 return;
283 }
Charles Chand55e84d2016-03-30 17:54:24 -0700284
Pier Luigib72201b2018-01-25 16:16:02 +0100285 // When source and sink are on the same device
286 if (source.deviceId().equals(sink.deviceId())) {
287 // Source and sink are on even the same port. There must be something wrong.
288 if (source.port().equals(sink.port())) {
289 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
290 mcastIp, sink, source);
291 return;
292 }
293 removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
294 return;
295 }
Charles Chand55e84d2016-03-30 17:54:24 -0700296
Pier Luigib72201b2018-01-25 16:16:02 +0100297 // Process the egress device
298 boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
299 if (isLast) {
300 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
301 }
302
303 // If this is the last sink on the device, also update upstream
304 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
305 if (mcastPath.isPresent()) {
306 List<Link> links = Lists.newArrayList(mcastPath.get().links());
307 Collections.reverse(links);
308 for (Link link : links) {
309 if (isLast) {
310 isLast = removePortFromDevice(
311 link.src().deviceId(),
312 link.src().port(),
313 mcastIp,
314 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
315 );
316 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
317 }
Charles Chand55e84d2016-03-30 17:54:24 -0700318 }
319 }
Pier Luigib72201b2018-01-25 16:16:02 +0100320 } finally {
321 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700322 }
323 }
324
325 /**
326 * Establishes a path from source to sink for given multicast group.
327 *
328 * @param source connect point of the multicast source
329 * @param sink connection point of the multicast sink
330 * @param mcastIp multicast group IP address
331 */
332 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
333 IpAddress mcastIp) {
Pier Luigib72201b2018-01-25 16:16:02 +0100334 lastMcastChange = Instant.now();
335 mcastLock();
336 try {
337 // Continue only when this instance is the master of source device
338 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
339 log.debug("Skip {} due to lack of mastership of the source device {}",
340 mcastIp, source.deviceId());
Charles Chand55e84d2016-03-30 17:54:24 -0700341 return;
342 }
Charles Chand55e84d2016-03-30 17:54:24 -0700343
Pier Luigib72201b2018-01-25 16:16:02 +0100344 // Process the ingress device
345 addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
Charles Chan2199c302016-04-23 17:36:10 -0700346
Pier Luigib72201b2018-01-25 16:16:02 +0100347 // When source and sink are on the same device
348 if (source.deviceId().equals(sink.deviceId())) {
349 // Source and sink are on even the same port. There must be something wrong.
350 if (source.port().equals(sink.port())) {
351 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
352 mcastIp, sink, source);
353 return;
354 }
355 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
356 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
357 return;
358 }
Charles Chan2199c302016-04-23 17:36:10 -0700359
Pier Luigib72201b2018-01-25 16:16:02 +0100360 // Find a path. If present, create/update groups and flows for each hop
361 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
362 if (mcastPath.isPresent()) {
363 List<Link> links = mcastPath.get().links();
364 checkState(links.size() == 2,
365 "Path in leaf-spine topology should always be two hops: ", links);
Charles Chan2199c302016-04-23 17:36:10 -0700366
Pier Luigib72201b2018-01-25 16:16:02 +0100367 links.forEach(link -> {
368 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
369 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
370 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
371 });
372
373 // Process the egress device
374 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
375
376 // Setup mcast roles
377 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
378 McastRole.INGRESS);
379 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
380 McastRole.TRANSIT);
381 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
382 McastRole.EGRESS);
383 } else {
384 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
385 source.deviceId(), sink.deviceId());
386 }
387 } finally {
388 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700389 }
390 }
391
392 /**
Charles Chan2199c302016-04-23 17:36:10 -0700393 * Processes the LINK_DOWN event.
394 *
395 * @param affectedLink Link that is going down
396 */
397 protected void processLinkDown(Link affectedLink) {
Pier Luigib72201b2018-01-25 16:16:02 +0100398 lastMcastChange = Instant.now();
399 mcastLock();
400 try {
401 // Get groups affected by the link down event
402 getAffectedGroups(affectedLink).forEach(mcastIp -> {
403 // TODO Optimize when the group editing is in place
404 log.debug("Processing link down {} for group {}",
405 affectedLink, mcastIp);
Pier Luigieba73a02018-01-16 10:47:50 +0100406
Pier Luigib72201b2018-01-25 16:16:02 +0100407 // Find out the ingress, transit and egress device of affected group
408 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
409 .stream().findAny().orElse(null);
410 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
411 .stream().findAny().orElse(null);
412 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
413 ConnectPoint source = getSource(mcastIp);
Charles Chan8d449862016-05-16 18:44:13 -0700414
Pier Luigib72201b2018-01-25 16:16:02 +0100415 // Do not proceed if any of these info is missing
416 if (ingressDevice == null || transitDevice == null
417 || egressDevices == null || source == null) {
418 log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
419 ingressDevice, transitDevice, egressDevices, source);
420 return;
Charles Chan2199c302016-04-23 17:36:10 -0700421 }
Pier Luigib72201b2018-01-25 16:16:02 +0100422
423 // Continue only when this instance is the master of source device
424 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
425 log.debug("Skip {} due to lack of mastership of the source device {}",
426 source.deviceId());
427 return;
428 }
429
430 // Remove entire transit
431 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
432
433 // Remove transit-facing port on ingress device
434 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
435 if (ingressTransitPort != null) {
436 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
437 mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
438 }
439
440 // Construct a new path for each egress device
441 egressDevices.forEach(egressDevice -> {
442 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
443 if (mcastPath.isPresent()) {
444 installPath(mcastIp, source, mcastPath.get());
445 } else {
446 log.warn("Fail to recover egress device {} from link failure {}",
447 egressDevice, affectedLink);
448 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
449 }
450 });
Charles Chan2199c302016-04-23 17:36:10 -0700451 });
Pier Luigib72201b2018-01-25 16:16:02 +0100452 } finally {
453 mcastUnlock();
454 }
Charles Chan2199c302016-04-23 17:36:10 -0700455 }
456
457 /**
Pier Luigieba73a02018-01-16 10:47:50 +0100458 * Process the DEVICE_DOWN event.
459 *
460 * @param deviceDown device going down
461 */
462 protected void processDeviceDown(DeviceId deviceDown) {
Pier Luigib72201b2018-01-25 16:16:02 +0100463 lastMcastChange = Instant.now();
464 mcastLock();
465 try {
466 // Get the mcast groups affected by the device going down
467 getAffectedGroups(deviceDown).forEach(mcastIp -> {
468 // TODO Optimize when the group editing is in place
469 log.debug("Processing device down {} for group {}",
470 deviceDown, mcastIp);
Pier Luigieba73a02018-01-16 10:47:50 +0100471
Pier Luigib72201b2018-01-25 16:16:02 +0100472 // Find out the ingress, transit and egress device of affected group
473 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
474 .stream().findAny().orElse(null);
475 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
476 .stream().findAny().orElse(null);
477 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
478 ConnectPoint source = getSource(mcastIp);
Pier Luigieba73a02018-01-16 10:47:50 +0100479
Pier Luigib72201b2018-01-25 16:16:02 +0100480 // Do not proceed if ingress device or source of this group are missing
481 // If sinks are in other leafs, we have ingress, transit, egress, and source
482 // If sinks are in the same leaf, we have just ingress and source
483 if (ingressDevice == null || source == null) {
484 log.warn("Missing ingress {} or source {} for group {}",
485 ingressDevice, source, mcastIp);
Pier Luigieba73a02018-01-16 10:47:50 +0100486 return;
487 }
Pier Luigieba73a02018-01-16 10:47:50 +0100488
Pier Luigib72201b2018-01-25 16:16:02 +0100489 // Continue only when we have the mastership on the operation
490 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
491 // When the source is available we just check the mastership
492 if (srManager.deviceService.isAvailable(source.deviceId())) {
493 log.debug("Skip {} due to lack of mastership of the source device {}",
494 mcastIp, source.deviceId());
495 return;
496 }
497 // Fallback with Leadership service
498 // source id is used a topic
499 NodeId leader = srManager.leadershipService.runForLeadership(
500 source.deviceId().toString()).leaderNodeId();
501 // Verify if this node is the leader
502 if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
503 log.debug("Skip {} due to lack of leadership on the topic {}",
504 mcastIp, source.deviceId());
Pier Luigieba73a02018-01-16 10:47:50 +0100505 return;
506 }
507 }
Pier Luigib72201b2018-01-25 16:16:02 +0100508
509 // If it exists, we have to remove it in any case
510 if (transitDevice != null) {
511 // Remove entire transit
512 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
513 }
514 // If the ingress is down
515 if (ingressDevice.equals(deviceDown)) {
516 // Remove entire ingress
517 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
518 // If other sinks different from the ingress exist
519 if (!egressDevices.isEmpty()) {
520 // Remove all the remaining egress
521 egressDevices.forEach(
522 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
523 );
Pier Luigieba73a02018-01-16 10:47:50 +0100524 }
Pier Luigib72201b2018-01-25 16:16:02 +0100525 } else {
526 // Egress or transit could be down at this point
527 // Get the ingress-transit port if it exists
528 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
529 if (ingressTransitPort != null) {
530 // Remove transit-facing port on ingress device
531 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
532 }
533 // One of the egress device is down
534 if (egressDevices.contains(deviceDown)) {
535 // Remove entire device down
536 removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
537 // Remove the device down from egress
538 egressDevices.remove(deviceDown);
539 // If there are no more egress and ingress does not have sinks
540 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
541 // Remove entire ingress
542 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
543 // We have done
544 return;
545 }
546 }
547 // Construct a new path for each egress device
548 egressDevices.forEach(egressDevice -> {
549 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
550 // If there is a new path
551 if (mcastPath.isPresent()) {
552 // Let's install the new mcast path for this egress
553 installPath(mcastIp, source, mcastPath.get());
554 } else {
555 // We were not able to find an alternative path for this egress
556 log.warn("Fail to recover egress device {} from device down {}",
557 egressDevice, deviceDown);
558 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
559 }
560 });
561 }
562 });
563 } finally {
564 mcastUnlock();
565 }
Pier Luigieba73a02018-01-16 10:47:50 +0100566 }
567
568 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700569 * Adds filtering objective for given device and port.
570 *
571 * @param deviceId device ID
572 * @param port ingress port number
573 * @param assignedVlan assigned VLAN ID
574 */
Julia Ferguson65428c32017-08-10 18:15:24 +0000575 private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
Charles Chand55e84d2016-03-30 17:54:24 -0700576 // Do nothing if the port is configured as suppressed
Charles Chan6ea94fc2016-05-10 17:29:47 -0700577 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
578 SegmentRoutingAppConfig appConfig = srManager.cfgService
579 .getConfig(srManager.appId, SegmentRoutingAppConfig.class);
580 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
581 log.info("Ignore suppressed port {}", connectPoint);
Charles Chand55e84d2016-03-30 17:54:24 -0700582 return;
583 }
584
585 FilteringObjective.Builder filtObjBuilder =
Julia Ferguson65428c32017-08-10 18:15:24 +0000586 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
Charles Chan2199c302016-04-23 17:36:10 -0700587 ObjectiveContext context = new DefaultObjectiveContext(
588 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
Charles Chan59cc16d2017-02-02 16:20:42 -0800589 deviceId, port.toLong(), assignedVlan),
Charles Chan2199c302016-04-23 17:36:10 -0700590 (objective, error) ->
591 log.warn("Failed to add filter on {}/{}, vlan {}: {}",
Charles Chan59cc16d2017-02-02 16:20:42 -0800592 deviceId, port.toLong(), assignedVlan, error));
Charles Chan2199c302016-04-23 17:36:10 -0700593 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
Charles Chand55e84d2016-03-30 17:54:24 -0700594 }
595
596 /**
597 * Adds a port to given multicast group on given device. This involves the
598 * update of L3 multicast group and multicast routing table entry.
599 *
600 * @param deviceId device ID
601 * @param port port to be added
602 * @param mcastIp multicast group
603 * @param assignedVlan assigned VLAN ID
604 */
605 private void addPortToDevice(DeviceId deviceId, PortNumber port,
606 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -0700607 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chand55e84d2016-03-30 17:54:24 -0700608 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi21fffd22018-01-19 10:24:53 +0100609 NextObjective newNextObj;
Charles Chan2199c302016-04-23 17:36:10 -0700610 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chand55e84d2016-03-30 17:54:24 -0700611 // First time someone request this mcast group via this device
612 portBuilder.add(port);
Pier Luigi21fffd22018-01-19 10:24:53 +0100613 // New nextObj
614 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
615 portBuilder.build(), null).add();
616 // Store the new port
617 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700618 } else {
619 // This device already serves some subscribers of this mcast group
Charles Chan2199c302016-04-23 17:36:10 -0700620 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chand55e84d2016-03-30 17:54:24 -0700621 // Stop if the port is already in the nextobj
622 Set<PortNumber> existingPorts = getPorts(nextObj.next());
623 if (existingPorts.contains(port)) {
624 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
625 return;
626 }
Pier Luigi21fffd22018-01-19 10:24:53 +0100627 // Let's add the port and reuse the previous one
Yuta HIGUCHI0eb68e12018-02-09 18:05:23 -0800628 portBuilder.addAll(existingPorts).add(port);
Pier Luigi21fffd22018-01-19 10:24:53 +0100629 // Reuse previous nextObj
630 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
631 portBuilder.build(), nextObj.id()).addToExisting();
632 // Store the final next objective and send only the difference to the driver
633 mcastNextObjStore.put(mcastStoreKey, newNextObj);
634 // Add just the new port
635 portBuilder = ImmutableSet.builder();
636 portBuilder.add(port);
637 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
638 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chand55e84d2016-03-30 17:54:24 -0700639 }
640 // Create, store and apply the new nextObj and fwdObj
Charles Chan2199c302016-04-23 17:36:10 -0700641 ObjectiveContext context = new DefaultObjectiveContext(
642 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
643 mcastIp, deviceId, port.toLong(), assignedVlan),
644 (objective, error) ->
645 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
646 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chand55e84d2016-03-30 17:54:24 -0700647 ForwardingObjective fwdObj =
Charles Chan2199c302016-04-23 17:36:10 -0700648 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chand55e84d2016-03-30 17:54:24 -0700649 srManager.flowObjectiveService.next(deviceId, newNextObj);
650 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700651 }
652
653 /**
654 * Removes a port from given multicast group on given device.
655 * This involves the update of L3 multicast group and multicast routing
656 * table entry.
657 *
658 * @param deviceId device ID
659 * @param port port to be added
660 * @param mcastIp multicast group
661 * @param assignedVlan assigned VLAN ID
662 * @return true if this is the last sink on this device
663 */
664 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
665 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -0700666 McastStoreKey mcastStoreKey =
667 new McastStoreKey(mcastIp, deviceId);
Charles Chand55e84d2016-03-30 17:54:24 -0700668 // This device is not serving this multicast group
Charles Chan2199c302016-04-23 17:36:10 -0700669 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chand55e84d2016-03-30 17:54:24 -0700670 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
671 return false;
672 }
Charles Chan2199c302016-04-23 17:36:10 -0700673 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chand55e84d2016-03-30 17:54:24 -0700674
675 Set<PortNumber> existingPorts = getPorts(nextObj.next());
Charles Chan2199c302016-04-23 17:36:10 -0700676 // This port does not serve this multicast group
Charles Chand55e84d2016-03-30 17:54:24 -0700677 if (!existingPorts.contains(port)) {
678 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
679 return false;
680 }
681 // Copy and modify the ImmutableSet
682 existingPorts = Sets.newHashSet(existingPorts);
683 existingPorts.remove(port);
684
685 NextObjective newNextObj;
Pier Luigid1be7b12018-01-19 10:24:53 +0100686 ObjectiveContext context;
Charles Chand55e84d2016-03-30 17:54:24 -0700687 ForwardingObjective fwdObj;
688 if (existingPorts.isEmpty()) {
Pier Luigid1be7b12018-01-19 10:24:53 +0100689 // If this is the last sink, remove flows and last bucket
Charles Chand55e84d2016-03-30 17:54:24 -0700690 // NOTE: Rely on GroupStore garbage collection rather than explicitly
691 // remove L3MG since there might be other flows/groups refer to
692 // the same L2IG
Pier Luigid1be7b12018-01-19 10:24:53 +0100693 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -0700694 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
695 mcastIp, deviceId, port.toLong(), assignedVlan),
696 (objective, error) ->
697 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
698 mcastIp, deviceId, port.toLong(), assignedVlan, error));
699 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
700 mcastNextObjStore.remove(mcastStoreKey);
Charles Chand55e84d2016-03-30 17:54:24 -0700701 } else {
702 // If this is not the last sink, update flows and groups
Pier Luigid1be7b12018-01-19 10:24:53 +0100703 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -0700704 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
705 mcastIp, deviceId, port.toLong(), assignedVlan),
706 (objective, error) ->
707 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
708 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigid1be7b12018-01-19 10:24:53 +0100709 // Here we store the next objective with the remaining port
710 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
711 existingPorts, nextObj.id()).removeFromExisting();
Charles Chanfc5c7802016-05-17 13:13:55 -0700712 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan2199c302016-04-23 17:36:10 -0700713 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700714 }
Pier Luigid1be7b12018-01-19 10:24:53 +0100715 // Let's modify the next objective removing the bucket
716 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
717 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
718 srManager.flowObjectiveService.next(deviceId, newNextObj);
719 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -0700720 return existingPorts.isEmpty();
721 }
722
Charles Chan2199c302016-04-23 17:36:10 -0700723 /**
724 * Removes entire group on given device.
725 *
726 * @param deviceId device ID
727 * @param mcastIp multicast group to be removed
728 * @param assignedVlan assigned VLAN ID
729 */
730 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
731 VlanId assignedVlan) {
732 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
733 // This device is not serving this multicast group
734 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
735 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
736 return;
737 }
738 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
739 // NOTE: Rely on GroupStore garbage collection rather than explicitly
740 // remove L3MG since there might be other flows/groups refer to
741 // the same L2IG
742 ObjectiveContext context = new DefaultObjectiveContext(
743 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
744 mcastIp, deviceId, assignedVlan),
745 (objective, error) ->
746 log.warn("Failed to remove {} on {}, vlan {}: {}",
747 mcastIp, deviceId, assignedVlan, error));
748 ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
749 srManager.flowObjectiveService.forward(deviceId, fwdObj);
750 mcastNextObjStore.remove(mcastStoreKey);
751 mcastRoleStore.remove(mcastStoreKey);
752 }
753
Pier Luigieba73a02018-01-16 10:47:50 +0100754 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
755 // Get Links
756 List<Link> links = mcastPath.links();
757 // For each link, modify the next on the source device adding the src port
758 // and a new filter objective on the destination port
759 links.forEach(link -> {
760 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
761 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
762 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null),
763 mcastIp);
764 });
765 // Setup new transit mcast role
766 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
767 McastRole.TRANSIT);
Charles Chan2199c302016-04-23 17:36:10 -0700768 }
769
Charles Chand55e84d2016-03-30 17:54:24 -0700770 /**
771 * Creates a next objective builder for multicast.
772 *
773 * @param mcastIp multicast group
774 * @param assignedVlan assigned VLAN ID
775 * @param outPorts set of output port numbers
776 * @return next objective builder
777 */
778 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
Pier Luigi21fffd22018-01-19 10:24:53 +0100779 VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
780 // If nextId is null allocate a new one
781 if (nextId == null) {
782 nextId = srManager.flowObjectiveService.allocateNextId();
783 }
Charles Chand55e84d2016-03-30 17:54:24 -0700784
785 TrafficSelector metadata =
786 DefaultTrafficSelector.builder()
787 .matchVlanId(assignedVlan)
788 .matchIPDst(mcastIp.toIpPrefix())
789 .build();
790
791 NextObjective.Builder nextObjBuilder = DefaultNextObjective
792 .builder().withId(nextId)
793 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId)
794 .withMeta(metadata);
795
796 outPorts.forEach(port -> {
797 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
798 if (egressVlan().equals(VlanId.NONE)) {
799 tBuilder.popVlan();
800 }
801 tBuilder.setOutput(port);
802 nextObjBuilder.addTreatment(tBuilder.build());
803 });
804
805 return nextObjBuilder;
806 }
807
808 /**
809 * Creates a forwarding objective builder for multicast.
810 *
811 * @param mcastIp multicast group
812 * @param assignedVlan assigned VLAN ID
813 * @param nextId next ID of the L3 multicast group
814 * @return forwarding objective builder
815 */
816 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
817 VlanId assignedVlan, int nextId) {
818 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
Julia Ferguson65428c32017-08-10 18:15:24 +0000819 IpPrefix mcastPrefix = mcastIp.toIpPrefix();
820
821 if (mcastIp.isIp4()) {
822 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
823 sbuilder.matchIPDst(mcastPrefix);
824 } else {
825 sbuilder.matchEthType(Ethernet.TYPE_IPV6);
826 sbuilder.matchIPv6Dst(mcastPrefix);
827 }
828
829
Charles Chand55e84d2016-03-30 17:54:24 -0700830 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
831 metabuilder.matchVlanId(assignedVlan);
832
833 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
834 fwdBuilder.withSelector(sbuilder.build())
835 .withMeta(metabuilder.build())
836 .nextStep(nextId)
837 .withFlag(ForwardingObjective.Flag.SPECIFIC)
838 .fromApp(srManager.appId)
839 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
840 return fwdBuilder;
841 }
842
843 /**
844 * Creates a filtering objective builder for multicast.
845 *
846 * @param deviceId Device ID
847 * @param ingressPort ingress port of the multicast stream
848 * @param assignedVlan assigned VLAN ID
849 * @return filtering objective builder
850 */
851 private FilteringObjective.Builder filterObjBuilder(DeviceId deviceId, PortNumber ingressPort,
Julia Ferguson65428c32017-08-10 18:15:24 +0000852 VlanId assignedVlan, IpAddress mcastIp) {
Charles Chand55e84d2016-03-30 17:54:24 -0700853 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
Charles Chan1588e7b2016-06-28 16:50:13 -0700854
Julia Ferguson65428c32017-08-10 18:15:24 +0000855 if (mcastIp.isIp4()) {
856 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
857 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
858 MacAddress.IPV4_MULTICAST_MASK))
859 .addCondition(Criteria.matchVlanId(egressVlan()))
860 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
861 } else {
862 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
863 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
864 MacAddress.IPV6_MULTICAST_MASK))
865 .addCondition(Criteria.matchVlanId(egressVlan()))
866 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
867 }
Charles Chan1588e7b2016-06-28 16:50:13 -0700868 TrafficTreatment tt = DefaultTrafficTreatment.builder()
869 .pushVlan().setVlanId(assignedVlan).build();
870 filtBuilder.withMeta(tt);
871
Charles Chand55e84d2016-03-30 17:54:24 -0700872 return filtBuilder.permit().fromApp(srManager.appId);
873 }
874
875 /**
876 * Gets output ports information from treatments.
877 *
878 * @param treatments collection of traffic treatments
879 * @return set of output port numbers
880 */
881 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
882 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
883 treatments.forEach(treatment -> {
884 treatment.allInstructions().stream()
885 .filter(instr -> instr instanceof OutputInstruction)
886 .forEach(instr -> {
887 builder.add(((OutputInstruction) instr).port());
888 });
889 });
890 return builder.build();
891 }
892
893 /**
894 * Gets a path from src to dst.
895 * If a path was allocated before, returns the allocated path.
896 * Otherwise, randomly pick one from available paths.
897 *
898 * @param src source device ID
899 * @param dst destination device ID
900 * @param mcastIp multicast group
901 * @return an optional path from src to dst
902 */
903 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
904 List<Path> allPaths = Lists.newArrayList(
905 topologyService.getPaths(topologyService.currentTopology(), src, dst));
Charles Chan2199c302016-04-23 17:36:10 -0700906 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Charles Chand55e84d2016-03-30 17:54:24 -0700907 if (allPaths.isEmpty()) {
Charles Chand55e84d2016-03-30 17:54:24 -0700908 return Optional.empty();
909 }
910
Pier Luigibad6d6c2018-01-23 16:06:38 +0100911 // Create a map index of suitablity-to-list of paths. For example
912 // a path in the list associated to the index 1 shares only the
913 // first hop and it is less suitable of a path belonging to the index
914 // 2 that shares leaf-spine.
915 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
916 // Some init steps
917 int nhop;
918 McastStoreKey mcastStoreKey;
919 Link hop;
920 PortNumber srcPort;
921 Set<PortNumber> existingPorts;
922 NextObjective nextObj;
923 // Iterate over paths looking for eligible paths
924 for (Path path : allPaths) {
925 // Unlikely, it will happen...
926 if (!src.equals(path.links().get(0).src().deviceId())) {
927 continue;
928 }
929 nhop = 0;
930 // Iterate over the links
931 while (nhop < path.links().size()) {
932 // Get the link and verify if a next related
933 // to the src device exist in the store
934 hop = path.links().get(nhop);
935 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
936 // It does not exist in the store, exit
937 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
938 break;
Charles Chand55e84d2016-03-30 17:54:24 -0700939 }
Pier Luigibad6d6c2018-01-23 16:06:38 +0100940 // Get the output ports on the next
941 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
942 existingPorts = getPorts(nextObj.next());
943 // And the src port on the link
944 srcPort = hop.src().port();
945 // the src port is not used as output, exit
946 if (!existingPorts.contains(srcPort)) {
947 break;
948 }
949 nhop++;
950 }
951 // n_hop defines the index
952 if (nhop > 0) {
953 eligiblePaths.compute(nhop, (index, paths) -> {
954 paths = paths == null ? Lists.newArrayList() : paths;
955 paths.add(path);
956 return paths;
957 });
Charles Chand55e84d2016-03-30 17:54:24 -0700958 }
959 }
Pier Luigibad6d6c2018-01-23 16:06:38 +0100960
961 // No suitable paths
962 if (eligiblePaths.isEmpty()) {
963 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
964 // Otherwise, randomly pick a path
965 Collections.shuffle(allPaths);
966 return allPaths.stream().findFirst();
967 }
968
969 // Let's take the best ones
970 Integer bestIndex = eligiblePaths.keySet()
971 .stream()
972 .sorted(Comparator.reverseOrder())
973 .findFirst().orElse(null);
974 List<Path> bestPaths = eligiblePaths.get(bestIndex);
975 log.debug("{} eligiblePath(s) found from {} to {}",
976 bestPaths.size(), src, dst);
977 // randomly pick a path on the highest index
978 Collections.shuffle(bestPaths);
979 return bestPaths.stream().findFirst();
Charles Chand55e84d2016-03-30 17:54:24 -0700980 }
981
982 /**
Charles Chan2199c302016-04-23 17:36:10 -0700983 * Gets device(s) of given role in given multicast group.
984 *
985 * @param mcastIp multicast IP
986 * @param role multicast role
987 * @return set of device ID or empty set if not found
988 */
989 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
990 return mcastRoleStore.entrySet().stream()
991 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
992 entry.getValue().value() == role)
993 .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
994 .collect(Collectors.toSet());
995 }
996
997 /**
Charles Chan8d449862016-05-16 18:44:13 -0700998 * Gets source connect point of given multicast group.
999 *
1000 * @param mcastIp multicast IP
1001 * @return source connect point or null if not found
1002 */
1003 private ConnectPoint getSource(IpAddress mcastIp) {
1004 return srManager.multicastRouteService.getRoutes().stream()
1005 .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
1006 .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
1007 .findAny().orElse(null);
1008 }
1009
1010 /**
Charles Chan2199c302016-04-23 17:36:10 -07001011 * Gets groups which is affected by the link down event.
1012 *
1013 * @param link link going down
1014 * @return a set of multicast IpAddress
1015 */
1016 private Set<IpAddress> getAffectedGroups(Link link) {
1017 DeviceId deviceId = link.src().deviceId();
1018 PortNumber port = link.src().port();
1019 return mcastNextObjStore.entrySet().stream()
1020 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
1021 getPorts(entry.getValue().value().next()).contains(port))
1022 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1023 .collect(Collectors.toSet());
1024 }
1025
1026 /**
Pier Luigieba73a02018-01-16 10:47:50 +01001027 * Gets groups which are affected by the device down event.
1028 *
1029 * @param deviceId device going down
1030 * @return a set of multicast IpAddress
1031 */
1032 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1033 return mcastNextObjStore.entrySet().stream()
1034 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
1035 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1036 .collect(Collectors.toSet());
1037 }
1038
1039 /**
Charles Chand55e84d2016-03-30 17:54:24 -07001040 * Gets egress VLAN from McastConfig.
1041 *
1042 * @return egress VLAN or VlanId.NONE if not configured
1043 */
1044 private VlanId egressVlan() {
1045 McastConfig mcastConfig =
1046 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
1047 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
1048 }
1049
1050 /**
1051 * Gets assigned VLAN according to the value of egress VLAN.
Charles Chan8d449862016-05-16 18:44:13 -07001052 * If connect point is specified, try to reuse the assigned VLAN on the connect point.
Charles Chand55e84d2016-03-30 17:54:24 -07001053 *
Charles Chan8d449862016-05-16 18:44:13 -07001054 * @param cp connect point; Can be null if not specified
1055 * @return assigned VLAN ID
Charles Chand55e84d2016-03-30 17:54:24 -07001056 */
Charles Chan8d449862016-05-16 18:44:13 -07001057 private VlanId assignedVlan(ConnectPoint cp) {
1058 // Use the egressVlan if it is tagged
1059 if (!egressVlan().equals(VlanId.NONE)) {
1060 return egressVlan();
1061 }
1062 // Reuse unicast VLAN if the port has subnet configured
1063 if (cp != null) {
Charles Chanb4879a52017-10-20 19:09:16 -07001064 VlanId untaggedVlan = srManager.getInternalVlanId(cp);
Charles Chan59cc16d2017-02-02 16:20:42 -08001065 return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
Charles Chan8d449862016-05-16 18:44:13 -07001066 }
Charles Chan59cc16d2017-02-02 16:20:42 -08001067 // Use DEFAULT_VLAN if none of the above matches
1068 return SegmentRoutingManager.INTERNAL_VLAN;
Charles Chand55e84d2016-03-30 17:54:24 -07001069 }
Charles Chan2199c302016-04-23 17:36:10 -07001070
1071 /**
1072 * Gets the spine-facing port on ingress device of given multicast group.
1073 *
1074 * @param mcastIp multicast IP
1075 * @return spine-facing port on ingress device
1076 */
1077 private PortNumber ingressTransitPort(IpAddress mcastIp) {
1078 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1079 .stream().findAny().orElse(null);
1080 if (ingressDevice != null) {
1081 NextObjective nextObj = mcastNextObjStore
1082 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
1083 Set<PortNumber> ports = getPorts(nextObj.next());
1084
1085 for (PortNumber port : ports) {
1086 // Spine-facing port should have no subnet and no xconnect
1087 if (srManager.deviceConfiguration != null &&
Pier Ventre10bd8d12016-11-26 21:05:22 -08001088 srManager.deviceConfiguration.getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chanfc5c7802016-05-17 13:13:55 -07001089 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Charles Chan2199c302016-04-23 17:36:10 -07001090 return port;
1091 }
1092 }
1093 }
1094 return null;
1095 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001096
1097 /**
Pier Luigieba73a02018-01-16 10:47:50 +01001098 * Verify if the given device has sinks
1099 * for the multicast group.
1100 *
1101 * @param deviceId device Id
1102 * @param mcastIp multicast IP
1103 * @return true if the device has sink for the group.
1104 * False otherwise.
1105 */
1106 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1107 if (deviceId != null) {
1108 // Get the nextobjective
1109 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1110 new McastStoreKey(mcastIp, deviceId)
1111 );
1112 // If it exists
1113 if (versionedNextObj != null) {
1114 NextObjective nextObj = versionedNextObj.value();
1115 // Retrieves all the output ports
1116 Set<PortNumber> ports = getPorts(nextObj.next());
1117 // Tries to find at least one port that is not spine-facing
1118 for (PortNumber port : ports) {
1119 // Spine-facing port should have no subnet and no xconnect
1120 if (srManager.deviceConfiguration != null &&
1121 (!srManager.deviceConfiguration.getPortSubnets(deviceId, port).isEmpty() ||
1122 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1123 return true;
1124 }
1125 }
1126 }
1127 }
1128 return false;
1129 }
1130
1131 /**
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001132 * Removes filtering objective for given device and port.
1133 *
1134 * @param deviceId device ID
1135 * @param port ingress port number
1136 * @param assignedVlan assigned VLAN ID
1137 * @param mcastIp multicast IP address
1138 */
1139 private void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
1140 // Do nothing if the port is configured as suppressed
1141 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
1142 SegmentRoutingAppConfig appConfig = srManager.cfgService
1143 .getConfig(srManager.appId, SegmentRoutingAppConfig.class);
1144 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
1145 log.info("Ignore suppressed port {}", connectPoint);
1146 return;
1147 }
1148
1149 FilteringObjective.Builder filtObjBuilder =
1150 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
1151 ObjectiveContext context = new DefaultObjectiveContext(
1152 (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
1153 deviceId, port.toLong(), assignedVlan),
1154 (objective, error) ->
1155 log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
1156 deviceId, port.toLong(), assignedVlan, error));
1157 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
1158 }
1159
1160 /**
Pier Luigib72201b2018-01-25 16:16:02 +01001161 * Updates filtering objective for given device and port.
1162 * It is called in general when the mcast config has been
1163 * changed.
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001164 *
1165 * @param deviceId device ID
1166 * @param portNum ingress port number
1167 * @param vlanId assigned VLAN ID
1168 * @param install true to add, false to remove
1169 */
1170 protected void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1171 VlanId vlanId, boolean install) {
Pier Luigib72201b2018-01-25 16:16:02 +01001172 lastMcastChange = Instant.now();
1173 mcastLock();
1174 try {
1175 // Iterates over the route and updates properly the filtering objective
1176 // on the source device.
1177 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
1178 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
1179 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1180 if (install) {
1181 addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1182 } else {
1183 removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1184 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001185 }
Pier Luigib72201b2018-01-25 16:16:02 +01001186 });
1187 } finally {
1188 mcastUnlock();
1189 }
1190 }
1191
1192 /**
1193 * Performs bucket verification operation for all mcast groups in the devices.
1194 * Firstly, it verifies that mcast is stable before trying verification operation.
1195 * Verification consists in creating new nexts with VERIFY operation. Actually,
1196 * the operation is totally delegated to the driver.
1197 */
1198 private final class McastBucketCorrector implements Runnable {
1199
1200 @Override
1201 public void run() {
1202 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1203 if (!isMcastStable()) {
1204 return;
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001205 }
Pier Luigib72201b2018-01-25 16:16:02 +01001206 // Acquires lock
1207 mcastLock();
1208 try {
1209 // Iterates over the routes and verify the related next objectives
1210 srManager.multicastRouteService.getRoutes()
1211 .stream()
1212 .map(McastRoute::group)
1213 .forEach(mcastIp -> {
1214 log.trace("Running mcast buckets corrector for mcast group: {}",
1215 mcastIp);
1216
1217 // For each group we get current information in the store
1218 // and issue a check of the next objectives in place
1219 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1220 .stream().findAny().orElse(null);
1221 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
1222 .stream().findAny().orElse(null);
1223 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
1224 ConnectPoint source = getSource(mcastIp);
1225
1226 // Do not proceed if ingress device or source of this group are missing
1227 if (ingressDevice == null || source == null) {
1228 log.warn("Unable to run buckets corrector. " +
1229 "Missing ingress {} or source {} for group {}",
1230 ingressDevice, source, mcastIp);
1231 return;
1232 }
1233
1234 // Continue only when this instance is the master of source device
1235 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1236 log.trace("Unable to run buckets corrector. " +
1237 "Skip {} due to lack of mastership " +
1238 "of the source device {}",
1239 mcastIp, source.deviceId());
1240 return;
1241 }
1242
1243 // Create the set of the devices to be processed
1244 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1245 devicesBuilder.add(ingressDevice);
1246 if (transitDevice != null) {
1247 devicesBuilder.add(transitDevice);
1248 }
1249 if (!egressDevices.isEmpty()) {
1250 devicesBuilder.addAll(egressDevices);
1251 }
1252 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1253
1254 // Iterate over the devices
1255 devicesToProcess.forEach(deviceId -> {
1256 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1257 // If next exists in our store verify related next objective
1258 if (mcastNextObjStore.containsKey(currentKey)) {
1259 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1260 // Get current ports
1261 Set<PortNumber> currentPorts = getPorts(currentNext.next());
1262 // Rebuild the next objective
1263 currentNext = nextObjBuilder(
1264 mcastIp,
1265 assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
1266 currentPorts,
1267 currentNext.id()
1268 ).verify();
1269 // Send to the flowobjective service
1270 srManager.flowObjectiveService.next(deviceId, currentNext);
1271 } else {
1272 log.warn("Unable to run buckets corrector." +
1273 "Missing next for {} and group {}",
1274 deviceId, mcastIp);
1275 }
1276 });
1277
1278 });
1279 } finally {
1280 // Finally, it releases the lock
1281 mcastUnlock();
1282 }
1283
1284 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001285 }
Charles Chand55e84d2016-03-30 17:54:24 -07001286}