blob: 3b659bd1dde54bfe0d33905016b0faa7a606e503 [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Pier Luigid29ca7c2018-02-28 17:24:03 +010019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
Pier71c55772018-04-17 17:25:22 +020023import com.google.common.collect.HashMultimap;
Pier7b657162018-03-27 11:29:42 -070024import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070025import com.google.common.collect.ImmutableSet;
26import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010027import com.google.common.collect.Maps;
Pier71c55772018-04-17 17:25:22 +020028import com.google.common.collect.Multimap;
Charles Chanc91c8782016-03-30 17:54:24 -070029import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070030import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070031import org.onlab.packet.VlanId;
32import org.onlab.util.KryoNamespace;
Pierdb27b8d2018-04-17 16:29:56 +020033import org.onosproject.cluster.NodeId;
Charles Chanc91c8782016-03-30 17:54:24 -070034import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070036import org.onosproject.mcast.api.McastEvent;
37import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070038import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070039import org.onosproject.mcast.api.McastRouteUpdate;
Pier7b657162018-03-27 11:29:42 -070040import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070041import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.Link;
44import org.onosproject.net.Path;
45import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070046import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070047import org.onosproject.net.flowobjective.ForwardingObjective;
48import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070049import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070050import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010051import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070052import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070053import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010054import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chan72779502016-04-23 17:36:10 -070055import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070056import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.service.ConsistentMap;
58import org.onosproject.store.service.Serializer;
Pier Luigi580fd8a2018-01-16 10:47:50 +010059import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Pier Luigi35dab3f2018-01-25 16:16:02 +010063import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070064import java.util.Collection;
65import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010066import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070068import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070069import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070070import java.util.Optional;
71import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010072import java.util.concurrent.ScheduledExecutorService;
73import java.util.concurrent.TimeUnit;
74import java.util.concurrent.locks.Lock;
75import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070076import java.util.stream.Collectors;
77
Pier Luigi35dab3f2018-01-25 16:16:02 +010078import static java.util.concurrent.Executors.newScheduledThreadPool;
79import static org.onlab.util.Tools.groupedThreads;
Pier1f87aca2018-03-14 16:47:32 -070080
Pierdb27b8d2018-04-17 16:29:56 +020081import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pier7b657162018-03-27 11:29:42 -070082import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Pierdb27b8d2018-04-17 16:29:56 +020083import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
84import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
85import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
86import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
87
Pier979e61a2018-03-07 11:42:50 +010088import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
89import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
90import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070091
92/**
Pier Luigi69f774d2018-02-28 12:10:50 +010093 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070094 */
Charles Chan1eaf4802016-04-18 13:44:03 -070095public class McastHandler {
Pier7b657162018-03-27 11:29:42 -070096 // Logger instance
Charles Chan1eaf4802016-04-18 13:44:03 -070097 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Pier7b657162018-03-27 11:29:42 -070098 // Reference to srManager and most used internal objects
Charles Chanc91c8782016-03-30 17:54:24 -070099 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -0700100 private final TopologyService topologyService;
Pierdb27b8d2018-04-17 16:29:56 +0200101 private final McastUtils mcastUtils;
Pier7b657162018-03-27 11:29:42 -0700102 // Internal store of the Mcast nextobjectives
Charles Chan72779502016-04-23 17:36:10 -0700103 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Pier7b657162018-03-27 11:29:42 -0700104 // Internal store of the Mcast roles
Charles Chan72779502016-04-23 17:36:10 -0700105 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
106
Pier Luigid29ca7c2018-02-28 17:24:03 +0100107 // Wait time for the cache
108 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700109
Pier Luigid29ca7c2018-02-28 17:24:03 +0100110 /**
111 * The mcastEventCache is implemented to avoid race condition by giving more time to the
112 * underlying subsystems to process previous calls.
113 */
114 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
115 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
116 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
117 // Get group ip, sink and related event
118 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700119 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100120 McastEvent mcastEvent = notification.getValue();
121 RemovalCause cause = notification.getCause();
122 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
123 mcastIp, sink, mcastEvent, cause);
124 // If it expires or it has been replaced, we deque the event
125 switch (notification.getCause()) {
126 case REPLACED:
127 case EXPIRED:
128 dequeueMcastEvent(mcastEvent);
129 break;
130 default:
131 break;
132 }
133 }).build();
134
135 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700136 // Retrieve, currentData, prevData and the group
Pier1f87aca2018-03-14 16:47:32 -0700137 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700138 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
139 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100140 // Let's create the keys of the cache
Pier7b657162018-03-27 11:29:42 -0700141 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700142 if (mcastEvent.type() == SOURCES_ADDED ||
143 mcastEvent.type() == SOURCES_REMOVED) {
144 // FIXME To be addressed with multiple sources support
Pier7b657162018-03-27 11:29:42 -0700145 sinksBuilder.addAll(Collections.emptySet());
146 } else if (mcastEvent.type() == SINKS_ADDED) {
147 // We need to process the host id one by one
148 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
149 // Get the previous locations and verify if there are changes
150 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
151 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
152 prevConnectPoints : Collections.emptySet());
153 if (!changes.isEmpty()) {
154 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700155 }
Pier7b657162018-03-27 11:29:42 -0700156 }));
157 } else if (mcastEvent.type() == SINKS_REMOVED) {
158 // We need to process the host id one by one
159 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
160 // Get the current locations and verify if there are changes
161 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
162 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
163 currentConnectPoints : Collections.emptySet());
164 if (!changes.isEmpty()) {
165 sinksBuilder.add(hostId);
166 }
167 }));
168 } else if (mcastEvent.type() == ROUTE_REMOVED) {
169 // Current subject is null, just take the previous host ids
170 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100171 }
172 // Push the elements in the cache
173 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700174 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100175 mcastEventCache.put(cacheKey, mcastEvent);
176 });
177 }
178
179 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700180 // Get new and old data
181 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
182 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100183 // Get source, mcast group
Pier1f87aca2018-03-14 16:47:32 -0700184 // FIXME To be addressed with multiple sources support
Pier7b657162018-03-27 11:29:42 -0700185 final ConnectPoint source = mcastPrevUpdate.sources()
Pier1f87aca2018-03-14 16:47:32 -0700186 .stream()
187 .findFirst()
188 .orElse(null);
Pier7b657162018-03-27 11:29:42 -0700189 IpAddress mcastIp = mcastPrevUpdate.route().group();
190 // Get all the previous sinks
191 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Pier1f87aca2018-03-14 16:47:32 -0700192 .values()
193 .stream()
194 .flatMap(Collection::stream)
195 .collect(Collectors.toSet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100196 // According to the event type let's call the proper method
197 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700198 case SOURCES_ADDED:
199 // FIXME To be addressed with multiple sources support
200 // Get all the sinks
201 //Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
202 // Compute the Mcast tree
203 //Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
204 // Process the given sinks using the pre-computed paths
205 //mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100206 break;
Pier1f87aca2018-03-14 16:47:32 -0700207 case SOURCES_REMOVED:
208 // FIXME To be addressed with multiple sources support
Pier Luigid29ca7c2018-02-28 17:24:03 +0100209 // Get old source
Pier1f87aca2018-03-14 16:47:32 -0700210 //ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100211 // Just the first cached element will be processed
Pier1f87aca2018-03-14 16:47:32 -0700212 //processSourceUpdatedInternal(mcastIp, source, oldSource);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100213 break;
214 case ROUTE_REMOVED:
215 // Process the route removed, just the first cached element will be processed
Pier7b657162018-03-27 11:29:42 -0700216 processRouteRemovedInternal(source, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100217 break;
Pier1f87aca2018-03-14 16:47:32 -0700218 case SINKS_ADDED:
Pier7b657162018-03-27 11:29:42 -0700219 // FIXME To be addressed with multiple sources support
220 processSinksAddedInternal(source, mcastIp,
221 mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100222 break;
Pier1f87aca2018-03-14 16:47:32 -0700223 case SINKS_REMOVED:
Pier7b657162018-03-27 11:29:42 -0700224 // FIXME To be addressed with multiple sources support
225 processSinksRemovedInternal(source, mcastIp,
Pier28164682018-04-17 15:50:43 +0200226 mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100227 break;
228 default:
229 break;
230 }
231 }
232
Pier Luigi35dab3f2018-01-25 16:16:02 +0100233 // Mcast lock to serialize local operations
234 private final Lock mcastLock = new ReentrantLock();
235
236 /**
237 * Acquires the lock used when making mcast changes.
238 */
239 private void mcastLock() {
240 mcastLock.lock();
241 }
242
243 /**
244 * Releases the lock used when making mcast changes.
245 */
246 private void mcastUnlock() {
247 mcastLock.unlock();
248 }
249
250 // Stability threshold for Mcast. Seconds
251 private static final long MCAST_STABLITY_THRESHOLD = 5;
252 // Last change done
253 private Instant lastMcastChange = Instant.now();
254
255 /**
256 * Determines if mcast in the network has been stable in the last
257 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
258 * to the last mcast change timestamp.
259 *
260 * @return true if stable
261 */
262 private boolean isMcastStable() {
263 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
264 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Das97241862018-02-14 14:14:54 -0800265 log.trace("Mcast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100266 return (now - last) > MCAST_STABLITY_THRESHOLD;
267 }
268
269 // Verify interval for Mcast
270 private static final long MCAST_VERIFY_INTERVAL = 30;
271
272 // Executor for mcast bucket corrector
273 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100274 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100275
Charles Chan72779502016-04-23 17:36:10 -0700276 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700277 * Constructs the McastEventHandler.
278 *
279 * @param srManager Segment Routing manager
280 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700281 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700282 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700283 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700284 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700285 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700286 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700287 .register(McastStoreKey.class)
288 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700289 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700290 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700291 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700292 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700293 .build();
Pier7b657162018-03-27 11:29:42 -0700294 mcastRoleStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700295 .<McastStoreKey, McastRole>consistentMapBuilder()
296 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700297 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700298 .build();
Pier7b657162018-03-27 11:29:42 -0700299 // Let's create McastUtils object
300 mcastUtils = new McastUtils(srManager, coreAppId, log);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100301 // Init the executor service and the buckets corrector
302 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700303 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100304 // Schedule the clean up, this will allow the processing of the expired events
305 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
306 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700307 }
308
309 /**
310 * Read initial multicast from mcast store.
311 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100312 public void init() {
Pier7b657162018-03-27 11:29:42 -0700313 lastMcastChange = Instant.now();
314 mcastLock();
315 try {
316 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +0200317 // Verify leadership on the operation
318 if (!mcastUtils.isLeader(mcastRoute.group())) {
319 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
320 return;
321 }
Pier7b657162018-03-27 11:29:42 -0700322 // FIXME To be addressed with multiple sources support
323 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
324 .stream()
325 .findFirst()
326 .orElse(null);
327 // Get all the sinks and process them
328 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
329 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(), mcastRouteData.sinks());
330 // Filter out all the working sinks, we do not want to move them
331 sinks = sinks.stream()
332 .filter(sink -> {
333 McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
334 Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
335 return verMcastNext == null ||
336 !mcastUtils.getPorts(verMcastNext.value().next()).contains(sink.port());
337 })
338 .collect(Collectors.toSet());
339 // Compute the Mcast tree
340 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
341 // Process the given sinks using the pre-computed paths
342 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
343 mcastRoute.group(), paths));
344 });
345 } finally {
346 mcastUnlock();
347 }
Charles Chanc91c8782016-03-30 17:54:24 -0700348 }
349
350 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100351 * Clean up when deactivating the application.
352 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100353 public void terminate() {
Pier72d0e582018-04-20 14:14:34 +0200354 mcastEventCache.invalidateAll();
Pier Luigi35dab3f2018-01-25 16:16:02 +0100355 executorService.shutdown();
Pier72d0e582018-04-20 14:14:34 +0200356 mcastNextObjStore.destroy();
357 mcastRoleStore.destroy();
358 mcastUtils.terminate();
359 log.info("Terminated");
Pier Luigi35dab3f2018-01-25 16:16:02 +0100360 }
361
362 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100363 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
364 * SINK_REMOVED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700365 *
366 * @param event McastEvent with SOURCE_ADDED type
367 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100368 public void processMcastEvent(McastEvent event) {
369 log.info("process {}", event);
Pierdb27b8d2018-04-17 16:29:56 +0200370 // If it is a route added, we do not enqueue
371 if (event.type() == ROUTE_ADDED) {
372 // We need just to elect a leader
373 processRouteAddedInternal(event.subject().route().group());
374 } else {
375 // Just enqueue for now
376 enqueueMcastEvent(event);
377 }
Pier Luigi6786b922018-02-02 16:19:11 +0100378 }
379
380 /**
Pierdb27b8d2018-04-17 16:29:56 +0200381 * Process the ROUTE_ADDED event.
Pier Luigie80d6b42018-02-26 12:31:38 +0100382 *
Pierdb27b8d2018-04-17 16:29:56 +0200383 * @param mcastIp the group address
Pier Luigie80d6b42018-02-26 12:31:38 +0100384 */
Pierdb27b8d2018-04-17 16:29:56 +0200385 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100386 lastMcastChange = Instant.now();
387 mcastLock();
388 try {
Pierdb27b8d2018-04-17 16:29:56 +0200389 log.debug("Processing route added for group {}", mcastIp);
390 // Just elect a new leader
391 mcastUtils.isLeader(mcastIp);
Pier Luigie80d6b42018-02-26 12:31:38 +0100392 } finally {
393 mcastUnlock();
394 }
395 }
396
397 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100398 * Removes the entire mcast tree related to this group.
399 *
400 * @param mcastIp multicast group IP address
401 */
402 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
403 lastMcastChange = Instant.now();
404 mcastLock();
405 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100406 log.debug("Processing route removed for group {}", mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200407 // Verify leadership on the operation
408 if (!mcastUtils.isLeader(mcastIp)) {
409 log.debug("Skip {} due to lack of leadership", mcastIp);
410 mcastUtils.withdrawLeader(mcastIp);
411 return;
412 }
Pier Luigi6786b922018-02-02 16:19:11 +0100413
414 // Find out the ingress, transit and egress device of the affected group
Pier979e61a2018-03-07 11:42:50 +0100415 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi6786b922018-02-02 16:19:11 +0100416 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700417 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100418 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi6786b922018-02-02 16:19:11 +0100419
Pier1a7e0c02018-03-12 15:00:54 -0700420 // If there are no egress devices, sinks could be only on the ingress
Pier Luigi6786b922018-02-02 16:19:11 +0100421 if (!egressDevices.isEmpty()) {
422 egressDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700423 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier Luigi6786b922018-02-02 16:19:11 +0100424 );
425 }
Pier1a7e0c02018-03-12 15:00:54 -0700426 // Transit could be empty if sinks are on the ingress
427 if (!transitDevices.isEmpty()) {
428 transitDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700429 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier1a7e0c02018-03-12 15:00:54 -0700430 );
Pier Luigi6786b922018-02-02 16:19:11 +0100431 }
432 // Ingress device should be not null
433 if (ingressDevice != null) {
Pier7b657162018-03-27 11:29:42 -0700434 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi6786b922018-02-02 16:19:11 +0100435 }
Pier Luigi6786b922018-02-02 16:19:11 +0100436 } finally {
437 mcastUnlock();
438 }
439 }
440
Pier7b657162018-03-27 11:29:42 -0700441
442 /**
443 * Process sinks to be removed.
444 *
445 * @param source the source connect point
446 * @param mcastIp the ip address of the group
447 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200448 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700449 */
450 private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
451 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200452 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700453 lastMcastChange = Instant.now();
454 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700455 try {
Pierdb27b8d2018-04-17 16:29:56 +0200456 // Verify leadership on the operation
457 if (!mcastUtils.isLeader(mcastIp)) {
458 log.debug("Skip {} due to lack of leadership", mcastIp);
459 return;
460 }
Pier28164682018-04-17 15:50:43 +0200461 // Remove the previous ones
462 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
463 newSinks);
464 sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp));
Pier7b657162018-03-27 11:29:42 -0700465 // Recover the dual-homed sinks
Pier28164682018-04-17 15:50:43 +0200466 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
467 prevSinks);
Pier7b657162018-03-27 11:29:42 -0700468 sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
Pier7b657162018-03-27 11:29:42 -0700469 } finally {
470 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700471 }
472 }
473
Pier Luigi6786b922018-02-02 16:19:11 +0100474 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100475 * Removes a path from source to sink for given multicast group.
476 *
477 * @param source connect point of the multicast source
478 * @param sink connection point of the multicast sink
479 * @param mcastIp multicast group IP address
480 */
481 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700482 IpAddress mcastIp) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100483 lastMcastChange = Instant.now();
484 mcastLock();
485 try {
Pier7b657162018-03-27 11:29:42 -0700486 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100487 // When source and sink are on the same device
488 if (source.deviceId().equals(sink.deviceId())) {
489 // Source and sink are on even the same port. There must be something wrong.
490 if (source.port().equals(sink.port())) {
491 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
492 mcastIp, sink, source);
493 return;
494 }
Pier7b657162018-03-27 11:29:42 -0700495 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100496 if (isLast) {
497 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
498 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100499 return;
500 }
Charles Chanc91c8782016-03-30 17:54:24 -0700501
Pier Luigi35dab3f2018-01-25 16:16:02 +0100502 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700503 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100504 if (isLast) {
505 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
506 }
507
508 // If this is the last sink on the device, also update upstream
Pier1f87aca2018-03-14 16:47:32 -0700509 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
510 mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100511 if (mcastPath.isPresent()) {
512 List<Link> links = Lists.newArrayList(mcastPath.get().links());
513 Collections.reverse(links);
514 for (Link link : links) {
515 if (isLast) {
516 isLast = removePortFromDevice(
517 link.src().deviceId(),
518 link.src().port(),
519 mcastIp,
Pier7b657162018-03-27 11:29:42 -0700520 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ?
521 source : null)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100522 );
Pier Luigi92e69be2018-03-02 12:53:37 +0100523 if (isLast) {
524 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
525 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100526 }
Charles Chanc91c8782016-03-30 17:54:24 -0700527 }
528 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100529 } finally {
530 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700531 }
532 }
533
Pier7b657162018-03-27 11:29:42 -0700534
535 /**
536 * Process sinks to be added.
537 *
538 * @param source the source connect point
539 * @param mcastIp the group IP
540 * @param newSinks the new sinks to be processed
541 * @param allPrevSinks all previous sinks
542 */
543 private void processSinksAddedInternal(ConnectPoint source, IpAddress mcastIp,
544 Map<HostId, Set<ConnectPoint>> newSinks,
545 Set<ConnectPoint> allPrevSinks) {
546 lastMcastChange = Instant.now();
547 mcastLock();
548 try {
Pierdb27b8d2018-04-17 16:29:56 +0200549 // Verify leadership on the operation
550 if (!mcastUtils.isLeader(mcastIp)) {
551 log.debug("Skip {} due to lack of leadership", mcastIp);
552 return;
553 }
Pier7b657162018-03-27 11:29:42 -0700554 // Get the only sinks to be processed (new ones)
555 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
556 // Install new sinks
557 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
558 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
559 } finally {
560 mcastUnlock();
561 }
562 }
563
Charles Chanc91c8782016-03-30 17:54:24 -0700564 /**
565 * Establishes a path from source to sink for given multicast group.
566 *
567 * @param source connect point of the multicast source
568 * @param sink connection point of the multicast sink
569 * @param mcastIp multicast group IP address
570 */
571 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700572 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100573 lastMcastChange = Instant.now();
574 mcastLock();
575 try {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100576 // Process the ingress device
Pier7b657162018-03-27 11:29:42 -0700577 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
578 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Charles Chan72779502016-04-23 17:36:10 -0700579
Pier Luigi35dab3f2018-01-25 16:16:02 +0100580 // When source and sink are on the same device
581 if (source.deviceId().equals(sink.deviceId())) {
582 // Source and sink are on even the same port. There must be something wrong.
583 if (source.port().equals(sink.port())) {
584 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
585 mcastIp, sink, source);
586 return;
587 }
Pier7b657162018-03-27 11:29:42 -0700588 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier979e61a2018-03-07 11:42:50 +0100589 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100590 return;
591 }
Charles Chan72779502016-04-23 17:36:10 -0700592
Pier Luigi35dab3f2018-01-25 16:16:02 +0100593 // Find a path. If present, create/update groups and flows for each hop
Pier1f87aca2018-03-14 16:47:32 -0700594 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
595 mcastIp, allPaths);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100596 if (mcastPath.isPresent()) {
597 List<Link> links = mcastPath.get().links();
Charles Chan72779502016-04-23 17:36:10 -0700598
Pier1a7e0c02018-03-12 15:00:54 -0700599 // Setup mcast role for ingress
600 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
601 INGRESS);
602
603 // Setup properly the transit
Pier Luigi35dab3f2018-01-25 16:16:02 +0100604 links.forEach(link -> {
605 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700606 mcastUtils.assignedVlan(link.src().deviceId()
607 .equals(source.deviceId()) ? source : null));
608 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
609 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100610 });
611
Pier1a7e0c02018-03-12 15:00:54 -0700612 // Setup mcast role for the transit
613 links.stream()
614 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
615 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.dst().deviceId()),
616 TRANSIT));
617
Pier Luigi35dab3f2018-01-25 16:16:02 +0100618 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700619 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700620 // Setup mcast role for egress
Pier Luigi35dab3f2018-01-25 16:16:02 +0100621 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
Pier979e61a2018-03-07 11:42:50 +0100622 EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100623 } else {
624 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
625 source.deviceId(), sink.deviceId());
626 }
627 } finally {
628 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700629 }
630 }
631
632 /**
Charles Chan72779502016-04-23 17:36:10 -0700633 * Processes the LINK_DOWN event.
634 *
635 * @param affectedLink Link that is going down
636 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100637 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100638 lastMcastChange = Instant.now();
639 mcastLock();
640 try {
641 // Get groups affected by the link down event
642 getAffectedGroups(affectedLink).forEach(mcastIp -> {
643 // TODO Optimize when the group editing is in place
644 log.debug("Processing link down {} for group {}",
645 affectedLink, mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200646 // Verify leadership on the operation
647 if (!mcastUtils.isLeader(mcastIp)) {
648 log.debug("Skip {} due to lack of leadership", mcastIp);
649 return;
650 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100651
Pier Luigi35dab3f2018-01-25 16:16:02 +0100652 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100653 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100654 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700655 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100656 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier7b657162018-03-27 11:29:42 -0700657 ConnectPoint source = mcastUtils.getSource(mcastIp);
Charles Chana8f9dee2016-05-16 18:44:13 -0700658
Pier1a7e0c02018-03-12 15:00:54 -0700659 // Do not proceed if ingress device or source of this group are missing
660 // If sinks are in other leafs, we have ingress, transit, egress, and source
661 // If sinks are in the same leaf, we have just ingress and source
662 if (ingressDevice == null || source == null) {
663 log.warn("Missing ingress {} or source {} for group {}",
664 ingressDevice, source, mcastIp);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100665 return;
Charles Chan72779502016-04-23 17:36:10 -0700666 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100667
Pier Luigi35dab3f2018-01-25 16:16:02 +0100668 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700669 transitDevices.forEach(transitDevice ->
Pier7b657162018-03-27 11:29:42 -0700670 removeGroupFromDevice(transitDevice, mcastIp,
671 mcastUtils.assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100672
Pier1a7e0c02018-03-12 15:00:54 -0700673 // Remove transit-facing ports on the ingress device
674 removeIngressTransitPorts(mcastIp, ingressDevice, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100675
Pier7b657162018-03-27 11:29:42 -0700676 // TODO create a shared procedure with DEVICE_DOWN
Pier1f87aca2018-03-14 16:47:32 -0700677 // Compute mcast tree for the the egress devices
678 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
679
Pier7b657162018-03-27 11:29:42 -0700680 // We have to verify, if there are egresses without paths
681 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier1f87aca2018-03-14 16:47:32 -0700682 mcastTree.forEach((egressDevice, paths) -> {
Pier7b657162018-03-27 11:29:42 -0700683 // Let's check if there is at least a path
Pier1f87aca2018-03-14 16:47:32 -0700684 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
685 mcastIp, paths);
Pier7b657162018-03-27 11:29:42 -0700686 // No paths, we have to try with alternative location
687 if (!mcastPath.isPresent()) {
688 notRecovered.add(egressDevice);
689 // We were not able to find an alternative path for this egress
Pier Luigi35dab3f2018-01-25 16:16:02 +0100690 log.warn("Fail to recover egress device {} from link failure {}",
691 egressDevice, affectedLink);
Pier7b657162018-03-27 11:29:42 -0700692 removeGroupFromDevice(egressDevice, mcastIp,
693 mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100694 }
695 });
Pier7b657162018-03-27 11:29:42 -0700696
697 // Fast path, we can recover all the locations
698 if (notRecovered.isEmpty()) {
699 // Construct a new path for each egress device
700 mcastTree.forEach((egressDevice, paths) -> {
701 // We try to enforce the sinks path on the mcast tree
702 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
703 mcastIp, paths);
704 // If a path is present, let's install it
705 if (mcastPath.isPresent()) {
706 installPath(mcastIp, source, mcastPath.get());
707 }
708 });
709 } else {
710 // Let's try to recover using alternate
711 recoverSinks(egressDevices, notRecovered, mcastIp,
712 ingressDevice, source, true);
713 }
Charles Chan72779502016-04-23 17:36:10 -0700714 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100715 } finally {
716 mcastUnlock();
717 }
Charles Chan72779502016-04-23 17:36:10 -0700718 }
719
720 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100721 * Process the DEVICE_DOWN event.
722 *
723 * @param deviceDown device going down
724 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100725 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100726 lastMcastChange = Instant.now();
727 mcastLock();
728 try {
729 // Get the mcast groups affected by the device going down
730 getAffectedGroups(deviceDown).forEach(mcastIp -> {
731 // TODO Optimize when the group editing is in place
732 log.debug("Processing device down {} for group {}",
733 deviceDown, mcastIp);
Pierdb27b8d2018-04-17 16:29:56 +0200734 // Verify leadership on the operation
735 if (!mcastUtils.isLeader(mcastIp)) {
736 log.debug("Skip {} due to lack of leadership", mcastIp);
737 return;
738 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100739
Pier Luigi35dab3f2018-01-25 16:16:02 +0100740 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100741 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100742 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700743 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100744 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier7b657162018-03-27 11:29:42 -0700745 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100746
Pier Luigi35dab3f2018-01-25 16:16:02 +0100747 // Do not proceed if ingress device or source of this group are missing
748 // If sinks are in other leafs, we have ingress, transit, egress, and source
749 // If sinks are in the same leaf, we have just ingress and source
750 if (ingressDevice == null || source == null) {
751 log.warn("Missing ingress {} or source {} for group {}",
752 ingressDevice, source, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100753 return;
754 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100755
Pier Luigi35dab3f2018-01-25 16:16:02 +0100756 // If it exists, we have to remove it in any case
Pier1a7e0c02018-03-12 15:00:54 -0700757 if (!transitDevices.isEmpty()) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100758 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700759 transitDevices.forEach(transitDevice ->
Pier7b657162018-03-27 11:29:42 -0700760 removeGroupFromDevice(transitDevice, mcastIp,
761 mcastUtils.assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100762 }
763 // If the ingress is down
764 if (ingressDevice.equals(deviceDown)) {
765 // Remove entire ingress
Pier7b657162018-03-27 11:29:42 -0700766 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100767 // If other sinks different from the ingress exist
768 if (!egressDevices.isEmpty()) {
769 // Remove all the remaining egress
770 egressDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700771 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp,
772 mcastUtils.assignedVlan(null))
Pier Luigi35dab3f2018-01-25 16:16:02 +0100773 );
Pier Luigi580fd8a2018-01-16 10:47:50 +0100774 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100775 } else {
776 // Egress or transit could be down at this point
Pier1a7e0c02018-03-12 15:00:54 -0700777 // Get the ingress-transit ports if they exist
778 removeIngressTransitPorts(mcastIp, ingressDevice, source);
779
Pier Luigi35dab3f2018-01-25 16:16:02 +0100780 // One of the egress device is down
781 if (egressDevices.contains(deviceDown)) {
782 // Remove entire device down
Pier7b657162018-03-27 11:29:42 -0700783 removeGroupFromDevice(deviceDown, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100784 // Remove the device down from egress
785 egressDevices.remove(deviceDown);
786 // If there are no more egress and ingress does not have sinks
787 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100788 // We have done
789 return;
790 }
791 }
Pier1f87aca2018-03-14 16:47:32 -0700792
793 // Compute mcast tree for the the egress devices
794 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
795
Pier7b657162018-03-27 11:29:42 -0700796 // We have to verify, if there are egresses without paths
797 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier1f87aca2018-03-14 16:47:32 -0700798 mcastTree.forEach((egressDevice, paths) -> {
Pier7b657162018-03-27 11:29:42 -0700799 // Let's check if there is at least a path
Pier1f87aca2018-03-14 16:47:32 -0700800 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
Pier7b657162018-03-27 11:29:42 -0700801 mcastIp, paths);
802 // No paths, we have to try with alternative location
803 if (!mcastPath.isPresent()) {
804 notRecovered.add(egressDevice);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100805 // We were not able to find an alternative path for this egress
806 log.warn("Fail to recover egress device {} from device down {}",
807 egressDevice, deviceDown);
Pier7b657162018-03-27 11:29:42 -0700808 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100809 }
810 });
Pier7b657162018-03-27 11:29:42 -0700811
812 // Fast path, we can recover all the locations
813 if (notRecovered.isEmpty()) {
814 // Construct a new path for each egress device
815 mcastTree.forEach((egressDevice, paths) -> {
816 // We try to enforce the sinks path on the mcast tree
817 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
818 mcastIp, paths);
819 // If a path is present, let's install it
820 if (mcastPath.isPresent()) {
821 installPath(mcastIp, source, mcastPath.get());
822 }
823 });
824 } else {
825 // Let's try to recover using alternate
826 recoverSinks(egressDevices, notRecovered, mcastIp,
827 ingressDevice, source, false);
828 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100829 }
830 });
831 } finally {
832 mcastUnlock();
833 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100834 }
835
836 /**
Pier7b657162018-03-27 11:29:42 -0700837 * Try to recover sinks using alternate locations.
838 *
839 * @param egressDevices the original egress devices
840 * @param notRecovered the devices not recovered
841 * @param mcastIp the group address
842 * @param ingressDevice the ingress device
843 * @param source the source connect point
844 * @param isLinkFailure true if it is a link failure, otherwise false
845 */
846 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
847 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source,
848 boolean isLinkFailure) {
849 // Recovered devices
850 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
851 // Total affected sinks
852 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
853 // Total sinks
854 Set<ConnectPoint> totalSinks = Sets.newHashSet();
855 // Let's compute all the affected sinks and all the sinks
856 notRecovered.forEach(deviceId -> {
857 totalAffectedSinks.addAll(
858 mcastUtils.getAffectedSinks(deviceId, mcastIp)
859 .values()
860 .stream()
861 .flatMap(Collection::stream)
862 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
863 .collect(Collectors.toSet())
864 );
865 totalSinks.addAll(
866 mcastUtils.getAffectedSinks(deviceId, mcastIp)
867 .values()
868 .stream()
869 .flatMap(Collection::stream)
870 .collect(Collectors.toSet())
871 );
872 });
873
874 // Sinks to be added
875 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
876 // New egress devices, filtering out the source
877 Set<DeviceId> newEgressDevice = sinksToBeAdded.stream()
878 .map(ConnectPoint::deviceId)
879 .collect(Collectors.toSet());
880 // Let's add the devices recovered from the previous round
881 newEgressDevice.addAll(recovered);
882 // Let's do a copy of the new egresses and filter out the source
883 Set<DeviceId> copyNewEgressDevice = ImmutableSet.copyOf(newEgressDevice);
884 newEgressDevice = newEgressDevice.stream()
885 .filter(deviceId -> !deviceId.equals(ingressDevice))
886 .collect(Collectors.toSet());
887
888 // Re-compute mcast tree for the the egress devices
889 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevice);
890 // if the source was originally in the new locations, add new sinks
891 if (copyNewEgressDevice.contains(ingressDevice)) {
892 sinksToBeAdded.stream()
893 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
894 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
895 }
896
897 // Construct a new path for each egress device
898 mcastTree.forEach((egressDevice, paths) -> {
899 // We try to enforce the sinks path on the mcast tree
900 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
901 mcastIp, paths);
902 // If a path is present, let's install it
903 if (mcastPath.isPresent()) {
904 // Using recovery procedure
905 if (recovered.contains(egressDevice)) {
906 installPath(mcastIp, source, mcastPath.get());
907 } else {
908 // otherwise we need to threat as new sink
909 sinksToBeAdded.stream()
910 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
911 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
912 }
913 } else {
914 // We were not able to find an alternative path for this egress
915 log.warn("Fail to recover egress device {} from {} failure",
916 egressDevice, isLinkFailure ? "Link" : "Device");
917 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
918 }
919 });
920
921 }
922
923 /**
Pier28164682018-04-17 15:50:43 +0200924 * Process all the sinks related to a mcast group and return
925 * the ones to be removed.
926 *
927 * @param mcastIp the group address
928 * @param prevsinks the previous sinks to be evaluated
929 * @param newSinks the new sinks to be evaluted
930 * @return the set of the sinks to be removed
931 */
932 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
933 Map<HostId, Set<ConnectPoint>> prevsinks,
934 Map<HostId, Set<ConnectPoint>> newSinks) {
935 // Iterate over the sinks in order to build the set
936 // of the connect points to be removed from this group
937 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
938 prevsinks.forEach(((hostId, connectPoints) -> {
939 // We have to check with the existing flows
940 ConnectPoint sinkToBeProcessed = connectPoints.stream()
941 .filter(connectPoint -> isSink(mcastIp, connectPoint))
942 .findFirst().orElse(null);
943 if (sinkToBeProcessed != null) {
944 // If the host has been removed or location has been removed
945 if (!newSinks.containsKey(hostId) ||
946 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
947 sinksToBeProcessed.add(sinkToBeProcessed);
948 }
949 }
950 }));
951 // We have done, return the set
952 return sinksToBeProcessed;
953 }
954
955 /**
Pier7b657162018-03-27 11:29:42 -0700956 * Process new locations and return the set of sinks to be added
957 * in the context of the recovery.
958 *
Pier28164682018-04-17 15:50:43 +0200959 * @param newSinks the remaining sinks
960 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700961 * @return the set of the sinks to be processed
962 */
963 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
Pier28164682018-04-17 15:50:43 +0200964 Map<HostId, Set<ConnectPoint>> newSinks,
965 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700966 // Iterate over the sinks in order to build the set
967 // of the connect points to be served by this group
968 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier28164682018-04-17 15:50:43 +0200969 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -0700970 // If it has more than 1 locations
971 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
972 log.debug("Skip {} since sink {} has {} locations",
973 mcastIp, hostId, connectPoints.size());
974 return;
975 }
Pier28164682018-04-17 15:50:43 +0200976 // If previously it had two locations, we need to recover it
977 // Filter out if the remaining location is already served
978 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier665b0fc2018-04-19 15:53:20 +0200979 ConnectPoint sinkToBeProcessed = connectPoints.stream()
980 .filter(connectPoint -> !isSink(mcastIp, connectPoint))
981 .findFirst().orElse(null);
982 if (sinkToBeProcessed != null) {
983 sinksToBeProcessed.add(sinkToBeProcessed);
984 }
Pier28164682018-04-17 15:50:43 +0200985 }
Pier7b657162018-03-27 11:29:42 -0700986 });
987 return sinksToBeProcessed;
988 }
989
990 /**
991 * Process all the sinks related to a mcast group and return
992 * the ones to be processed.
993 *
994 * @param source the source connect point
995 * @param mcastIp the group address
996 * @param sinks the sinks to be evaluated
997 * @return the set of the sinks to be processed
998 */
999 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1000 Map<HostId, Set<ConnectPoint>> sinks) {
1001 // Iterate over the sinks in order to build the set
1002 // of the connect points to be served by this group
1003 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
1004 sinks.forEach(((hostId, connectPoints) -> {
1005 // If it has more than 2 locations
1006 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1007 log.debug("Skip {} since sink {} has {} locations",
1008 mcastIp, hostId, connectPoints.size());
1009 return;
1010 }
1011 // If it has one location, just use it
1012 if (connectPoints.size() == 1) {
1013 sinksToBeProcessed.add(connectPoints.stream()
Pier665b0fc2018-04-19 15:53:20 +02001014 .findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001015 return;
1016 }
1017 // We prefer to reuse existing flows
1018 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Pier28164682018-04-17 15:50:43 +02001019 .filter(connectPoint -> isSink(mcastIp, connectPoint))
Pier7b657162018-03-27 11:29:42 -07001020 .findFirst().orElse(null);
1021 if (sinkToBeProcessed != null) {
1022 sinksToBeProcessed.add(sinkToBeProcessed);
1023 return;
1024 }
1025 // Otherwise we prefer to reuse existing egresses
1026 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
1027 sinkToBeProcessed = connectPoints.stream()
Pier28164682018-04-17 15:50:43 +02001028 .filter(connectPoint -> egresses.contains(connectPoint.deviceId()))
Pier7b657162018-03-27 11:29:42 -07001029 .findFirst().orElse(null);
1030 if (sinkToBeProcessed != null) {
1031 sinksToBeProcessed.add(sinkToBeProcessed);
1032 return;
1033 }
1034 // Otherwise we prefer a location co-located with the source (if it exists)
1035 sinkToBeProcessed = connectPoints.stream()
1036 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1037 .findFirst().orElse(null);
1038 if (sinkToBeProcessed != null) {
1039 sinksToBeProcessed.add(sinkToBeProcessed);
1040 return;
1041 }
1042 // Finally, we randomly pick a new location
1043 sinksToBeProcessed.add(connectPoints.stream()
Pier665b0fc2018-04-19 15:53:20 +02001044 .findFirst().orElse(null));
Pier7b657162018-03-27 11:29:42 -07001045 }));
1046 // We have done, return the set
1047 return sinksToBeProcessed;
1048 }
1049
1050 /**
Pier1a7e0c02018-03-12 15:00:54 -07001051 * Utility method to remove all the ingress transit ports.
1052 *
1053 * @param mcastIp the group ip
1054 * @param ingressDevice the ingress device for this group
1055 * @param source the source connect point
1056 */
1057 private void removeIngressTransitPorts(IpAddress mcastIp, DeviceId ingressDevice,
1058 ConnectPoint source) {
1059 Set<PortNumber> ingressTransitPorts = ingressTransitPort(mcastIp);
1060 ingressTransitPorts.forEach(ingressTransitPort -> {
1061 if (ingressTransitPort != null) {
1062 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
Pier7b657162018-03-27 11:29:42 -07001063 mcastIp, mcastUtils.assignedVlan(source));
Pier1a7e0c02018-03-12 15:00:54 -07001064 if (isLast) {
1065 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
1066 }
1067 }
1068 });
1069 }
1070
1071 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001072 * Adds a port to given multicast group on given device. This involves the
1073 * update of L3 multicast group and multicast routing table entry.
1074 *
1075 * @param deviceId device ID
1076 * @param port port to be added
1077 * @param mcastIp multicast group
1078 * @param assignedVlan assigned VLAN ID
1079 */
1080 private void addPortToDevice(DeviceId deviceId, PortNumber port,
Pier7b657162018-03-27 11:29:42 -07001081 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001082 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -07001083 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001084 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001085 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001086 // First time someone request this mcast group via this device
1087 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001088 // New nextObj
Pier7b657162018-03-27 11:29:42 -07001089 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001090 portBuilder.build(), null).add();
1091 // Store the new port
1092 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001093 } else {
1094 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001095 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001096 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001097 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001098 if (existingPorts.contains(port)) {
1099 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1100 return;
1101 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001102 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001103 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001104 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001105 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001106 portBuilder.build(), nextObj.id()).addToExisting();
1107 // Store the final next objective and send only the difference to the driver
1108 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1109 // Add just the new port
1110 portBuilder = ImmutableSet.builder();
1111 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001112 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001113 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -07001114 }
1115 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -07001116 ObjectiveContext context = new DefaultObjectiveContext(
1117 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1118 mcastIp, deviceId, port.toLong(), assignedVlan),
1119 (objective, error) ->
1120 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1121 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001122 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1123 newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -07001124 srManager.flowObjectiveService.next(deviceId, newNextObj);
1125 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001126 }
1127
1128 /**
1129 * Removes a port from given multicast group on given device.
1130 * This involves the update of L3 multicast group and multicast routing
1131 * table entry.
1132 *
1133 * @param deviceId device ID
1134 * @param port port to be added
1135 * @param mcastIp multicast group
1136 * @param assignedVlan assigned VLAN ID
1137 * @return true if this is the last sink on this device
1138 */
1139 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
Pier7b657162018-03-27 11:29:42 -07001140 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001141 McastStoreKey mcastStoreKey =
1142 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -07001143 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001144 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001145 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1146 return false;
1147 }
Charles Chan72779502016-04-23 17:36:10 -07001148 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001149
Pier7b657162018-03-27 11:29:42 -07001150 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001151 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001152 if (!existingPorts.contains(port)) {
1153 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1154 return false;
1155 }
1156 // Copy and modify the ImmutableSet
1157 existingPorts = Sets.newHashSet(existingPorts);
1158 existingPorts.remove(port);
1159
1160 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001161 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001162 ForwardingObjective fwdObj;
1163 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001164 // If this is the last sink, remove flows and last bucket
Charles Chanc91c8782016-03-30 17:54:24 -07001165 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1166 // remove L3MG since there might be other flows/groups refer to
1167 // the same L2IG
Pier Luigi8cd46de2018-01-19 10:24:53 +01001168 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001169 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1170 mcastIp, deviceId, port.toLong(), assignedVlan),
1171 (objective, error) ->
1172 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
1173 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001174 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001175 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001176 } else {
1177 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +01001178 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001179 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1180 mcastIp, deviceId, port.toLong(), assignedVlan),
1181 (objective, error) ->
1182 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
1183 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +01001184 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001185 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001186 existingPorts, nextObj.id()).removeFromExisting();
Pier7b657162018-03-27 11:29:42 -07001187 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -07001188 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001189 }
Pier Luigi8cd46de2018-01-19 10:24:53 +01001190 // Let's modify the next objective removing the bucket
Pier7b657162018-03-27 11:29:42 -07001191 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001192 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
1193 srManager.flowObjectiveService.next(deviceId, newNextObj);
1194 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001195 return existingPorts.isEmpty();
1196 }
1197
Charles Chan72779502016-04-23 17:36:10 -07001198 /**
1199 * Removes entire group on given device.
1200 *
1201 * @param deviceId device ID
1202 * @param mcastIp multicast group to be removed
1203 * @param assignedVlan assigned VLAN ID
1204 */
1205 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -07001206 VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001207 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
1208 // This device is not serving this multicast group
1209 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1210 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1211 return;
1212 }
1213 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1214 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1215 // remove L3MG since there might be other flows/groups refer to
1216 // the same L2IG
1217 ObjectiveContext context = new DefaultObjectiveContext(
1218 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1219 mcastIp, deviceId, assignedVlan),
1220 (objective, error) ->
1221 log.warn("Failed to remove {} on {}, vlan {}: {}",
1222 mcastIp, deviceId, assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001223 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001224 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1225 mcastNextObjStore.remove(mcastStoreKey);
1226 mcastRoleStore.remove(mcastStoreKey);
1227 }
1228
Pier Luigi580fd8a2018-01-16 10:47:50 +01001229 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
1230 // Get Links
1231 List<Link> links = mcastPath.links();
Pier1a7e0c02018-03-12 15:00:54 -07001232
1233 // Setup new ingress mcast role
1234 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).src().deviceId()),
1235 INGRESS);
1236
Pier Luigi580fd8a2018-01-16 10:47:50 +01001237 // For each link, modify the next on the source device adding the src port
1238 // and a new filter objective on the destination port
1239 links.forEach(link -> {
1240 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001241 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1242 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1243 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001244 });
Pier1a7e0c02018-03-12 15:00:54 -07001245
1246 // Setup mcast role for the transit
1247 links.stream()
1248 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
1249 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.src().deviceId()),
1250 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001251 }
1252
Charles Chanc91c8782016-03-30 17:54:24 -07001253 /**
Pier1f87aca2018-03-14 16:47:32 -07001254 * Go through all the paths, looking for shared links to be used
1255 * in the final path computation.
1256 *
1257 * @param egresses egress devices
1258 * @param availablePaths all the available paths towards the egress
1259 * @return shared links between egress devices
1260 */
1261 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1262 Map<DeviceId, List<Path>> availablePaths) {
1263 // Length of the shortest path
1264 int minLength = Integer.MAX_VALUE;
1265 int length;
1266 // Current paths
1267 List<Path> currentPaths;
1268 // Verify the source can still reach all the egresses
1269 for (DeviceId egress : egresses) {
1270 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001271 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001272 currentPaths = availablePaths.get(egress);
1273 if (currentPaths.isEmpty()) {
1274 continue;
1275 }
1276 // Get the length of the first one available,
Pier7b657162018-03-27 11:29:42 -07001277 // update the min length
Pier1f87aca2018-03-14 16:47:32 -07001278 length = currentPaths.get(0).links().size();
1279 if (length < minLength) {
1280 minLength = length;
1281 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001282 }
Pier1f87aca2018-03-14 16:47:32 -07001283 // If there are no paths
1284 if (minLength == Integer.MAX_VALUE) {
1285 return Collections.emptySet();
1286 }
1287 // Iterate looking for shared links
1288 int index = 0;
1289 // Define the sets for the intersection
1290 Set<Link> sharedLinks = Sets.newHashSet();
1291 Set<Link> currentSharedLinks;
1292 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001293 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001294 // Let's find out the shared links
1295 while (index < minLength) {
1296 // Initialize the intersection with the paths related to the first egress
1297 currentPaths = availablePaths.get(
1298 egresses.stream()
1299 .findFirst()
1300 .orElse(null)
1301 );
1302 currentSharedLinks = Sets.newHashSet();
1303 // Iterate over the paths and take the "index" links
1304 for (Path path : currentPaths) {
1305 currentSharedLinks.add(path.links().get(index));
1306 }
1307 // Iterate over the remaining egress
1308 for (DeviceId egress : egresses) {
1309 // Iterate over the paths and take the "index" links
1310 currentLinks = Sets.newHashSet();
1311 for (Path path : availablePaths.get(egress)) {
1312 currentLinks.add(path.links().get(index));
1313 }
1314 // Do intersection
1315 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1316 // If there are no shared paths exit and record the device to remove
1317 // we have to retry with a subset of sinks
1318 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001319 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001320 index = minLength;
1321 break;
1322 }
1323 }
1324 sharedLinks.addAll(currentSharedLinks);
1325 index++;
1326 }
1327 // If the shared links is empty and there are egress
1328 // let's retry another time with less sinks, we can
1329 // still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001330 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1331 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001332 sharedLinks = exploreMcastTree(egresses, availablePaths);
1333 }
1334 return sharedLinks;
1335 }
1336
1337 /**
1338 * Build Mcast tree having as root the given source and as leaves the given egress points.
1339 *
1340 * @param source source of the tree
1341 * @param sinks leaves of the tree
1342 * @return the computed Mcast tree
1343 */
1344 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
Pier7b657162018-03-27 11:29:42 -07001345 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001346 // Get the egress devices, remove source from the egress if present
1347 Set<DeviceId> egresses = sinks.stream()
1348 .map(ConnectPoint::deviceId)
1349 .filter(deviceId -> !deviceId.equals(source))
1350 .collect(Collectors.toSet());
1351 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pier7b657162018-03-27 11:29:42 -07001352 // Build final tree and return it as it is
Pier1f87aca2018-03-14 16:47:32 -07001353 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001354 // We need to put back the source if it was originally present
1355 sinks.forEach(sink -> {
1356 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1357 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1358 });
Pier1f87aca2018-03-14 16:47:32 -07001359 return finalTree;
1360 }
1361
1362 /**
1363 * Build Mcast tree having as root the given source and as leaves the given egress.
1364 *
1365 * @param source source of the tree
1366 * @param egresses leaves of the tree
1367 * @return the computed Mcast tree
1368 */
1369 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1370 Set<DeviceId> egresses) {
1371 // Pre-compute all the paths
1372 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
1373 // No links to enforce
1374 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1375 Collections.emptySet())));
1376 // Explore the topology looking for shared links amongst the egresses
1377 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
1378 // Remove all the paths from the previous computation
1379 availablePaths.clear();
1380 // Build the final paths enforcing the shared links between egress devices
1381 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1382 linksToEnforce)));
1383 return availablePaths;
1384 }
1385
1386 /**
1387 * Gets path from src to dst computed using the custom link weigher.
1388 *
1389 * @param src source device ID
1390 * @param dst destination device ID
1391 * @return list of paths from src to dst
1392 */
1393 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
1394 // Takes a snapshot of the topology
1395 final Topology currentTopology = topologyService.currentTopology();
1396 // Build a specific link weigher for this path computation
1397 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
1398 // We will use our custom link weigher for our path
1399 // computations and build the list of valid paths
1400 List<Path> allPaths = Lists.newArrayList(
1401 topologyService.getPaths(currentTopology, src, dst, linkWeigher)
1402 );
1403 // If there are no valid paths, just exit
1404 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1405 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001406 }
1407
Charles Chanc91c8782016-03-30 17:54:24 -07001408 /**
1409 * Gets a path from src to dst.
1410 * If a path was allocated before, returns the allocated path.
1411 * Otherwise, randomly pick one from available paths.
1412 *
1413 * @param src source device ID
1414 * @param dst destination device ID
1415 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001416 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001417 * @return an optional path from src to dst
1418 */
Pier1f87aca2018-03-14 16:47:32 -07001419 private Optional<Path> getPath(DeviceId src, DeviceId dst,
1420 IpAddress mcastIp, List<Path> allPaths) {
1421 // Firstly we get all the valid paths, if the supplied are null
1422 if (allPaths == null) {
1423 allPaths = getPaths(src, dst, Collections.emptySet());
1424 }
1425
1426 // If there are no paths just exit
Charles Chanc91c8782016-03-30 17:54:24 -07001427 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001428 return Optional.empty();
1429 }
1430
Pier Luigi91573e12018-01-23 16:06:38 +01001431 // Create a map index of suitablity-to-list of paths. For example
1432 // a path in the list associated to the index 1 shares only the
1433 // first hop and it is less suitable of a path belonging to the index
1434 // 2 that shares leaf-spine.
1435 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1436 // Some init steps
1437 int nhop;
1438 McastStoreKey mcastStoreKey;
1439 Link hop;
1440 PortNumber srcPort;
1441 Set<PortNumber> existingPorts;
1442 NextObjective nextObj;
1443 // Iterate over paths looking for eligible paths
1444 for (Path path : allPaths) {
1445 // Unlikely, it will happen...
1446 if (!src.equals(path.links().get(0).src().deviceId())) {
1447 continue;
1448 }
1449 nhop = 0;
1450 // Iterate over the links
1451 while (nhop < path.links().size()) {
1452 // Get the link and verify if a next related
1453 // to the src device exist in the store
1454 hop = path.links().get(nhop);
1455 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1456 // It does not exist in the store, exit
1457 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1458 break;
Charles Chanc91c8782016-03-30 17:54:24 -07001459 }
Pier Luigi91573e12018-01-23 16:06:38 +01001460 // Get the output ports on the next
1461 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001462 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001463 // And the src port on the link
1464 srcPort = hop.src().port();
1465 // the src port is not used as output, exit
1466 if (!existingPorts.contains(srcPort)) {
1467 break;
1468 }
1469 nhop++;
1470 }
1471 // n_hop defines the index
1472 if (nhop > 0) {
1473 eligiblePaths.compute(nhop, (index, paths) -> {
1474 paths = paths == null ? Lists.newArrayList() : paths;
1475 paths.add(path);
1476 return paths;
1477 });
Charles Chanc91c8782016-03-30 17:54:24 -07001478 }
1479 }
Pier Luigi91573e12018-01-23 16:06:38 +01001480
1481 // No suitable paths
1482 if (eligiblePaths.isEmpty()) {
1483 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1484 // Otherwise, randomly pick a path
1485 Collections.shuffle(allPaths);
1486 return allPaths.stream().findFirst();
1487 }
1488
1489 // Let's take the best ones
1490 Integer bestIndex = eligiblePaths.keySet()
1491 .stream()
1492 .sorted(Comparator.reverseOrder())
1493 .findFirst().orElse(null);
1494 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1495 log.debug("{} eligiblePath(s) found from {} to {}",
1496 bestPaths.size(), src, dst);
1497 // randomly pick a path on the highest index
1498 Collections.shuffle(bestPaths);
1499 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001500 }
1501
1502 /**
Charles Chan72779502016-04-23 17:36:10 -07001503 * Gets device(s) of given role in given multicast group.
1504 *
1505 * @param mcastIp multicast IP
1506 * @param role multicast role
1507 * @return set of device ID or empty set if not found
1508 */
1509 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1510 return mcastRoleStore.entrySet().stream()
1511 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1512 entry.getValue().value() == role)
Pier1f87aca2018-03-14 16:47:32 -07001513 .map(Entry::getKey).map(McastStoreKey::deviceId)
Charles Chan72779502016-04-23 17:36:10 -07001514 .collect(Collectors.toSet());
1515 }
1516
1517 /**
1518 * Gets groups which is affected by the link down event.
1519 *
1520 * @param link link going down
1521 * @return a set of multicast IpAddress
1522 */
1523 private Set<IpAddress> getAffectedGroups(Link link) {
1524 DeviceId deviceId = link.src().deviceId();
1525 PortNumber port = link.src().port();
1526 return mcastNextObjStore.entrySet().stream()
1527 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Pier7b657162018-03-27 11:29:42 -07001528 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
Pier1f87aca2018-03-14 16:47:32 -07001529 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Charles Chan72779502016-04-23 17:36:10 -07001530 .collect(Collectors.toSet());
1531 }
1532
1533 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001534 * Gets groups which are affected by the device down event.
1535 *
1536 * @param deviceId device going down
1537 * @return a set of multicast IpAddress
1538 */
1539 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1540 return mcastNextObjStore.entrySet().stream()
1541 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001542 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001543 .collect(Collectors.toSet());
1544 }
1545
1546 /**
Charles Chan72779502016-04-23 17:36:10 -07001547 * Gets the spine-facing port on ingress device of given multicast group.
1548 *
1549 * @param mcastIp multicast IP
1550 * @return spine-facing port on ingress device
1551 */
Pier1a7e0c02018-03-12 15:00:54 -07001552 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp) {
Pier979e61a2018-03-07 11:42:50 +01001553 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Charles Chan72779502016-04-23 17:36:10 -07001554 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001555 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001556 if (ingressDevice != null) {
1557 NextObjective nextObj = mcastNextObjStore
1558 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
Pier7b657162018-03-27 11:29:42 -07001559 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001560 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001561 for (PortNumber port : ports) {
1562 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001563 if (srManager.deviceConfiguration() != null &&
1564 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001565 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier1a7e0c02018-03-12 15:00:54 -07001566 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001567 }
1568 }
1569 }
Pier1a7e0c02018-03-12 15:00:54 -07001570 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001571 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001572
1573 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001574 * Verify if the given device has sinks
1575 * for the multicast group.
1576 *
1577 * @param deviceId device Id
1578 * @param mcastIp multicast IP
1579 * @return true if the device has sink for the group.
1580 * False otherwise.
1581 */
1582 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1583 if (deviceId != null) {
1584 // Get the nextobjective
1585 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1586 new McastStoreKey(mcastIp, deviceId)
1587 );
1588 // If it exists
1589 if (versionedNextObj != null) {
1590 NextObjective nextObj = versionedNextObj.value();
1591 // Retrieves all the output ports
Pier7b657162018-03-27 11:29:42 -07001592 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier Luigi580fd8a2018-01-16 10:47:50 +01001593 // Tries to find at least one port that is not spine-facing
1594 for (PortNumber port : ports) {
1595 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001596 if (srManager.deviceConfiguration() != null &&
1597 (!srManager.deviceConfiguration().getPortSubnets(deviceId, port).isEmpty() ||
Pier Luigi580fd8a2018-01-16 10:47:50 +01001598 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1599 return true;
1600 }
1601 }
1602 }
1603 }
1604 return false;
1605 }
1606
1607 /**
Pier28164682018-04-17 15:50:43 +02001608 * Verify if a given connect point is sink for this group.
1609 *
1610 * @param mcastIp group address
1611 * @param connectPoint connect point to be verified
1612 * @return true if the connect point is sink of the group
1613 */
1614 private boolean isSink(IpAddress mcastIp, ConnectPoint connectPoint) {
1615 // Let's check if we are already serving that location
1616 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
1617 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1618 return false;
1619 }
1620 // Get next and check with the port
1621 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1622 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1623 }
1624
1625 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001626 * Updates filtering objective for given device and port.
1627 * It is called in general when the mcast config has been
1628 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001629 *
1630 * @param deviceId device ID
1631 * @param portNum ingress port number
1632 * @param vlanId assigned VLAN ID
1633 * @param install true to add, false to remove
1634 */
Pier Luigi69f774d2018-02-28 12:10:50 +01001635 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001636 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001637 lastMcastChange = Instant.now();
1638 mcastLock();
1639 try {
1640 // Iterates over the route and updates properly the filtering objective
1641 // on the source device.
1642 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pierdb27b8d2018-04-17 16:29:56 +02001643 log.debug("Update filter for {}", mcastRoute.group());
1644 // Verify leadership on the operation
1645 if (!mcastUtils.isLeader(mcastRoute.group())) {
1646 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1647 return;
1648 }
Pier1f87aca2018-03-14 16:47:32 -07001649 // FIXME To be addressed with multiple sources support
1650 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
1651 .stream()
1652 .findFirst().orElse(null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001653 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1654 if (install) {
Pier7b657162018-03-27 11:29:42 -07001655 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001656 } else {
Pier7b657162018-03-27 11:29:42 -07001657 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001658 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001659 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001660 });
1661 } finally {
1662 mcastUnlock();
1663 }
1664 }
1665
1666 /**
1667 * Performs bucket verification operation for all mcast groups in the devices.
1668 * Firstly, it verifies that mcast is stable before trying verification operation.
1669 * Verification consists in creating new nexts with VERIFY operation. Actually,
1670 * the operation is totally delegated to the driver.
1671 */
1672 private final class McastBucketCorrector implements Runnable {
1673
1674 @Override
1675 public void run() {
1676 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1677 if (!isMcastStable()) {
1678 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001679 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001680 // Acquires lock
1681 mcastLock();
1682 try {
1683 // Iterates over the routes and verify the related next objectives
1684 srManager.multicastRouteService.getRoutes()
1685 .stream()
1686 .map(McastRoute::group)
1687 .forEach(mcastIp -> {
1688 log.trace("Running mcast buckets corrector for mcast group: {}",
1689 mcastIp);
1690
1691 // For each group we get current information in the store
1692 // and issue a check of the next objectives in place
Pier979e61a2018-03-07 11:42:50 +01001693 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +01001694 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001695 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +01001696 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi92e69be2018-03-02 12:53:37 +01001697 // Get source and sinks from Mcast Route Service and warn about errors
Pier7b657162018-03-27 11:29:42 -07001698 ConnectPoint source = mcastUtils.getSource(mcastIp);
1699 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1700 .flatMap(Collection::stream)
1701 .collect(Collectors.toSet());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001702
1703 // Do not proceed if ingress device or source of this group are missing
1704 if (ingressDevice == null || source == null) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001705 if (!sinks.isEmpty()) {
1706 log.warn("Unable to run buckets corrector. " +
1707 "Missing ingress {} or source {} for group {}",
1708 ingressDevice, source, mcastIp);
1709 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001710 return;
1711 }
1712
Pierdb27b8d2018-04-17 16:29:56 +02001713 // Continue only when this instance is the leader of the group
1714 if (!mcastUtils.isLeader(mcastIp)) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001715 log.trace("Unable to run buckets corrector. " +
Pierdb27b8d2018-04-17 16:29:56 +02001716 "Skip {} due to lack of leadership", mcastIp);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001717 return;
1718 }
1719
1720 // Create the set of the devices to be processed
1721 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1722 devicesBuilder.add(ingressDevice);
Pier1a7e0c02018-03-12 15:00:54 -07001723 if (!transitDevices.isEmpty()) {
1724 devicesBuilder.addAll(transitDevices);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001725 }
1726 if (!egressDevices.isEmpty()) {
1727 devicesBuilder.addAll(egressDevices);
1728 }
1729 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1730
1731 // Iterate over the devices
1732 devicesToProcess.forEach(deviceId -> {
1733 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1734 // If next exists in our store verify related next objective
1735 if (mcastNextObjStore.containsKey(currentKey)) {
1736 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1737 // Get current ports
Pier7b657162018-03-27 11:29:42 -07001738 Set<PortNumber> currentPorts = mcastUtils.getPorts(currentNext.next());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001739 // Rebuild the next objective
Pier7b657162018-03-27 11:29:42 -07001740 currentNext = mcastUtils.nextObjBuilder(
Pier Luigi35dab3f2018-01-25 16:16:02 +01001741 mcastIp,
Pier7b657162018-03-27 11:29:42 -07001742 mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1743 source : null),
Pier Luigi35dab3f2018-01-25 16:16:02 +01001744 currentPorts,
1745 currentNext.id()
1746 ).verify();
1747 // Send to the flowobjective service
1748 srManager.flowObjectiveService.next(deviceId, currentNext);
1749 } else {
Pier Luigid8a15162018-02-15 16:33:08 +01001750 log.warn("Unable to run buckets corrector. " +
Pier Luigi35dab3f2018-01-25 16:16:02 +01001751 "Missing next for {} and group {}",
1752 deviceId, mcastIp);
1753 }
1754 });
1755
1756 });
1757 } finally {
1758 // Finally, it releases the lock
1759 mcastUnlock();
1760 }
1761
1762 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001763 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001764
1765 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1766 // If mcast ip is present
1767 if (mcastIp != null) {
1768 return mcastNextObjStore.entrySet().stream()
1769 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001770 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001771 entry -> entry.getValue().value().id()));
1772 }
1773 // Otherwise take all the groups
1774 return mcastNextObjStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001775 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001776 entry -> entry.getValue().value().id()));
1777 }
1778
Pier71c55772018-04-17 17:25:22 +02001779 /**
1780 * Returns the associated roles to the mcast groups or to the single
1781 * group if mcastIp is present.
1782 *
1783 * @param mcastIp the group ip
1784 * @return the mapping mcastIp-device to mcast role
1785 *
1786 * @deprecated in 1.12 ("Magpie") release.
1787 */
1788 @Deprecated
Pier Luigi69f774d2018-02-28 12:10:50 +01001789 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001790 // If mcast ip is present
1791 if (mcastIp != null) {
1792 return mcastRoleStore.entrySet().stream()
1793 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001794 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001795 entry -> entry.getValue().value()));
1796 }
1797 // Otherwise take all the groups
1798 return mcastRoleStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001799 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001800 entry -> entry.getValue().value()));
1801 }
1802
Pier71c55772018-04-17 17:25:22 +02001803 /**
1804 * Returns the associated paths to the mcast group.
1805 *
1806 * @param mcastIp the group ip
1807 * @return the mapping egress point to mcast path
1808 *
1809 * @deprecated in 1.12 ("Magpie") release.
1810 */
1811 @Deprecated
Pier Luigi0f9635b2018-01-15 18:06:43 +01001812 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1813 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1814 // Get the source
Pier7b657162018-03-27 11:29:42 -07001815 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001816 // Source cannot be null, we don't know the starting point
1817 if (source != null) {
1818 // Init steps
1819 Set<DeviceId> visited = Sets.newHashSet();
1820 List<ConnectPoint> currentPath = Lists.newArrayList(
1821 source
1822 );
1823 // Build recursively the mcast paths
1824 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1825 }
1826 return mcastPaths;
1827 }
1828
Pier71c55772018-04-17 17:25:22 +02001829 /**
1830 * Returns the associated trees to the mcast group.
1831 *
1832 * @param mcastIp the group ip
1833 * @param sourcecp the source connect point
1834 * @return the mapping egress point to mcast path
1835 */
1836 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
1837 ConnectPoint sourcecp) {
1838 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
1839 // Get the sources
1840 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
1841
1842 // If we are providing the source, let's filter out
1843 if (sourcecp != null) {
1844 sources = sources.stream()
1845 .filter(source -> source.equals(sourcecp))
1846 .collect(Collectors.toSet());
1847 }
1848
1849 // Source cannot be null, we don't know the starting point
1850 if (!sources.isEmpty()) {
1851 sources.forEach(source -> {
1852 // Init steps
1853 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1854 Set<DeviceId> visited = Sets.newHashSet();
1855 List<ConnectPoint> currentPath = Lists.newArrayList(source);
1856 // Build recursively the mcast paths
1857 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1858 mcastPaths.forEach(mcastTrees::put);
1859 });
1860 }
1861 return mcastTrees;
1862 }
1863
1864 /**
1865 * Build recursively the mcast paths.
1866 *
1867 * @param toVisit the node to visit
1868 * @param visited the visited nodes
1869 * @param mcastPaths the current mcast paths
1870 * @param currentPath the current path
1871 * @param mcastIp the group ip
1872 */
Pier Luigi0f9635b2018-01-15 18:06:43 +01001873 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1874 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1875 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1876 // If we have visited the node to visit
1877 // there is a loop
1878 if (visited.contains(toVisit)) {
1879 return;
1880 }
1881 // Visit next-hop
1882 visited.add(toVisit);
1883 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1884 // Looking for next-hops
1885 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1886 // Build egress connectpoints
1887 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1888 // Get Ports
Pier7b657162018-03-27 11:29:42 -07001889 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigi0f9635b2018-01-15 18:06:43 +01001890 // Build relative cps
1891 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1892 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1893 Set<ConnectPoint> egressPoints = cpBuilder.build();
1894 // Define other variables for the next steps
1895 Set<Link> egressLinks;
1896 List<ConnectPoint> newCurrentPath;
1897 Set<DeviceId> newVisited;
1898 DeviceId newToVisit;
1899 for (ConnectPoint egressPoint : egressPoints) {
1900 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1901 // If it does not have egress links, stop
1902 if (egressLinks.isEmpty()) {
1903 // Add the connect points to the path
1904 newCurrentPath = Lists.newArrayList(currentPath);
1905 newCurrentPath.add(0, egressPoint);
1906 // Save in the map
1907 mcastPaths.put(egressPoint, newCurrentPath);
1908 } else {
1909 newVisited = Sets.newHashSet(visited);
1910 // Iterate over the egress links for the next hops
1911 for (Link egressLink : egressLinks) {
1912 // Update to visit
1913 newToVisit = egressLink.dst().deviceId();
1914 // Add the connect points to the path
1915 newCurrentPath = Lists.newArrayList(currentPath);
1916 newCurrentPath.add(0, egressPoint);
1917 newCurrentPath.add(0, egressLink.dst());
1918 // Go to the next hop
1919 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1920 }
1921 }
1922 }
1923 }
1924 }
1925
Pierdb27b8d2018-04-17 16:29:56 +02001926 /**
1927 * Return the leaders of the mcast groups.
1928 *
1929 * @param mcastIp the group ip
1930 * @return the mapping group-node
1931 */
1932 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
1933 return mcastUtils.getMcastLeaders(mcastIp);
1934 }
Charles Chanc91c8782016-03-30 17:54:24 -07001935}