blob: 2c82cf5c046d0faab0e161b6fe10b276ad8a269b [file] [log] [blame]
Charles Chanc91c8782016-03-30 17:54:24 -07001/*
Pier Luigi69f774d2018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chanc91c8782016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi69f774d2018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chanc91c8782016-03-30 17:54:24 -070018
Pier Luigid29ca7c2018-02-28 17:24:03 +010019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
Pier7b657162018-03-27 11:29:42 -070023import com.google.common.collect.ImmutableList;
Charles Chanc91c8782016-03-30 17:54:24 -070024import com.google.common.collect.ImmutableSet;
25import com.google.common.collect.Lists;
Pier Luigi91573e12018-01-23 16:06:38 +010026import com.google.common.collect.Maps;
Charles Chanc91c8782016-03-30 17:54:24 -070027import com.google.common.collect.Sets;
Charles Chanc91c8782016-03-30 17:54:24 -070028import org.onlab.packet.IpAddress;
Charles Chanc91c8782016-03-30 17:54:24 -070029import org.onlab.packet.VlanId;
30import org.onlab.util.KryoNamespace;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
Pier1f87aca2018-03-14 16:47:32 -070033import org.onosproject.mcast.api.McastEvent;
34import org.onosproject.mcast.api.McastRoute;
Pier7b657162018-03-27 11:29:42 -070035import org.onosproject.mcast.api.McastRouteData;
Pier1f87aca2018-03-14 16:47:32 -070036import org.onosproject.mcast.api.McastRouteUpdate;
Pier7b657162018-03-27 11:29:42 -070037import org.onosproject.net.HostId;
Charles Chanc91c8782016-03-30 17:54:24 -070038import org.onosproject.net.ConnectPoint;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.Link;
41import org.onosproject.net.Path;
42import org.onosproject.net.PortNumber;
Charles Chan72779502016-04-23 17:36:10 -070043import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chanc91c8782016-03-30 17:54:24 -070044import org.onosproject.net.flowobjective.ForwardingObjective;
45import org.onosproject.net.flowobjective.NextObjective;
Charles Chan72779502016-04-23 17:36:10 -070046import org.onosproject.net.flowobjective.ObjectiveContext;
Pier1f87aca2018-03-14 16:47:32 -070047import org.onosproject.net.topology.LinkWeigher;
Pier Luigid8a15162018-02-15 16:33:08 +010048import org.onosproject.net.topology.Topology;
Charles Chanc91c8782016-03-30 17:54:24 -070049import org.onosproject.net.topology.TopologyService;
Pier1f87aca2018-03-14 16:47:32 -070050import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi69f774d2018-02-28 12:10:50 +010051import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chan72779502016-04-23 17:36:10 -070052import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chanc91c8782016-03-30 17:54:24 -070053import org.onosproject.store.serializers.KryoNamespaces;
54import org.onosproject.store.service.ConsistentMap;
55import org.onosproject.store.service.Serializer;
Pier Luigi580fd8a2018-01-16 10:47:50 +010056import org.onosproject.store.service.Versioned;
Charles Chanc91c8782016-03-30 17:54:24 -070057import org.slf4j.Logger;
58import org.slf4j.LoggerFactory;
59
Pier Luigi35dab3f2018-01-25 16:16:02 +010060import java.time.Instant;
Charles Chanc91c8782016-03-30 17:54:24 -070061import java.util.Collection;
62import java.util.Collections;
Pier Luigi91573e12018-01-23 16:06:38 +010063import java.util.Comparator;
Charles Chanc91c8782016-03-30 17:54:24 -070064import java.util.List;
Charles Chan72779502016-04-23 17:36:10 -070065import java.util.Map;
Pier1f87aca2018-03-14 16:47:32 -070066import java.util.Map.Entry;
Charles Chanc91c8782016-03-30 17:54:24 -070067import java.util.Optional;
68import java.util.Set;
Pier Luigi35dab3f2018-01-25 16:16:02 +010069import java.util.concurrent.ScheduledExecutorService;
70import java.util.concurrent.TimeUnit;
71import java.util.concurrent.locks.Lock;
72import java.util.concurrent.locks.ReentrantLock;
Charles Chan72779502016-04-23 17:36:10 -070073import java.util.stream.Collectors;
74
Pier Luigi35dab3f2018-01-25 16:16:02 +010075import static java.util.concurrent.Executors.newScheduledThreadPool;
76import static org.onlab.util.Tools.groupedThreads;
Pier1f87aca2018-03-14 16:47:32 -070077
Pier7b657162018-03-27 11:29:42 -070078import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
79import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
80import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
81import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
82import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Pier979e61a2018-03-07 11:42:50 +010083import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
84import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
85import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chanc91c8782016-03-30 17:54:24 -070086
87/**
Pier Luigi69f774d2018-02-28 12:10:50 +010088 * Handles Multicast related events.
Charles Chanc91c8782016-03-30 17:54:24 -070089 */
Charles Chan1eaf4802016-04-18 13:44:03 -070090public class McastHandler {
Pier7b657162018-03-27 11:29:42 -070091 // Logger instance
Charles Chan1eaf4802016-04-18 13:44:03 -070092 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Pier7b657162018-03-27 11:29:42 -070093 // Reference to srManager and most used internal objects
Charles Chanc91c8782016-03-30 17:54:24 -070094 private final SegmentRoutingManager srManager;
Charles Chan82f19972016-05-17 13:13:55 -070095 private final TopologyService topologyService;
Pier7b657162018-03-27 11:29:42 -070096 // Internal store of the Mcast nextobjectives
Charles Chan72779502016-04-23 17:36:10 -070097 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Pier7b657162018-03-27 11:29:42 -070098 // Internal store of the Mcast roles
Charles Chan72779502016-04-23 17:36:10 -070099 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
Pier7b657162018-03-27 11:29:42 -0700100 // McastUtils
101 private final McastUtils mcastUtils;
Charles Chan72779502016-04-23 17:36:10 -0700102
Pier Luigid29ca7c2018-02-28 17:24:03 +0100103 // Wait time for the cache
104 private static final int WAIT_TIME_MS = 1000;
Pier7b657162018-03-27 11:29:42 -0700105
106 // Wait time for the removal of the old location
107 private static final int HOST_MOVED_DELAY_MS = 1000;
108
Pier Luigid29ca7c2018-02-28 17:24:03 +0100109 /**
110 * The mcastEventCache is implemented to avoid race condition by giving more time to the
111 * underlying subsystems to process previous calls.
112 */
113 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
114 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
115 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
116 // Get group ip, sink and related event
117 IpAddress mcastIp = notification.getKey().mcastIp();
Pier7b657162018-03-27 11:29:42 -0700118 HostId sink = notification.getKey().sinkHost();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100119 McastEvent mcastEvent = notification.getValue();
120 RemovalCause cause = notification.getCause();
121 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
122 mcastIp, sink, mcastEvent, cause);
123 // If it expires or it has been replaced, we deque the event
124 switch (notification.getCause()) {
125 case REPLACED:
126 case EXPIRED:
127 dequeueMcastEvent(mcastEvent);
128 break;
129 default:
130 break;
131 }
132 }).build();
133
134 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700135 // Retrieve, currentData, prevData and the group
Pier1f87aca2018-03-14 16:47:32 -0700136 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pier7b657162018-03-27 11:29:42 -0700137 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
138 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100139 // Let's create the keys of the cache
Pier7b657162018-03-27 11:29:42 -0700140 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier1f87aca2018-03-14 16:47:32 -0700141 if (mcastEvent.type() == SOURCES_ADDED ||
142 mcastEvent.type() == SOURCES_REMOVED) {
143 // FIXME To be addressed with multiple sources support
Pier7b657162018-03-27 11:29:42 -0700144 sinksBuilder.addAll(Collections.emptySet());
145 } else if (mcastEvent.type() == SINKS_ADDED) {
146 // We need to process the host id one by one
147 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
148 // Get the previous locations and verify if there are changes
149 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
150 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
151 prevConnectPoints : Collections.emptySet());
152 if (!changes.isEmpty()) {
153 sinksBuilder.add(hostId);
Pier1f87aca2018-03-14 16:47:32 -0700154 }
Pier7b657162018-03-27 11:29:42 -0700155 }));
156 } else if (mcastEvent.type() == SINKS_REMOVED) {
157 // We need to process the host id one by one
158 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
159 // Get the current locations and verify if there are changes
160 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
161 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
162 currentConnectPoints : Collections.emptySet());
163 if (!changes.isEmpty()) {
164 sinksBuilder.add(hostId);
165 }
166 }));
167 } else if (mcastEvent.type() == ROUTE_REMOVED) {
168 // Current subject is null, just take the previous host ids
169 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100170 }
171 // Push the elements in the cache
172 sinksBuilder.build().forEach(sink -> {
Pier1f87aca2018-03-14 16:47:32 -0700173 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100174 mcastEventCache.put(cacheKey, mcastEvent);
175 });
176 }
177
178 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pier7b657162018-03-27 11:29:42 -0700179 // Get new and old data
180 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
181 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier Luigid29ca7c2018-02-28 17:24:03 +0100182 // Get source, mcast group
Pier1f87aca2018-03-14 16:47:32 -0700183 // FIXME To be addressed with multiple sources support
Pier7b657162018-03-27 11:29:42 -0700184 final ConnectPoint source = mcastPrevUpdate.sources()
Pier1f87aca2018-03-14 16:47:32 -0700185 .stream()
186 .findFirst()
187 .orElse(null);
Pier7b657162018-03-27 11:29:42 -0700188 IpAddress mcastIp = mcastPrevUpdate.route().group();
189 // Get all the previous sinks
190 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Pier1f87aca2018-03-14 16:47:32 -0700191 .values()
192 .stream()
193 .flatMap(Collection::stream)
194 .collect(Collectors.toSet());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100195 // According to the event type let's call the proper method
196 switch (mcastEvent.type()) {
Pier1f87aca2018-03-14 16:47:32 -0700197 case SOURCES_ADDED:
198 // FIXME To be addressed with multiple sources support
199 // Get all the sinks
200 //Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
201 // Compute the Mcast tree
202 //Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
203 // Process the given sinks using the pre-computed paths
204 //mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100205 break;
Pier1f87aca2018-03-14 16:47:32 -0700206 case SOURCES_REMOVED:
207 // FIXME To be addressed with multiple sources support
Pier Luigid29ca7c2018-02-28 17:24:03 +0100208 // Get old source
Pier1f87aca2018-03-14 16:47:32 -0700209 //ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100210 // Just the first cached element will be processed
Pier1f87aca2018-03-14 16:47:32 -0700211 //processSourceUpdatedInternal(mcastIp, source, oldSource);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100212 break;
213 case ROUTE_REMOVED:
214 // Process the route removed, just the first cached element will be processed
Pier7b657162018-03-27 11:29:42 -0700215 processRouteRemovedInternal(source, mcastIp);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100216 break;
Pier1f87aca2018-03-14 16:47:32 -0700217 case SINKS_ADDED:
Pier7b657162018-03-27 11:29:42 -0700218 // FIXME To be addressed with multiple sources support
219 processSinksAddedInternal(source, mcastIp,
220 mcastUpdate.sinks(), prevSinks);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100221 break;
Pier1f87aca2018-03-14 16:47:32 -0700222 case SINKS_REMOVED:
Pier7b657162018-03-27 11:29:42 -0700223 // FIXME To be addressed with multiple sources support
224 processSinksRemovedInternal(source, mcastIp,
Pier28164682018-04-17 15:50:43 +0200225 mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigid29ca7c2018-02-28 17:24:03 +0100226 break;
227 default:
228 break;
229 }
230 }
231
Pier Luigi35dab3f2018-01-25 16:16:02 +0100232 // Mcast lock to serialize local operations
233 private final Lock mcastLock = new ReentrantLock();
234
235 /**
236 * Acquires the lock used when making mcast changes.
237 */
238 private void mcastLock() {
239 mcastLock.lock();
240 }
241
242 /**
243 * Releases the lock used when making mcast changes.
244 */
245 private void mcastUnlock() {
246 mcastLock.unlock();
247 }
248
249 // Stability threshold for Mcast. Seconds
250 private static final long MCAST_STABLITY_THRESHOLD = 5;
251 // Last change done
252 private Instant lastMcastChange = Instant.now();
253
254 /**
255 * Determines if mcast in the network has been stable in the last
256 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
257 * to the last mcast change timestamp.
258 *
259 * @return true if stable
260 */
261 private boolean isMcastStable() {
262 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
263 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Das97241862018-02-14 14:14:54 -0800264 log.trace("Mcast stable since {}s", now - last);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100265 return (now - last) > MCAST_STABLITY_THRESHOLD;
266 }
267
268 // Verify interval for Mcast
269 private static final long MCAST_VERIFY_INTERVAL = 30;
270
271 // Executor for mcast bucket corrector
272 private ScheduledExecutorService executorService
Pier Luigid29ca7c2018-02-28 17:24:03 +0100273 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100274
Charles Chan72779502016-04-23 17:36:10 -0700275 /**
Charles Chanc91c8782016-03-30 17:54:24 -0700276 * Constructs the McastEventHandler.
277 *
278 * @param srManager Segment Routing manager
279 */
Charles Chan1eaf4802016-04-18 13:44:03 -0700280 public McastHandler(SegmentRoutingManager srManager) {
Pier7b657162018-03-27 11:29:42 -0700281 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chanc91c8782016-03-30 17:54:24 -0700282 this.srManager = srManager;
Charles Chanc91c8782016-03-30 17:54:24 -0700283 this.topologyService = srManager.topologyService;
Pier7b657162018-03-27 11:29:42 -0700284 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chanc91c8782016-03-30 17:54:24 -0700285 .register(KryoNamespaces.API)
Charles Chan72779502016-04-23 17:36:10 -0700286 .register(McastStoreKey.class)
287 .register(McastRole.class);
Pier7b657162018-03-27 11:29:42 -0700288 mcastNextObjStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700289 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chanc91c8782016-03-30 17:54:24 -0700290 .withName("onos-mcast-nextobj-store")
Charles Chan4922a172016-05-23 16:45:45 -0700291 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chanc91c8782016-03-30 17:54:24 -0700292 .build();
Pier7b657162018-03-27 11:29:42 -0700293 mcastRoleStore = srManager.storageService
Charles Chan72779502016-04-23 17:36:10 -0700294 .<McastStoreKey, McastRole>consistentMapBuilder()
295 .withName("onos-mcast-role-store")
Charles Chan4922a172016-05-23 16:45:45 -0700296 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan72779502016-04-23 17:36:10 -0700297 .build();
Pier7b657162018-03-27 11:29:42 -0700298 // Let's create McastUtils object
299 mcastUtils = new McastUtils(srManager, coreAppId, log);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100300 // Init the executor service and the buckets corrector
301 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pier7b657162018-03-27 11:29:42 -0700302 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100303 // Schedule the clean up, this will allow the processing of the expired events
304 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
305 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan72779502016-04-23 17:36:10 -0700306 }
307
308 /**
309 * Read initial multicast from mcast store.
310 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100311 public void init() {
Pier7b657162018-03-27 11:29:42 -0700312 lastMcastChange = Instant.now();
313 mcastLock();
314 try {
315 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
316 // FIXME To be addressed with multiple sources support
317 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
318 .stream()
319 .findFirst()
320 .orElse(null);
321 // Get all the sinks and process them
322 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
323 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(), mcastRouteData.sinks());
324 // Filter out all the working sinks, we do not want to move them
325 sinks = sinks.stream()
326 .filter(sink -> {
327 McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
328 Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
329 return verMcastNext == null ||
330 !mcastUtils.getPorts(verMcastNext.value().next()).contains(sink.port());
331 })
332 .collect(Collectors.toSet());
333 // Compute the Mcast tree
334 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
335 // Process the given sinks using the pre-computed paths
336 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
337 mcastRoute.group(), paths));
338 });
339 } finally {
340 mcastUnlock();
341 }
Charles Chanc91c8782016-03-30 17:54:24 -0700342 }
343
344 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100345 * Clean up when deactivating the application.
346 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100347 public void terminate() {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100348 executorService.shutdown();
349 }
350
351 /**
Pier Luigid29ca7c2018-02-28 17:24:03 +0100352 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
353 * SINK_REMOVED and ROUTE_REMOVED events.
Charles Chanc91c8782016-03-30 17:54:24 -0700354 *
355 * @param event McastEvent with SOURCE_ADDED type
356 */
Pier Luigid29ca7c2018-02-28 17:24:03 +0100357 public void processMcastEvent(McastEvent event) {
358 log.info("process {}", event);
Pier Luigid29ca7c2018-02-28 17:24:03 +0100359 // Just enqueue for now
360 enqueueMcastEvent(event);
Pier Luigi6786b922018-02-02 16:19:11 +0100361 }
362
363 /**
Pier Luigie80d6b42018-02-26 12:31:38 +0100364 * Process the SOURCE_UPDATED event.
365 *
366 * @param newSource the updated srouce info
367 * @param oldSource the outdated source info
368 */
369 private void processSourceUpdatedInternal(IpAddress mcastIp,
370 ConnectPoint newSource,
371 ConnectPoint oldSource) {
372 lastMcastChange = Instant.now();
373 mcastLock();
374 try {
375 log.debug("Processing source updated for group {}", mcastIp);
376
377 // Build key for the store and retrieve old data
378 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
379
380 // Verify leadership on the operation
Pier7b657162018-03-27 11:29:42 -0700381 if (!mcastUtils.isLeader(oldSource)) {
Pier Luigie80d6b42018-02-26 12:31:38 +0100382 log.debug("Skip {} due to lack of leadership", mcastIp);
383 return;
384 }
385
386 // This device is not serving this multicast group
387 if (!mcastRoleStore.containsKey(mcastStoreKey) ||
388 !mcastNextObjStore.containsKey(mcastStoreKey)) {
389 log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
390 return;
391 }
392 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -0700393 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigie80d6b42018-02-26 12:31:38 +0100394
Pier Luigid29ca7c2018-02-28 17:24:03 +0100395 // This an optimization to avoid unnecessary removal and add
Pier7b657162018-03-27 11:29:42 -0700396 if (!mcastUtils.assignedVlanFromNext(nextObjective)
397 .equals(mcastUtils.assignedVlan(newSource))) {
Pier Luigid29ca7c2018-02-28 17:24:03 +0100398 // Let's remove old flows and groups
Pier7b657162018-03-27 11:29:42 -0700399 removeGroupFromDevice(oldSource.deviceId(), mcastIp, mcastUtils.assignedVlan(oldSource));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100400 // Push new flows and group
401 outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
Pier7b657162018-03-27 11:29:42 -0700402 mcastIp, mcastUtils.assignedVlan(newSource)));
Pier Luigid29ca7c2018-02-28 17:24:03 +0100403 }
Pier7b657162018-03-27 11:29:42 -0700404 mcastUtils.addFilterToDevice(newSource.deviceId(), newSource.port(),
405 mcastUtils.assignedVlan(newSource), mcastIp, INGRESS);
Pier Luigie80d6b42018-02-26 12:31:38 +0100406 // Setup mcast roles
407 mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
Pier979e61a2018-03-07 11:42:50 +0100408 INGRESS);
Pier Luigie80d6b42018-02-26 12:31:38 +0100409 } finally {
410 mcastUnlock();
411 }
412 }
413
414 /**
Pier Luigi6786b922018-02-02 16:19:11 +0100415 * Removes the entire mcast tree related to this group.
416 *
417 * @param mcastIp multicast group IP address
418 */
419 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
420 lastMcastChange = Instant.now();
421 mcastLock();
422 try {
Pier Luigie80d6b42018-02-26 12:31:38 +0100423 log.debug("Processing route removed for group {}", mcastIp);
Pier Luigi6786b922018-02-02 16:19:11 +0100424
425 // Find out the ingress, transit and egress device of the affected group
Pier979e61a2018-03-07 11:42:50 +0100426 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi6786b922018-02-02 16:19:11 +0100427 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700428 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100429 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi6786b922018-02-02 16:19:11 +0100430
431 // Verify leadership on the operation
Pier7b657162018-03-27 11:29:42 -0700432 if (!mcastUtils.isLeader(source)) {
Pier Luigi6786b922018-02-02 16:19:11 +0100433 log.debug("Skip {} due to lack of leadership", mcastIp);
434 return;
435 }
436
Pier1a7e0c02018-03-12 15:00:54 -0700437 // If there are no egress devices, sinks could be only on the ingress
Pier Luigi6786b922018-02-02 16:19:11 +0100438 if (!egressDevices.isEmpty()) {
439 egressDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700440 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier Luigi6786b922018-02-02 16:19:11 +0100441 );
442 }
Pier1a7e0c02018-03-12 15:00:54 -0700443 // Transit could be empty if sinks are on the ingress
444 if (!transitDevices.isEmpty()) {
445 transitDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700446 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier1a7e0c02018-03-12 15:00:54 -0700447 );
Pier Luigi6786b922018-02-02 16:19:11 +0100448 }
449 // Ingress device should be not null
450 if (ingressDevice != null) {
Pier7b657162018-03-27 11:29:42 -0700451 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi6786b922018-02-02 16:19:11 +0100452 }
Pier Luigi6786b922018-02-02 16:19:11 +0100453 } finally {
454 mcastUnlock();
455 }
456 }
457
Pier7b657162018-03-27 11:29:42 -0700458
459 /**
460 * Process sinks to be removed.
461 *
462 * @param source the source connect point
463 * @param mcastIp the ip address of the group
464 * @param newSinks the new sinks to be processed
Pier28164682018-04-17 15:50:43 +0200465 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700466 */
467 private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
468 Map<HostId, Set<ConnectPoint>> newSinks,
Pier28164682018-04-17 15:50:43 +0200469 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700470 lastMcastChange = Instant.now();
471 mcastLock();
Pier7b657162018-03-27 11:29:42 -0700472 try {
Pier28164682018-04-17 15:50:43 +0200473 // Remove the previous ones
474 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
475 newSinks);
476 sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp));
Pier7b657162018-03-27 11:29:42 -0700477 // Recover the dual-homed sinks
Pier28164682018-04-17 15:50:43 +0200478 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
479 prevSinks);
Pier7b657162018-03-27 11:29:42 -0700480 sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
Pier7b657162018-03-27 11:29:42 -0700481 } finally {
482 mcastUnlock();
Pier7b657162018-03-27 11:29:42 -0700483 }
484 }
485
Pier Luigi6786b922018-02-02 16:19:11 +0100486 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +0100487 * Removes a path from source to sink for given multicast group.
488 *
489 * @param source connect point of the multicast source
490 * @param sink connection point of the multicast sink
491 * @param mcastIp multicast group IP address
492 */
493 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700494 IpAddress mcastIp) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100495 lastMcastChange = Instant.now();
496 mcastLock();
497 try {
Pier Luigi6786b922018-02-02 16:19:11 +0100498 // Verify leadership on the operation
Pier7b657162018-03-27 11:29:42 -0700499 if (!mcastUtils.isLeader(source)) {
Pier Luigi6786b922018-02-02 16:19:11 +0100500 log.debug("Skip {} due to lack of leadership", mcastIp);
Charles Chanc91c8782016-03-30 17:54:24 -0700501 return;
502 }
Charles Chanc91c8782016-03-30 17:54:24 -0700503
Pier7b657162018-03-27 11:29:42 -0700504 boolean isLast;
Pier Luigi35dab3f2018-01-25 16:16:02 +0100505 // When source and sink are on the same device
506 if (source.deviceId().equals(sink.deviceId())) {
507 // Source and sink are on even the same port. There must be something wrong.
508 if (source.port().equals(sink.port())) {
509 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
510 mcastIp, sink, source);
511 return;
512 }
Pier7b657162018-03-27 11:29:42 -0700513 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi92e69be2018-03-02 12:53:37 +0100514 if (isLast) {
515 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
516 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100517 return;
518 }
Charles Chanc91c8782016-03-30 17:54:24 -0700519
Pier Luigi35dab3f2018-01-25 16:16:02 +0100520 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700521 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100522 if (isLast) {
523 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
524 }
525
526 // If this is the last sink on the device, also update upstream
Pier1f87aca2018-03-14 16:47:32 -0700527 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
528 mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100529 if (mcastPath.isPresent()) {
530 List<Link> links = Lists.newArrayList(mcastPath.get().links());
531 Collections.reverse(links);
532 for (Link link : links) {
533 if (isLast) {
534 isLast = removePortFromDevice(
535 link.src().deviceId(),
536 link.src().port(),
537 mcastIp,
Pier7b657162018-03-27 11:29:42 -0700538 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ?
539 source : null)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100540 );
Pier Luigi92e69be2018-03-02 12:53:37 +0100541 if (isLast) {
542 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
543 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100544 }
Charles Chanc91c8782016-03-30 17:54:24 -0700545 }
546 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100547 } finally {
548 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700549 }
550 }
551
Pier7b657162018-03-27 11:29:42 -0700552
553 /**
554 * Process sinks to be added.
555 *
556 * @param source the source connect point
557 * @param mcastIp the group IP
558 * @param newSinks the new sinks to be processed
559 * @param allPrevSinks all previous sinks
560 */
561 private void processSinksAddedInternal(ConnectPoint source, IpAddress mcastIp,
562 Map<HostId, Set<ConnectPoint>> newSinks,
563 Set<ConnectPoint> allPrevSinks) {
564 lastMcastChange = Instant.now();
565 mcastLock();
566 try {
567 // Get the only sinks to be processed (new ones)
568 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
569 // Install new sinks
570 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
571 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
572 } finally {
573 mcastUnlock();
574 }
575 }
576
Charles Chanc91c8782016-03-30 17:54:24 -0700577 /**
578 * Establishes a path from source to sink for given multicast group.
579 *
580 * @param source connect point of the multicast source
581 * @param sink connection point of the multicast sink
582 * @param mcastIp multicast group IP address
583 */
584 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pier7b657162018-03-27 11:29:42 -0700585 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100586 lastMcastChange = Instant.now();
587 mcastLock();
588 try {
589 // Continue only when this instance is the master of source device
590 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
591 log.debug("Skip {} due to lack of mastership of the source device {}",
592 mcastIp, source.deviceId());
Charles Chanc91c8782016-03-30 17:54:24 -0700593 return;
594 }
Charles Chanc91c8782016-03-30 17:54:24 -0700595
Pier Luigi35dab3f2018-01-25 16:16:02 +0100596 // Process the ingress device
Pier7b657162018-03-27 11:29:42 -0700597 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
598 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Charles Chan72779502016-04-23 17:36:10 -0700599
Pier Luigi35dab3f2018-01-25 16:16:02 +0100600 // When source and sink are on the same device
601 if (source.deviceId().equals(sink.deviceId())) {
602 // Source and sink are on even the same port. There must be something wrong.
603 if (source.port().equals(sink.port())) {
604 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
605 mcastIp, sink, source);
606 return;
607 }
Pier7b657162018-03-27 11:29:42 -0700608 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier979e61a2018-03-07 11:42:50 +0100609 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100610 return;
611 }
Charles Chan72779502016-04-23 17:36:10 -0700612
Pier Luigi35dab3f2018-01-25 16:16:02 +0100613 // Find a path. If present, create/update groups and flows for each hop
Pier1f87aca2018-03-14 16:47:32 -0700614 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
615 mcastIp, allPaths);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100616 if (mcastPath.isPresent()) {
617 List<Link> links = mcastPath.get().links();
Charles Chan72779502016-04-23 17:36:10 -0700618
Pier1a7e0c02018-03-12 15:00:54 -0700619 // Setup mcast role for ingress
620 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
621 INGRESS);
622
623 // Setup properly the transit
Pier Luigi35dab3f2018-01-25 16:16:02 +0100624 links.forEach(link -> {
625 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -0700626 mcastUtils.assignedVlan(link.src().deviceId()
627 .equals(source.deviceId()) ? source : null));
628 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
629 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100630 });
631
Pier1a7e0c02018-03-12 15:00:54 -0700632 // Setup mcast role for the transit
633 links.stream()
634 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
635 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.dst().deviceId()),
636 TRANSIT));
637
Pier Luigi35dab3f2018-01-25 16:16:02 +0100638 // Process the egress device
Pier7b657162018-03-27 11:29:42 -0700639 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier1a7e0c02018-03-12 15:00:54 -0700640 // Setup mcast role for egress
Pier Luigi35dab3f2018-01-25 16:16:02 +0100641 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
Pier979e61a2018-03-07 11:42:50 +0100642 EGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100643 } else {
644 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
645 source.deviceId(), sink.deviceId());
646 }
647 } finally {
648 mcastUnlock();
Charles Chanc91c8782016-03-30 17:54:24 -0700649 }
650 }
651
652 /**
Charles Chan72779502016-04-23 17:36:10 -0700653 * Processes the LINK_DOWN event.
654 *
655 * @param affectedLink Link that is going down
656 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100657 public void processLinkDown(Link affectedLink) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100658 lastMcastChange = Instant.now();
659 mcastLock();
660 try {
661 // Get groups affected by the link down event
662 getAffectedGroups(affectedLink).forEach(mcastIp -> {
663 // TODO Optimize when the group editing is in place
664 log.debug("Processing link down {} for group {}",
665 affectedLink, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100666
Pier Luigi35dab3f2018-01-25 16:16:02 +0100667 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100668 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100669 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700670 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100671 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier7b657162018-03-27 11:29:42 -0700672 ConnectPoint source = mcastUtils.getSource(mcastIp);
Charles Chana8f9dee2016-05-16 18:44:13 -0700673
Pier1a7e0c02018-03-12 15:00:54 -0700674 // Do not proceed if ingress device or source of this group are missing
675 // If sinks are in other leafs, we have ingress, transit, egress, and source
676 // If sinks are in the same leaf, we have just ingress and source
677 if (ingressDevice == null || source == null) {
678 log.warn("Missing ingress {} or source {} for group {}",
679 ingressDevice, source, mcastIp);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100680 return;
Charles Chan72779502016-04-23 17:36:10 -0700681 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100682
683 // Continue only when this instance is the master of source device
684 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
685 log.debug("Skip {} due to lack of mastership of the source device {}",
Pier1f87aca2018-03-14 16:47:32 -0700686 mcastIp, source.deviceId());
Pier Luigi35dab3f2018-01-25 16:16:02 +0100687 return;
688 }
689
690 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700691 transitDevices.forEach(transitDevice ->
Pier7b657162018-03-27 11:29:42 -0700692 removeGroupFromDevice(transitDevice, mcastIp,
693 mcastUtils.assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100694
Pier1a7e0c02018-03-12 15:00:54 -0700695 // Remove transit-facing ports on the ingress device
696 removeIngressTransitPorts(mcastIp, ingressDevice, source);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100697
Pier7b657162018-03-27 11:29:42 -0700698 // TODO create a shared procedure with DEVICE_DOWN
Pier1f87aca2018-03-14 16:47:32 -0700699 // Compute mcast tree for the the egress devices
700 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
701
Pier7b657162018-03-27 11:29:42 -0700702 // We have to verify, if there are egresses without paths
703 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier1f87aca2018-03-14 16:47:32 -0700704 mcastTree.forEach((egressDevice, paths) -> {
Pier7b657162018-03-27 11:29:42 -0700705 // Let's check if there is at least a path
Pier1f87aca2018-03-14 16:47:32 -0700706 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
707 mcastIp, paths);
Pier7b657162018-03-27 11:29:42 -0700708 // No paths, we have to try with alternative location
709 if (!mcastPath.isPresent()) {
710 notRecovered.add(egressDevice);
711 // We were not able to find an alternative path for this egress
Pier Luigi35dab3f2018-01-25 16:16:02 +0100712 log.warn("Fail to recover egress device {} from link failure {}",
713 egressDevice, affectedLink);
Pier7b657162018-03-27 11:29:42 -0700714 removeGroupFromDevice(egressDevice, mcastIp,
715 mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100716 }
717 });
Pier7b657162018-03-27 11:29:42 -0700718
719 // Fast path, we can recover all the locations
720 if (notRecovered.isEmpty()) {
721 // Construct a new path for each egress device
722 mcastTree.forEach((egressDevice, paths) -> {
723 // We try to enforce the sinks path on the mcast tree
724 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
725 mcastIp, paths);
726 // If a path is present, let's install it
727 if (mcastPath.isPresent()) {
728 installPath(mcastIp, source, mcastPath.get());
729 }
730 });
731 } else {
732 // Let's try to recover using alternate
733 recoverSinks(egressDevices, notRecovered, mcastIp,
734 ingressDevice, source, true);
735 }
Charles Chan72779502016-04-23 17:36:10 -0700736 });
Pier Luigi35dab3f2018-01-25 16:16:02 +0100737 } finally {
738 mcastUnlock();
739 }
Charles Chan72779502016-04-23 17:36:10 -0700740 }
741
742 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +0100743 * Process the DEVICE_DOWN event.
744 *
745 * @param deviceDown device going down
746 */
Pier Luigi69f774d2018-02-28 12:10:50 +0100747 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100748 lastMcastChange = Instant.now();
749 mcastLock();
750 try {
751 // Get the mcast groups affected by the device going down
752 getAffectedGroups(deviceDown).forEach(mcastIp -> {
753 // TODO Optimize when the group editing is in place
754 log.debug("Processing device down {} for group {}",
755 deviceDown, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100756
Pier Luigi35dab3f2018-01-25 16:16:02 +0100757 // Find out the ingress, transit and egress device of affected group
Pier979e61a2018-03-07 11:42:50 +0100758 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +0100759 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -0700760 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +0100761 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier7b657162018-03-27 11:29:42 -0700762 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100763
Pier Luigi35dab3f2018-01-25 16:16:02 +0100764 // Do not proceed if ingress device or source of this group are missing
765 // If sinks are in other leafs, we have ingress, transit, egress, and source
766 // If sinks are in the same leaf, we have just ingress and source
767 if (ingressDevice == null || source == null) {
768 log.warn("Missing ingress {} or source {} for group {}",
769 ingressDevice, source, mcastIp);
Pier Luigi580fd8a2018-01-16 10:47:50 +0100770 return;
771 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100772
Pier Luigi6786b922018-02-02 16:19:11 +0100773 // Verify leadership on the operation
Pier7b657162018-03-27 11:29:42 -0700774 if (!mcastUtils.isLeader(source)) {
Pier Luigi6786b922018-02-02 16:19:11 +0100775 log.debug("Skip {} due to lack of leadership", mcastIp);
776 return;
Pier Luigi580fd8a2018-01-16 10:47:50 +0100777 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100778
779 // If it exists, we have to remove it in any case
Pier1a7e0c02018-03-12 15:00:54 -0700780 if (!transitDevices.isEmpty()) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100781 // Remove entire transit
Pier1a7e0c02018-03-12 15:00:54 -0700782 transitDevices.forEach(transitDevice ->
Pier7b657162018-03-27 11:29:42 -0700783 removeGroupFromDevice(transitDevice, mcastIp,
784 mcastUtils.assignedVlan(null)));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100785 }
786 // If the ingress is down
787 if (ingressDevice.equals(deviceDown)) {
788 // Remove entire ingress
Pier7b657162018-03-27 11:29:42 -0700789 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100790 // If other sinks different from the ingress exist
791 if (!egressDevices.isEmpty()) {
792 // Remove all the remaining egress
793 egressDevices.forEach(
Pier7b657162018-03-27 11:29:42 -0700794 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp,
795 mcastUtils.assignedVlan(null))
Pier Luigi35dab3f2018-01-25 16:16:02 +0100796 );
Pier Luigi580fd8a2018-01-16 10:47:50 +0100797 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100798 } else {
799 // Egress or transit could be down at this point
Pier1a7e0c02018-03-12 15:00:54 -0700800 // Get the ingress-transit ports if they exist
801 removeIngressTransitPorts(mcastIp, ingressDevice, source);
802
Pier Luigi35dab3f2018-01-25 16:16:02 +0100803 // One of the egress device is down
804 if (egressDevices.contains(deviceDown)) {
805 // Remove entire device down
Pier7b657162018-03-27 11:29:42 -0700806 removeGroupFromDevice(deviceDown, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100807 // Remove the device down from egress
808 egressDevices.remove(deviceDown);
809 // If there are no more egress and ingress does not have sinks
810 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
Pier Luigi35dab3f2018-01-25 16:16:02 +0100811 // We have done
812 return;
813 }
814 }
Pier1f87aca2018-03-14 16:47:32 -0700815
816 // Compute mcast tree for the the egress devices
817 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
818
Pier7b657162018-03-27 11:29:42 -0700819 // We have to verify, if there are egresses without paths
820 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier1f87aca2018-03-14 16:47:32 -0700821 mcastTree.forEach((egressDevice, paths) -> {
Pier7b657162018-03-27 11:29:42 -0700822 // Let's check if there is at least a path
Pier1f87aca2018-03-14 16:47:32 -0700823 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
Pier7b657162018-03-27 11:29:42 -0700824 mcastIp, paths);
825 // No paths, we have to try with alternative location
826 if (!mcastPath.isPresent()) {
827 notRecovered.add(egressDevice);
Pier Luigi35dab3f2018-01-25 16:16:02 +0100828 // We were not able to find an alternative path for this egress
829 log.warn("Fail to recover egress device {} from device down {}",
830 egressDevice, deviceDown);
Pier7b657162018-03-27 11:29:42 -0700831 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigi35dab3f2018-01-25 16:16:02 +0100832 }
833 });
Pier7b657162018-03-27 11:29:42 -0700834
835 // Fast path, we can recover all the locations
836 if (notRecovered.isEmpty()) {
837 // Construct a new path for each egress device
838 mcastTree.forEach((egressDevice, paths) -> {
839 // We try to enforce the sinks path on the mcast tree
840 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
841 mcastIp, paths);
842 // If a path is present, let's install it
843 if (mcastPath.isPresent()) {
844 installPath(mcastIp, source, mcastPath.get());
845 }
846 });
847 } else {
848 // Let's try to recover using alternate
849 recoverSinks(egressDevices, notRecovered, mcastIp,
850 ingressDevice, source, false);
851 }
Pier Luigi35dab3f2018-01-25 16:16:02 +0100852 }
853 });
854 } finally {
855 mcastUnlock();
856 }
Pier Luigi580fd8a2018-01-16 10:47:50 +0100857 }
858
859 /**
Pier7b657162018-03-27 11:29:42 -0700860 * Try to recover sinks using alternate locations.
861 *
862 * @param egressDevices the original egress devices
863 * @param notRecovered the devices not recovered
864 * @param mcastIp the group address
865 * @param ingressDevice the ingress device
866 * @param source the source connect point
867 * @param isLinkFailure true if it is a link failure, otherwise false
868 */
869 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
870 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source,
871 boolean isLinkFailure) {
872 // Recovered devices
873 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
874 // Total affected sinks
875 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
876 // Total sinks
877 Set<ConnectPoint> totalSinks = Sets.newHashSet();
878 // Let's compute all the affected sinks and all the sinks
879 notRecovered.forEach(deviceId -> {
880 totalAffectedSinks.addAll(
881 mcastUtils.getAffectedSinks(deviceId, mcastIp)
882 .values()
883 .stream()
884 .flatMap(Collection::stream)
885 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
886 .collect(Collectors.toSet())
887 );
888 totalSinks.addAll(
889 mcastUtils.getAffectedSinks(deviceId, mcastIp)
890 .values()
891 .stream()
892 .flatMap(Collection::stream)
893 .collect(Collectors.toSet())
894 );
895 });
896
897 // Sinks to be added
898 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
899 // New egress devices, filtering out the source
900 Set<DeviceId> newEgressDevice = sinksToBeAdded.stream()
901 .map(ConnectPoint::deviceId)
902 .collect(Collectors.toSet());
903 // Let's add the devices recovered from the previous round
904 newEgressDevice.addAll(recovered);
905 // Let's do a copy of the new egresses and filter out the source
906 Set<DeviceId> copyNewEgressDevice = ImmutableSet.copyOf(newEgressDevice);
907 newEgressDevice = newEgressDevice.stream()
908 .filter(deviceId -> !deviceId.equals(ingressDevice))
909 .collect(Collectors.toSet());
910
911 // Re-compute mcast tree for the the egress devices
912 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevice);
913 // if the source was originally in the new locations, add new sinks
914 if (copyNewEgressDevice.contains(ingressDevice)) {
915 sinksToBeAdded.stream()
916 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
917 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
918 }
919
920 // Construct a new path for each egress device
921 mcastTree.forEach((egressDevice, paths) -> {
922 // We try to enforce the sinks path on the mcast tree
923 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
924 mcastIp, paths);
925 // If a path is present, let's install it
926 if (mcastPath.isPresent()) {
927 // Using recovery procedure
928 if (recovered.contains(egressDevice)) {
929 installPath(mcastIp, source, mcastPath.get());
930 } else {
931 // otherwise we need to threat as new sink
932 sinksToBeAdded.stream()
933 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
934 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
935 }
936 } else {
937 // We were not able to find an alternative path for this egress
938 log.warn("Fail to recover egress device {} from {} failure",
939 egressDevice, isLinkFailure ? "Link" : "Device");
940 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
941 }
942 });
943
944 }
945
946 /**
Pier28164682018-04-17 15:50:43 +0200947 * Process all the sinks related to a mcast group and return
948 * the ones to be removed.
949 *
950 * @param mcastIp the group address
951 * @param prevsinks the previous sinks to be evaluated
952 * @param newSinks the new sinks to be evaluted
953 * @return the set of the sinks to be removed
954 */
955 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
956 Map<HostId, Set<ConnectPoint>> prevsinks,
957 Map<HostId, Set<ConnectPoint>> newSinks) {
958 // Iterate over the sinks in order to build the set
959 // of the connect points to be removed from this group
960 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
961 prevsinks.forEach(((hostId, connectPoints) -> {
962 // We have to check with the existing flows
963 ConnectPoint sinkToBeProcessed = connectPoints.stream()
964 .filter(connectPoint -> isSink(mcastIp, connectPoint))
965 .findFirst().orElse(null);
966 if (sinkToBeProcessed != null) {
967 // If the host has been removed or location has been removed
968 if (!newSinks.containsKey(hostId) ||
969 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
970 sinksToBeProcessed.add(sinkToBeProcessed);
971 }
972 }
973 }));
974 // We have done, return the set
975 return sinksToBeProcessed;
976 }
977
978 /**
Pier7b657162018-03-27 11:29:42 -0700979 * Process new locations and return the set of sinks to be added
980 * in the context of the recovery.
981 *
Pier28164682018-04-17 15:50:43 +0200982 * @param newSinks the remaining sinks
983 * @param prevSinks the previous sinks
Pier7b657162018-03-27 11:29:42 -0700984 * @return the set of the sinks to be processed
985 */
986 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
Pier28164682018-04-17 15:50:43 +0200987 Map<HostId, Set<ConnectPoint>> newSinks,
988 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pier7b657162018-03-27 11:29:42 -0700989 // Iterate over the sinks in order to build the set
990 // of the connect points to be served by this group
991 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier28164682018-04-17 15:50:43 +0200992 newSinks.forEach((hostId, connectPoints) -> {
Pier7b657162018-03-27 11:29:42 -0700993 // If it has more than 1 locations
994 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
995 log.debug("Skip {} since sink {} has {} locations",
996 mcastIp, hostId, connectPoints.size());
997 return;
998 }
Pier28164682018-04-17 15:50:43 +0200999 // If previously it had two locations, we need to recover it
1000 // Filter out if the remaining location is already served
1001 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
1002 sinksToBeProcessed.add(connectPoints.stream()
1003 .filter(connectPoint -> !isSink(mcastIp, connectPoint))
1004 .findFirst().orElseGet(null));
1005 }
Pier7b657162018-03-27 11:29:42 -07001006 });
1007 return sinksToBeProcessed;
1008 }
1009
1010 /**
1011 * Process all the sinks related to a mcast group and return
1012 * the ones to be processed.
1013 *
1014 * @param source the source connect point
1015 * @param mcastIp the group address
1016 * @param sinks the sinks to be evaluated
1017 * @return the set of the sinks to be processed
1018 */
1019 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1020 Map<HostId, Set<ConnectPoint>> sinks) {
1021 // Iterate over the sinks in order to build the set
1022 // of the connect points to be served by this group
1023 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
1024 sinks.forEach(((hostId, connectPoints) -> {
1025 // If it has more than 2 locations
1026 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1027 log.debug("Skip {} since sink {} has {} locations",
1028 mcastIp, hostId, connectPoints.size());
1029 return;
1030 }
1031 // If it has one location, just use it
1032 if (connectPoints.size() == 1) {
1033 sinksToBeProcessed.add(connectPoints.stream()
1034 .findFirst().orElseGet(null));
1035 return;
1036 }
1037 // We prefer to reuse existing flows
1038 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Pier28164682018-04-17 15:50:43 +02001039 .filter(connectPoint -> isSink(mcastIp, connectPoint))
Pier7b657162018-03-27 11:29:42 -07001040 .findFirst().orElse(null);
1041 if (sinkToBeProcessed != null) {
1042 sinksToBeProcessed.add(sinkToBeProcessed);
1043 return;
1044 }
1045 // Otherwise we prefer to reuse existing egresses
1046 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
1047 sinkToBeProcessed = connectPoints.stream()
Pier28164682018-04-17 15:50:43 +02001048 .filter(connectPoint -> egresses.contains(connectPoint.deviceId()))
Pier7b657162018-03-27 11:29:42 -07001049 .findFirst().orElse(null);
1050 if (sinkToBeProcessed != null) {
1051 sinksToBeProcessed.add(sinkToBeProcessed);
1052 return;
1053 }
1054 // Otherwise we prefer a location co-located with the source (if it exists)
1055 sinkToBeProcessed = connectPoints.stream()
1056 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1057 .findFirst().orElse(null);
1058 if (sinkToBeProcessed != null) {
1059 sinksToBeProcessed.add(sinkToBeProcessed);
1060 return;
1061 }
1062 // Finally, we randomly pick a new location
1063 sinksToBeProcessed.add(connectPoints.stream()
1064 .findFirst().orElseGet(null));
1065 }));
1066 // We have done, return the set
1067 return sinksToBeProcessed;
1068 }
1069
1070 /**
Pier1a7e0c02018-03-12 15:00:54 -07001071 * Utility method to remove all the ingress transit ports.
1072 *
1073 * @param mcastIp the group ip
1074 * @param ingressDevice the ingress device for this group
1075 * @param source the source connect point
1076 */
1077 private void removeIngressTransitPorts(IpAddress mcastIp, DeviceId ingressDevice,
1078 ConnectPoint source) {
1079 Set<PortNumber> ingressTransitPorts = ingressTransitPort(mcastIp);
1080 ingressTransitPorts.forEach(ingressTransitPort -> {
1081 if (ingressTransitPort != null) {
1082 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
Pier7b657162018-03-27 11:29:42 -07001083 mcastIp, mcastUtils.assignedVlan(source));
Pier1a7e0c02018-03-12 15:00:54 -07001084 if (isLast) {
1085 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
1086 }
1087 }
1088 });
1089 }
1090
1091 /**
Charles Chanc91c8782016-03-30 17:54:24 -07001092 * Adds a port to given multicast group on given device. This involves the
1093 * update of L3 multicast group and multicast routing table entry.
1094 *
1095 * @param deviceId device ID
1096 * @param port port to be added
1097 * @param mcastIp multicast group
1098 * @param assignedVlan assigned VLAN ID
1099 */
1100 private void addPortToDevice(DeviceId deviceId, PortNumber port,
Pier7b657162018-03-27 11:29:42 -07001101 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001102 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -07001103 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi4f0dd212018-01-19 10:24:53 +01001104 NextObjective newNextObj;
Charles Chan72779502016-04-23 17:36:10 -07001105 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001106 // First time someone request this mcast group via this device
1107 portBuilder.add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001108 // New nextObj
Pier7b657162018-03-27 11:29:42 -07001109 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001110 portBuilder.build(), null).add();
1111 // Store the new port
1112 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001113 } else {
1114 // This device already serves some subscribers of this mcast group
Charles Chan72779502016-04-23 17:36:10 -07001115 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001116 // Stop if the port is already in the nextobj
Pier7b657162018-03-27 11:29:42 -07001117 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chanc91c8782016-03-30 17:54:24 -07001118 if (existingPorts.contains(port)) {
1119 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1120 return;
1121 }
Pier Luigi4f0dd212018-01-19 10:24:53 +01001122 // Let's add the port and reuse the previous one
Yuta HIGUCHIbef07b52018-02-09 18:05:23 -08001123 portBuilder.addAll(existingPorts).add(port);
Pier Luigi4f0dd212018-01-19 10:24:53 +01001124 // Reuse previous nextObj
Pier7b657162018-03-27 11:29:42 -07001125 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001126 portBuilder.build(), nextObj.id()).addToExisting();
1127 // Store the final next objective and send only the difference to the driver
1128 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1129 // Add just the new port
1130 portBuilder = ImmutableSet.builder();
1131 portBuilder.add(port);
Pier7b657162018-03-27 11:29:42 -07001132 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi4f0dd212018-01-19 10:24:53 +01001133 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chanc91c8782016-03-30 17:54:24 -07001134 }
1135 // Create, store and apply the new nextObj and fwdObj
Charles Chan72779502016-04-23 17:36:10 -07001136 ObjectiveContext context = new DefaultObjectiveContext(
1137 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1138 mcastIp, deviceId, port.toLong(), assignedVlan),
1139 (objective, error) ->
1140 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1141 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001142 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1143 newNextObj.id()).add(context);
Charles Chanc91c8782016-03-30 17:54:24 -07001144 srManager.flowObjectiveService.next(deviceId, newNextObj);
1145 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001146 }
1147
1148 /**
1149 * Removes a port from given multicast group on given device.
1150 * This involves the update of L3 multicast group and multicast routing
1151 * table entry.
1152 *
1153 * @param deviceId device ID
1154 * @param port port to be added
1155 * @param mcastIp multicast group
1156 * @param assignedVlan assigned VLAN ID
1157 * @return true if this is the last sink on this device
1158 */
1159 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
Pier7b657162018-03-27 11:29:42 -07001160 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001161 McastStoreKey mcastStoreKey =
1162 new McastStoreKey(mcastIp, deviceId);
Charles Chanc91c8782016-03-30 17:54:24 -07001163 // This device is not serving this multicast group
Charles Chan72779502016-04-23 17:36:10 -07001164 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chanc91c8782016-03-30 17:54:24 -07001165 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1166 return false;
1167 }
Charles Chan72779502016-04-23 17:36:10 -07001168 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chanc91c8782016-03-30 17:54:24 -07001169
Pier7b657162018-03-27 11:29:42 -07001170 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan72779502016-04-23 17:36:10 -07001171 // This port does not serve this multicast group
Charles Chanc91c8782016-03-30 17:54:24 -07001172 if (!existingPorts.contains(port)) {
1173 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1174 return false;
1175 }
1176 // Copy and modify the ImmutableSet
1177 existingPorts = Sets.newHashSet(existingPorts);
1178 existingPorts.remove(port);
1179
1180 NextObjective newNextObj;
Pier Luigi8cd46de2018-01-19 10:24:53 +01001181 ObjectiveContext context;
Charles Chanc91c8782016-03-30 17:54:24 -07001182 ForwardingObjective fwdObj;
1183 if (existingPorts.isEmpty()) {
Pier Luigi8cd46de2018-01-19 10:24:53 +01001184 // If this is the last sink, remove flows and last bucket
Charles Chanc91c8782016-03-30 17:54:24 -07001185 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1186 // remove L3MG since there might be other flows/groups refer to
1187 // the same L2IG
Pier Luigi8cd46de2018-01-19 10:24:53 +01001188 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001189 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1190 mcastIp, deviceId, port.toLong(), assignedVlan),
1191 (objective, error) ->
1192 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
1193 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001194 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001195 mcastNextObjStore.remove(mcastStoreKey);
Charles Chanc91c8782016-03-30 17:54:24 -07001196 } else {
1197 // If this is not the last sink, update flows and groups
Pier Luigi8cd46de2018-01-19 10:24:53 +01001198 context = new DefaultObjectiveContext(
Charles Chan72779502016-04-23 17:36:10 -07001199 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1200 mcastIp, deviceId, port.toLong(), assignedVlan),
1201 (objective, error) ->
1202 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
1203 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigi8cd46de2018-01-19 10:24:53 +01001204 // Here we store the next objective with the remaining port
Pier7b657162018-03-27 11:29:42 -07001205 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001206 existingPorts, nextObj.id()).removeFromExisting();
Pier7b657162018-03-27 11:29:42 -07001207 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan72779502016-04-23 17:36:10 -07001208 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001209 }
Pier Luigi8cd46de2018-01-19 10:24:53 +01001210 // Let's modify the next objective removing the bucket
Pier7b657162018-03-27 11:29:42 -07001211 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi8cd46de2018-01-19 10:24:53 +01001212 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
1213 srManager.flowObjectiveService.next(deviceId, newNextObj);
1214 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chanc91c8782016-03-30 17:54:24 -07001215 return existingPorts.isEmpty();
1216 }
1217
Charles Chan72779502016-04-23 17:36:10 -07001218 /**
1219 * Removes entire group on given device.
1220 *
1221 * @param deviceId device ID
1222 * @param mcastIp multicast group to be removed
1223 * @param assignedVlan assigned VLAN ID
1224 */
1225 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
Pier7b657162018-03-27 11:29:42 -07001226 VlanId assignedVlan) {
Charles Chan72779502016-04-23 17:36:10 -07001227 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
1228 // This device is not serving this multicast group
1229 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1230 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1231 return;
1232 }
1233 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1234 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1235 // remove L3MG since there might be other flows/groups refer to
1236 // the same L2IG
1237 ObjectiveContext context = new DefaultObjectiveContext(
1238 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1239 mcastIp, deviceId, assignedVlan),
1240 (objective, error) ->
1241 log.warn("Failed to remove {} on {}, vlan {}: {}",
1242 mcastIp, deviceId, assignedVlan, error));
Pier7b657162018-03-27 11:29:42 -07001243 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan72779502016-04-23 17:36:10 -07001244 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1245 mcastNextObjStore.remove(mcastStoreKey);
1246 mcastRoleStore.remove(mcastStoreKey);
1247 }
1248
Pier Luigi580fd8a2018-01-16 10:47:50 +01001249 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
1250 // Get Links
1251 List<Link> links = mcastPath.links();
Pier1a7e0c02018-03-12 15:00:54 -07001252
1253 // Setup new ingress mcast role
1254 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).src().deviceId()),
1255 INGRESS);
1256
Pier Luigi580fd8a2018-01-16 10:47:50 +01001257 // For each link, modify the next on the source device adding the src port
1258 // and a new filter objective on the destination port
1259 links.forEach(link -> {
1260 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pier7b657162018-03-27 11:29:42 -07001261 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1262 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1263 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigi580fd8a2018-01-16 10:47:50 +01001264 });
Pier1a7e0c02018-03-12 15:00:54 -07001265
1266 // Setup mcast role for the transit
1267 links.stream()
1268 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
1269 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.src().deviceId()),
1270 TRANSIT));
Charles Chan72779502016-04-23 17:36:10 -07001271 }
1272
Charles Chanc91c8782016-03-30 17:54:24 -07001273 /**
Pier1f87aca2018-03-14 16:47:32 -07001274 * Go through all the paths, looking for shared links to be used
1275 * in the final path computation.
1276 *
1277 * @param egresses egress devices
1278 * @param availablePaths all the available paths towards the egress
1279 * @return shared links between egress devices
1280 */
1281 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1282 Map<DeviceId, List<Path>> availablePaths) {
1283 // Length of the shortest path
1284 int minLength = Integer.MAX_VALUE;
1285 int length;
1286 // Current paths
1287 List<Path> currentPaths;
1288 // Verify the source can still reach all the egresses
1289 for (DeviceId egress : egresses) {
1290 // From the source we cannot reach all the sinks
Pier7b657162018-03-27 11:29:42 -07001291 // just continue and let's figure out after
Pier1f87aca2018-03-14 16:47:32 -07001292 currentPaths = availablePaths.get(egress);
1293 if (currentPaths.isEmpty()) {
1294 continue;
1295 }
1296 // Get the length of the first one available,
Pier7b657162018-03-27 11:29:42 -07001297 // update the min length
Pier1f87aca2018-03-14 16:47:32 -07001298 length = currentPaths.get(0).links().size();
1299 if (length < minLength) {
1300 minLength = length;
1301 }
Pier Luigi51ee7c02018-02-23 19:57:40 +01001302 }
Pier1f87aca2018-03-14 16:47:32 -07001303 // If there are no paths
1304 if (minLength == Integer.MAX_VALUE) {
1305 return Collections.emptySet();
1306 }
1307 // Iterate looking for shared links
1308 int index = 0;
1309 // Define the sets for the intersection
1310 Set<Link> sharedLinks = Sets.newHashSet();
1311 Set<Link> currentSharedLinks;
1312 Set<Link> currentLinks;
Pier7b657162018-03-27 11:29:42 -07001313 DeviceId egressToRemove = null;
Pier1f87aca2018-03-14 16:47:32 -07001314 // Let's find out the shared links
1315 while (index < minLength) {
1316 // Initialize the intersection with the paths related to the first egress
1317 currentPaths = availablePaths.get(
1318 egresses.stream()
1319 .findFirst()
1320 .orElse(null)
1321 );
1322 currentSharedLinks = Sets.newHashSet();
1323 // Iterate over the paths and take the "index" links
1324 for (Path path : currentPaths) {
1325 currentSharedLinks.add(path.links().get(index));
1326 }
1327 // Iterate over the remaining egress
1328 for (DeviceId egress : egresses) {
1329 // Iterate over the paths and take the "index" links
1330 currentLinks = Sets.newHashSet();
1331 for (Path path : availablePaths.get(egress)) {
1332 currentLinks.add(path.links().get(index));
1333 }
1334 // Do intersection
1335 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1336 // If there are no shared paths exit and record the device to remove
1337 // we have to retry with a subset of sinks
1338 if (currentSharedLinks.isEmpty()) {
Pier7b657162018-03-27 11:29:42 -07001339 egressToRemove = egress;
Pier1f87aca2018-03-14 16:47:32 -07001340 index = minLength;
1341 break;
1342 }
1343 }
1344 sharedLinks.addAll(currentSharedLinks);
1345 index++;
1346 }
1347 // If the shared links is empty and there are egress
1348 // let's retry another time with less sinks, we can
1349 // still build optimal subtrees
Pier7b657162018-03-27 11:29:42 -07001350 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1351 egresses.remove(egressToRemove);
Pier1f87aca2018-03-14 16:47:32 -07001352 sharedLinks = exploreMcastTree(egresses, availablePaths);
1353 }
1354 return sharedLinks;
1355 }
1356
1357 /**
1358 * Build Mcast tree having as root the given source and as leaves the given egress points.
1359 *
1360 * @param source source of the tree
1361 * @param sinks leaves of the tree
1362 * @return the computed Mcast tree
1363 */
1364 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
Pier7b657162018-03-27 11:29:42 -07001365 Set<ConnectPoint> sinks) {
Pier1f87aca2018-03-14 16:47:32 -07001366 // Get the egress devices, remove source from the egress if present
1367 Set<DeviceId> egresses = sinks.stream()
1368 .map(ConnectPoint::deviceId)
1369 .filter(deviceId -> !deviceId.equals(source))
1370 .collect(Collectors.toSet());
1371 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pier7b657162018-03-27 11:29:42 -07001372 // Build final tree and return it as it is
Pier1f87aca2018-03-14 16:47:32 -07001373 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pier7b657162018-03-27 11:29:42 -07001374 // We need to put back the source if it was originally present
1375 sinks.forEach(sink -> {
1376 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1377 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1378 });
Pier1f87aca2018-03-14 16:47:32 -07001379 return finalTree;
1380 }
1381
1382 /**
1383 * Build Mcast tree having as root the given source and as leaves the given egress.
1384 *
1385 * @param source source of the tree
1386 * @param egresses leaves of the tree
1387 * @return the computed Mcast tree
1388 */
1389 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1390 Set<DeviceId> egresses) {
1391 // Pre-compute all the paths
1392 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
1393 // No links to enforce
1394 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1395 Collections.emptySet())));
1396 // Explore the topology looking for shared links amongst the egresses
1397 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
1398 // Remove all the paths from the previous computation
1399 availablePaths.clear();
1400 // Build the final paths enforcing the shared links between egress devices
1401 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1402 linksToEnforce)));
1403 return availablePaths;
1404 }
1405
1406 /**
1407 * Gets path from src to dst computed using the custom link weigher.
1408 *
1409 * @param src source device ID
1410 * @param dst destination device ID
1411 * @return list of paths from src to dst
1412 */
1413 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
1414 // Takes a snapshot of the topology
1415 final Topology currentTopology = topologyService.currentTopology();
1416 // Build a specific link weigher for this path computation
1417 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
1418 // We will use our custom link weigher for our path
1419 // computations and build the list of valid paths
1420 List<Path> allPaths = Lists.newArrayList(
1421 topologyService.getPaths(currentTopology, src, dst, linkWeigher)
1422 );
1423 // If there are no valid paths, just exit
1424 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1425 return allPaths;
Pier Luigi51ee7c02018-02-23 19:57:40 +01001426 }
1427
Charles Chanc91c8782016-03-30 17:54:24 -07001428 /**
1429 * Gets a path from src to dst.
1430 * If a path was allocated before, returns the allocated path.
1431 * Otherwise, randomly pick one from available paths.
1432 *
1433 * @param src source device ID
1434 * @param dst destination device ID
1435 * @param mcastIp multicast group
Pier1f87aca2018-03-14 16:47:32 -07001436 * @param allPaths paths list
Charles Chanc91c8782016-03-30 17:54:24 -07001437 * @return an optional path from src to dst
1438 */
Pier1f87aca2018-03-14 16:47:32 -07001439 private Optional<Path> getPath(DeviceId src, DeviceId dst,
1440 IpAddress mcastIp, List<Path> allPaths) {
1441 // Firstly we get all the valid paths, if the supplied are null
1442 if (allPaths == null) {
1443 allPaths = getPaths(src, dst, Collections.emptySet());
1444 }
1445
1446 // If there are no paths just exit
Charles Chanc91c8782016-03-30 17:54:24 -07001447 if (allPaths.isEmpty()) {
Charles Chanc91c8782016-03-30 17:54:24 -07001448 return Optional.empty();
1449 }
1450
Pier Luigi91573e12018-01-23 16:06:38 +01001451 // Create a map index of suitablity-to-list of paths. For example
1452 // a path in the list associated to the index 1 shares only the
1453 // first hop and it is less suitable of a path belonging to the index
1454 // 2 that shares leaf-spine.
1455 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1456 // Some init steps
1457 int nhop;
1458 McastStoreKey mcastStoreKey;
1459 Link hop;
1460 PortNumber srcPort;
1461 Set<PortNumber> existingPorts;
1462 NextObjective nextObj;
1463 // Iterate over paths looking for eligible paths
1464 for (Path path : allPaths) {
1465 // Unlikely, it will happen...
1466 if (!src.equals(path.links().get(0).src().deviceId())) {
1467 continue;
1468 }
1469 nhop = 0;
1470 // Iterate over the links
1471 while (nhop < path.links().size()) {
1472 // Get the link and verify if a next related
1473 // to the src device exist in the store
1474 hop = path.links().get(nhop);
1475 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1476 // It does not exist in the store, exit
1477 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1478 break;
Charles Chanc91c8782016-03-30 17:54:24 -07001479 }
Pier Luigi91573e12018-01-23 16:06:38 +01001480 // Get the output ports on the next
1481 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pier7b657162018-03-27 11:29:42 -07001482 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigi91573e12018-01-23 16:06:38 +01001483 // And the src port on the link
1484 srcPort = hop.src().port();
1485 // the src port is not used as output, exit
1486 if (!existingPorts.contains(srcPort)) {
1487 break;
1488 }
1489 nhop++;
1490 }
1491 // n_hop defines the index
1492 if (nhop > 0) {
1493 eligiblePaths.compute(nhop, (index, paths) -> {
1494 paths = paths == null ? Lists.newArrayList() : paths;
1495 paths.add(path);
1496 return paths;
1497 });
Charles Chanc91c8782016-03-30 17:54:24 -07001498 }
1499 }
Pier Luigi91573e12018-01-23 16:06:38 +01001500
1501 // No suitable paths
1502 if (eligiblePaths.isEmpty()) {
1503 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1504 // Otherwise, randomly pick a path
1505 Collections.shuffle(allPaths);
1506 return allPaths.stream().findFirst();
1507 }
1508
1509 // Let's take the best ones
1510 Integer bestIndex = eligiblePaths.keySet()
1511 .stream()
1512 .sorted(Comparator.reverseOrder())
1513 .findFirst().orElse(null);
1514 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1515 log.debug("{} eligiblePath(s) found from {} to {}",
1516 bestPaths.size(), src, dst);
1517 // randomly pick a path on the highest index
1518 Collections.shuffle(bestPaths);
1519 return bestPaths.stream().findFirst();
Charles Chanc91c8782016-03-30 17:54:24 -07001520 }
1521
1522 /**
Charles Chan72779502016-04-23 17:36:10 -07001523 * Gets device(s) of given role in given multicast group.
1524 *
1525 * @param mcastIp multicast IP
1526 * @param role multicast role
1527 * @return set of device ID or empty set if not found
1528 */
1529 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1530 return mcastRoleStore.entrySet().stream()
1531 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1532 entry.getValue().value() == role)
Pier1f87aca2018-03-14 16:47:32 -07001533 .map(Entry::getKey).map(McastStoreKey::deviceId)
Charles Chan72779502016-04-23 17:36:10 -07001534 .collect(Collectors.toSet());
1535 }
1536
1537 /**
1538 * Gets groups which is affected by the link down event.
1539 *
1540 * @param link link going down
1541 * @return a set of multicast IpAddress
1542 */
1543 private Set<IpAddress> getAffectedGroups(Link link) {
1544 DeviceId deviceId = link.src().deviceId();
1545 PortNumber port = link.src().port();
1546 return mcastNextObjStore.entrySet().stream()
1547 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Pier7b657162018-03-27 11:29:42 -07001548 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
Pier1f87aca2018-03-14 16:47:32 -07001549 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Charles Chan72779502016-04-23 17:36:10 -07001550 .collect(Collectors.toSet());
1551 }
1552
1553 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001554 * Gets groups which are affected by the device down event.
1555 *
1556 * @param deviceId device going down
1557 * @return a set of multicast IpAddress
1558 */
1559 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1560 return mcastNextObjStore.entrySet().stream()
1561 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier1f87aca2018-03-14 16:47:32 -07001562 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigi580fd8a2018-01-16 10:47:50 +01001563 .collect(Collectors.toSet());
1564 }
1565
1566 /**
Charles Chan72779502016-04-23 17:36:10 -07001567 * Gets the spine-facing port on ingress device of given multicast group.
1568 *
1569 * @param mcastIp multicast IP
1570 * @return spine-facing port on ingress device
1571 */
Pier1a7e0c02018-03-12 15:00:54 -07001572 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp) {
Pier979e61a2018-03-07 11:42:50 +01001573 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Charles Chan72779502016-04-23 17:36:10 -07001574 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001575 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan72779502016-04-23 17:36:10 -07001576 if (ingressDevice != null) {
1577 NextObjective nextObj = mcastNextObjStore
1578 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
Pier7b657162018-03-27 11:29:42 -07001579 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier1a7e0c02018-03-12 15:00:54 -07001580 // Let's find out all the ingress-transit ports
Charles Chan72779502016-04-23 17:36:10 -07001581 for (PortNumber port : ports) {
1582 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001583 if (srManager.deviceConfiguration() != null &&
1584 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chan82f19972016-05-17 13:13:55 -07001585 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier1a7e0c02018-03-12 15:00:54 -07001586 portBuilder.add(port);
Charles Chan72779502016-04-23 17:36:10 -07001587 }
1588 }
1589 }
Pier1a7e0c02018-03-12 15:00:54 -07001590 return portBuilder.build();
Charles Chan72779502016-04-23 17:36:10 -07001591 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001592
1593 /**
Pier Luigi580fd8a2018-01-16 10:47:50 +01001594 * Verify if the given device has sinks
1595 * for the multicast group.
1596 *
1597 * @param deviceId device Id
1598 * @param mcastIp multicast IP
1599 * @return true if the device has sink for the group.
1600 * False otherwise.
1601 */
1602 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1603 if (deviceId != null) {
1604 // Get the nextobjective
1605 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1606 new McastStoreKey(mcastIp, deviceId)
1607 );
1608 // If it exists
1609 if (versionedNextObj != null) {
1610 NextObjective nextObj = versionedNextObj.value();
1611 // Retrieves all the output ports
Pier7b657162018-03-27 11:29:42 -07001612 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier Luigi580fd8a2018-01-16 10:47:50 +01001613 // Tries to find at least one port that is not spine-facing
1614 for (PortNumber port : ports) {
1615 // Spine-facing port should have no subnet and no xconnect
Pier Luigi69f774d2018-02-28 12:10:50 +01001616 if (srManager.deviceConfiguration() != null &&
1617 (!srManager.deviceConfiguration().getPortSubnets(deviceId, port).isEmpty() ||
Pier Luigi580fd8a2018-01-16 10:47:50 +01001618 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1619 return true;
1620 }
1621 }
1622 }
1623 }
1624 return false;
1625 }
1626
1627 /**
Pier28164682018-04-17 15:50:43 +02001628 * Verify if a given connect point is sink for this group.
1629 *
1630 * @param mcastIp group address
1631 * @param connectPoint connect point to be verified
1632 * @return true if the connect point is sink of the group
1633 */
1634 private boolean isSink(IpAddress mcastIp, ConnectPoint connectPoint) {
1635 // Let's check if we are already serving that location
1636 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
1637 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1638 return false;
1639 }
1640 // Get next and check with the port
1641 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1642 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1643 }
1644
1645 /**
Pier Luigi35dab3f2018-01-25 16:16:02 +01001646 * Updates filtering objective for given device and port.
1647 * It is called in general when the mcast config has been
1648 * changed.
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001649 *
1650 * @param deviceId device ID
1651 * @param portNum ingress port number
1652 * @param vlanId assigned VLAN ID
1653 * @param install true to add, false to remove
1654 */
Pier Luigi69f774d2018-02-28 12:10:50 +01001655 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001656 VlanId vlanId, boolean install) {
Pier Luigi35dab3f2018-01-25 16:16:02 +01001657 lastMcastChange = Instant.now();
1658 mcastLock();
1659 try {
1660 // Iterates over the route and updates properly the filtering objective
1661 // on the source device.
1662 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pier1f87aca2018-03-14 16:47:32 -07001663 // FIXME To be addressed with multiple sources support
1664 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
1665 .stream()
1666 .findFirst().orElse(null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001667 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1668 if (install) {
Pier7b657162018-03-27 11:29:42 -07001669 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001670 } else {
Pier7b657162018-03-27 11:29:42 -07001671 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001672 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001673 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001674 });
1675 } finally {
1676 mcastUnlock();
1677 }
1678 }
1679
1680 /**
1681 * Performs bucket verification operation for all mcast groups in the devices.
1682 * Firstly, it verifies that mcast is stable before trying verification operation.
1683 * Verification consists in creating new nexts with VERIFY operation. Actually,
1684 * the operation is totally delegated to the driver.
1685 */
1686 private final class McastBucketCorrector implements Runnable {
1687
1688 @Override
1689 public void run() {
1690 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1691 if (!isMcastStable()) {
1692 return;
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001693 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001694 // Acquires lock
1695 mcastLock();
1696 try {
1697 // Iterates over the routes and verify the related next objectives
1698 srManager.multicastRouteService.getRoutes()
1699 .stream()
1700 .map(McastRoute::group)
1701 .forEach(mcastIp -> {
1702 log.trace("Running mcast buckets corrector for mcast group: {}",
1703 mcastIp);
1704
1705 // For each group we get current information in the store
1706 // and issue a check of the next objectives in place
Pier979e61a2018-03-07 11:42:50 +01001707 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi35dab3f2018-01-25 16:16:02 +01001708 .stream().findAny().orElse(null);
Pier1a7e0c02018-03-12 15:00:54 -07001709 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Pier979e61a2018-03-07 11:42:50 +01001710 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi92e69be2018-03-02 12:53:37 +01001711 // Get source and sinks from Mcast Route Service and warn about errors
Pier7b657162018-03-27 11:29:42 -07001712 ConnectPoint source = mcastUtils.getSource(mcastIp);
1713 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1714 .flatMap(Collection::stream)
1715 .collect(Collectors.toSet());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001716
1717 // Do not proceed if ingress device or source of this group are missing
1718 if (ingressDevice == null || source == null) {
Pier Luigi92e69be2018-03-02 12:53:37 +01001719 if (!sinks.isEmpty()) {
1720 log.warn("Unable to run buckets corrector. " +
1721 "Missing ingress {} or source {} for group {}",
1722 ingressDevice, source, mcastIp);
1723 }
Pier Luigi35dab3f2018-01-25 16:16:02 +01001724 return;
1725 }
1726
1727 // Continue only when this instance is the master of source device
1728 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
1729 log.trace("Unable to run buckets corrector. " +
1730 "Skip {} due to lack of mastership " +
1731 "of the source device {}",
1732 mcastIp, source.deviceId());
1733 return;
1734 }
1735
1736 // Create the set of the devices to be processed
1737 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1738 devicesBuilder.add(ingressDevice);
Pier1a7e0c02018-03-12 15:00:54 -07001739 if (!transitDevices.isEmpty()) {
1740 devicesBuilder.addAll(transitDevices);
Pier Luigi35dab3f2018-01-25 16:16:02 +01001741 }
1742 if (!egressDevices.isEmpty()) {
1743 devicesBuilder.addAll(egressDevices);
1744 }
1745 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1746
1747 // Iterate over the devices
1748 devicesToProcess.forEach(deviceId -> {
1749 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1750 // If next exists in our store verify related next objective
1751 if (mcastNextObjStore.containsKey(currentKey)) {
1752 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1753 // Get current ports
Pier7b657162018-03-27 11:29:42 -07001754 Set<PortNumber> currentPorts = mcastUtils.getPorts(currentNext.next());
Pier Luigi35dab3f2018-01-25 16:16:02 +01001755 // Rebuild the next objective
Pier7b657162018-03-27 11:29:42 -07001756 currentNext = mcastUtils.nextObjBuilder(
Pier Luigi35dab3f2018-01-25 16:16:02 +01001757 mcastIp,
Pier7b657162018-03-27 11:29:42 -07001758 mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1759 source : null),
Pier Luigi35dab3f2018-01-25 16:16:02 +01001760 currentPorts,
1761 currentNext.id()
1762 ).verify();
1763 // Send to the flowobjective service
1764 srManager.flowObjectiveService.next(deviceId, currentNext);
1765 } else {
Pier Luigid8a15162018-02-15 16:33:08 +01001766 log.warn("Unable to run buckets corrector. " +
Pier Luigi35dab3f2018-01-25 16:16:02 +01001767 "Missing next for {} and group {}",
1768 deviceId, mcastIp);
1769 }
1770 });
1771
1772 });
1773 } finally {
1774 // Finally, it releases the lock
1775 mcastUnlock();
1776 }
1777
1778 }
Jonghwan Hyune5ef7622017-08-25 17:48:36 -07001779 }
Pier Luigi0f9635b2018-01-15 18:06:43 +01001780
1781 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1782 // If mcast ip is present
1783 if (mcastIp != null) {
1784 return mcastNextObjStore.entrySet().stream()
1785 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001786 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001787 entry -> entry.getValue().value().id()));
1788 }
1789 // Otherwise take all the groups
1790 return mcastNextObjStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001791 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001792 entry -> entry.getValue().value().id()));
1793 }
1794
Pier Luigi69f774d2018-02-28 12:10:50 +01001795 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigi0f9635b2018-01-15 18:06:43 +01001796 // If mcast ip is present
1797 if (mcastIp != null) {
1798 return mcastRoleStore.entrySet().stream()
1799 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier1f87aca2018-03-14 16:47:32 -07001800 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001801 entry -> entry.getValue().value()));
1802 }
1803 // Otherwise take all the groups
1804 return mcastRoleStore.entrySet().stream()
Pier1f87aca2018-03-14 16:47:32 -07001805 .collect(Collectors.toMap(Entry::getKey,
Pier Luigi0f9635b2018-01-15 18:06:43 +01001806 entry -> entry.getValue().value()));
1807 }
1808
1809 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1810 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1811 // Get the source
Pier7b657162018-03-27 11:29:42 -07001812 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigi0f9635b2018-01-15 18:06:43 +01001813 // Source cannot be null, we don't know the starting point
1814 if (source != null) {
1815 // Init steps
1816 Set<DeviceId> visited = Sets.newHashSet();
1817 List<ConnectPoint> currentPath = Lists.newArrayList(
1818 source
1819 );
1820 // Build recursively the mcast paths
1821 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1822 }
1823 return mcastPaths;
1824 }
1825
1826 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1827 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1828 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1829 // If we have visited the node to visit
1830 // there is a loop
1831 if (visited.contains(toVisit)) {
1832 return;
1833 }
1834 // Visit next-hop
1835 visited.add(toVisit);
1836 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1837 // Looking for next-hops
1838 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1839 // Build egress connectpoints
1840 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1841 // Get Ports
Pier7b657162018-03-27 11:29:42 -07001842 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigi0f9635b2018-01-15 18:06:43 +01001843 // Build relative cps
1844 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1845 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1846 Set<ConnectPoint> egressPoints = cpBuilder.build();
1847 // Define other variables for the next steps
1848 Set<Link> egressLinks;
1849 List<ConnectPoint> newCurrentPath;
1850 Set<DeviceId> newVisited;
1851 DeviceId newToVisit;
1852 for (ConnectPoint egressPoint : egressPoints) {
1853 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1854 // If it does not have egress links, stop
1855 if (egressLinks.isEmpty()) {
1856 // Add the connect points to the path
1857 newCurrentPath = Lists.newArrayList(currentPath);
1858 newCurrentPath.add(0, egressPoint);
1859 // Save in the map
1860 mcastPaths.put(egressPoint, newCurrentPath);
1861 } else {
1862 newVisited = Sets.newHashSet(visited);
1863 // Iterate over the egress links for the next hops
1864 for (Link egressLink : egressLinks) {
1865 // Update to visit
1866 newToVisit = egressLink.dst().deviceId();
1867 // Add the connect points to the path
1868 newCurrentPath = Lists.newArrayList(currentPath);
1869 newCurrentPath.add(0, egressPoint);
1870 newCurrentPath.add(0, egressLink.dst());
1871 // Go to the next hop
1872 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1873 }
1874 }
1875 }
1876 }
1877 }
1878
Charles Chanc91c8782016-03-30 17:54:24 -07001879}