blob: fff2c92fbc1f3169bb937bc317b545366c742dc8 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Brian O'Connor0947d7e2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010021import com.google.common.collect.Maps;
Charles Chanc91c8782016-03-30 17:54:24 -070022import com.google.common.collect.Sets;
23import org.onlab.packet.Ethernet;
24import org.onlab.packet.IpAddress;
25import org.onlab.packet.IpPrefix;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
Pier Luigi580fd8a2018-01-16 10:47:50 +010029import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
Ray Milkeyae0068a2017-08-15 11:02:29 -070032import org.onosproject.net.config.basics.McastConfig;
Charles Chanc91c8782016-03-30 17:54:24 -070033import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.Link;
36import org.onosproject.net.Path;
37import org.onosproject.net.PortNumber;
38import org.onosproject.net.flow.DefaultTrafficSelector;
39import org.onosproject.net.flow.DefaultTrafficTreatment;
40import org.onosproject.net.flow.TrafficSelector;
41import org.onosproject.net.flow.TrafficTreatment;
42import org.onosproject.net.flow.criteria.Criteria;
43import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
44import org.onosproject.net.flowobjective.DefaultFilteringObjective;
45import org.onosproject.net.flowobjective.DefaultForwardingObjective;
46import org.onosproject.net.flowobjective.DefaultNextObjective;
Charles Chan72779502016-04-23 17:36:10 -070047import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070048import org.onosproject.net.flowobjective.FilteringObjective;
49import org.onosproject.net.flowobjective.ForwardingObjective;
50import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070051import org.onosproject.net.flowobjective.ObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070052import org.onosproject.net.mcast.McastEvent;
Pier Luigi35dab3f2018-01-25 16:16:02 +010053import org.onosproject.net.mcast.McastRoute;
Charles Chanc91c8782016-03-30 17:54:24 -070054import org.onosproject.net.mcast.McastRouteInfo;
55import org.onosproject.net.topology.TopologyService;
Charles Chan370a65b2016-05-10 17:29:47 -070056import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
Charles Chan72779502016-04-23 17:36:10 -070057import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070058import org.onosproject.store.serializers.KryoNamespaces;
59import org.onosproject.store.service.ConsistentMap;
60import org.onosproject.store.service.Serializer;
61import org.onosproject.store.service.StorageService;
Pier Luigi580fd8a2018-01-16 10:47:50 +010062import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070063import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
65
Pier Luigi35dab3f2018-01-25 16:16:02 +010066import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.Collection;
68import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010069import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070070import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070071import java.util.Map;
Charles Chanc91c8782016-03-30 17:54:24 -070072import java.util.Optional;
73import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010074import java.util.concurrent.ScheduledExecutorService;
75import java.util.concurrent.TimeUnit;
76import java.util.concurrent.locks.Lock;
77import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070078import java.util.stream.Collectors;
79
80import static com.google.common.base.Preconditions.checkState;
Pier Luigi35dab3f2018-01-25 16:16:02 +010081import static java.util.concurrent.Executors.newScheduledThreadPool;
82import static org.onlab.util.Tools.groupedThreads;
Charles Chan10b0fb72017-02-02 16:20:42 -080083import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
Charles Chanc91c8782016-03-30 17:54:24 -070084
85/**
Charles Chan1eaf4802016-04-18 13:44:03 -070086 * Handles multicast-related events.
Charles Chanc91c8782016-03-30 17:54:24 -070087 */
Charles Chan1eaf4802016-04-18 13:44:03 -070088public class McastHandler {
89 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Charles Chanc91c8782016-03-30 17:54:24 -070090 private final SegmentRoutingManager srManager;
91 private final ApplicationId coreAppId;
Charles Chan82f19972016-05-17 13:13:55 -070092 private final StorageService storageService;
93 private final TopologyService topologyService;
Charles Chan72779502016-04-23 17:36:10 -070094 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
95 private final KryoNamespace.Builder mcastKryo;
96 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
97
Pier Luigi35dab3f2018-01-25 16:16:02 +010098 // Mcast lock to serialize local operations
99 private final Lock mcastLock = new ReentrantLock();
100
101 /**
102 * Acquires the lock used when making mcast changes.
103 */
104 private void mcastLock() {
105 mcastLock.lock();
106 }
107
108 /**
109 * Releases the lock used when making mcast changes.
110 */
111 private void mcastUnlock() {
112 mcastLock.unlock();
113 }
114
115 // Stability threshold for Mcast. Seconds
116 private static final long MCAST_STABLITY_THRESHOLD = 5;
117 // Last change done
118 private Instant lastMcastChange = Instant.now();
119
120 /**
121 * Determines if mcast in the network has been stable in the last
122 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
123 * to the last mcast change timestamp.
124 *
125 * @return true if stable
126 */
127 private boolean isMcastStable() {
128 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
129 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
130 log.debug("Mcast stable since {}s", now - last);
131 return (now - last) > MCAST_STABLITY_THRESHOLD;
132 }
133
134 // Verify interval for Mcast
135 private static final long MCAST_VERIFY_INTERVAL = 30;
136
137 // Executor for mcast bucket corrector
138 private ScheduledExecutorService executorService
139 = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
140
Charles Chan72779502016-04-23 17:36:10 -0700141 /**
142 * Role in the multicast tree.
143 */
144 public enum McastRole {
145 /**
146 * The device is the ingress device of this group.
147 */
148 INGRESS,
149 /**
150 * The device is the transit device of this group.
151 */
152 TRANSIT,
153 /**
154 * The device is the egress device of this group.
155 */
156 EGRESS
157 }
Charles Chanc91c8782016-03-30 17:54:24 -0700158
159 /**
160 * Constructs the McastEventHandler.
161 *
162 * @param srManager Segment Routing manager
163 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700164 public McastHandler(SegmentRoutingManager srManager) {
Charles Chanc91c8782016-03-30 17:54:24 -0700165 coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700166 this.srManager = srManager;
167 this.storageService = srManager.storageService;
168 this.topologyService = srManager.topologyService;
Charles Chan72779502016-04-23 17:36:10 -0700169 mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700170 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700171 .register(McastStoreKey.class)
172 .register(McastRole.class);
Charles Chanc91c8782016-03-30 17:54:24 -0700173 mcastNextObjStore = storageService
Charles Chan72779502016-04-23 17:36:10 -0700174 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700175 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700176 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700177 .build();
Charles Chan72779502016-04-23 17:36:10 -0700178 mcastRoleStore = storageService
179 .<McastStoreKey, McastRole>consistentMapBuilder()
180 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700181 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700182 .build();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100183 // Init the executor service and the buckets corrector
184 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
185 MCAST_VERIFY_INTERVAL,
186 TimeUnit.SECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700187 }
188
189 /**
190 * Read initial multicast from mcast store.
191 */
Charles Chan82f19972016-05-17 13:13:55 -0700192 protected void init() {
Charles Chan72779502016-04-23 17:36:10 -0700193 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
194 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
195 Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
196 sinks.forEach(sink -> {
197 processSinkAddedInternal(source, sink, mcastRoute.group());
198 });
199 });
Charles Chanc91c8782016-03-30 17:54:24 -0700200 }
201
202 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100203 * Clean up when deactivating the application.
204 */
205 protected void terminate() {
206 executorService.shutdown();
207 }
208
209 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700210 * Processes the SOURCE_ADDED event.
211 *
212 * @param event McastEvent with SOURCE_ADDED type
213 */
214 protected void processSourceAdded(McastEvent event) {
215 log.info("processSourceAdded {}", event);
216 McastRouteInfo mcastRouteInfo = event.subject();
217 if (!mcastRouteInfo.isComplete()) {
218 log.info("Incompleted McastRouteInfo. Abort.");
219 return;
220 }
221 ConnectPoint source = mcastRouteInfo.source().orElse(null);
222 Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
223 IpAddress mcastIp = mcastRouteInfo.route().group();
224
Pier Luigi35dab3f2018-01-25 16:16:02 +0100225 sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
Charles Chanc91c8782016-03-30 17:54:24 -0700226 }
227
228 /**
229 * Processes the SINK_ADDED event.
230 *
231 * @param event McastEvent with SINK_ADDED type
232 */
233 protected void processSinkAdded(McastEvent event) {
234 log.info("processSinkAdded {}", event);
235 McastRouteInfo mcastRouteInfo = event.subject();
236 if (!mcastRouteInfo.isComplete()) {
237 log.info("Incompleted McastRouteInfo. Abort.");
238 return;
239 }
240 ConnectPoint source = mcastRouteInfo.source().orElse(null);
241 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
242 IpAddress mcastIp = mcastRouteInfo.route().group();
243
244 processSinkAddedInternal(source, sink, mcastIp);
245 }
246
247 /**
248 * Processes the SINK_REMOVED event.
249 *
250 * @param event McastEvent with SINK_REMOVED type
251 */
252 protected void processSinkRemoved(McastEvent event) {
253 log.info("processSinkRemoved {}", event);
254 McastRouteInfo mcastRouteInfo = event.subject();
255 if (!mcastRouteInfo.isComplete()) {
256 log.info("Incompleted McastRouteInfo. Abort.");
257 return;
258 }
259 ConnectPoint source = mcastRouteInfo.source().orElse(null);
260 ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
261 IpAddress mcastIp = mcastRouteInfo.route().group();
Charles Chanc91c8782016-03-30 17:54:24 -0700262
Pier Luigi35dab3f2018-01-25 16:16:02 +0100263 processSinkRemovedInternal(source, sink, mcastIp);
264 }
Charles Chan0932eca2016-06-28 16:50:13 -0700265
Pier Luigi35dab3f2018-01-25 16:16:02 +0100266 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100267 * Processes the ROUTE_REMOVED event.
268 *
269 * @param event McastEvent with ROUTE_REMOVED type
270 */
271 protected void processRouteRemoved(McastEvent event) {
272 log.info("processRouteRemoved {}", event);
273 McastRouteInfo mcastRouteInfo = event.subject();
274 if (!mcastRouteInfo.source().isPresent()) {
275 log.info("Incompleted McastRouteInfo. Abort.");
276 return;
277 }
278 // Get group ip and ingress connect point
279 IpAddress mcastIp = mcastRouteInfo.route().group();
280 ConnectPoint source = mcastRouteInfo.source().get();
281
282 processRouteRemovedInternal(source, mcastIp);
283 }
284
285 /**
286 * Removes the entire mcast tree related to this group.
287 *
288 * @param mcastIp multicast group IP address
289 */
290 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
291 lastMcastChange = Instant.now();
292 mcastLock();
293 try {
294 log.debug("Processing route down for group {}", mcastIp);
295
296 // Find out the ingress, transit and egress device of the affected group
297 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
298 .stream().findAny().orElse(null);
299 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
300 .stream().findAny().orElse(null);
301 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
302
303 // Verify leadership on the operation
304 if (!isLeader(source)) {
305 log.debug("Skip {} due to lack of leadership", mcastIp);
306 return;
307 }
308
309 // If there are egress devices, sinks could be only on the ingress
310 if (!egressDevices.isEmpty()) {
311 egressDevices.forEach(
312 deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
313 );
314 }
315 // Transit could be null
316 if (transitDevice != null) {
317 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
318 }
319 // Ingress device should be not null
320 if (ingressDevice != null) {
321 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
322 }
323
324 } finally {
325 mcastUnlock();
326 }
327 }
328
329 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100330 * Removes a path from source to sink for given multicast group.
331 *
332 * @param source connect point of the multicast source
333 * @param sink connection point of the multicast sink
334 * @param mcastIp multicast group IP address
335 */
336 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
337 IpAddress mcastIp) {
338 lastMcastChange = Instant.now();
339 mcastLock();
340 try {
Pier Luigi6786b922018-02-02 16:19:11 +0100341 // Verify leadership on the operation
342 if (!isLeader(source)) {
343 log.debug("Skip {} due to lack of leadership", mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -0700344 return;
345 }
Charles Chanc91c8782016-03-30 17:54:24 -0700346
Pier Luigi35dab3f2018-01-25 16:16:02 +0100347 // When source and sink are on the same device
348 if (source.deviceId().equals(sink.deviceId())) {
349 // Source and sink are on even the same port. There must be something wrong.
350 if (source.port().equals(sink.port())) {
351 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
352 mcastIp, sink, source);
353 return;
354 }
355 removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
356 return;
357 }
Charles Chanc91c8782016-03-30 17:54:24 -0700358
Pier Luigi35dab3f2018-01-25 16:16:02 +0100359 // Process the egress device
360 boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
361 if (isLast) {
362 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
363 }
364
365 // If this is the last sink on the device, also update upstream
366 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
367 if (mcastPath.isPresent()) {
368 List<Link> links = Lists.newArrayList(mcastPath.get().links());
369 Collections.reverse(links);
370 for (Link link : links) {
371 if (isLast) {
372 isLast = removePortFromDevice(
373 link.src().deviceId(),
374 link.src().port(),
375 mcastIp,
376 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
377 );
378 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
379 }
Charles Chanc91c8782016-03-30 17:54:24 -0700380 }
381 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100382 } finally {
383 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700384 }
385 }
386
387 /**
388 * Establishes a path from source to sink for given multicast group.
389 *
390 * @param source connect point of the multicast source
391 * @param sink connection point of the multicast sink
392 * @param mcastIp multicast group IP address
393 */
394 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
395 IpAddress mcastIp) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100396 lastMcastChange = Instant.now();
397 mcastLock();
398 try {
399 // Continue only when this instance is the master of source device
400 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
401 log.debug("Skip {} due to lack of mastership of the source device {}",
402 mcastIp, source.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700403 return;
404 }
Charles Chanc91c8782016-03-30 17:54:24 -0700405
Pier Luigi35dab3f2018-01-25 16:16:02 +0100406 // Process the ingress device
407 addFilterToDevice(source.deviceId(), source.port(), assignedVlan(source), mcastIp);
Charles Chan72779502016-04-23 17:36:10 -0700408
Pier Luigi35dab3f2018-01-25 16:16:02 +0100409 // When source and sink are on the same device
410 if (source.deviceId().equals(sink.deviceId())) {
411 // Source and sink are on even the same port. There must be something wrong.
412 if (source.port().equals(sink.port())) {
413 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
414 mcastIp, sink, source);
415 return;
416 }
417 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
418 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
419 return;
420 }
Charles Chan72779502016-04-23 17:36:10 -0700421
Pier Luigi35dab3f2018-01-25 16:16:02 +0100422 // Find a path. If present, create/update groups and flows for each hop
423 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
424 if (mcastPath.isPresent()) {
425 List<Link> links = mcastPath.get().links();
426 checkState(links.size() == 2,
427 "Path in leaf-spine topology should always be two hops: ", links);
Charles Chan72779502016-04-23 17:36:10 -0700428
Pier Luigi35dab3f2018-01-25 16:16:02 +0100429 links.forEach(link -> {
430 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
431 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
432 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
433 });
434
435 // Process the egress device
436 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
437
438 // Setup mcast roles
439 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
440 McastRole.INGRESS);
441 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
442 McastRole.TRANSIT);
443 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
444 McastRole.EGRESS);
445 } else {
446 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
447 source.deviceId(), sink.deviceId());
448 }
449 } finally {
450 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700451 }
452 }
453
454 /**
Charles Chan72779502016-04-23 17:36:10 -0700455 * Processes the LINK_DOWN event.
456 *
457 * @param affectedLink Link that is going down
458 */
459 protected void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100460 lastMcastChange = Instant.now();
461 mcastLock();
462 try {
463 // Get groups affected by the link down event
464 getAffectedGroups(affectedLink).forEach(mcastIp -> {
465 // TODO Optimize when the group editing is in place
466 log.debug("Processing link down {} for group {}",
467 affectedLink, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100468
Pier Luigi35dab3f2018-01-25 16:16:02 +0100469 // Find out the ingress, transit and egress device of affected group
470 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
471 .stream().findAny().orElse(null);
472 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
473 .stream().findAny().orElse(null);
474 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
475 ConnectPoint source = getSource(mcastIp);
Charles Chana8f9dee2016-05-16 18:44:13 -0700476
Pier Luigi35dab3f2018-01-25 16:16:02 +0100477 // Do not proceed if any of these info is missing
478 if (ingressDevice == null || transitDevice == null
479 || egressDevices == null || source == null) {
480 log.warn("Missing ingress {}, transit {}, egress {} devices or source {}",
481 ingressDevice, transitDevice, egressDevices, source);
482 return;
Charles Chan72779502016-04-23 17:36:10 -0700483 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100484
485 // Continue only when this instance is the master of source device
486 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
487 log.debug("Skip {} due to lack of mastership of the source device {}",
488 source.deviceId());
489 return;
490 }
491
492 // Remove entire transit
493 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
494
495 // Remove transit-facing port on ingress device
496 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
497 if (ingressTransitPort != null) {
498 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
499 mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
500 }
501
502 // Construct a new path for each egress device
503 egressDevices.forEach(egressDevice -> {
504 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
505 if (mcastPath.isPresent()) {
506 installPath(mcastIp, source, mcastPath.get());
507 } else {
508 log.warn("Fail to recover egress device {} from link failure {}",
509 egressDevice, affectedLink);
510 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
511 }
512 });
Charles Chan72779502016-04-23 17:36:10 -0700513 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100514 } finally {
515 mcastUnlock();
516 }
Charles Chan72779502016-04-23 17:36:10 -0700517 }
518
519 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100520 * Process the DEVICE_DOWN event.
521 *
522 * @param deviceDown device going down
523 */
524 protected void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100525 lastMcastChange = Instant.now();
526 mcastLock();
527 try {
528 // Get the mcast groups affected by the device going down
529 getAffectedGroups(deviceDown).forEach(mcastIp -> {
530 // TODO Optimize when the group editing is in place
531 log.debug("Processing device down {} for group {}",
532 deviceDown, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100533
Pier Luigi35dab3f2018-01-25 16:16:02 +0100534 // Find out the ingress, transit and egress device of affected group
535 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
536 .stream().findAny().orElse(null);
537 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
538 .stream().findAny().orElse(null);
539 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
540 ConnectPoint source = getSource(mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100541
Pier Luigi35dab3f2018-01-25 16:16:02 +0100542 // Do not proceed if ingress device or source of this group are missing
543 // If sinks are in other leafs, we have ingress, transit, egress, and source
544 // If sinks are in the same leaf, we have just ingress and source
545 if (ingressDevice == null || source == null) {
546 log.warn("Missing ingress {} or source {} for group {}",
547 ingressDevice, source, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100548 return;
549 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100550
Pier Luigi6786b922018-02-02 16:19:11 +0100551 // Verify leadership on the operation
552 if (!isLeader(source)) {
553 log.debug("Skip {} due to lack of leadership", mcastIp);
554 return;
Pier Luigi580fd8a2018-01-16 10:47:50 +0100555 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100556
557 // If it exists, we have to remove it in any case
558 if (transitDevice != null) {
559 // Remove entire transit
560 removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
561 }
562 // If the ingress is down
563 if (ingressDevice.equals(deviceDown)) {
564 // Remove entire ingress
565 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
566 // If other sinks different from the ingress exist
567 if (!egressDevices.isEmpty()) {
568 // Remove all the remaining egress
569 egressDevices.forEach(
570 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
571 );
Pier Luigi580fd8a2018-01-16 10:47:50 +0100572 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100573 } else {
574 // Egress or transit could be down at this point
575 // Get the ingress-transit port if it exists
576 PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
577 if (ingressTransitPort != null) {
578 // Remove transit-facing port on ingress device
579 removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
580 }
581 // One of the egress device is down
582 if (egressDevices.contains(deviceDown)) {
583 // Remove entire device down
584 removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
585 // Remove the device down from egress
586 egressDevices.remove(deviceDown);
587 // If there are no more egress and ingress does not have sinks
588 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
589 // Remove entire ingress
590 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
591 // We have done
592 return;
593 }
594 }
595 // Construct a new path for each egress device
596 egressDevices.forEach(egressDevice -> {
597 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
598 // If there is a new path
599 if (mcastPath.isPresent()) {
600 // Let's install the new mcast path for this egress
601 installPath(mcastIp, source, mcastPath.get());
602 } else {
603 // We were not able to find an alternative path for this egress
604 log.warn("Fail to recover egress device {} from device down {}",
605 egressDevice, deviceDown);
606 removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
607 }
608 });
609 }
610 });
611 } finally {
612 mcastUnlock();
613 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100614 }
615
616 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700617 * Adds filtering objective for given device and port.
618 *
619 * @param deviceId device ID
620 * @param port ingress port number
621 * @param assignedVlan assigned VLAN ID
622 */
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000623 private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
Charles Chanc91c8782016-03-30 17:54:24 -0700624 // Do nothing if the port is configured as suppressed
Charles Chan370a65b2016-05-10 17:29:47 -0700625 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
626 SegmentRoutingAppConfig appConfig = srManager.cfgService
627 .getConfig(srManager.appId, SegmentRoutingAppConfig.class);
628 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
629 log.info("Ignore suppressed port {}", connectPoint);
Charles Chanc91c8782016-03-30 17:54:24 -0700630 return;
631 }
632
633 FilteringObjective.Builder filtObjBuilder =
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000634 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
Charles Chan72779502016-04-23 17:36:10 -0700635 ObjectiveContext context = new DefaultObjectiveContext(
636 (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800637 deviceId, port.toLong(), assignedVlan),
Charles Chan72779502016-04-23 17:36:10 -0700638 (objective, error) ->
639 log.warn("Failed to add filter on {}/{}, vlan {}: {}",
Charles Chan10b0fb72017-02-02 16:20:42 -0800640 deviceId, port.toLong(), assignedVlan, error));
Charles Chan72779502016-04-23 17:36:10 -0700641 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
Charles Chanc91c8782016-03-30 17:54:24 -0700642 }
643
644 /**
645 * Adds a port to given multicast group on given device. This involves the
646 * update of L3 multicast group and multicast routing table entry.
647 *
648 * @param deviceId device ID
649 * @param port port to be added
650 * @param mcastIp multicast group
651 * @param assignedVlan assigned VLAN ID
652 */
653 private void addPortToDevice(DeviceId deviceId, PortNumber port,
654 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700655 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700656 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +0100657 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -0700658 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700659 // First time someone request this mcast group via this device
660 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100661 // New nextObj
662 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
663 portBuilder.build(), null).add();
664 // Store the new port
665 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700666 } else {
667 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -0700668 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700669 // Stop if the port is already in the nextobj
670 Set<PortNumber> existingPorts = getPorts(nextObj.next());
671 if (existingPorts.contains(port)) {
672 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
673 return;
674 }
Pier Luigi4f0dd212018-01-19 10:24:53 +0100675 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -0800676 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +0100677 // Reuse previous nextObj
678 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
679 portBuilder.build(), nextObj.id()).addToExisting();
680 // Store the final next objective and send only the difference to the driver
681 mcastNextObjStore.put(mcastStoreKey, newNextObj);
682 // Add just the new port
683 portBuilder = ImmutableSet.builder();
684 portBuilder.add(port);
685 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
686 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -0700687 }
688 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -0700689 ObjectiveContext context = new DefaultObjectiveContext(
690 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
691 mcastIp, deviceId, port.toLong(), assignedVlan),
692 (objective, error) ->
693 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
694 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Charles Chanc91c8782016-03-30 17:54:24 -0700695 ForwardingObjective fwdObj =
Charles Chan72779502016-04-23 17:36:10 -0700696 fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -0700697 srManager.flowObjectiveService.next(deviceId, newNextObj);
698 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700699 }
700
701 /**
702 * Removes a port from given multicast group on given device.
703 * This involves the update of L3 multicast group and multicast routing
704 * table entry.
705 *
706 * @param deviceId device ID
707 * @param port port to be added
708 * @param mcastIp multicast group
709 * @param assignedVlan assigned VLAN ID
710 * @return true if this is the last sink on this device
711 */
712 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
713 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -0700714 McastStoreKey mcastStoreKey =
715 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -0700716 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -0700717 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -0700718 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
719 return false;
720 }
Charles Chan72779502016-04-23 17:36:10 -0700721 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -0700722
723 Set<PortNumber> existingPorts = getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -0700724 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -0700725 if (!existingPorts.contains(port)) {
726 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
727 return false;
728 }
729 // Copy and modify the ImmutableSet
730 existingPorts = Sets.newHashSet(existingPorts);
731 existingPorts.remove(port);
732
733 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +0100734 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -0700735 ForwardingObjective fwdObj;
736 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +0100737 // If this is the last sink, remove flows and last bucket
Charles Chanc91c8782016-03-30 17:54:24 -0700738 // NOTE: Rely on GroupStore garbage collection rather than explicitly
739 // remove L3MG since there might be other flows/groups refer to
740 // the same L2IG
Pier Luigi8cd46de2018-01-19 10:24:53 +0100741 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700742 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
743 mcastIp, deviceId, port.toLong(), assignedVlan),
744 (objective, error) ->
745 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
746 mcastIp, deviceId, port.toLong(), assignedVlan, error));
747 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
748 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -0700749 } else {
750 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +0100751 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -0700752 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
753 mcastIp, deviceId, port.toLong(), assignedVlan),
754 (objective, error) ->
755 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
756 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +0100757 // Here we store the next objective with the remaining port
758 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
759 existingPorts, nextObj.id()).removeFromExisting();
Charles Chan82f19972016-05-17 13:13:55 -0700760 fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -0700761 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700762 }
Pier Luigi8cd46de2018-01-19 10:24:53 +0100763 // Let's modify the next objective removing the bucket
764 newNextObj = nextObjBuilder(mcastIp, assignedVlan,
765 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
766 srManager.flowObjectiveService.next(deviceId, newNextObj);
767 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -0700768 return existingPorts.isEmpty();
769 }
770
Charles Chan72779502016-04-23 17:36:10 -0700771 /**
772 * Removes entire group on given device.
773 *
774 * @param deviceId device ID
775 * @param mcastIp multicast group to be removed
776 * @param assignedVlan assigned VLAN ID
777 */
778 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
779 VlanId assignedVlan) {
780 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
781 // This device is not serving this multicast group
782 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
783 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
784 return;
785 }
786 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
787 // NOTE: Rely on GroupStore garbage collection rather than explicitly
788 // remove L3MG since there might be other flows/groups refer to
789 // the same L2IG
790 ObjectiveContext context = new DefaultObjectiveContext(
791 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
792 mcastIp, deviceId, assignedVlan),
793 (objective, error) ->
794 log.warn("Failed to remove {} on {}, vlan {}: {}",
795 mcastIp, deviceId, assignedVlan, error));
796 ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
797 srManager.flowObjectiveService.forward(deviceId, fwdObj);
798 mcastNextObjStore.remove(mcastStoreKey);
799 mcastRoleStore.remove(mcastStoreKey);
800 }
801
Pier Luigi580fd8a2018-01-16 10:47:50 +0100802 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
803 // Get Links
804 List<Link> links = mcastPath.links();
805 // For each link, modify the next on the source device adding the src port
806 // and a new filter objective on the destination port
807 links.forEach(link -> {
808 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
809 assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
810 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null),
811 mcastIp);
812 });
813 // Setup new transit mcast role
814 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
815 McastRole.TRANSIT);
Charles Chan72779502016-04-23 17:36:10 -0700816 }
817
Charles Chanc91c8782016-03-30 17:54:24 -0700818 /**
819 * Creates a next objective builder for multicast.
820 *
821 * @param mcastIp multicast group
822 * @param assignedVlan assigned VLAN ID
823 * @param outPorts set of output port numbers
824 * @return next objective builder
825 */
826 private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
Pier Luigi4f0dd212018-01-19 10:24:53 +0100827 VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
828 // If nextId is null allocate a new one
829 if (nextId == null) {
830 nextId = srManager.flowObjectiveService.allocateNextId();
831 }
Charles Chanc91c8782016-03-30 17:54:24 -0700832
833 TrafficSelector metadata =
834 DefaultTrafficSelector.builder()
835 .matchVlanId(assignedVlan)
836 .matchIPDst(mcastIp.toIpPrefix())
837 .build();
838
839 NextObjective.Builder nextObjBuilder = DefaultNextObjective
840 .builder().withId(nextId)
841 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId)
842 .withMeta(metadata);
843
844 outPorts.forEach(port -> {
845 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
846 if (egressVlan().equals(VlanId.NONE)) {
847 tBuilder.popVlan();
848 }
849 tBuilder.setOutput(port);
850 nextObjBuilder.addTreatment(tBuilder.build());
851 });
852
853 return nextObjBuilder;
854 }
855
856 /**
857 * Creates a forwarding objective builder for multicast.
858 *
859 * @param mcastIp multicast group
860 * @param assignedVlan assigned VLAN ID
861 * @param nextId next ID of the L3 multicast group
862 * @return forwarding objective builder
863 */
864 private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
865 VlanId assignedVlan, int nextId) {
866 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000867 IpPrefix mcastPrefix = mcastIp.toIpPrefix();
868
869 if (mcastIp.isIp4()) {
870 sbuilder.matchEthType(Ethernet.TYPE_IPV4);
871 sbuilder.matchIPDst(mcastPrefix);
872 } else {
873 sbuilder.matchEthType(Ethernet.TYPE_IPV6);
874 sbuilder.matchIPv6Dst(mcastPrefix);
875 }
876
877
Charles Chanc91c8782016-03-30 17:54:24 -0700878 TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
879 metabuilder.matchVlanId(assignedVlan);
880
881 ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
882 fwdBuilder.withSelector(sbuilder.build())
883 .withMeta(metabuilder.build())
884 .nextStep(nextId)
885 .withFlag(ForwardingObjective.Flag.SPECIFIC)
886 .fromApp(srManager.appId)
887 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
888 return fwdBuilder;
889 }
890
891 /**
892 * Creates a filtering objective builder for multicast.
893 *
894 * @param deviceId Device ID
895 * @param ingressPort ingress port of the multicast stream
896 * @param assignedVlan assigned VLAN ID
897 * @return filtering objective builder
898 */
899 private FilteringObjective.Builder filterObjBuilder(DeviceId deviceId, PortNumber ingressPort,
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000900 VlanId assignedVlan, IpAddress mcastIp) {
Charles Chanc91c8782016-03-30 17:54:24 -0700901 FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
Charles Chan0932eca2016-06-28 16:50:13 -0700902
Julia Fergusonf1d9c342017-08-10 18:15:24 +0000903 if (mcastIp.isIp4()) {
904 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
905 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
906 MacAddress.IPV4_MULTICAST_MASK))
907 .addCondition(Criteria.matchVlanId(egressVlan()))
908 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
909 } else {
910 filtBuilder.withKey(Criteria.matchInPort(ingressPort))
911 .addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
912 MacAddress.IPV6_MULTICAST_MASK))
913 .addCondition(Criteria.matchVlanId(egressVlan()))
914 .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
915 }
Charles Chan0932eca2016-06-28 16:50:13 -0700916 TrafficTreatment tt = DefaultTrafficTreatment.builder()
917 .pushVlan().setVlanId(assignedVlan).build();
918 filtBuilder.withMeta(tt);
919
Charles Chanc91c8782016-03-30 17:54:24 -0700920 return filtBuilder.permit().fromApp(srManager.appId);
921 }
922
923 /**
924 * Gets output ports information from treatments.
925 *
926 * @param treatments collection of traffic treatments
927 * @return set of output port numbers
928 */
929 private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
930 ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
931 treatments.forEach(treatment -> {
932 treatment.allInstructions().stream()
933 .filter(instr -> instr instanceof OutputInstruction)
934 .forEach(instr -> {
935 builder.add(((OutputInstruction) instr).port());
936 });
937 });
938 return builder.build();
939 }
940
941 /**
942 * Gets a path from src to dst.
943 * If a path was allocated before, returns the allocated path.
944 * Otherwise, randomly pick one from available paths.
945 *
946 * @param src source device ID
947 * @param dst destination device ID
948 * @param mcastIp multicast group
949 * @return an optional path from src to dst
950 */
951 private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
952 List<Path> allPaths = Lists.newArrayList(
953 topologyService.getPaths(topologyService.currentTopology(), src, dst));
Charles Chan72779502016-04-23 17:36:10 -0700954 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
Charles Chanc91c8782016-03-30 17:54:24 -0700955 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -0700956 return Optional.empty();
957 }
958
Pier Luigi91573e12018-01-23 16:06:38 +0100959 // Create a map index of suitablity-to-list of paths. For example
960 // a path in the list associated to the index 1 shares only the
961 // first hop and it is less suitable of a path belonging to the index
962 // 2 that shares leaf-spine.
963 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
964 // Some init steps
965 int nhop;
966 McastStoreKey mcastStoreKey;
967 Link hop;
968 PortNumber srcPort;
969 Set<PortNumber> existingPorts;
970 NextObjective nextObj;
971 // Iterate over paths looking for eligible paths
972 for (Path path : allPaths) {
973 // Unlikely, it will happen...
974 if (!src.equals(path.links().get(0).src().deviceId())) {
975 continue;
976 }
977 nhop = 0;
978 // Iterate over the links
979 while (nhop < path.links().size()) {
980 // Get the link and verify if a next related
981 // to the src device exist in the store
982 hop = path.links().get(nhop);
983 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
984 // It does not exist in the store, exit
985 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
986 break;
Charles Chanc91c8782016-03-30 17:54:24 -0700987 }
Pier Luigi91573e12018-01-23 16:06:38 +0100988 // Get the output ports on the next
989 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
990 existingPorts = getPorts(nextObj.next());
991 // And the src port on the link
992 srcPort = hop.src().port();
993 // the src port is not used as output, exit
994 if (!existingPorts.contains(srcPort)) {
995 break;
996 }
997 nhop++;
998 }
999 // n_hop defines the index
1000 if (nhop > 0) {
1001 eligiblePaths.compute(nhop, (index, paths) -> {
1002 paths = paths == null ? Lists.newArrayList() : paths;
1003 paths.add(path);
1004 return paths;
1005 });
Charles Chanc91c8782016-03-30 17:54:24 -07001006 }
1007 }
Pier Luigi91573e12018-01-23 16:06:38 +01001008
1009 // No suitable paths
1010 if (eligiblePaths.isEmpty()) {
1011 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1012 // Otherwise, randomly pick a path
1013 Collections.shuffle(allPaths);
1014 return allPaths.stream().findFirst();
1015 }
1016
1017 // Let's take the best ones
1018 Integer bestIndex = eligiblePaths.keySet()
1019 .stream()
1020 .sorted(Comparator.reverseOrder())
1021 .findFirst().orElse(null);
1022 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1023 log.debug("{} eligiblePath(s) found from {} to {}",
1024 bestPaths.size(), src, dst);
1025 // randomly pick a path on the highest index
1026 Collections.shuffle(bestPaths);
1027 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001028 }
1029
1030 /**
Charles Chan72779502016-04-23 17:36:10 -07001031 * Gets device(s) of given role in given multicast group.
1032 *
1033 * @param mcastIp multicast IP
1034 * @param role multicast role
1035 * @return set of device ID or empty set if not found
1036 */
1037 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1038 return mcastRoleStore.entrySet().stream()
1039 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1040 entry.getValue().value() == role)
1041 .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
1042 .collect(Collectors.toSet());
1043 }
1044
1045 /**
Charles Chana8f9dee2016-05-16 18:44:13 -07001046 * Gets source connect point of given multicast group.
1047 *
1048 * @param mcastIp multicast IP
1049 * @return source connect point or null if not found
1050 */
1051 private ConnectPoint getSource(IpAddress mcastIp) {
1052 return srManager.multicastRouteService.getRoutes().stream()
1053 .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
1054 .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
1055 .findAny().orElse(null);
1056 }
1057
1058 /**
Charles Chan72779502016-04-23 17:36:10 -07001059 * Gets groups which is affected by the link down event.
1060 *
1061 * @param link link going down
1062 * @return a set of multicast IpAddress
1063 */
1064 private Set<IpAddress> getAffectedGroups(Link link) {
1065 DeviceId deviceId = link.src().deviceId();
1066 PortNumber port = link.src().port();
1067 return mcastNextObjStore.entrySet().stream()
1068 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
1069 getPorts(entry.getValue().value().next()).contains(port))
1070 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1071 .collect(Collectors.toSet());
1072 }
1073
1074 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001075 * Gets groups which are affected by the device down event.
1076 *
1077 * @param deviceId device going down
1078 * @return a set of multicast IpAddress
1079 */
1080 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1081 return mcastNextObjStore.entrySet().stream()
1082 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
1083 .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
1084 .collect(Collectors.toSet());
1085 }
1086
1087 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001088 * Gets egress VLAN from McastConfig.
1089 *
1090 * @return egress VLAN or VlanId.NONE if not configured
1091 */
1092 private VlanId egressVlan() {
1093 McastConfig mcastConfig =
1094 srManager.cfgService.getConfig(coreAppId, McastConfig.class);
1095 return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
1096 }
1097
1098 /**
1099 * Gets assigned VLAN according to the value of egress VLAN.
Charles Chana8f9dee2016-05-16 18:44:13 -07001100 * If connect point is specified, try to reuse the assigned VLAN on the connect point.
Charles Chanc91c8782016-03-30 17:54:24 -07001101 *
Charles Chana8f9dee2016-05-16 18:44:13 -07001102 * @param cp connect point; Can be null if not specified
1103 * @return assigned VLAN ID
Charles Chanc91c8782016-03-30 17:54:24 -07001104 */
Charles Chana8f9dee2016-05-16 18:44:13 -07001105 private VlanId assignedVlan(ConnectPoint cp) {
1106 // Use the egressVlan if it is tagged
1107 if (!egressVlan().equals(VlanId.NONE)) {
1108 return egressVlan();
1109 }
1110 // Reuse unicast VLAN if the port has subnet configured
1111 if (cp != null) {
Charles Chanf9759582017-10-20 19:09:16 -07001112 VlanId untaggedVlan = srManager.getInternalVlanId(cp);
Charles Chan10b0fb72017-02-02 16:20:42 -08001113 return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
Charles Chana8f9dee2016-05-16 18:44:13 -07001114 }
Charles Chan10b0fb72017-02-02 16:20:42 -08001115 // Use DEFAULT_VLAN if none of the above matches
1116 return SegmentRoutingManager.INTERNAL_VLAN;
Charles Chanc91c8782016-03-30 17:54:24 -07001117 }
Charles Chan72779502016-04-23 17:36:10 -07001118
1119 /**
1120 * Gets the spine-facing port on ingress device of given multicast group.
1121 *
1122 * @param mcastIp multicast IP
1123 * @return spine-facing port on ingress device
1124 */
1125 private PortNumber ingressTransitPort(IpAddress mcastIp) {
1126 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1127 .stream().findAny().orElse(null);
1128 if (ingressDevice != null) {
1129 NextObjective nextObj = mcastNextObjStore
1130 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
1131 Set<PortNumber> ports = getPorts(nextObj.next());
1132
1133 for (PortNumber port : ports) {
1134 // Spine-facing port should have no subnet and no xconnect
1135 if (srManager.deviceConfiguration != null &&
Pier Ventreb6a7f342016-11-26 21:05:22 -08001136 srManager.deviceConfiguration.getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001137 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Charles Chan72779502016-04-23 17:36:10 -07001138 return port;
1139 }
1140 }
1141 }
1142 return null;
1143 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001144
1145 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001146 * Verify if the given device has sinks
1147 * for the multicast group.
1148 *
1149 * @param deviceId device Id
1150 * @param mcastIp multicast IP
1151 * @return true if the device has sink for the group.
1152 * False otherwise.
1153 */
1154 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1155 if (deviceId != null) {
1156 // Get the nextobjective
1157 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1158 new McastStoreKey(mcastIp, deviceId)
1159 );
1160 // If it exists
1161 if (versionedNextObj != null) {
1162 NextObjective nextObj = versionedNextObj.value();
1163 // Retrieves all the output ports
1164 Set<PortNumber> ports = getPorts(nextObj.next());
1165 // Tries to find at least one port that is not spine-facing
1166 for (PortNumber port : ports) {
1167 // Spine-facing port should have no subnet and no xconnect
1168 if (srManager.deviceConfiguration != null &&
1169 (!srManager.deviceConfiguration.getPortSubnets(deviceId, port).isEmpty() ||
1170 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1171 return true;
1172 }
1173 }
1174 }
1175 }
1176 return false;
1177 }
1178
1179 /**
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001180 * Removes filtering objective for given device and port.
1181 *
1182 * @param deviceId device ID
1183 * @param port ingress port number
1184 * @param assignedVlan assigned VLAN ID
1185 * @param mcastIp multicast IP address
1186 */
1187 private void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan, IpAddress mcastIp) {
1188 // Do nothing if the port is configured as suppressed
1189 ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
1190 SegmentRoutingAppConfig appConfig = srManager.cfgService
1191 .getConfig(srManager.appId, SegmentRoutingAppConfig.class);
1192 if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
1193 log.info("Ignore suppressed port {}", connectPoint);
1194 return;
1195 }
1196
1197 FilteringObjective.Builder filtObjBuilder =
1198 filterObjBuilder(deviceId, port, assignedVlan, mcastIp);
1199 ObjectiveContext context = new DefaultObjectiveContext(
1200 (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
1201 deviceId, port.toLong(), assignedVlan),
1202 (objective, error) ->
1203 log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
1204 deviceId, port.toLong(), assignedVlan, error));
1205 srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
1206 }
1207
1208 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001209 * Updates filtering objective for given device and port.
1210 * It is called in general when the mcast config has been
1211 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001212 *
1213 * @param deviceId device ID
1214 * @param portNum ingress port number
1215 * @param vlanId assigned VLAN ID
1216 * @param install true to add, false to remove
1217 */
1218 protected void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
1219 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001220 lastMcastChange = Instant.now();
1221 mcastLock();
1222 try {
1223 // Iterates over the route and updates properly the filtering objective
1224 // on the source device.
1225 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
1226 ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
1227 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1228 if (install) {
1229 addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1230 } else {
1231 removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group());
1232 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001233 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001234 });
1235 } finally {
1236 mcastUnlock();
1237 }
1238 }
1239
Pier Luigi6786b922018-02-02 16:19:11 +01001240 private boolean isLeader(ConnectPoint source) {
1241 // Continue only when we have the mastership on the operation
1242 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1243 // When the source is available we just check the mastership
1244 if (srManager.deviceService.isAvailable(source.deviceId())) {
1245 return false;
1246 }
1247 // Fallback with Leadership service
1248 // source id is used a topic
1249 NodeId leader = srManager.leadershipService.runForLeadership(
1250 source.deviceId().toString()).leaderNodeId();
1251 // Verify if this node is the leader
1252 if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
1253 return false;
1254 }
1255 }
1256 // Done
1257 return true;
1258 }
1259
Pier Luigi35dab3f2018-01-25 16:16:02 +01001260 /**
1261 * Performs bucket verification operation for all mcast groups in the devices.
1262 * Firstly, it verifies that mcast is stable before trying verification operation.
1263 * Verification consists in creating new nexts with VERIFY operation. Actually,
1264 * the operation is totally delegated to the driver.
1265 */
1266 private final class McastBucketCorrector implements Runnable {
1267
1268 @Override
1269 public void run() {
1270 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1271 if (!isMcastStable()) {
1272 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001273 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001274 // Acquires lock
1275 mcastLock();
1276 try {
1277 // Iterates over the routes and verify the related next objectives
1278 srManager.multicastRouteService.getRoutes()
1279 .stream()
1280 .map(McastRoute::group)
1281 .forEach(mcastIp -> {
1282 log.trace("Running mcast buckets corrector for mcast group: {}",
1283 mcastIp);
1284
1285 // For each group we get current information in the store
1286 // and issue a check of the next objectives in place
1287 DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
1288 .stream().findAny().orElse(null);
1289 DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
1290 .stream().findAny().orElse(null);
1291 Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
1292 ConnectPoint source = getSource(mcastIp);
1293
1294 // Do not proceed if ingress device or source of this group are missing
1295 if (ingressDevice == null || source == null) {
1296 log.warn("Unable to run buckets corrector. " +
1297 "Missing ingress {} or source {} for group {}",
1298 ingressDevice, source, mcastIp);
1299 return;
1300 }
1301
1302 // Continue only when this instance is the master of source device
1303 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1304 log.trace("Unable to run buckets corrector. " +
1305 "Skip {} due to lack of mastership " +
1306 "of the source device {}",
1307 mcastIp, source.deviceId());
1308 return;
1309 }
1310
1311 // Create the set of the devices to be processed
1312 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1313 devicesBuilder.add(ingressDevice);
1314 if (transitDevice != null) {
1315 devicesBuilder.add(transitDevice);
1316 }
1317 if (!egressDevices.isEmpty()) {
1318 devicesBuilder.addAll(egressDevices);
1319 }
1320 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1321
1322 // Iterate over the devices
1323 devicesToProcess.forEach(deviceId -> {
1324 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1325 // If next exists in our store verify related next objective
1326 if (mcastNextObjStore.containsKey(currentKey)) {
1327 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1328 // Get current ports
1329 Set<PortNumber> currentPorts = getPorts(currentNext.next());
1330 // Rebuild the next objective
1331 currentNext = nextObjBuilder(
1332 mcastIp,
1333 assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
1334 currentPorts,
1335 currentNext.id()
1336 ).verify();
1337 // Send to the flowobjective service
1338 srManager.flowObjectiveService.next(deviceId, currentNext);
1339 } else {
1340 log.warn("Unable to run buckets corrector." +
1341 "Missing next for {} and group {}",
1342 deviceId, mcastIp);
1343 }
1344 });
1345
1346 });
1347 } finally {
1348 // Finally, it releases the lock
1349 mcastUnlock();
1350 }
1351
1352 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001353 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001354
1355 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1356 // If mcast ip is present
1357 if (mcastIp != null) {
1358 return mcastNextObjStore.entrySet().stream()
1359 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1360 .collect(Collectors.toMap(Map.Entry::getKey,
1361 entry -> entry.getValue().value().id()));
1362 }
1363 // Otherwise take all the groups
1364 return mcastNextObjStore.entrySet().stream()
1365 .collect(Collectors.toMap(Map.Entry::getKey,
1366 entry -> entry.getValue().value().id()));
1367 }
1368
1369 public Map<McastStoreKey, McastHandler.McastRole> getMcastRoles(IpAddress mcastIp) {
1370 // If mcast ip is present
1371 if (mcastIp != null) {
1372 return mcastRoleStore.entrySet().stream()
1373 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
1374 .collect(Collectors.toMap(Map.Entry::getKey,
1375 entry -> entry.getValue().value()));
1376 }
1377 // Otherwise take all the groups
1378 return mcastRoleStore.entrySet().stream()
1379 .collect(Collectors.toMap(Map.Entry::getKey,
1380 entry -> entry.getValue().value()));
1381 }
1382
1383 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1384 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1385 // Get the source
1386 ConnectPoint source = getSource(mcastIp);
1387 // Source cannot be null, we don't know the starting point
1388 if (source != null) {
1389 // Init steps
1390 Set<DeviceId> visited = Sets.newHashSet();
1391 List<ConnectPoint> currentPath = Lists.newArrayList(
1392 source
1393 );
1394 // Build recursively the mcast paths
1395 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1396 }
1397 return mcastPaths;
1398 }
1399
1400 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1401 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1402 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1403 // If we have visited the node to visit
1404 // there is a loop
1405 if (visited.contains(toVisit)) {
1406 return;
1407 }
1408 // Visit next-hop
1409 visited.add(toVisit);
1410 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1411 // Looking for next-hops
1412 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1413 // Build egress connectpoints
1414 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1415 // Get Ports
1416 Set<PortNumber> outputPorts = getPorts(nextObjective.next());
1417 // Build relative cps
1418 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1419 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1420 Set<ConnectPoint> egressPoints = cpBuilder.build();
1421 // Define other variables for the next steps
1422 Set<Link> egressLinks;
1423 List<ConnectPoint> newCurrentPath;
1424 Set<DeviceId> newVisited;
1425 DeviceId newToVisit;
1426 for (ConnectPoint egressPoint : egressPoints) {
1427 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1428 // If it does not have egress links, stop
1429 if (egressLinks.isEmpty()) {
1430 // Add the connect points to the path
1431 newCurrentPath = Lists.newArrayList(currentPath);
1432 newCurrentPath.add(0, egressPoint);
1433 // Save in the map
1434 mcastPaths.put(egressPoint, newCurrentPath);
1435 } else {
1436 newVisited = Sets.newHashSet(visited);
1437 // Iterate over the egress links for the next hops
1438 for (Link egressLink : egressLinks) {
1439 // Update to visit
1440 newToVisit = egressLink.dst().deviceId();
1441 // Add the connect points to the path
1442 newCurrentPath = Lists.newArrayList(currentPath);
1443 newCurrentPath.add(0, egressPoint);
1444 newCurrentPath.add(0, egressLink.dst());
1445 // Go to the next hop
1446 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1447 }
1448 }
1449 }
1450 }
1451 }
1452
Charles Chanc91c8782016-03-30 17:54:24 -07001453}