blob: 289778e2229b74a9b77a4aef12ac45bafa791973 [file] [log] [blame]
Charles Chand55e84d2016-03-30 17:54:24 -07001/*
Pier Luigi96fe0772018-02-28 12:10:50 +01002 * Copyright 2018-present Open Networking Foundation
Charles Chand55e84d2016-03-30 17:54:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Pier Luigi96fe0772018-02-28 12:10:50 +010017package org.onosproject.segmentrouting.mcast;
Charles Chand55e84d2016-03-30 17:54:24 -070018
Pier Luigi05514fd2018-02-28 17:24:03 +010019import com.google.common.cache.Cache;
20import com.google.common.cache.CacheBuilder;
21import com.google.common.cache.RemovalCause;
22import com.google.common.cache.RemovalNotification;
Pierb1fe7382018-04-17 17:25:22 +020023import com.google.common.collect.HashMultimap;
Pierb0328e42018-03-27 11:29:42 -070024import com.google.common.collect.ImmutableList;
Charles Chand55e84d2016-03-30 17:54:24 -070025import com.google.common.collect.ImmutableSet;
26import com.google.common.collect.Lists;
Pier Luigibad6d6c2018-01-23 16:06:38 +010027import com.google.common.collect.Maps;
Pierb1fe7382018-04-17 17:25:22 +020028import com.google.common.collect.Multimap;
Charles Chand55e84d2016-03-30 17:54:24 -070029import com.google.common.collect.Sets;
Charles Chand55e84d2016-03-30 17:54:24 -070030import org.onlab.packet.IpAddress;
Charles Chand55e84d2016-03-30 17:54:24 -070031import org.onlab.packet.VlanId;
32import org.onlab.util.KryoNamespace;
Pier96f63cb2018-04-17 16:29:56 +020033import org.onosproject.cluster.NodeId;
Charles Chand55e84d2016-03-30 17:54:24 -070034import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
Pier3ee24552018-03-14 16:47:32 -070036import org.onosproject.mcast.api.McastEvent;
37import org.onosproject.mcast.api.McastRoute;
Pierb0328e42018-03-27 11:29:42 -070038import org.onosproject.mcast.api.McastRouteData;
Pier3ee24552018-03-14 16:47:32 -070039import org.onosproject.mcast.api.McastRouteUpdate;
Pierb0328e42018-03-27 11:29:42 -070040import org.onosproject.net.HostId;
Charles Chand55e84d2016-03-30 17:54:24 -070041import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.Link;
44import org.onosproject.net.Path;
45import org.onosproject.net.PortNumber;
Charles Chan2199c302016-04-23 17:36:10 -070046import org.onosproject.net.flowobjective.DefaultObjectiveContext;
Charles Chand55e84d2016-03-30 17:54:24 -070047import org.onosproject.net.flowobjective.ForwardingObjective;
48import org.onosproject.net.flowobjective.NextObjective;
Charles Chan2199c302016-04-23 17:36:10 -070049import org.onosproject.net.flowobjective.ObjectiveContext;
Pier3ee24552018-03-14 16:47:32 -070050import org.onosproject.net.topology.LinkWeigher;
Pier Luigi83f919b2018-02-15 16:33:08 +010051import org.onosproject.net.topology.Topology;
Charles Chand55e84d2016-03-30 17:54:24 -070052import org.onosproject.net.topology.TopologyService;
Pier3ee24552018-03-14 16:47:32 -070053import org.onosproject.segmentrouting.SRLinkWeigher;
Pier Luigi96fe0772018-02-28 12:10:50 +010054import org.onosproject.segmentrouting.SegmentRoutingManager;
Charles Chan2199c302016-04-23 17:36:10 -070055import org.onosproject.segmentrouting.storekey.McastStoreKey;
Charles Chand55e84d2016-03-30 17:54:24 -070056import org.onosproject.store.serializers.KryoNamespaces;
57import org.onosproject.store.service.ConsistentMap;
58import org.onosproject.store.service.Serializer;
Pier Luigieba73a02018-01-16 10:47:50 +010059import org.onosproject.store.service.Versioned;
Charles Chand55e84d2016-03-30 17:54:24 -070060import org.slf4j.Logger;
61import org.slf4j.LoggerFactory;
62
Pier Luigib72201b2018-01-25 16:16:02 +010063import java.time.Instant;
Charles Chand55e84d2016-03-30 17:54:24 -070064import java.util.Collection;
65import java.util.Collections;
Pier Luigibad6d6c2018-01-23 16:06:38 +010066import java.util.Comparator;
Charles Chand55e84d2016-03-30 17:54:24 -070067import java.util.List;
Charles Chan2199c302016-04-23 17:36:10 -070068import java.util.Map;
Pier3ee24552018-03-14 16:47:32 -070069import java.util.Map.Entry;
Charles Chand55e84d2016-03-30 17:54:24 -070070import java.util.Optional;
71import java.util.Set;
Pier Luigib72201b2018-01-25 16:16:02 +010072import java.util.concurrent.ScheduledExecutorService;
73import java.util.concurrent.TimeUnit;
74import java.util.concurrent.locks.Lock;
75import java.util.concurrent.locks.ReentrantLock;
Charles Chan2199c302016-04-23 17:36:10 -070076import java.util.stream.Collectors;
77
Pier Luigib72201b2018-01-25 16:16:02 +010078import static java.util.concurrent.Executors.newScheduledThreadPool;
79import static org.onlab.util.Tools.groupedThreads;
Pier3ee24552018-03-14 16:47:32 -070080
Pier96f63cb2018-04-17 16:29:56 +020081import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_ADDED;
Pierb0328e42018-03-27 11:29:42 -070082import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
Pier96f63cb2018-04-17 16:29:56 +020083import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
84import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
85import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
86import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
87
Piere5bff482018-03-07 11:42:50 +010088import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
89import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
90import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
Charles Chand55e84d2016-03-30 17:54:24 -070091
92/**
Pier Luigi96fe0772018-02-28 12:10:50 +010093 * Handles Multicast related events.
Charles Chand55e84d2016-03-30 17:54:24 -070094 */
Charles Chand2990362016-04-18 13:44:03 -070095public class McastHandler {
Pierb0328e42018-03-27 11:29:42 -070096 // Logger instance
Charles Chand2990362016-04-18 13:44:03 -070097 private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
Pierb0328e42018-03-27 11:29:42 -070098 // Reference to srManager and most used internal objects
Charles Chand55e84d2016-03-30 17:54:24 -070099 private final SegmentRoutingManager srManager;
Charles Chanfc5c7802016-05-17 13:13:55 -0700100 private final TopologyService topologyService;
Pier96f63cb2018-04-17 16:29:56 +0200101 private final McastUtils mcastUtils;
Pierb0328e42018-03-27 11:29:42 -0700102 // Internal store of the Mcast nextobjectives
Charles Chan2199c302016-04-23 17:36:10 -0700103 private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
Pierb0328e42018-03-27 11:29:42 -0700104 // Internal store of the Mcast roles
Charles Chan2199c302016-04-23 17:36:10 -0700105 private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
106
Pier Luigi05514fd2018-02-28 17:24:03 +0100107 // Wait time for the cache
108 private static final int WAIT_TIME_MS = 1000;
Pierb0328e42018-03-27 11:29:42 -0700109
Pier Luigi05514fd2018-02-28 17:24:03 +0100110 /**
111 * The mcastEventCache is implemented to avoid race condition by giving more time to the
112 * underlying subsystems to process previous calls.
113 */
114 private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
115 .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
116 .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
117 // Get group ip, sink and related event
118 IpAddress mcastIp = notification.getKey().mcastIp();
Pierb0328e42018-03-27 11:29:42 -0700119 HostId sink = notification.getKey().sinkHost();
Pier Luigi05514fd2018-02-28 17:24:03 +0100120 McastEvent mcastEvent = notification.getValue();
121 RemovalCause cause = notification.getCause();
122 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
123 mcastIp, sink, mcastEvent, cause);
124 // If it expires or it has been replaced, we deque the event
125 switch (notification.getCause()) {
126 case REPLACED:
127 case EXPIRED:
128 dequeueMcastEvent(mcastEvent);
129 break;
130 default:
131 break;
132 }
133 }).build();
134
135 private void enqueueMcastEvent(McastEvent mcastEvent) {
Pierb0328e42018-03-27 11:29:42 -0700136 // Retrieve, currentData, prevData and the group
Pier3ee24552018-03-14 16:47:32 -0700137 final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
Pierb0328e42018-03-27 11:29:42 -0700138 final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
139 final IpAddress group = mcastRoutePrevUpdate.route().group();
Pier Luigi05514fd2018-02-28 17:24:03 +0100140 // Let's create the keys of the cache
Pierb0328e42018-03-27 11:29:42 -0700141 ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
Pier3ee24552018-03-14 16:47:32 -0700142 if (mcastEvent.type() == SOURCES_ADDED ||
143 mcastEvent.type() == SOURCES_REMOVED) {
144 // FIXME To be addressed with multiple sources support
Pierb0328e42018-03-27 11:29:42 -0700145 sinksBuilder.addAll(Collections.emptySet());
146 } else if (mcastEvent.type() == SINKS_ADDED) {
147 // We need to process the host id one by one
148 mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
149 // Get the previous locations and verify if there are changes
150 Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
151 Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
152 prevConnectPoints : Collections.emptySet());
153 if (!changes.isEmpty()) {
154 sinksBuilder.add(hostId);
Pier3ee24552018-03-14 16:47:32 -0700155 }
Pierb0328e42018-03-27 11:29:42 -0700156 }));
157 } else if (mcastEvent.type() == SINKS_REMOVED) {
158 // We need to process the host id one by one
159 mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
160 // Get the current locations and verify if there are changes
161 Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
162 Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
163 currentConnectPoints : Collections.emptySet());
164 if (!changes.isEmpty()) {
165 sinksBuilder.add(hostId);
166 }
167 }));
168 } else if (mcastEvent.type() == ROUTE_REMOVED) {
169 // Current subject is null, just take the previous host ids
170 sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
Pier Luigi05514fd2018-02-28 17:24:03 +0100171 }
172 // Push the elements in the cache
173 sinksBuilder.build().forEach(sink -> {
Pier3ee24552018-03-14 16:47:32 -0700174 McastCacheKey cacheKey = new McastCacheKey(group, sink);
Pier Luigi05514fd2018-02-28 17:24:03 +0100175 mcastEventCache.put(cacheKey, mcastEvent);
176 });
177 }
178
179 private void dequeueMcastEvent(McastEvent mcastEvent) {
Pierb0328e42018-03-27 11:29:42 -0700180 // Get new and old data
181 final McastRouteUpdate mcastUpdate = mcastEvent.subject();
182 final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
Pier Luigi05514fd2018-02-28 17:24:03 +0100183 // Get source, mcast group
Pier3ee24552018-03-14 16:47:32 -0700184 // FIXME To be addressed with multiple sources support
Pierb0328e42018-03-27 11:29:42 -0700185 final ConnectPoint source = mcastPrevUpdate.sources()
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200186 .values()
Pier3ee24552018-03-14 16:47:32 -0700187 .stream()
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200188 .flatMap(Collection::stream)
Pier3ee24552018-03-14 16:47:32 -0700189 .findFirst()
190 .orElse(null);
Pierb0328e42018-03-27 11:29:42 -0700191 IpAddress mcastIp = mcastPrevUpdate.route().group();
192 // Get all the previous sinks
193 Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
Pier3ee24552018-03-14 16:47:32 -0700194 .values()
195 .stream()
196 .flatMap(Collection::stream)
197 .collect(Collectors.toSet());
Pier Luigi05514fd2018-02-28 17:24:03 +0100198 // According to the event type let's call the proper method
199 switch (mcastEvent.type()) {
Pier3ee24552018-03-14 16:47:32 -0700200 case SOURCES_ADDED:
201 // FIXME To be addressed with multiple sources support
202 // Get all the sinks
203 //Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
204 // Compute the Mcast tree
205 //Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
206 // Process the given sinks using the pre-computed paths
207 //mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
Pier Luigi05514fd2018-02-28 17:24:03 +0100208 break;
Pier3ee24552018-03-14 16:47:32 -0700209 case SOURCES_REMOVED:
210 // FIXME To be addressed with multiple sources support
Pier Luigi05514fd2018-02-28 17:24:03 +0100211 // Get old source
Pier3ee24552018-03-14 16:47:32 -0700212 //ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
Pier Luigi05514fd2018-02-28 17:24:03 +0100213 // Just the first cached element will be processed
Pier3ee24552018-03-14 16:47:32 -0700214 //processSourceUpdatedInternal(mcastIp, source, oldSource);
Pier Luigi05514fd2018-02-28 17:24:03 +0100215 break;
216 case ROUTE_REMOVED:
217 // Process the route removed, just the first cached element will be processed
Pierb0328e42018-03-27 11:29:42 -0700218 processRouteRemovedInternal(source, mcastIp);
Pier Luigi05514fd2018-02-28 17:24:03 +0100219 break;
Pier3ee24552018-03-14 16:47:32 -0700220 case SINKS_ADDED:
Pierb0328e42018-03-27 11:29:42 -0700221 // FIXME To be addressed with multiple sources support
222 processSinksAddedInternal(source, mcastIp,
223 mcastUpdate.sinks(), prevSinks);
Pier Luigi05514fd2018-02-28 17:24:03 +0100224 break;
Pier3ee24552018-03-14 16:47:32 -0700225 case SINKS_REMOVED:
Pierb0328e42018-03-27 11:29:42 -0700226 // FIXME To be addressed with multiple sources support
227 processSinksRemovedInternal(source, mcastIp,
Pier3bb1f3f2018-04-17 15:50:43 +0200228 mcastUpdate.sinks(), mcastPrevUpdate.sinks());
Pier Luigi05514fd2018-02-28 17:24:03 +0100229 break;
230 default:
231 break;
232 }
233 }
234
Pier Luigib72201b2018-01-25 16:16:02 +0100235 // Mcast lock to serialize local operations
236 private final Lock mcastLock = new ReentrantLock();
237
238 /**
239 * Acquires the lock used when making mcast changes.
240 */
241 private void mcastLock() {
242 mcastLock.lock();
243 }
244
245 /**
246 * Releases the lock used when making mcast changes.
247 */
248 private void mcastUnlock() {
249 mcastLock.unlock();
250 }
251
252 // Stability threshold for Mcast. Seconds
253 private static final long MCAST_STABLITY_THRESHOLD = 5;
254 // Last change done
255 private Instant lastMcastChange = Instant.now();
256
257 /**
258 * Determines if mcast in the network has been stable in the last
259 * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
260 * to the last mcast change timestamp.
261 *
262 * @return true if stable
263 */
264 private boolean isMcastStable() {
265 long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
266 long now = (long) (Instant.now().toEpochMilli() / 1000.0);
Saurav Dasa4020382018-02-14 14:14:54 -0800267 log.trace("Mcast stable since {}s", now - last);
Pier Luigib72201b2018-01-25 16:16:02 +0100268 return (now - last) > MCAST_STABLITY_THRESHOLD;
269 }
270
271 // Verify interval for Mcast
272 private static final long MCAST_VERIFY_INTERVAL = 30;
273
274 // Executor for mcast bucket corrector
275 private ScheduledExecutorService executorService
Pier Luigi05514fd2018-02-28 17:24:03 +0100276 = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
Pier Luigib72201b2018-01-25 16:16:02 +0100277
Charles Chan2199c302016-04-23 17:36:10 -0700278 /**
Charles Chand55e84d2016-03-30 17:54:24 -0700279 * Constructs the McastEventHandler.
280 *
281 * @param srManager Segment Routing manager
282 */
Charles Chand2990362016-04-18 13:44:03 -0700283 public McastHandler(SegmentRoutingManager srManager) {
Pierb0328e42018-03-27 11:29:42 -0700284 ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
Charles Chand55e84d2016-03-30 17:54:24 -0700285 this.srManager = srManager;
Charles Chand55e84d2016-03-30 17:54:24 -0700286 this.topologyService = srManager.topologyService;
Pierb0328e42018-03-27 11:29:42 -0700287 KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
Charles Chand55e84d2016-03-30 17:54:24 -0700288 .register(KryoNamespaces.API)
Charles Chan2199c302016-04-23 17:36:10 -0700289 .register(McastStoreKey.class)
290 .register(McastRole.class);
Pierb0328e42018-03-27 11:29:42 -0700291 mcastNextObjStore = srManager.storageService
Charles Chan2199c302016-04-23 17:36:10 -0700292 .<McastStoreKey, NextObjective>consistentMapBuilder()
Charles Chand55e84d2016-03-30 17:54:24 -0700293 .withName("onos-mcast-nextobj-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700294 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
Charles Chand55e84d2016-03-30 17:54:24 -0700295 .build();
Pierb0328e42018-03-27 11:29:42 -0700296 mcastRoleStore = srManager.storageService
Charles Chan2199c302016-04-23 17:36:10 -0700297 .<McastStoreKey, McastRole>consistentMapBuilder()
298 .withName("onos-mcast-role-store")
Charles Chaneefdedf2016-05-23 16:45:45 -0700299 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
Charles Chan2199c302016-04-23 17:36:10 -0700300 .build();
Pierb0328e42018-03-27 11:29:42 -0700301 // Let's create McastUtils object
302 mcastUtils = new McastUtils(srManager, coreAppId, log);
Pier Luigib72201b2018-01-25 16:16:02 +0100303 // Init the executor service and the buckets corrector
304 executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
Pierb0328e42018-03-27 11:29:42 -0700305 MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
Pier Luigi05514fd2018-02-28 17:24:03 +0100306 // Schedule the clean up, this will allow the processing of the expired events
307 executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
308 WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Charles Chan2199c302016-04-23 17:36:10 -0700309 }
310
311 /**
312 * Read initial multicast from mcast store.
313 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100314 public void init() {
Pierb0328e42018-03-27 11:29:42 -0700315 lastMcastChange = Instant.now();
316 mcastLock();
317 try {
318 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pier96f63cb2018-04-17 16:29:56 +0200319 // Verify leadership on the operation
320 if (!mcastUtils.isLeader(mcastRoute.group())) {
321 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
322 return;
323 }
Pierb0328e42018-03-27 11:29:42 -0700324 // FIXME To be addressed with multiple sources support
325 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
326 .stream()
327 .findFirst()
328 .orElse(null);
329 // Get all the sinks and process them
330 McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
331 Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(), mcastRouteData.sinks());
332 // Filter out all the working sinks, we do not want to move them
333 sinks = sinks.stream()
334 .filter(sink -> {
335 McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
336 Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
337 return verMcastNext == null ||
338 !mcastUtils.getPorts(verMcastNext.value().next()).contains(sink.port());
339 })
340 .collect(Collectors.toSet());
341 // Compute the Mcast tree
342 Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
343 // Process the given sinks using the pre-computed paths
344 mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
345 mcastRoute.group(), paths));
346 });
347 } finally {
348 mcastUnlock();
349 }
Charles Chand55e84d2016-03-30 17:54:24 -0700350 }
351
352 /**
Pier Luigib72201b2018-01-25 16:16:02 +0100353 * Clean up when deactivating the application.
354 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100355 public void terminate() {
Pier477e0062018-04-20 14:14:34 +0200356 mcastEventCache.invalidateAll();
Pier Luigib72201b2018-01-25 16:16:02 +0100357 executorService.shutdown();
Pier477e0062018-04-20 14:14:34 +0200358 mcastNextObjStore.destroy();
359 mcastRoleStore.destroy();
360 mcastUtils.terminate();
361 log.info("Terminated");
Pier Luigib72201b2018-01-25 16:16:02 +0100362 }
363
364 /**
Pier Luigi05514fd2018-02-28 17:24:03 +0100365 * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
366 * SINK_REMOVED and ROUTE_REMOVED events.
Charles Chand55e84d2016-03-30 17:54:24 -0700367 *
368 * @param event McastEvent with SOURCE_ADDED type
369 */
Pier Luigi05514fd2018-02-28 17:24:03 +0100370 public void processMcastEvent(McastEvent event) {
371 log.info("process {}", event);
Pier96f63cb2018-04-17 16:29:56 +0200372 // If it is a route added, we do not enqueue
373 if (event.type() == ROUTE_ADDED) {
374 // We need just to elect a leader
375 processRouteAddedInternal(event.subject().route().group());
376 } else {
377 // Just enqueue for now
378 enqueueMcastEvent(event);
379 }
Pier Luigi9930da52018-02-02 16:19:11 +0100380 }
381
382 /**
Pier96f63cb2018-04-17 16:29:56 +0200383 * Process the ROUTE_ADDED event.
Pier Luigi57d41792018-02-26 12:31:38 +0100384 *
Pier96f63cb2018-04-17 16:29:56 +0200385 * @param mcastIp the group address
Pier Luigi57d41792018-02-26 12:31:38 +0100386 */
Pier96f63cb2018-04-17 16:29:56 +0200387 private void processRouteAddedInternal(IpAddress mcastIp) {
Pier Luigi57d41792018-02-26 12:31:38 +0100388 lastMcastChange = Instant.now();
389 mcastLock();
390 try {
Pier96f63cb2018-04-17 16:29:56 +0200391 log.debug("Processing route added for group {}", mcastIp);
392 // Just elect a new leader
393 mcastUtils.isLeader(mcastIp);
Pier Luigi57d41792018-02-26 12:31:38 +0100394 } finally {
395 mcastUnlock();
396 }
397 }
398
399 /**
Pier Luigi9930da52018-02-02 16:19:11 +0100400 * Removes the entire mcast tree related to this group.
401 *
402 * @param mcastIp multicast group IP address
403 */
404 private void processRouteRemovedInternal(ConnectPoint source, IpAddress mcastIp) {
405 lastMcastChange = Instant.now();
406 mcastLock();
407 try {
Pier Luigi57d41792018-02-26 12:31:38 +0100408 log.debug("Processing route removed for group {}", mcastIp);
Pier96f63cb2018-04-17 16:29:56 +0200409 // Verify leadership on the operation
410 if (!mcastUtils.isLeader(mcastIp)) {
411 log.debug("Skip {} due to lack of leadership", mcastIp);
412 mcastUtils.withdrawLeader(mcastIp);
413 return;
414 }
Pier Luigi9930da52018-02-02 16:19:11 +0100415
416 // Find out the ingress, transit and egress device of the affected group
Piere5bff482018-03-07 11:42:50 +0100417 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigi9930da52018-02-02 16:19:11 +0100418 .stream().findAny().orElse(null);
Pier37db3692018-03-12 15:00:54 -0700419 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Piere5bff482018-03-07 11:42:50 +0100420 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigi9930da52018-02-02 16:19:11 +0100421
Pier37db3692018-03-12 15:00:54 -0700422 // If there are no egress devices, sinks could be only on the ingress
Pier Luigi9930da52018-02-02 16:19:11 +0100423 if (!egressDevices.isEmpty()) {
424 egressDevices.forEach(
Pierb0328e42018-03-27 11:29:42 -0700425 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier Luigi9930da52018-02-02 16:19:11 +0100426 );
427 }
Pier37db3692018-03-12 15:00:54 -0700428 // Transit could be empty if sinks are on the ingress
429 if (!transitDevices.isEmpty()) {
430 transitDevices.forEach(
Pierb0328e42018-03-27 11:29:42 -0700431 deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
Pier37db3692018-03-12 15:00:54 -0700432 );
Pier Luigi9930da52018-02-02 16:19:11 +0100433 }
434 // Ingress device should be not null
435 if (ingressDevice != null) {
Pierb0328e42018-03-27 11:29:42 -0700436 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigi9930da52018-02-02 16:19:11 +0100437 }
Pier Luigi9930da52018-02-02 16:19:11 +0100438 } finally {
439 mcastUnlock();
440 }
441 }
442
Pierb0328e42018-03-27 11:29:42 -0700443
444 /**
445 * Process sinks to be removed.
446 *
447 * @param source the source connect point
448 * @param mcastIp the ip address of the group
449 * @param newSinks the new sinks to be processed
Pier3bb1f3f2018-04-17 15:50:43 +0200450 * @param prevSinks the previous sinks
Pierb0328e42018-03-27 11:29:42 -0700451 */
452 private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
453 Map<HostId, Set<ConnectPoint>> newSinks,
Pier3bb1f3f2018-04-17 15:50:43 +0200454 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pierb0328e42018-03-27 11:29:42 -0700455 lastMcastChange = Instant.now();
456 mcastLock();
Pierb0328e42018-03-27 11:29:42 -0700457 try {
Pier96f63cb2018-04-17 16:29:56 +0200458 // Verify leadership on the operation
459 if (!mcastUtils.isLeader(mcastIp)) {
460 log.debug("Skip {} due to lack of leadership", mcastIp);
461 return;
462 }
Pier3bb1f3f2018-04-17 15:50:43 +0200463 // Remove the previous ones
464 Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
465 newSinks);
466 sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp));
Pierb0328e42018-03-27 11:29:42 -0700467 // Recover the dual-homed sinks
Pier3bb1f3f2018-04-17 15:50:43 +0200468 Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
469 prevSinks);
Pierb0328e42018-03-27 11:29:42 -0700470 sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
Pierb0328e42018-03-27 11:29:42 -0700471 } finally {
472 mcastUnlock();
Pierb0328e42018-03-27 11:29:42 -0700473 }
474 }
475
Pier Luigi9930da52018-02-02 16:19:11 +0100476 /**
Pier Luigib72201b2018-01-25 16:16:02 +0100477 * Removes a path from source to sink for given multicast group.
478 *
479 * @param source connect point of the multicast source
480 * @param sink connection point of the multicast sink
481 * @param mcastIp multicast group IP address
482 */
483 private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
Pierb0328e42018-03-27 11:29:42 -0700484 IpAddress mcastIp) {
Pier Luigib72201b2018-01-25 16:16:02 +0100485 lastMcastChange = Instant.now();
486 mcastLock();
487 try {
Pierb0328e42018-03-27 11:29:42 -0700488 boolean isLast;
Pier Luigib72201b2018-01-25 16:16:02 +0100489 // When source and sink are on the same device
490 if (source.deviceId().equals(sink.deviceId())) {
491 // Source and sink are on even the same port. There must be something wrong.
492 if (source.port().equals(sink.port())) {
493 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
494 mcastIp, sink, source);
495 return;
496 }
Pierb0328e42018-03-27 11:29:42 -0700497 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Pier Luigib87b8ab2018-03-02 12:53:37 +0100498 if (isLast) {
499 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
500 }
Pier Luigib72201b2018-01-25 16:16:02 +0100501 return;
502 }
Charles Chand55e84d2016-03-30 17:54:24 -0700503
Pier Luigib72201b2018-01-25 16:16:02 +0100504 // Process the egress device
Pierb0328e42018-03-27 11:29:42 -0700505 isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier Luigib72201b2018-01-25 16:16:02 +0100506 if (isLast) {
507 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
508 }
509
510 // If this is the last sink on the device, also update upstream
Pier3ee24552018-03-14 16:47:32 -0700511 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
512 mcastIp, null);
Pier Luigib72201b2018-01-25 16:16:02 +0100513 if (mcastPath.isPresent()) {
514 List<Link> links = Lists.newArrayList(mcastPath.get().links());
515 Collections.reverse(links);
516 for (Link link : links) {
517 if (isLast) {
518 isLast = removePortFromDevice(
519 link.src().deviceId(),
520 link.src().port(),
521 mcastIp,
Pierb0328e42018-03-27 11:29:42 -0700522 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ?
523 source : null)
Pier Luigib72201b2018-01-25 16:16:02 +0100524 );
Pier Luigib87b8ab2018-03-02 12:53:37 +0100525 if (isLast) {
526 mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
527 }
Pier Luigib72201b2018-01-25 16:16:02 +0100528 }
Charles Chand55e84d2016-03-30 17:54:24 -0700529 }
530 }
Pier Luigib72201b2018-01-25 16:16:02 +0100531 } finally {
532 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700533 }
534 }
535
Pierb0328e42018-03-27 11:29:42 -0700536
537 /**
538 * Process sinks to be added.
539 *
540 * @param source the source connect point
541 * @param mcastIp the group IP
542 * @param newSinks the new sinks to be processed
543 * @param allPrevSinks all previous sinks
544 */
545 private void processSinksAddedInternal(ConnectPoint source, IpAddress mcastIp,
546 Map<HostId, Set<ConnectPoint>> newSinks,
547 Set<ConnectPoint> allPrevSinks) {
548 lastMcastChange = Instant.now();
549 mcastLock();
550 try {
Pier96f63cb2018-04-17 16:29:56 +0200551 // Verify leadership on the operation
552 if (!mcastUtils.isLeader(mcastIp)) {
553 log.debug("Skip {} due to lack of leadership", mcastIp);
554 return;
555 }
Pierb0328e42018-03-27 11:29:42 -0700556 // Get the only sinks to be processed (new ones)
557 Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
558 // Install new sinks
559 sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
560 sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
561 } finally {
562 mcastUnlock();
563 }
564 }
565
Charles Chand55e84d2016-03-30 17:54:24 -0700566 /**
567 * Establishes a path from source to sink for given multicast group.
568 *
569 * @param source connect point of the multicast source
570 * @param sink connection point of the multicast sink
571 * @param mcastIp multicast group IP address
572 */
573 private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
Pierb0328e42018-03-27 11:29:42 -0700574 IpAddress mcastIp, List<Path> allPaths) {
Pier Luigib72201b2018-01-25 16:16:02 +0100575 lastMcastChange = Instant.now();
576 mcastLock();
577 try {
Pier Luigib72201b2018-01-25 16:16:02 +0100578 // Process the ingress device
Pierb0328e42018-03-27 11:29:42 -0700579 mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
580 mcastUtils.assignedVlan(source), mcastIp, INGRESS);
Charles Chan2199c302016-04-23 17:36:10 -0700581
Pier Luigib72201b2018-01-25 16:16:02 +0100582 // When source and sink are on the same device
583 if (source.deviceId().equals(sink.deviceId())) {
584 // Source and sink are on even the same port. There must be something wrong.
585 if (source.port().equals(sink.port())) {
586 log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
587 mcastIp, sink, source);
588 return;
589 }
Pierb0328e42018-03-27 11:29:42 -0700590 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
Piere5bff482018-03-07 11:42:50 +0100591 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), INGRESS);
Pier Luigib72201b2018-01-25 16:16:02 +0100592 return;
593 }
Charles Chan2199c302016-04-23 17:36:10 -0700594
Pier Luigib72201b2018-01-25 16:16:02 +0100595 // Find a path. If present, create/update groups and flows for each hop
Pier3ee24552018-03-14 16:47:32 -0700596 Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
597 mcastIp, allPaths);
Pier Luigib72201b2018-01-25 16:16:02 +0100598 if (mcastPath.isPresent()) {
599 List<Link> links = mcastPath.get().links();
Charles Chan2199c302016-04-23 17:36:10 -0700600
Pier37db3692018-03-12 15:00:54 -0700601 // Setup mcast role for ingress
602 mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
603 INGRESS);
604
605 // Setup properly the transit
Pier Luigib72201b2018-01-25 16:16:02 +0100606 links.forEach(link -> {
607 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pierb0328e42018-03-27 11:29:42 -0700608 mcastUtils.assignedVlan(link.src().deviceId()
609 .equals(source.deviceId()) ? source : null));
610 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
611 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigib72201b2018-01-25 16:16:02 +0100612 });
613
Pier37db3692018-03-12 15:00:54 -0700614 // Setup mcast role for the transit
615 links.stream()
616 .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
617 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.dst().deviceId()),
618 TRANSIT));
619
Pier Luigib72201b2018-01-25 16:16:02 +0100620 // Process the egress device
Pierb0328e42018-03-27 11:29:42 -0700621 addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
Pier37db3692018-03-12 15:00:54 -0700622 // Setup mcast role for egress
Pier Luigib72201b2018-01-25 16:16:02 +0100623 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
Piere5bff482018-03-07 11:42:50 +0100624 EGRESS);
Pier Luigib72201b2018-01-25 16:16:02 +0100625 } else {
626 log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
627 source.deviceId(), sink.deviceId());
628 }
629 } finally {
630 mcastUnlock();
Charles Chand55e84d2016-03-30 17:54:24 -0700631 }
632 }
633
634 /**
Charles Chan2199c302016-04-23 17:36:10 -0700635 * Processes the LINK_DOWN event.
636 *
637 * @param affectedLink Link that is going down
638 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100639 public void processLinkDown(Link affectedLink) {
Pier Luigib72201b2018-01-25 16:16:02 +0100640 lastMcastChange = Instant.now();
641 mcastLock();
642 try {
643 // Get groups affected by the link down event
644 getAffectedGroups(affectedLink).forEach(mcastIp -> {
645 // TODO Optimize when the group editing is in place
646 log.debug("Processing link down {} for group {}",
647 affectedLink, mcastIp);
Pier96f63cb2018-04-17 16:29:56 +0200648 // Verify leadership on the operation
649 if (!mcastUtils.isLeader(mcastIp)) {
650 log.debug("Skip {} due to lack of leadership", mcastIp);
651 return;
652 }
Pier Luigieba73a02018-01-16 10:47:50 +0100653
Pier Luigib72201b2018-01-25 16:16:02 +0100654 // Find out the ingress, transit and egress device of affected group
Piere5bff482018-03-07 11:42:50 +0100655 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigib72201b2018-01-25 16:16:02 +0100656 .stream().findAny().orElse(null);
Pier37db3692018-03-12 15:00:54 -0700657 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Piere5bff482018-03-07 11:42:50 +0100658 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pierb0328e42018-03-27 11:29:42 -0700659 ConnectPoint source = mcastUtils.getSource(mcastIp);
Charles Chan8d449862016-05-16 18:44:13 -0700660
Pier37db3692018-03-12 15:00:54 -0700661 // Do not proceed if ingress device or source of this group are missing
662 // If sinks are in other leafs, we have ingress, transit, egress, and source
663 // If sinks are in the same leaf, we have just ingress and source
664 if (ingressDevice == null || source == null) {
665 log.warn("Missing ingress {} or source {} for group {}",
666 ingressDevice, source, mcastIp);
Pier Luigib72201b2018-01-25 16:16:02 +0100667 return;
Charles Chan2199c302016-04-23 17:36:10 -0700668 }
Pier Luigib72201b2018-01-25 16:16:02 +0100669
Pier Luigib72201b2018-01-25 16:16:02 +0100670 // Remove entire transit
Pier37db3692018-03-12 15:00:54 -0700671 transitDevices.forEach(transitDevice ->
Pierb0328e42018-03-27 11:29:42 -0700672 removeGroupFromDevice(transitDevice, mcastIp,
673 mcastUtils.assignedVlan(null)));
Pier Luigib72201b2018-01-25 16:16:02 +0100674
Pier37db3692018-03-12 15:00:54 -0700675 // Remove transit-facing ports on the ingress device
676 removeIngressTransitPorts(mcastIp, ingressDevice, source);
Pier Luigib72201b2018-01-25 16:16:02 +0100677
Pierb0328e42018-03-27 11:29:42 -0700678 // TODO create a shared procedure with DEVICE_DOWN
Pier3ee24552018-03-14 16:47:32 -0700679 // Compute mcast tree for the the egress devices
680 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
681
Pierb0328e42018-03-27 11:29:42 -0700682 // We have to verify, if there are egresses without paths
683 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier3ee24552018-03-14 16:47:32 -0700684 mcastTree.forEach((egressDevice, paths) -> {
Pierb0328e42018-03-27 11:29:42 -0700685 // Let's check if there is at least a path
Pier3ee24552018-03-14 16:47:32 -0700686 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
687 mcastIp, paths);
Pierb0328e42018-03-27 11:29:42 -0700688 // No paths, we have to try with alternative location
689 if (!mcastPath.isPresent()) {
690 notRecovered.add(egressDevice);
691 // We were not able to find an alternative path for this egress
Pier Luigib72201b2018-01-25 16:16:02 +0100692 log.warn("Fail to recover egress device {} from link failure {}",
693 egressDevice, affectedLink);
Pierb0328e42018-03-27 11:29:42 -0700694 removeGroupFromDevice(egressDevice, mcastIp,
695 mcastUtils.assignedVlan(null));
Pier Luigib72201b2018-01-25 16:16:02 +0100696 }
697 });
Pierb0328e42018-03-27 11:29:42 -0700698
699 // Fast path, we can recover all the locations
700 if (notRecovered.isEmpty()) {
701 // Construct a new path for each egress device
702 mcastTree.forEach((egressDevice, paths) -> {
703 // We try to enforce the sinks path on the mcast tree
704 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
705 mcastIp, paths);
706 // If a path is present, let's install it
707 if (mcastPath.isPresent()) {
708 installPath(mcastIp, source, mcastPath.get());
709 }
710 });
711 } else {
712 // Let's try to recover using alternate
713 recoverSinks(egressDevices, notRecovered, mcastIp,
714 ingressDevice, source, true);
715 }
Charles Chan2199c302016-04-23 17:36:10 -0700716 });
Pier Luigib72201b2018-01-25 16:16:02 +0100717 } finally {
718 mcastUnlock();
719 }
Charles Chan2199c302016-04-23 17:36:10 -0700720 }
721
722 /**
Pier Luigieba73a02018-01-16 10:47:50 +0100723 * Process the DEVICE_DOWN event.
724 *
725 * @param deviceDown device going down
726 */
Pier Luigi96fe0772018-02-28 12:10:50 +0100727 public void processDeviceDown(DeviceId deviceDown) {
Pier Luigib72201b2018-01-25 16:16:02 +0100728 lastMcastChange = Instant.now();
729 mcastLock();
730 try {
731 // Get the mcast groups affected by the device going down
732 getAffectedGroups(deviceDown).forEach(mcastIp -> {
733 // TODO Optimize when the group editing is in place
734 log.debug("Processing device down {} for group {}",
735 deviceDown, mcastIp);
Pier96f63cb2018-04-17 16:29:56 +0200736 // Verify leadership on the operation
737 if (!mcastUtils.isLeader(mcastIp)) {
738 log.debug("Skip {} due to lack of leadership", mcastIp);
739 return;
740 }
Pier Luigieba73a02018-01-16 10:47:50 +0100741
Pier Luigib72201b2018-01-25 16:16:02 +0100742 // Find out the ingress, transit and egress device of affected group
Piere5bff482018-03-07 11:42:50 +0100743 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigib72201b2018-01-25 16:16:02 +0100744 .stream().findAny().orElse(null);
Pier37db3692018-03-12 15:00:54 -0700745 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Piere5bff482018-03-07 11:42:50 +0100746 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pierb0328e42018-03-27 11:29:42 -0700747 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigieba73a02018-01-16 10:47:50 +0100748
Pier Luigib72201b2018-01-25 16:16:02 +0100749 // Do not proceed if ingress device or source of this group are missing
750 // If sinks are in other leafs, we have ingress, transit, egress, and source
751 // If sinks are in the same leaf, we have just ingress and source
752 if (ingressDevice == null || source == null) {
753 log.warn("Missing ingress {} or source {} for group {}",
754 ingressDevice, source, mcastIp);
Pier Luigieba73a02018-01-16 10:47:50 +0100755 return;
756 }
Pier Luigieba73a02018-01-16 10:47:50 +0100757
Pier Luigib72201b2018-01-25 16:16:02 +0100758 // If it exists, we have to remove it in any case
Pier37db3692018-03-12 15:00:54 -0700759 if (!transitDevices.isEmpty()) {
Pier Luigib72201b2018-01-25 16:16:02 +0100760 // Remove entire transit
Pier37db3692018-03-12 15:00:54 -0700761 transitDevices.forEach(transitDevice ->
Pierb0328e42018-03-27 11:29:42 -0700762 removeGroupFromDevice(transitDevice, mcastIp,
763 mcastUtils.assignedVlan(null)));
Pier Luigib72201b2018-01-25 16:16:02 +0100764 }
765 // If the ingress is down
766 if (ingressDevice.equals(deviceDown)) {
767 // Remove entire ingress
Pierb0328e42018-03-27 11:29:42 -0700768 removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
Pier Luigib72201b2018-01-25 16:16:02 +0100769 // If other sinks different from the ingress exist
770 if (!egressDevices.isEmpty()) {
771 // Remove all the remaining egress
772 egressDevices.forEach(
Pierb0328e42018-03-27 11:29:42 -0700773 egressDevice -> removeGroupFromDevice(egressDevice, mcastIp,
774 mcastUtils.assignedVlan(null))
Pier Luigib72201b2018-01-25 16:16:02 +0100775 );
Pier Luigieba73a02018-01-16 10:47:50 +0100776 }
Pier Luigib72201b2018-01-25 16:16:02 +0100777 } else {
778 // Egress or transit could be down at this point
Pier37db3692018-03-12 15:00:54 -0700779 // Get the ingress-transit ports if they exist
780 removeIngressTransitPorts(mcastIp, ingressDevice, source);
781
Pier Luigib72201b2018-01-25 16:16:02 +0100782 // One of the egress device is down
783 if (egressDevices.contains(deviceDown)) {
784 // Remove entire device down
Pierb0328e42018-03-27 11:29:42 -0700785 removeGroupFromDevice(deviceDown, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigib72201b2018-01-25 16:16:02 +0100786 // Remove the device down from egress
787 egressDevices.remove(deviceDown);
788 // If there are no more egress and ingress does not have sinks
789 if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
Pier Luigib72201b2018-01-25 16:16:02 +0100790 // We have done
791 return;
792 }
793 }
Pier3ee24552018-03-14 16:47:32 -0700794
795 // Compute mcast tree for the the egress devices
796 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
797
Pierb0328e42018-03-27 11:29:42 -0700798 // We have to verify, if there are egresses without paths
799 Set<DeviceId> notRecovered = Sets.newHashSet();
Pier3ee24552018-03-14 16:47:32 -0700800 mcastTree.forEach((egressDevice, paths) -> {
Pierb0328e42018-03-27 11:29:42 -0700801 // Let's check if there is at least a path
Pier3ee24552018-03-14 16:47:32 -0700802 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
Pierb0328e42018-03-27 11:29:42 -0700803 mcastIp, paths);
804 // No paths, we have to try with alternative location
805 if (!mcastPath.isPresent()) {
806 notRecovered.add(egressDevice);
Pier Luigib72201b2018-01-25 16:16:02 +0100807 // We were not able to find an alternative path for this egress
808 log.warn("Fail to recover egress device {} from device down {}",
809 egressDevice, deviceDown);
Pierb0328e42018-03-27 11:29:42 -0700810 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
Pier Luigib72201b2018-01-25 16:16:02 +0100811 }
812 });
Pierb0328e42018-03-27 11:29:42 -0700813
814 // Fast path, we can recover all the locations
815 if (notRecovered.isEmpty()) {
816 // Construct a new path for each egress device
817 mcastTree.forEach((egressDevice, paths) -> {
818 // We try to enforce the sinks path on the mcast tree
819 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
820 mcastIp, paths);
821 // If a path is present, let's install it
822 if (mcastPath.isPresent()) {
823 installPath(mcastIp, source, mcastPath.get());
824 }
825 });
826 } else {
827 // Let's try to recover using alternate
828 recoverSinks(egressDevices, notRecovered, mcastIp,
829 ingressDevice, source, false);
830 }
Pier Luigib72201b2018-01-25 16:16:02 +0100831 }
832 });
833 } finally {
834 mcastUnlock();
835 }
Pier Luigieba73a02018-01-16 10:47:50 +0100836 }
837
838 /**
Pierb0328e42018-03-27 11:29:42 -0700839 * Try to recover sinks using alternate locations.
840 *
841 * @param egressDevices the original egress devices
842 * @param notRecovered the devices not recovered
843 * @param mcastIp the group address
844 * @param ingressDevice the ingress device
845 * @param source the source connect point
846 * @param isLinkFailure true if it is a link failure, otherwise false
847 */
848 private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
849 IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source,
850 boolean isLinkFailure) {
851 // Recovered devices
852 Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
853 // Total affected sinks
854 Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
855 // Total sinks
856 Set<ConnectPoint> totalSinks = Sets.newHashSet();
857 // Let's compute all the affected sinks and all the sinks
858 notRecovered.forEach(deviceId -> {
859 totalAffectedSinks.addAll(
860 mcastUtils.getAffectedSinks(deviceId, mcastIp)
861 .values()
862 .stream()
863 .flatMap(Collection::stream)
864 .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
865 .collect(Collectors.toSet())
866 );
867 totalSinks.addAll(
868 mcastUtils.getAffectedSinks(deviceId, mcastIp)
869 .values()
870 .stream()
871 .flatMap(Collection::stream)
872 .collect(Collectors.toSet())
873 );
874 });
875
876 // Sinks to be added
877 Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
878 // New egress devices, filtering out the source
879 Set<DeviceId> newEgressDevice = sinksToBeAdded.stream()
880 .map(ConnectPoint::deviceId)
881 .collect(Collectors.toSet());
882 // Let's add the devices recovered from the previous round
883 newEgressDevice.addAll(recovered);
884 // Let's do a copy of the new egresses and filter out the source
885 Set<DeviceId> copyNewEgressDevice = ImmutableSet.copyOf(newEgressDevice);
886 newEgressDevice = newEgressDevice.stream()
887 .filter(deviceId -> !deviceId.equals(ingressDevice))
888 .collect(Collectors.toSet());
889
890 // Re-compute mcast tree for the the egress devices
891 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevice);
892 // if the source was originally in the new locations, add new sinks
893 if (copyNewEgressDevice.contains(ingressDevice)) {
894 sinksToBeAdded.stream()
895 .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
896 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
897 }
898
899 // Construct a new path for each egress device
900 mcastTree.forEach((egressDevice, paths) -> {
901 // We try to enforce the sinks path on the mcast tree
902 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
903 mcastIp, paths);
904 // If a path is present, let's install it
905 if (mcastPath.isPresent()) {
906 // Using recovery procedure
907 if (recovered.contains(egressDevice)) {
908 installPath(mcastIp, source, mcastPath.get());
909 } else {
910 // otherwise we need to threat as new sink
911 sinksToBeAdded.stream()
912 .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
913 .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
914 }
915 } else {
916 // We were not able to find an alternative path for this egress
917 log.warn("Fail to recover egress device {} from {} failure",
918 egressDevice, isLinkFailure ? "Link" : "Device");
919 removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
920 }
921 });
922
923 }
924
925 /**
Pier3bb1f3f2018-04-17 15:50:43 +0200926 * Process all the sinks related to a mcast group and return
927 * the ones to be removed.
928 *
929 * @param mcastIp the group address
930 * @param prevsinks the previous sinks to be evaluated
931 * @param newSinks the new sinks to be evaluted
932 * @return the set of the sinks to be removed
933 */
934 private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
935 Map<HostId, Set<ConnectPoint>> prevsinks,
936 Map<HostId, Set<ConnectPoint>> newSinks) {
937 // Iterate over the sinks in order to build the set
938 // of the connect points to be removed from this group
939 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
940 prevsinks.forEach(((hostId, connectPoints) -> {
941 // We have to check with the existing flows
942 ConnectPoint sinkToBeProcessed = connectPoints.stream()
943 .filter(connectPoint -> isSink(mcastIp, connectPoint))
944 .findFirst().orElse(null);
945 if (sinkToBeProcessed != null) {
946 // If the host has been removed or location has been removed
947 if (!newSinks.containsKey(hostId) ||
948 !newSinks.get(hostId).contains(sinkToBeProcessed)) {
949 sinksToBeProcessed.add(sinkToBeProcessed);
950 }
951 }
952 }));
953 // We have done, return the set
954 return sinksToBeProcessed;
955 }
956
957 /**
Pierb0328e42018-03-27 11:29:42 -0700958 * Process new locations and return the set of sinks to be added
959 * in the context of the recovery.
960 *
Pier3bb1f3f2018-04-17 15:50:43 +0200961 * @param newSinks the remaining sinks
962 * @param prevSinks the previous sinks
Pierb0328e42018-03-27 11:29:42 -0700963 * @return the set of the sinks to be processed
964 */
965 private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
Pier3bb1f3f2018-04-17 15:50:43 +0200966 Map<HostId, Set<ConnectPoint>> newSinks,
967 Map<HostId, Set<ConnectPoint>> prevSinks) {
Pierb0328e42018-03-27 11:29:42 -0700968 // Iterate over the sinks in order to build the set
969 // of the connect points to be served by this group
970 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
Pier3bb1f3f2018-04-17 15:50:43 +0200971 newSinks.forEach((hostId, connectPoints) -> {
Pierb0328e42018-03-27 11:29:42 -0700972 // If it has more than 1 locations
973 if (connectPoints.size() > 1 || connectPoints.size() == 0) {
974 log.debug("Skip {} since sink {} has {} locations",
975 mcastIp, hostId, connectPoints.size());
976 return;
977 }
Pier3bb1f3f2018-04-17 15:50:43 +0200978 // If previously it had two locations, we need to recover it
979 // Filter out if the remaining location is already served
980 if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
Pier06d0a772018-04-19 15:53:20 +0200981 ConnectPoint sinkToBeProcessed = connectPoints.stream()
982 .filter(connectPoint -> !isSink(mcastIp, connectPoint))
983 .findFirst().orElse(null);
984 if (sinkToBeProcessed != null) {
985 sinksToBeProcessed.add(sinkToBeProcessed);
986 }
Pier3bb1f3f2018-04-17 15:50:43 +0200987 }
Pierb0328e42018-03-27 11:29:42 -0700988 });
989 return sinksToBeProcessed;
990 }
991
992 /**
993 * Process all the sinks related to a mcast group and return
994 * the ones to be processed.
995 *
996 * @param source the source connect point
997 * @param mcastIp the group address
998 * @param sinks the sinks to be evaluated
999 * @return the set of the sinks to be processed
1000 */
1001 private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
1002 Map<HostId, Set<ConnectPoint>> sinks) {
1003 // Iterate over the sinks in order to build the set
1004 // of the connect points to be served by this group
1005 final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
1006 sinks.forEach(((hostId, connectPoints) -> {
1007 // If it has more than 2 locations
1008 if (connectPoints.size() > 2 || connectPoints.size() == 0) {
1009 log.debug("Skip {} since sink {} has {} locations",
1010 mcastIp, hostId, connectPoints.size());
1011 return;
1012 }
1013 // If it has one location, just use it
1014 if (connectPoints.size() == 1) {
1015 sinksToBeProcessed.add(connectPoints.stream()
Pier06d0a772018-04-19 15:53:20 +02001016 .findFirst().orElse(null));
Pierb0328e42018-03-27 11:29:42 -07001017 return;
1018 }
1019 // We prefer to reuse existing flows
1020 ConnectPoint sinkToBeProcessed = connectPoints.stream()
Pier3bb1f3f2018-04-17 15:50:43 +02001021 .filter(connectPoint -> isSink(mcastIp, connectPoint))
Pierb0328e42018-03-27 11:29:42 -07001022 .findFirst().orElse(null);
1023 if (sinkToBeProcessed != null) {
1024 sinksToBeProcessed.add(sinkToBeProcessed);
1025 return;
1026 }
1027 // Otherwise we prefer to reuse existing egresses
1028 Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
1029 sinkToBeProcessed = connectPoints.stream()
Pier3bb1f3f2018-04-17 15:50:43 +02001030 .filter(connectPoint -> egresses.contains(connectPoint.deviceId()))
Pierb0328e42018-03-27 11:29:42 -07001031 .findFirst().orElse(null);
1032 if (sinkToBeProcessed != null) {
1033 sinksToBeProcessed.add(sinkToBeProcessed);
1034 return;
1035 }
1036 // Otherwise we prefer a location co-located with the source (if it exists)
1037 sinkToBeProcessed = connectPoints.stream()
1038 .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
1039 .findFirst().orElse(null);
1040 if (sinkToBeProcessed != null) {
1041 sinksToBeProcessed.add(sinkToBeProcessed);
1042 return;
1043 }
1044 // Finally, we randomly pick a new location
1045 sinksToBeProcessed.add(connectPoints.stream()
Pier06d0a772018-04-19 15:53:20 +02001046 .findFirst().orElse(null));
Pierb0328e42018-03-27 11:29:42 -07001047 }));
1048 // We have done, return the set
1049 return sinksToBeProcessed;
1050 }
1051
1052 /**
Pier37db3692018-03-12 15:00:54 -07001053 * Utility method to remove all the ingress transit ports.
1054 *
1055 * @param mcastIp the group ip
1056 * @param ingressDevice the ingress device for this group
1057 * @param source the source connect point
1058 */
1059 private void removeIngressTransitPorts(IpAddress mcastIp, DeviceId ingressDevice,
1060 ConnectPoint source) {
1061 Set<PortNumber> ingressTransitPorts = ingressTransitPort(mcastIp);
1062 ingressTransitPorts.forEach(ingressTransitPort -> {
1063 if (ingressTransitPort != null) {
1064 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
Pierb0328e42018-03-27 11:29:42 -07001065 mcastIp, mcastUtils.assignedVlan(source));
Pier37db3692018-03-12 15:00:54 -07001066 if (isLast) {
1067 mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
1068 }
1069 }
1070 });
1071 }
1072
1073 /**
Charles Chand55e84d2016-03-30 17:54:24 -07001074 * Adds a port to given multicast group on given device. This involves the
1075 * update of L3 multicast group and multicast routing table entry.
1076 *
1077 * @param deviceId device ID
1078 * @param port port to be added
1079 * @param mcastIp multicast group
1080 * @param assignedVlan assigned VLAN ID
1081 */
1082 private void addPortToDevice(DeviceId deviceId, PortNumber port,
Pierb0328e42018-03-27 11:29:42 -07001083 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -07001084 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
Charles Chand55e84d2016-03-30 17:54:24 -07001085 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Pier Luigi21fffd22018-01-19 10:24:53 +01001086 NextObjective newNextObj;
Charles Chan2199c302016-04-23 17:36:10 -07001087 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chand55e84d2016-03-30 17:54:24 -07001088 // First time someone request this mcast group via this device
1089 portBuilder.add(port);
Pier Luigi21fffd22018-01-19 10:24:53 +01001090 // New nextObj
Pierb0328e42018-03-27 11:29:42 -07001091 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi21fffd22018-01-19 10:24:53 +01001092 portBuilder.build(), null).add();
1093 // Store the new port
1094 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001095 } else {
1096 // This device already serves some subscribers of this mcast group
Charles Chan2199c302016-04-23 17:36:10 -07001097 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chand55e84d2016-03-30 17:54:24 -07001098 // Stop if the port is already in the nextobj
Pierb0328e42018-03-27 11:29:42 -07001099 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chand55e84d2016-03-30 17:54:24 -07001100 if (existingPorts.contains(port)) {
1101 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
1102 return;
1103 }
Pier Luigi21fffd22018-01-19 10:24:53 +01001104 // Let's add the port and reuse the previous one
Yuta HIGUCHI0eb68e12018-02-09 18:05:23 -08001105 portBuilder.addAll(existingPorts).add(port);
Pier Luigi21fffd22018-01-19 10:24:53 +01001106 // Reuse previous nextObj
Pierb0328e42018-03-27 11:29:42 -07001107 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi21fffd22018-01-19 10:24:53 +01001108 portBuilder.build(), nextObj.id()).addToExisting();
1109 // Store the final next objective and send only the difference to the driver
1110 mcastNextObjStore.put(mcastStoreKey, newNextObj);
1111 // Add just the new port
1112 portBuilder = ImmutableSet.builder();
1113 portBuilder.add(port);
Pierb0328e42018-03-27 11:29:42 -07001114 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigi21fffd22018-01-19 10:24:53 +01001115 portBuilder.build(), nextObj.id()).addToExisting();
Charles Chand55e84d2016-03-30 17:54:24 -07001116 }
1117 // Create, store and apply the new nextObj and fwdObj
Charles Chan2199c302016-04-23 17:36:10 -07001118 ObjectiveContext context = new DefaultObjectiveContext(
1119 (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
1120 mcastIp, deviceId, port.toLong(), assignedVlan),
1121 (objective, error) ->
1122 log.warn("Failed to add {} on {}/{}, vlan {}: {}",
1123 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pierb0328e42018-03-27 11:29:42 -07001124 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
1125 newNextObj.id()).add(context);
Charles Chand55e84d2016-03-30 17:54:24 -07001126 srManager.flowObjectiveService.next(deviceId, newNextObj);
1127 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001128 }
1129
1130 /**
1131 * Removes a port from given multicast group on given device.
1132 * This involves the update of L3 multicast group and multicast routing
1133 * table entry.
1134 *
1135 * @param deviceId device ID
1136 * @param port port to be added
1137 * @param mcastIp multicast group
1138 * @param assignedVlan assigned VLAN ID
1139 * @return true if this is the last sink on this device
1140 */
1141 private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
Pierb0328e42018-03-27 11:29:42 -07001142 IpAddress mcastIp, VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -07001143 McastStoreKey mcastStoreKey =
1144 new McastStoreKey(mcastIp, deviceId);
Charles Chand55e84d2016-03-30 17:54:24 -07001145 // This device is not serving this multicast group
Charles Chan2199c302016-04-23 17:36:10 -07001146 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
Charles Chand55e84d2016-03-30 17:54:24 -07001147 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1148 return false;
1149 }
Charles Chan2199c302016-04-23 17:36:10 -07001150 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Charles Chand55e84d2016-03-30 17:54:24 -07001151
Pierb0328e42018-03-27 11:29:42 -07001152 Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
Charles Chan2199c302016-04-23 17:36:10 -07001153 // This port does not serve this multicast group
Charles Chand55e84d2016-03-30 17:54:24 -07001154 if (!existingPorts.contains(port)) {
1155 log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
1156 return false;
1157 }
1158 // Copy and modify the ImmutableSet
1159 existingPorts = Sets.newHashSet(existingPorts);
1160 existingPorts.remove(port);
1161
1162 NextObjective newNextObj;
Pier Luigid1be7b12018-01-19 10:24:53 +01001163 ObjectiveContext context;
Charles Chand55e84d2016-03-30 17:54:24 -07001164 ForwardingObjective fwdObj;
1165 if (existingPorts.isEmpty()) {
Pier Luigid1be7b12018-01-19 10:24:53 +01001166 // If this is the last sink, remove flows and last bucket
Charles Chand55e84d2016-03-30 17:54:24 -07001167 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1168 // remove L3MG since there might be other flows/groups refer to
1169 // the same L2IG
Pier Luigid1be7b12018-01-19 10:24:53 +01001170 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -07001171 (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
1172 mcastIp, deviceId, port.toLong(), assignedVlan),
1173 (objective, error) ->
1174 log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
1175 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pierb0328e42018-03-27 11:29:42 -07001176 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan2199c302016-04-23 17:36:10 -07001177 mcastNextObjStore.remove(mcastStoreKey);
Charles Chand55e84d2016-03-30 17:54:24 -07001178 } else {
1179 // If this is not the last sink, update flows and groups
Pier Luigid1be7b12018-01-19 10:24:53 +01001180 context = new DefaultObjectiveContext(
Charles Chan2199c302016-04-23 17:36:10 -07001181 (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
1182 mcastIp, deviceId, port.toLong(), assignedVlan),
1183 (objective, error) ->
1184 log.warn("Failed to update {} on {}/{}, vlan {}: {}",
1185 mcastIp, deviceId, port.toLong(), assignedVlan, error));
Pier Luigid1be7b12018-01-19 10:24:53 +01001186 // Here we store the next objective with the remaining port
Pierb0328e42018-03-27 11:29:42 -07001187 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigid1be7b12018-01-19 10:24:53 +01001188 existingPorts, nextObj.id()).removeFromExisting();
Pierb0328e42018-03-27 11:29:42 -07001189 fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
Charles Chan2199c302016-04-23 17:36:10 -07001190 mcastNextObjStore.put(mcastStoreKey, newNextObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001191 }
Pier Luigid1be7b12018-01-19 10:24:53 +01001192 // Let's modify the next objective removing the bucket
Pierb0328e42018-03-27 11:29:42 -07001193 newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
Pier Luigid1be7b12018-01-19 10:24:53 +01001194 ImmutableSet.of(port), nextObj.id()).removeFromExisting();
1195 srManager.flowObjectiveService.next(deviceId, newNextObj);
1196 srManager.flowObjectiveService.forward(deviceId, fwdObj);
Charles Chand55e84d2016-03-30 17:54:24 -07001197 return existingPorts.isEmpty();
1198 }
1199
Charles Chan2199c302016-04-23 17:36:10 -07001200 /**
1201 * Removes entire group on given device.
1202 *
1203 * @param deviceId device ID
1204 * @param mcastIp multicast group to be removed
1205 * @param assignedVlan assigned VLAN ID
1206 */
1207 private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
Pierb0328e42018-03-27 11:29:42 -07001208 VlanId assignedVlan) {
Charles Chan2199c302016-04-23 17:36:10 -07001209 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
1210 // This device is not serving this multicast group
1211 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1212 log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
1213 return;
1214 }
1215 NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
1216 // NOTE: Rely on GroupStore garbage collection rather than explicitly
1217 // remove L3MG since there might be other flows/groups refer to
1218 // the same L2IG
1219 ObjectiveContext context = new DefaultObjectiveContext(
1220 (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
1221 mcastIp, deviceId, assignedVlan),
1222 (objective, error) ->
1223 log.warn("Failed to remove {} on {}, vlan {}: {}",
1224 mcastIp, deviceId, assignedVlan, error));
Pierb0328e42018-03-27 11:29:42 -07001225 ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
Charles Chan2199c302016-04-23 17:36:10 -07001226 srManager.flowObjectiveService.forward(deviceId, fwdObj);
1227 mcastNextObjStore.remove(mcastStoreKey);
1228 mcastRoleStore.remove(mcastStoreKey);
1229 }
1230
Pier Luigieba73a02018-01-16 10:47:50 +01001231 private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
1232 // Get Links
1233 List<Link> links = mcastPath.links();
Pier37db3692018-03-12 15:00:54 -07001234
1235 // Setup new ingress mcast role
1236 mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).src().deviceId()),
1237 INGRESS);
1238
Pier Luigieba73a02018-01-16 10:47:50 +01001239 // For each link, modify the next on the source device adding the src port
1240 // and a new filter objective on the destination port
1241 links.forEach(link -> {
1242 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
Pierb0328e42018-03-27 11:29:42 -07001243 mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
1244 mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
1245 mcastUtils.assignedVlan(null), mcastIp, null);
Pier Luigieba73a02018-01-16 10:47:50 +01001246 });
Pier37db3692018-03-12 15:00:54 -07001247
1248 // Setup mcast role for the transit
1249 links.stream()
1250 .filter(link -> !link.src().deviceId().equals(source.deviceId()))
1251 .forEach(link -> mcastRoleStore.put(new McastStoreKey(mcastIp, link.src().deviceId()),
1252 TRANSIT));
Charles Chan2199c302016-04-23 17:36:10 -07001253 }
1254
Charles Chand55e84d2016-03-30 17:54:24 -07001255 /**
Pier3ee24552018-03-14 16:47:32 -07001256 * Go through all the paths, looking for shared links to be used
1257 * in the final path computation.
1258 *
1259 * @param egresses egress devices
1260 * @param availablePaths all the available paths towards the egress
1261 * @return shared links between egress devices
1262 */
1263 private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
1264 Map<DeviceId, List<Path>> availablePaths) {
1265 // Length of the shortest path
1266 int minLength = Integer.MAX_VALUE;
1267 int length;
1268 // Current paths
1269 List<Path> currentPaths;
1270 // Verify the source can still reach all the egresses
1271 for (DeviceId egress : egresses) {
1272 // From the source we cannot reach all the sinks
Pierb0328e42018-03-27 11:29:42 -07001273 // just continue and let's figure out after
Pier3ee24552018-03-14 16:47:32 -07001274 currentPaths = availablePaths.get(egress);
1275 if (currentPaths.isEmpty()) {
1276 continue;
1277 }
1278 // Get the length of the first one available,
Pierb0328e42018-03-27 11:29:42 -07001279 // update the min length
Pier3ee24552018-03-14 16:47:32 -07001280 length = currentPaths.get(0).links().size();
1281 if (length < minLength) {
1282 minLength = length;
1283 }
Pier Luigif7049c52018-02-23 19:57:40 +01001284 }
Pier3ee24552018-03-14 16:47:32 -07001285 // If there are no paths
1286 if (minLength == Integer.MAX_VALUE) {
1287 return Collections.emptySet();
1288 }
1289 // Iterate looking for shared links
1290 int index = 0;
1291 // Define the sets for the intersection
1292 Set<Link> sharedLinks = Sets.newHashSet();
1293 Set<Link> currentSharedLinks;
1294 Set<Link> currentLinks;
Pierb0328e42018-03-27 11:29:42 -07001295 DeviceId egressToRemove = null;
Pier3ee24552018-03-14 16:47:32 -07001296 // Let's find out the shared links
1297 while (index < minLength) {
1298 // Initialize the intersection with the paths related to the first egress
1299 currentPaths = availablePaths.get(
1300 egresses.stream()
1301 .findFirst()
1302 .orElse(null)
1303 );
1304 currentSharedLinks = Sets.newHashSet();
1305 // Iterate over the paths and take the "index" links
1306 for (Path path : currentPaths) {
1307 currentSharedLinks.add(path.links().get(index));
1308 }
1309 // Iterate over the remaining egress
1310 for (DeviceId egress : egresses) {
1311 // Iterate over the paths and take the "index" links
1312 currentLinks = Sets.newHashSet();
1313 for (Path path : availablePaths.get(egress)) {
1314 currentLinks.add(path.links().get(index));
1315 }
1316 // Do intersection
1317 currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
1318 // If there are no shared paths exit and record the device to remove
1319 // we have to retry with a subset of sinks
1320 if (currentSharedLinks.isEmpty()) {
Pierb0328e42018-03-27 11:29:42 -07001321 egressToRemove = egress;
Pier3ee24552018-03-14 16:47:32 -07001322 index = minLength;
1323 break;
1324 }
1325 }
1326 sharedLinks.addAll(currentSharedLinks);
1327 index++;
1328 }
1329 // If the shared links is empty and there are egress
1330 // let's retry another time with less sinks, we can
1331 // still build optimal subtrees
Pierb0328e42018-03-27 11:29:42 -07001332 if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
1333 egresses.remove(egressToRemove);
Pier3ee24552018-03-14 16:47:32 -07001334 sharedLinks = exploreMcastTree(egresses, availablePaths);
1335 }
1336 return sharedLinks;
1337 }
1338
1339 /**
1340 * Build Mcast tree having as root the given source and as leaves the given egress points.
1341 *
1342 * @param source source of the tree
1343 * @param sinks leaves of the tree
1344 * @return the computed Mcast tree
1345 */
1346 private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
Pierb0328e42018-03-27 11:29:42 -07001347 Set<ConnectPoint> sinks) {
Pier3ee24552018-03-14 16:47:32 -07001348 // Get the egress devices, remove source from the egress if present
1349 Set<DeviceId> egresses = sinks.stream()
1350 .map(ConnectPoint::deviceId)
1351 .filter(deviceId -> !deviceId.equals(source))
1352 .collect(Collectors.toSet());
1353 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
Pierb0328e42018-03-27 11:29:42 -07001354 // Build final tree and return it as it is
Pier3ee24552018-03-14 16:47:32 -07001355 final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
Pierb0328e42018-03-27 11:29:42 -07001356 // We need to put back the source if it was originally present
1357 sinks.forEach(sink -> {
1358 List<Path> sinkPaths = mcastTree.get(sink.deviceId());
1359 finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
1360 });
Pier3ee24552018-03-14 16:47:32 -07001361 return finalTree;
1362 }
1363
1364 /**
1365 * Build Mcast tree having as root the given source and as leaves the given egress.
1366 *
1367 * @param source source of the tree
1368 * @param egresses leaves of the tree
1369 * @return the computed Mcast tree
1370 */
1371 private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
1372 Set<DeviceId> egresses) {
1373 // Pre-compute all the paths
1374 Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
1375 // No links to enforce
1376 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1377 Collections.emptySet())));
1378 // Explore the topology looking for shared links amongst the egresses
1379 Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
1380 // Remove all the paths from the previous computation
1381 availablePaths.clear();
1382 // Build the final paths enforcing the shared links between egress devices
1383 egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
1384 linksToEnforce)));
1385 return availablePaths;
1386 }
1387
1388 /**
1389 * Gets path from src to dst computed using the custom link weigher.
1390 *
1391 * @param src source device ID
1392 * @param dst destination device ID
1393 * @return list of paths from src to dst
1394 */
1395 private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
1396 // Takes a snapshot of the topology
1397 final Topology currentTopology = topologyService.currentTopology();
1398 // Build a specific link weigher for this path computation
1399 final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
1400 // We will use our custom link weigher for our path
1401 // computations and build the list of valid paths
1402 List<Path> allPaths = Lists.newArrayList(
1403 topologyService.getPaths(currentTopology, src, dst, linkWeigher)
1404 );
1405 // If there are no valid paths, just exit
1406 log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
1407 return allPaths;
Pier Luigif7049c52018-02-23 19:57:40 +01001408 }
1409
Charles Chand55e84d2016-03-30 17:54:24 -07001410 /**
1411 * Gets a path from src to dst.
1412 * If a path was allocated before, returns the allocated path.
1413 * Otherwise, randomly pick one from available paths.
1414 *
1415 * @param src source device ID
1416 * @param dst destination device ID
1417 * @param mcastIp multicast group
Pier3ee24552018-03-14 16:47:32 -07001418 * @param allPaths paths list
Charles Chand55e84d2016-03-30 17:54:24 -07001419 * @return an optional path from src to dst
1420 */
Pier3ee24552018-03-14 16:47:32 -07001421 private Optional<Path> getPath(DeviceId src, DeviceId dst,
1422 IpAddress mcastIp, List<Path> allPaths) {
1423 // Firstly we get all the valid paths, if the supplied are null
1424 if (allPaths == null) {
1425 allPaths = getPaths(src, dst, Collections.emptySet());
1426 }
1427
1428 // If there are no paths just exit
Charles Chand55e84d2016-03-30 17:54:24 -07001429 if (allPaths.isEmpty()) {
Charles Chand55e84d2016-03-30 17:54:24 -07001430 return Optional.empty();
1431 }
1432
Pier Luigibad6d6c2018-01-23 16:06:38 +01001433 // Create a map index of suitablity-to-list of paths. For example
1434 // a path in the list associated to the index 1 shares only the
1435 // first hop and it is less suitable of a path belonging to the index
1436 // 2 that shares leaf-spine.
1437 Map<Integer, List<Path>> eligiblePaths = Maps.newHashMap();
1438 // Some init steps
1439 int nhop;
1440 McastStoreKey mcastStoreKey;
1441 Link hop;
1442 PortNumber srcPort;
1443 Set<PortNumber> existingPorts;
1444 NextObjective nextObj;
1445 // Iterate over paths looking for eligible paths
1446 for (Path path : allPaths) {
1447 // Unlikely, it will happen...
1448 if (!src.equals(path.links().get(0).src().deviceId())) {
1449 continue;
1450 }
1451 nhop = 0;
1452 // Iterate over the links
1453 while (nhop < path.links().size()) {
1454 // Get the link and verify if a next related
1455 // to the src device exist in the store
1456 hop = path.links().get(nhop);
1457 mcastStoreKey = new McastStoreKey(mcastIp, hop.src().deviceId());
1458 // It does not exist in the store, exit
1459 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1460 break;
Charles Chand55e84d2016-03-30 17:54:24 -07001461 }
Pier Luigibad6d6c2018-01-23 16:06:38 +01001462 // Get the output ports on the next
1463 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
Pierb0328e42018-03-27 11:29:42 -07001464 existingPorts = mcastUtils.getPorts(nextObj.next());
Pier Luigibad6d6c2018-01-23 16:06:38 +01001465 // And the src port on the link
1466 srcPort = hop.src().port();
1467 // the src port is not used as output, exit
1468 if (!existingPorts.contains(srcPort)) {
1469 break;
1470 }
1471 nhop++;
1472 }
1473 // n_hop defines the index
1474 if (nhop > 0) {
1475 eligiblePaths.compute(nhop, (index, paths) -> {
1476 paths = paths == null ? Lists.newArrayList() : paths;
1477 paths.add(path);
1478 return paths;
1479 });
Charles Chand55e84d2016-03-30 17:54:24 -07001480 }
1481 }
Pier Luigibad6d6c2018-01-23 16:06:38 +01001482
1483 // No suitable paths
1484 if (eligiblePaths.isEmpty()) {
1485 log.debug("No eligiblePath(s) found from {} to {}", src, dst);
1486 // Otherwise, randomly pick a path
1487 Collections.shuffle(allPaths);
1488 return allPaths.stream().findFirst();
1489 }
1490
1491 // Let's take the best ones
1492 Integer bestIndex = eligiblePaths.keySet()
1493 .stream()
1494 .sorted(Comparator.reverseOrder())
1495 .findFirst().orElse(null);
1496 List<Path> bestPaths = eligiblePaths.get(bestIndex);
1497 log.debug("{} eligiblePath(s) found from {} to {}",
1498 bestPaths.size(), src, dst);
1499 // randomly pick a path on the highest index
1500 Collections.shuffle(bestPaths);
1501 return bestPaths.stream().findFirst();
Charles Chand55e84d2016-03-30 17:54:24 -07001502 }
1503
1504 /**
Charles Chan2199c302016-04-23 17:36:10 -07001505 * Gets device(s) of given role in given multicast group.
1506 *
1507 * @param mcastIp multicast IP
1508 * @param role multicast role
1509 * @return set of device ID or empty set if not found
1510 */
1511 private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
1512 return mcastRoleStore.entrySet().stream()
1513 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
1514 entry.getValue().value() == role)
Pier3ee24552018-03-14 16:47:32 -07001515 .map(Entry::getKey).map(McastStoreKey::deviceId)
Charles Chan2199c302016-04-23 17:36:10 -07001516 .collect(Collectors.toSet());
1517 }
1518
1519 /**
1520 * Gets groups which is affected by the link down event.
1521 *
1522 * @param link link going down
1523 * @return a set of multicast IpAddress
1524 */
1525 private Set<IpAddress> getAffectedGroups(Link link) {
1526 DeviceId deviceId = link.src().deviceId();
1527 PortNumber port = link.src().port();
1528 return mcastNextObjStore.entrySet().stream()
1529 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
Pierb0328e42018-03-27 11:29:42 -07001530 mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
Pier3ee24552018-03-14 16:47:32 -07001531 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Charles Chan2199c302016-04-23 17:36:10 -07001532 .collect(Collectors.toSet());
1533 }
1534
1535 /**
Pier Luigieba73a02018-01-16 10:47:50 +01001536 * Gets groups which are affected by the device down event.
1537 *
1538 * @param deviceId device going down
1539 * @return a set of multicast IpAddress
1540 */
1541 private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
1542 return mcastNextObjStore.entrySet().stream()
1543 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
Pier3ee24552018-03-14 16:47:32 -07001544 .map(Entry::getKey).map(McastStoreKey::mcastIp)
Pier Luigieba73a02018-01-16 10:47:50 +01001545 .collect(Collectors.toSet());
1546 }
1547
1548 /**
Charles Chan2199c302016-04-23 17:36:10 -07001549 * Gets the spine-facing port on ingress device of given multicast group.
1550 *
1551 * @param mcastIp multicast IP
1552 * @return spine-facing port on ingress device
1553 */
Pier37db3692018-03-12 15:00:54 -07001554 private Set<PortNumber> ingressTransitPort(IpAddress mcastIp) {
Piere5bff482018-03-07 11:42:50 +01001555 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Charles Chan2199c302016-04-23 17:36:10 -07001556 .stream().findAny().orElse(null);
Pier37db3692018-03-12 15:00:54 -07001557 ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
Charles Chan2199c302016-04-23 17:36:10 -07001558 if (ingressDevice != null) {
1559 NextObjective nextObj = mcastNextObjStore
1560 .get(new McastStoreKey(mcastIp, ingressDevice)).value();
Pierb0328e42018-03-27 11:29:42 -07001561 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier37db3692018-03-12 15:00:54 -07001562 // Let's find out all the ingress-transit ports
Charles Chan2199c302016-04-23 17:36:10 -07001563 for (PortNumber port : ports) {
1564 // Spine-facing port should have no subnet and no xconnect
Pier Luigi96fe0772018-02-28 12:10:50 +01001565 if (srManager.deviceConfiguration() != null &&
1566 srManager.deviceConfiguration().getPortSubnets(ingressDevice, port).isEmpty() &&
Charles Chanfc5c7802016-05-17 13:13:55 -07001567 !srManager.xConnectHandler.hasXConnect(new ConnectPoint(ingressDevice, port))) {
Pier37db3692018-03-12 15:00:54 -07001568 portBuilder.add(port);
Charles Chan2199c302016-04-23 17:36:10 -07001569 }
1570 }
1571 }
Pier37db3692018-03-12 15:00:54 -07001572 return portBuilder.build();
Charles Chan2199c302016-04-23 17:36:10 -07001573 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001574
1575 /**
Pier Luigieba73a02018-01-16 10:47:50 +01001576 * Verify if the given device has sinks
1577 * for the multicast group.
1578 *
1579 * @param deviceId device Id
1580 * @param mcastIp multicast IP
1581 * @return true if the device has sink for the group.
1582 * False otherwise.
1583 */
1584 private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
1585 if (deviceId != null) {
1586 // Get the nextobjective
1587 Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
1588 new McastStoreKey(mcastIp, deviceId)
1589 );
1590 // If it exists
1591 if (versionedNextObj != null) {
1592 NextObjective nextObj = versionedNextObj.value();
1593 // Retrieves all the output ports
Pierb0328e42018-03-27 11:29:42 -07001594 Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
Pier Luigieba73a02018-01-16 10:47:50 +01001595 // Tries to find at least one port that is not spine-facing
1596 for (PortNumber port : ports) {
1597 // Spine-facing port should have no subnet and no xconnect
Pier Luigi96fe0772018-02-28 12:10:50 +01001598 if (srManager.deviceConfiguration() != null &&
1599 (!srManager.deviceConfiguration().getPortSubnets(deviceId, port).isEmpty() ||
Pier Luigieba73a02018-01-16 10:47:50 +01001600 srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
1601 return true;
1602 }
1603 }
1604 }
1605 }
1606 return false;
1607 }
1608
1609 /**
Pier3bb1f3f2018-04-17 15:50:43 +02001610 * Verify if a given connect point is sink for this group.
1611 *
1612 * @param mcastIp group address
1613 * @param connectPoint connect point to be verified
1614 * @return true if the connect point is sink of the group
1615 */
1616 private boolean isSink(IpAddress mcastIp, ConnectPoint connectPoint) {
1617 // Let's check if we are already serving that location
1618 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
1619 if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
1620 return false;
1621 }
1622 // Get next and check with the port
1623 NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
1624 return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
1625 }
1626
1627 /**
Pier Luigib72201b2018-01-25 16:16:02 +01001628 * Updates filtering objective for given device and port.
1629 * It is called in general when the mcast config has been
1630 * changed.
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001631 *
1632 * @param deviceId device ID
1633 * @param portNum ingress port number
1634 * @param vlanId assigned VLAN ID
1635 * @param install true to add, false to remove
1636 */
Pier Luigi96fe0772018-02-28 12:10:50 +01001637 public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001638 VlanId vlanId, boolean install) {
Pier Luigib72201b2018-01-25 16:16:02 +01001639 lastMcastChange = Instant.now();
1640 mcastLock();
1641 try {
1642 // Iterates over the route and updates properly the filtering objective
1643 // on the source device.
1644 srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
Pier96f63cb2018-04-17 16:29:56 +02001645 log.debug("Update filter for {}", mcastRoute.group());
1646 // Verify leadership on the operation
1647 if (!mcastUtils.isLeader(mcastRoute.group())) {
1648 log.debug("Skip {} due to lack of leadership", mcastRoute.group());
1649 return;
1650 }
Pier3ee24552018-03-14 16:47:32 -07001651 // FIXME To be addressed with multiple sources support
1652 ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
1653 .stream()
1654 .findFirst().orElse(null);
Pier Luigib72201b2018-01-25 16:16:02 +01001655 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
1656 if (install) {
Pierb0328e42018-03-27 11:29:42 -07001657 mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
Pier Luigib72201b2018-01-25 16:16:02 +01001658 } else {
Pierb0328e42018-03-27 11:29:42 -07001659 mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
Pier Luigib72201b2018-01-25 16:16:02 +01001660 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001661 }
Pier Luigib72201b2018-01-25 16:16:02 +01001662 });
1663 } finally {
1664 mcastUnlock();
1665 }
1666 }
1667
1668 /**
1669 * Performs bucket verification operation for all mcast groups in the devices.
1670 * Firstly, it verifies that mcast is stable before trying verification operation.
1671 * Verification consists in creating new nexts with VERIFY operation. Actually,
1672 * the operation is totally delegated to the driver.
1673 */
1674 private final class McastBucketCorrector implements Runnable {
1675
1676 @Override
1677 public void run() {
1678 // Verify if the Mcast has been stable for MCAST_STABLITY_THRESHOLD
1679 if (!isMcastStable()) {
1680 return;
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001681 }
Pier Luigib72201b2018-01-25 16:16:02 +01001682 // Acquires lock
1683 mcastLock();
1684 try {
1685 // Iterates over the routes and verify the related next objectives
1686 srManager.multicastRouteService.getRoutes()
1687 .stream()
1688 .map(McastRoute::group)
1689 .forEach(mcastIp -> {
1690 log.trace("Running mcast buckets corrector for mcast group: {}",
1691 mcastIp);
1692
1693 // For each group we get current information in the store
1694 // and issue a check of the next objectives in place
Piere5bff482018-03-07 11:42:50 +01001695 DeviceId ingressDevice = getDevice(mcastIp, INGRESS)
Pier Luigib72201b2018-01-25 16:16:02 +01001696 .stream().findAny().orElse(null);
Pier37db3692018-03-12 15:00:54 -07001697 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
Piere5bff482018-03-07 11:42:50 +01001698 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
Pier Luigib87b8ab2018-03-02 12:53:37 +01001699 // Get source and sinks from Mcast Route Service and warn about errors
Pierb0328e42018-03-27 11:29:42 -07001700 ConnectPoint source = mcastUtils.getSource(mcastIp);
1701 Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
1702 .flatMap(Collection::stream)
1703 .collect(Collectors.toSet());
Pier Luigib72201b2018-01-25 16:16:02 +01001704
1705 // Do not proceed if ingress device or source of this group are missing
1706 if (ingressDevice == null || source == null) {
Pier Luigib87b8ab2018-03-02 12:53:37 +01001707 if (!sinks.isEmpty()) {
1708 log.warn("Unable to run buckets corrector. " +
1709 "Missing ingress {} or source {} for group {}",
1710 ingressDevice, source, mcastIp);
1711 }
Pier Luigib72201b2018-01-25 16:16:02 +01001712 return;
1713 }
1714
Pier96f63cb2018-04-17 16:29:56 +02001715 // Continue only when this instance is the leader of the group
1716 if (!mcastUtils.isLeader(mcastIp)) {
Pier Luigib72201b2018-01-25 16:16:02 +01001717 log.trace("Unable to run buckets corrector. " +
Pier96f63cb2018-04-17 16:29:56 +02001718 "Skip {} due to lack of leadership", mcastIp);
Pier Luigib72201b2018-01-25 16:16:02 +01001719 return;
1720 }
1721
1722 // Create the set of the devices to be processed
1723 ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
1724 devicesBuilder.add(ingressDevice);
Pier37db3692018-03-12 15:00:54 -07001725 if (!transitDevices.isEmpty()) {
1726 devicesBuilder.addAll(transitDevices);
Pier Luigib72201b2018-01-25 16:16:02 +01001727 }
1728 if (!egressDevices.isEmpty()) {
1729 devicesBuilder.addAll(egressDevices);
1730 }
1731 Set<DeviceId> devicesToProcess = devicesBuilder.build();
1732
1733 // Iterate over the devices
1734 devicesToProcess.forEach(deviceId -> {
1735 McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId);
1736 // If next exists in our store verify related next objective
1737 if (mcastNextObjStore.containsKey(currentKey)) {
1738 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
1739 // Get current ports
Pierb0328e42018-03-27 11:29:42 -07001740 Set<PortNumber> currentPorts = mcastUtils.getPorts(currentNext.next());
Pier Luigib72201b2018-01-25 16:16:02 +01001741 // Rebuild the next objective
Pierb0328e42018-03-27 11:29:42 -07001742 currentNext = mcastUtils.nextObjBuilder(
Pier Luigib72201b2018-01-25 16:16:02 +01001743 mcastIp,
Pierb0328e42018-03-27 11:29:42 -07001744 mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
1745 source : null),
Pier Luigib72201b2018-01-25 16:16:02 +01001746 currentPorts,
1747 currentNext.id()
1748 ).verify();
1749 // Send to the flowobjective service
1750 srManager.flowObjectiveService.next(deviceId, currentNext);
1751 } else {
Pier Luigi83f919b2018-02-15 16:33:08 +01001752 log.warn("Unable to run buckets corrector. " +
Pier Luigib72201b2018-01-25 16:16:02 +01001753 "Missing next for {} and group {}",
1754 deviceId, mcastIp);
1755 }
1756 });
1757
1758 });
1759 } finally {
1760 // Finally, it releases the lock
1761 mcastUnlock();
1762 }
1763
1764 }
Jonghwan Hyun42fe1052017-08-25 17:48:36 -07001765 }
Pier Luigib29144d2018-01-15 18:06:43 +01001766
1767 public Map<McastStoreKey, Integer> getMcastNextIds(IpAddress mcastIp) {
1768 // If mcast ip is present
1769 if (mcastIp != null) {
1770 return mcastNextObjStore.entrySet().stream()
1771 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier3ee24552018-03-14 16:47:32 -07001772 .collect(Collectors.toMap(Entry::getKey,
Pier Luigib29144d2018-01-15 18:06:43 +01001773 entry -> entry.getValue().value().id()));
1774 }
1775 // Otherwise take all the groups
1776 return mcastNextObjStore.entrySet().stream()
Pier3ee24552018-03-14 16:47:32 -07001777 .collect(Collectors.toMap(Entry::getKey,
Pier Luigib29144d2018-01-15 18:06:43 +01001778 entry -> entry.getValue().value().id()));
1779 }
1780
Pierb1fe7382018-04-17 17:25:22 +02001781 /**
1782 * Returns the associated roles to the mcast groups or to the single
1783 * group if mcastIp is present.
1784 *
1785 * @param mcastIp the group ip
1786 * @return the mapping mcastIp-device to mcast role
1787 *
1788 * @deprecated in 1.12 ("Magpie") release.
1789 */
1790 @Deprecated
Pier Luigi96fe0772018-02-28 12:10:50 +01001791 public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
Pier Luigib29144d2018-01-15 18:06:43 +01001792 // If mcast ip is present
1793 if (mcastIp != null) {
1794 return mcastRoleStore.entrySet().stream()
1795 .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
Pier3ee24552018-03-14 16:47:32 -07001796 .collect(Collectors.toMap(Entry::getKey,
Pier Luigib29144d2018-01-15 18:06:43 +01001797 entry -> entry.getValue().value()));
1798 }
1799 // Otherwise take all the groups
1800 return mcastRoleStore.entrySet().stream()
Pier3ee24552018-03-14 16:47:32 -07001801 .collect(Collectors.toMap(Entry::getKey,
Pier Luigib29144d2018-01-15 18:06:43 +01001802 entry -> entry.getValue().value()));
1803 }
1804
Pierb1fe7382018-04-17 17:25:22 +02001805 /**
1806 * Returns the associated paths to the mcast group.
1807 *
1808 * @param mcastIp the group ip
1809 * @return the mapping egress point to mcast path
1810 *
1811 * @deprecated in 1.12 ("Magpie") release.
1812 */
1813 @Deprecated
Pier Luigib29144d2018-01-15 18:06:43 +01001814 public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
1815 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1816 // Get the source
Pierb0328e42018-03-27 11:29:42 -07001817 ConnectPoint source = mcastUtils.getSource(mcastIp);
Pier Luigib29144d2018-01-15 18:06:43 +01001818 // Source cannot be null, we don't know the starting point
1819 if (source != null) {
1820 // Init steps
1821 Set<DeviceId> visited = Sets.newHashSet();
1822 List<ConnectPoint> currentPath = Lists.newArrayList(
1823 source
1824 );
1825 // Build recursively the mcast paths
1826 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1827 }
1828 return mcastPaths;
1829 }
1830
Pierb1fe7382018-04-17 17:25:22 +02001831 /**
1832 * Returns the associated trees to the mcast group.
1833 *
1834 * @param mcastIp the group ip
1835 * @param sourcecp the source connect point
1836 * @return the mapping egress point to mcast path
1837 */
1838 public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
1839 ConnectPoint sourcecp) {
1840 Multimap<ConnectPoint, List<ConnectPoint>> mcastTrees = HashMultimap.create();
1841 // Get the sources
1842 Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
1843
1844 // If we are providing the source, let's filter out
1845 if (sourcecp != null) {
1846 sources = sources.stream()
1847 .filter(source -> source.equals(sourcecp))
1848 .collect(Collectors.toSet());
1849 }
1850
1851 // Source cannot be null, we don't know the starting point
1852 if (!sources.isEmpty()) {
1853 sources.forEach(source -> {
1854 // Init steps
1855 Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
1856 Set<DeviceId> visited = Sets.newHashSet();
1857 List<ConnectPoint> currentPath = Lists.newArrayList(source);
1858 // Build recursively the mcast paths
1859 buildMcastPaths(source.deviceId(), visited, mcastPaths, currentPath, mcastIp);
1860 mcastPaths.forEach(mcastTrees::put);
1861 });
1862 }
1863 return mcastTrees;
1864 }
1865
1866 /**
1867 * Build recursively the mcast paths.
1868 *
1869 * @param toVisit the node to visit
1870 * @param visited the visited nodes
1871 * @param mcastPaths the current mcast paths
1872 * @param currentPath the current path
1873 * @param mcastIp the group ip
1874 */
Pier Luigib29144d2018-01-15 18:06:43 +01001875 private void buildMcastPaths(DeviceId toVisit, Set<DeviceId> visited,
1876 Map<ConnectPoint, List<ConnectPoint>> mcastPaths,
1877 List<ConnectPoint> currentPath, IpAddress mcastIp) {
1878 // If we have visited the node to visit
1879 // there is a loop
1880 if (visited.contains(toVisit)) {
1881 return;
1882 }
1883 // Visit next-hop
1884 visited.add(toVisit);
1885 McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, toVisit);
1886 // Looking for next-hops
1887 if (mcastNextObjStore.containsKey(mcastStoreKey)) {
1888 // Build egress connectpoints
1889 NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
1890 // Get Ports
Pierb0328e42018-03-27 11:29:42 -07001891 Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
Pier Luigib29144d2018-01-15 18:06:43 +01001892 // Build relative cps
1893 ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
1894 outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
1895 Set<ConnectPoint> egressPoints = cpBuilder.build();
1896 // Define other variables for the next steps
1897 Set<Link> egressLinks;
1898 List<ConnectPoint> newCurrentPath;
1899 Set<DeviceId> newVisited;
1900 DeviceId newToVisit;
1901 for (ConnectPoint egressPoint : egressPoints) {
1902 egressLinks = srManager.linkService.getEgressLinks(egressPoint);
1903 // If it does not have egress links, stop
1904 if (egressLinks.isEmpty()) {
1905 // Add the connect points to the path
1906 newCurrentPath = Lists.newArrayList(currentPath);
1907 newCurrentPath.add(0, egressPoint);
1908 // Save in the map
1909 mcastPaths.put(egressPoint, newCurrentPath);
1910 } else {
1911 newVisited = Sets.newHashSet(visited);
1912 // Iterate over the egress links for the next hops
1913 for (Link egressLink : egressLinks) {
1914 // Update to visit
1915 newToVisit = egressLink.dst().deviceId();
1916 // Add the connect points to the path
1917 newCurrentPath = Lists.newArrayList(currentPath);
1918 newCurrentPath.add(0, egressPoint);
1919 newCurrentPath.add(0, egressLink.dst());
1920 // Go to the next hop
1921 buildMcastPaths(newToVisit, newVisited, mcastPaths, newCurrentPath, mcastIp);
1922 }
1923 }
1924 }
1925 }
1926 }
1927
Pier96f63cb2018-04-17 16:29:56 +02001928 /**
1929 * Return the leaders of the mcast groups.
1930 *
1931 * @param mcastIp the group ip
1932 * @return the mapping group-node
1933 */
1934 public Map<IpAddress, NodeId> getMcastLeaders(IpAddress mcastIp) {
1935 return mcastUtils.getMcastLeaders(mcastIp);
1936 }
Charles Chand55e84d2016-03-30 17:54:24 -07001937}