Removes the Multicast cache and repurposes the Multicast workers
Additionally reduce the locking inside MulticastHandler
Change-Id: Id8dd9aff5322388e3aeec8ebc67465719719514f
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
deleted file mode 100644
index 55d2131..0000000
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.segmentrouting.mcast;
-
-import org.onlab.packet.IpAddress;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.HostId;
-
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Key of the multicast event cache.
- */
-class McastCacheKey {
- // The group ip
- private final IpAddress mcastIp;
- // The sink id
- private final HostId sinkHost;
- // The sink connect point
- private final ConnectPoint sink;
-
- /**
- * Constructs a key for multicast event cache.
- *
- * @param mcastIp multicast group IP address
- * @param sink connect point of the sink
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) {
- checkNotNull(mcastIp, "mcastIp cannot be null");
- checkNotNull(sink, "sink cannot be null");
- checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
- this.mcastIp = mcastIp;
- this.sink = sink;
- this.sinkHost = null;
- }
-
- /**
- * Constructs a key for multicast event cache.
- *
- * @param mcastIp multicast group IP address
- * @param hostId id of the sink
- */
- public McastCacheKey(IpAddress mcastIp, HostId hostId) {
- checkNotNull(mcastIp, "mcastIp cannot be null");
- checkNotNull(hostId, "sink cannot be null");
- checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
- this.mcastIp = mcastIp;
- this.sinkHost = hostId;
- this.sink = null;
- }
-
- /**
- * Returns the multicast IP address of this key.
- *
- * @return multicast IP
- */
- public IpAddress mcastIp() {
- return mcastIp;
- }
-
- /**
- * Returns the sink of this key.
- *
- * @return connect point of the sink
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- public ConnectPoint sink() {
- return sink;
- }
-
- /**
- * Returns the sink of this key.
- *
- * @return host id of the sink
- */
- public HostId sinkHost() {
- return sinkHost;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof McastCacheKey)) {
- return false;
- }
- McastCacheKey that =
- (McastCacheKey) o;
- return (Objects.equals(this.mcastIp, that.mcastIp) &&
- Objects.equals(this.sink, that.sink) &&
- Objects.equals(this.sinkHost, that.sinkHost));
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(mcastIp, sink);
- }
-
- @Override
- public String toString() {
- return toStringHelper(getClass())
- .add("mcastIp", mcastIp)
- .add("sink", sink)
- .add("sinkHost", sinkHost)
- .toString();
- }
-}
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 52618f4..5533413 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -17,10 +17,6 @@
package org.onosproject.segmentrouting.mcast;
import com.google.common.base.Objects;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalNotification;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -76,8 +72,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.util.concurrent.Executors.newScheduledThreadPool;
@@ -98,6 +93,7 @@
* Handles Multicast related events.
*/
public class McastHandler {
+ // Internal elements
private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
private final SegmentRoutingManager srManager;
private final TopologyService topologyService;
@@ -105,152 +101,21 @@
private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
private final DistributedSet<McastFilteringObjStoreKey> mcastFilteringObjStore;
-
- // Wait time for the cache
- private static final int WAIT_TIME_MS = 1000;
-
- //The mcastEventCache is implemented to avoid race condition by giving more time
- // to the underlying subsystems to process previous calls.
- private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
- .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
- IpAddress mcastIp = notification.getKey().mcastIp();
- HostId sink = notification.getKey().sinkHost();
- McastEvent mcastEvent = notification.getValue();
- RemovalCause cause = notification.getCause();
- // If it expires or it has been replaced, we deque the event - no when evicted
- switch (notification.getCause()) {
- case REPLACED:
- case EXPIRED:
- dequeueMcastEvent(mcastEvent);
- break;
- default:
- break;
- }
- }).build();
-
- private void enqueueMcastEvent(McastEvent mcastEvent) {
- final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
- final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
- final IpAddress group = mcastRoutePrevUpdate.route().group();
- ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
- if (mcastEvent.type() == SOURCES_ADDED ||
- mcastEvent.type() == SOURCES_REMOVED) {
- // Current subject and prev just differ for the source connect points
- sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
- } else if (mcastEvent.type() == SINKS_ADDED) {
- mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
- // Get the previous locations and verify if there are changes
- Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
- Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
- prevConnectPoints : Collections.emptySet());
- if (!changes.isEmpty()) {
- sinksBuilder.add(hostId);
- }
- }));
- } else if (mcastEvent.type() == SINKS_REMOVED) {
- mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
- // Get the current locations and verify if there are changes
- Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
- Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
- currentConnectPoints : Collections.emptySet());
- if (!changes.isEmpty()) {
- sinksBuilder.add(hostId);
- }
- }));
- } else if (mcastEvent.type() == ROUTE_REMOVED) {
- // Current subject is null, just take the previous host ids
- sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
- }
- sinksBuilder.build().forEach(sink -> {
- McastCacheKey cacheKey = new McastCacheKey(group, sink);
- mcastEventCache.put(cacheKey, mcastEvent);
- });
- }
-
- private void dequeueMcastEvent(McastEvent mcastEvent) {
- final McastRouteUpdate mcastUpdate = mcastEvent.subject();
- final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
- IpAddress mcastIp = mcastPrevUpdate.route().group();
- Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
- .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
- Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
- .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
- Set<ConnectPoint> sources;
- switch (mcastEvent.type()) {
- case SOURCES_ADDED:
- sources = mcastUpdate.sources()
- .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
- Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
- processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
- break;
- case SOURCES_REMOVED:
- sources = mcastUpdate.sources()
- .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
- Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
- processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
- break;
- case ROUTE_REMOVED:
- processRouteRemovedInternal(prevSources, mcastIp);
- break;
- case SINKS_ADDED:
- processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
- break;
- case SINKS_REMOVED:
- processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
- break;
- default:
- break;
- }
- }
-
- // Mcast lock to serialize local operations
- private final Lock mcastLock = new ReentrantLock();
- private void mcastLock() {
- mcastLock.lock();
- }
- private void mcastUnlock() {
- mcastLock.unlock();
- }
// Stability threshold for Mcast. Seconds
private static final long MCAST_STABLITY_THRESHOLD = 5;
- // Last change done
- private Instant lastMcastChange = Instant.now();
- // Last bucker corrector execution
- private Instant lastBktCorrExecution = Instant.now();
-
- /**
- * Determines if mcast in the network has been stable in the last
- * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
- * to the last mcast change timestamp.
- *
- * @return true if stable
- */
- private boolean isMcastStable() {
- long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
- long now = (long) (Instant.now().toEpochMilli() / 1000.0);
- log.trace("Multicast stable since {}s", now - last);
- return (now - last) > MCAST_STABLITY_THRESHOLD;
- }
-
- /**
- * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
- * by comparing the current time with the last corrector execution.
- *
- * @return true if stable
- */
- private boolean wasBktCorrRunning() {
- long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
- long now = (long) (Instant.now().toEpochMilli() / 1000.0);
- log.trace("McastBucketCorrector executed {}s ago", now - last);
- return (now - last) < MCAST_VERIFY_INTERVAL;
- }
-
// Verify interval for Mcast bucket corrector
private static final long MCAST_VERIFY_INTERVAL = 30;
- // Executor for mcast bucket corrector and for cache
- private ScheduledExecutorService executorService
- = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
+ // Max verify that can be processed at the same time
+ private static final int MAX_VERIFY_ON_FLIGHT = 10;
+ // Last change done
+ private AtomicReference<Instant> lastMcastChange = new AtomicReference<>(Instant.now());
+ // Last bucker corrector execution
+ private AtomicReference<Instant> lastBktCorrExecution = new AtomicReference<>(Instant.now());
+ // Executors for mcast bucket corrector and for the events
+ private ScheduledExecutorService mcastCorrector
+ = newScheduledThreadPool(1, groupedThreads("onos", "m-corrector", log));
+ private ScheduledExecutorService mcastWorker
+ = newScheduledThreadPool(1, groupedThreads("onos", "m-worker-%d", log));
/**
* Constructs the McastEventHandler.
@@ -288,64 +153,88 @@
.build()
.asDistributedSet();
mcastUtils = new McastUtils(srManager, coreAppId, log);
- // Init the executor service, the buckets corrector and schedule the clean up
- executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
- MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
- executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
- WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+ // Init the executor for the buckets corrector
+ mcastCorrector.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
+ MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Determines if mcast in the network has been stable in the last
+ * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
+ * to the last mcast change timestamp.
+ *
+ * @return true if stable
+ */
+ private boolean isMcastStable() {
+ long last = (long) (lastMcastChange.get().toEpochMilli() / 1000.0);
+ long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+ log.trace("Multicast stable since {}s", now - last);
+ return (now - last) > MCAST_STABLITY_THRESHOLD;
+ }
+
+ /**
+ * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
+ * by comparing the current time with the last corrector execution.
+ *
+ * @return true if stable
+ */
+ private boolean wasBktCorrRunning() {
+ long last = (long) (lastBktCorrExecution.get().toEpochMilli() / 1000.0);
+ long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+ log.trace("McastBucketCorrector executed {}s ago", now - last);
+ return (now - last) < MCAST_VERIFY_INTERVAL;
}
/**
* Read initial multicast configuration from mcast store.
*/
public void init() {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
- log.debug("Init group {}", mcastRoute.group());
- if (!mcastUtils.isLeader(mcastRoute.group())) {
- log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+ mcastWorker.execute(this::initInternal);
+ }
+
+ private void initInternal() {
+ lastMcastChange.set(Instant.now());
+ srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ log.debug("Init group {}", mcastRoute.group());
+ if (!mcastUtils.isLeader(mcastRoute.group())) {
+ log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+ return;
+ }
+ McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
+ // For each source process the mcast tree
+ srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
+ Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
+ Set<DeviceId> visited = Sets.newHashSet();
+ List<ConnectPoint> currentPath = Lists.newArrayList(source);
+ mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
+ currentPath, mcastRoute.group(), source);
+ // Get all the sinks and process them
+ Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
+ mcastRouteData.sinks());
+ // Filter out all the working sinks, we do not want to move them
+ // TODO we need a better way to distinguish flows coming from different sources
+ sinks = sinks.stream()
+ .filter(sink -> !mcastPaths.containsKey(sink) ||
+ !isSinkForSource(mcastRoute.group(), sink, source))
+ .collect(Collectors.toSet());
+ if (sinks.isEmpty()) {
+ log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
return;
}
- McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
- // For each source process the mcast tree
- srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
- Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
- Set<DeviceId> visited = Sets.newHashSet();
- List<ConnectPoint> currentPath = Lists.newArrayList(source);
- mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
- currentPath, mcastRoute.group(), source);
- // Get all the sinks and process them
- Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
- mcastRouteData.sinks());
- // Filter out all the working sinks, we do not want to move them
- // TODO we need a better way to distinguish flows coming from different sources
- sinks = sinks.stream()
- .filter(sink -> !mcastPaths.containsKey(sink) ||
- !isSinkForSource(mcastRoute.group(), sink, source))
- .collect(Collectors.toSet());
- if (sinks.isEmpty()) {
- log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
- return;
- }
- Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastRoute.group(),
- source.deviceId(), sinks);
- mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
- mcastRoute.group(), paths));
- });
+ Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastRoute.group(),
+ source.deviceId(), sinks);
+ mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
+ mcastRoute.group(), paths));
});
- } finally {
- mcastUnlock();
- }
+ });
}
/**
* Clean up when deactivating the application.
*/
public void terminate() {
- mcastEventCache.invalidateAll();
- executorService.shutdown();
+ mcastCorrector.shutdown();
+ mcastWorker.shutdown();
mcastNextObjStore.destroy();
mcastRoleStore.destroy();
mcastFilteringObjStore.destroy();
@@ -357,14 +246,46 @@
* Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
* SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
*
- * @param event McastEvent with SOURCE_ADDED type
+ * @param event the multicast event to be processed
*/
public void processMcastEvent(McastEvent event) {
- // If it is a route added, we do not enqueue
+ mcastWorker.execute(() -> processMcastEventInternal(event));
+ }
+
+ private void processMcastEventInternal(McastEvent event) {
+ lastMcastChange.set(Instant.now());
+ // Current subject is null, for ROUTE_REMOVED events
+ final McastRouteUpdate mcastUpdate = event.subject();
+ final McastRouteUpdate mcastPrevUpdate = event.prevSubject();
+ IpAddress mcastIp = mcastPrevUpdate.route().group();
+ Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
+ .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
+ .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ Set<ConnectPoint> sources;
+ // Events handling
if (event.type() == ROUTE_ADDED) {
- processRouteAddedInternal(event.subject().route().group());
+ processRouteAddedInternal(mcastUpdate.route().group());
+ } else if (event.type() == ROUTE_REMOVED) {
+ processRouteRemovedInternal(prevSources, mcastIp);
+ } else if (event.type() == SOURCES_ADDED) {
+ // Current subject and prev just differ for the source connect points
+ sources = mcastUpdate.sources()
+ .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
+ processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
+ } else if (event.type() == SOURCES_REMOVED) {
+ // Current subject and prev just differ for the source connect points
+ sources = mcastUpdate.sources()
+ .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
+ processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
+ } else if (event.type() == SINKS_ADDED) {
+ processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
+ } else if (event.type() == SINKS_REMOVED) {
+ processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
} else {
- enqueueMcastEvent(event);
+ log.warn("Event {} not handled", event);
}
}
@@ -377,23 +298,22 @@
*/
private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Map<HostId, Set<ConnectPoint>> sinks) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing sources added {} for group {}", sources, mcastIp);
- if (!mcastUtils.isLeader(mcastIp)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
- sources.forEach(source -> {
- Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
- Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastIp, source.deviceId(),
- sinksToBeAdded);
- mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
- });
- } finally {
- mcastUnlock();
+ lastMcastChange.set(Instant.now());
+ log.info("Processing sources added {} for group {}", sources, mcastIp);
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
}
+ if (sources.isEmpty()) {
+ log.debug("Skip {} due to empty sources to be added", mcastIp);
+ return;
+ }
+ sources.forEach(source -> {
+ Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
+ Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastIp, source.deviceId(),
+ sinksToBeAdded);
+ mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
+ });
}
/**
@@ -408,119 +328,114 @@
Set<ConnectPoint> remainingSources,
IpAddress mcastIp,
Map<HostId, Set<ConnectPoint>> sinks) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
- if (!mcastUtils.isLeader(mcastIp)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
- if (remainingSources.isEmpty()) {
- log.debug("There are no more sources for {}", mcastIp);
- processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
- return;
- }
- // Skip offline devices
- Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
- .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
+ lastMcastChange.set(Instant.now());
+ log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
+ if (remainingSources.isEmpty()) {
+ log.debug("There are no more sources for {}", mcastIp);
+ processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
+ return;
+ }
+ // Skip offline devices
+ Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
+ .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
+ .collect(Collectors.toSet());
+ if (candidateSources.isEmpty()) {
+ log.debug("Skip {} due to empty sources to be removed", mcastIp);
+ return;
+ }
+ // Let's heal the trees
+ Set<Link> remainingLinks = Sets.newHashSet();
+ Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
+ Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
+ Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
+ totalSources.addAll(remainingSources);
+ // Calculate all the links used by the sources
+ totalSources.forEach(source -> {
+ Set<ConnectPoint> currentSinks = sinks.values()
+ .stream().flatMap(Collection::stream)
+ .filter(sink -> isSinkForSource(mcastIp, sink, source))
.collect(Collectors.toSet());
- if (candidateSources.isEmpty()) {
- log.debug("Skip {} due to empty sources to be removed", mcastIp);
- return;
- }
- // Let's heal the trees
- Set<Link> remainingLinks = Sets.newHashSet();
- Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
- Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
- Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
- totalSources.addAll(remainingSources);
- // Calculate all the links used by the sources
- totalSources.forEach(source -> {
- Set<ConnectPoint> currentSinks = sinks.values()
- .stream().flatMap(Collection::stream)
- .filter(sink -> isSinkForSource(mcastIp, sink, source))
- .collect(Collectors.toSet());
- candidateSinks.put(source, currentSinks);
- currentSinks.forEach(currentSink -> {
- Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
- mcastIp, null, source);
- if (currentPath.isPresent()) {
- if (!candidateSources.contains(source)) {
- remainingLinks.addAll(currentPath.get().links());
- } else {
- candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
- }
+ candidateSinks.put(source, currentSinks);
+ currentSinks.forEach(currentSink -> {
+ Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
+ mcastIp, null, source);
+ if (currentPath.isPresent()) {
+ if (!candidateSources.contains(source)) {
+ remainingLinks.addAll(currentPath.get().links());
+ } else {
+ candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
}
- });
- });
- // Clean transit links
- candidateLinks.forEach((source, currentCandidateLinks) -> {
- Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
- .immutableCopy();
- if (!linksToBeRemoved.isEmpty()) {
- currentCandidateLinks.forEach(link -> {
- DeviceId srcLink = link.src().deviceId();
- // Remove ports only on links to be removed
- if (linksToBeRemoved.contains(link)) {
- removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
- mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
- source : null));
- }
- // Remove role on the candidate links
- mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
- });
}
});
- // Clean ingress and egress
- candidateSources.forEach(source -> {
- Set<ConnectPoint> currentSinks = candidateSinks.get(source);
- currentSinks.forEach(currentSink -> {
- VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
- source : null);
- // Sinks co-located with the source
- if (source.deviceId().equals(currentSink.deviceId())) {
- if (source.port().equals(currentSink.port())) {
- log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
- mcastIp, currentSink, source);
- return;
- }
- // We need to check against the other sources and if it is
- // necessary remove the port from the device - no overlap
- Set<VlanId> otherVlans = remainingSources.stream()
- // Only sources co-located and having this sink
- .filter(remainingSource -> remainingSource.deviceId()
- .equals(source.deviceId()) && candidateSinks.get(remainingSource)
- .contains(currentSink))
- .map(remainingSource -> mcastUtils.assignedVlan(
- remainingSource.deviceId().equals(currentSink.deviceId()) ?
- remainingSource : null)).collect(Collectors.toSet());
- if (!otherVlans.contains(assignedVlan)) {
- removePortFromDevice(currentSink.deviceId(), currentSink.port(),
- mcastIp, assignedVlan);
- }
- mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
- source));
+ });
+ // Clean transit links
+ candidateLinks.forEach((source, currentCandidateLinks) -> {
+ Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
+ .immutableCopy();
+ if (!linksToBeRemoved.isEmpty()) {
+ currentCandidateLinks.forEach(link -> {
+ DeviceId srcLink = link.src().deviceId();
+ // Remove ports only on links to be removed
+ if (linksToBeRemoved.contains(link)) {
+ removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
+ mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
+ source : null));
+ }
+ // Remove role on the candidate links
+ mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
+ });
+ }
+ });
+ // Clean ingress and egress
+ candidateSources.forEach(source -> {
+ Set<ConnectPoint> currentSinks = candidateSinks.get(source);
+ currentSinks.forEach(currentSink -> {
+ VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
+ source : null);
+ // Sinks co-located with the source
+ if (source.deviceId().equals(currentSink.deviceId())) {
+ if (source.port().equals(currentSink.port())) {
+ log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+ mcastIp, currentSink, source);
return;
}
+ // We need to check against the other sources and if it is
+ // necessary remove the port from the device - no overlap
Set<VlanId> otherVlans = remainingSources.stream()
- .filter(remainingSource -> candidateSinks.get(remainingSource)
+ // Only sources co-located and having this sink
+ .filter(remainingSource -> remainingSource.deviceId()
+ .equals(source.deviceId()) && candidateSinks.get(remainingSource)
.contains(currentSink))
.map(remainingSource -> mcastUtils.assignedVlan(
remainingSource.deviceId().equals(currentSink.deviceId()) ?
remainingSource : null)).collect(Collectors.toSet());
- // Sinks on other leaves
if (!otherVlans.contains(assignedVlan)) {
removePortFromDevice(currentSink.deviceId(), currentSink.port(),
mcastIp, assignedVlan);
}
mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
source));
- });
+ return;
+ }
+ Set<VlanId> otherVlans = remainingSources.stream()
+ .filter(remainingSource -> candidateSinks.get(remainingSource)
+ .contains(currentSink))
+ .map(remainingSource -> mcastUtils.assignedVlan(
+ remainingSource.deviceId().equals(currentSink.deviceId()) ?
+ remainingSource : null)).collect(Collectors.toSet());
+ // Sinks on other leaves
+ if (!otherVlans.contains(assignedVlan)) {
+ removePortFromDevice(currentSink.deviceId(), currentSink.port(),
+ mcastIp, assignedVlan);
+ }
+ mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
+ source));
});
- } finally {
- mcastUnlock();
- }
+ });
}
/**
@@ -529,15 +444,10 @@
* @param mcastIp the group address
*/
private void processRouteAddedInternal(IpAddress mcastIp) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing route added for Multicast group {}", mcastIp);
- // Just elect a new leader
- mcastUtils.isLeader(mcastIp);
- } finally {
- mcastUnlock();
- }
+ lastMcastChange.set(Instant.now());
+ log.info("Processing route added for Multicast group {}", mcastIp);
+ // Just elect a new leader
+ mcastUtils.isLeader(mcastIp);
}
/**
@@ -546,44 +456,39 @@
* @param mcastIp multicast group IP address
*/
private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing route removed for group {}", mcastIp);
- if (!mcastUtils.isLeader(mcastIp)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- mcastUtils.withdrawLeader(mcastIp);
- return;
- }
- sources.forEach(source -> {
- // Find out the ingress, transit and egress device of the affected group
- DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
- .stream().findFirst().orElse(null);
- Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
- Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
- // If there are no egress and transit devices, sinks could be only on the ingress
- if (!egressDevices.isEmpty()) {
- egressDevices.forEach(deviceId -> {
- removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
- mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
- });
- }
- if (!transitDevices.isEmpty()) {
- transitDevices.forEach(deviceId -> {
- removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
- mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
- });
- }
- if (ingressDevice != null) {
- removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
- mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
- }
- });
- // Finally, withdraw the leadership
+ lastMcastChange.set(Instant.now());
+ log.info("Processing route removed for group {}", mcastIp);
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
mcastUtils.withdrawLeader(mcastIp);
- } finally {
- mcastUnlock();
+ return;
}
+ sources.forEach(source -> {
+ // Find out the ingress, transit and egress device of the affected group
+ DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
+ .stream().findFirst().orElse(null);
+ Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
+ Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
+ // If there are no egress and transit devices, sinks could be only on the ingress
+ if (!egressDevices.isEmpty()) {
+ egressDevices.forEach(deviceId -> {
+ removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
+ mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
+ });
+ }
+ if (!transitDevices.isEmpty()) {
+ transitDevices.forEach(deviceId -> {
+ removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
+ mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
+ });
+ }
+ if (ingressDevice != null) {
+ removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
+ mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
+ }
+ });
+ // Finally, withdraw the leadership
+ mcastUtils.withdrawLeader(mcastIp);
}
/**
@@ -597,40 +502,39 @@
private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Map<HostId, Set<ConnectPoint>> newSinks,
Map<HostId, Set<ConnectPoint>> prevSinks) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing sinks removed for group {} and for sources {}",
- mcastIp, sources);
- if (!mcastUtils.isLeader(mcastIp)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
- Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
- Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
- sources.forEach(source -> {
- // Save the path associated to the sinks to be removed
- Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
- newSinks, source);
- Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
- sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
- sink.deviceId(), mcastIp,
- null, source)));
- treesToBeRemoved.put(source, treeToBeRemoved);
- // Recover the dual-homed sinks
- Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
- prevSinks, source);
- treesToBeAdded.put(source, sinksToBeRecovered);
- });
- // Remove the sinks taking into account the multiple sources and the original paths
- treesToBeRemoved.forEach((source, tree) ->
- tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
- // Add new sinks according to the recovery procedure
- treesToBeAdded.forEach((source, sinks) ->
- sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
- } finally {
- mcastUnlock();
+ lastMcastChange.set(Instant.now());
+ log.info("Processing sinks removed for group {} and for sources {}",
+ mcastIp, sources);
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
}
+ Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
+ Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
+ sources.forEach(source -> {
+ // Save the path associated to the sinks to be removed
+ Set<ConnectPoint> candidateSinks = processSinksToBeRemoved(mcastIp, prevSinks,
+ newSinks, source);
+ // Skip offline devices
+ Set<ConnectPoint> sinksToBeRemoved = candidateSinks.stream()
+ .filter(sink -> srManager.deviceService.isAvailable(sink.deviceId()))
+ .collect(Collectors.toSet());
+ Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
+ sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
+ sink.deviceId(), mcastIp,
+ null, source)));
+ treesToBeRemoved.put(source, treeToBeRemoved);
+ // Recover the dual-homed sinks
+ Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
+ prevSinks, source);
+ treesToBeAdded.put(source, sinksToBeRecovered);
+ });
+ // Remove the sinks taking into account the multiple sources and the original paths
+ treesToBeRemoved.forEach((source, tree) ->
+ tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
+ // Add new sinks according to the recovery procedure
+ treesToBeAdded.forEach((source, sinks) ->
+ sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
}
/**
@@ -643,49 +547,45 @@
*/
private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
IpAddress mcastIp, Optional<Path> mcastPath) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
- boolean isLast;
- // When source and sink are on the same device
- if (source.deviceId().equals(sink.deviceId())) {
- // Source and sink are on even the same port. There must be something wrong.
- if (source.port().equals(sink.port())) {
- log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
- return;
- }
- isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
- if (isLast) {
- mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
- }
+ lastMcastChange.set(Instant.now());
+ log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
+ boolean isLast;
+ // When source and sink are on the same device
+ if (source.deviceId().equals(sink.deviceId())) {
+ // Source and sink are on even the same port. There must be something wrong.
+ if (source.port().equals(sink.port())) {
+ log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
return;
}
- // Process the egress device
- isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
+ isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
if (isLast) {
mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
}
- // If this is the last sink on the device, also update upstream
- if (mcastPath.isPresent()) {
- List<Link> links = Lists.newArrayList(mcastPath.get().links());
- Collections.reverse(links);
- for (Link link : links) {
+ return;
+ }
+ // Process the egress device
+ isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
+ if (isLast) {
+ mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
+ }
+ // If this is the last sink on the device, also update upstream
+ if (mcastPath.isPresent()) {
+ List<Link> links = Lists.newArrayList(mcastPath.get().links());
+ Collections.reverse(links);
+ for (Link link : links) {
+ if (isLast) {
+ isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
+ mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
if (isLast) {
- isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
- mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
- if (isLast) {
- mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
- }
+ mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
}
}
}
- } finally {
- mcastUnlock();
+ } else {
+ log.warn("Unable to find a path from {} to {}. Abort sinkRemoved", source.deviceId(), sink.deviceId());
}
}
-
/**
* Process sinks to be added.
*
@@ -697,22 +597,17 @@
private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
Map<HostId, Set<ConnectPoint>> newSinks,
Set<ConnectPoint> allPrevSinks) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
- if (!mcastUtils.isLeader(mcastIp)) {
- log.debug("Skip {} due to lack of leadership", mcastIp);
- return;
- }
- sources.forEach(source -> {
- Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
- sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
- sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
- });
- } finally {
- mcastUnlock();
+ lastMcastChange.set(Instant.now());
+ log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
}
+ sources.forEach(source -> {
+ Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
+ sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
+ sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
+ });
}
/**
@@ -724,69 +619,70 @@
*/
private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
IpAddress mcastIp, List<Path> allPaths) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
- // Process the ingress device
- McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
- mcastUtils.assignedVlan(source), mcastIp.isIp4());
- addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
- if (source.deviceId().equals(sink.deviceId())) {
- if (source.port().equals(sink.port())) {
- log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
- mcastIp, sink, source);
- return;
- }
- addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
- mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
+ lastMcastChange.set(Instant.now());
+ log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
+ // Process the ingress device
+ McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+ mcastUtils.assignedVlan(source), mcastIp.isIp4());
+ addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
+ if (source.deviceId().equals(sink.deviceId())) {
+ if (source.port().equals(sink.port())) {
+ log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+ mcastIp, sink, source);
return;
}
- // Find a path. If present, create/update groups and flows for each hop
- Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
- if (mcastPath.isPresent()) {
- List<Link> links = mcastPath.get().links();
- // Setup mcast role for ingress
- mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
- // Setup properly the transit forwarding
- links.forEach(link -> {
- addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
- mcastUtils.assignedVlan(link.src().deviceId()
- .equals(source.deviceId()) ? source : null));
- McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
- mcastUtils.assignedVlan(null), mcastIp.isIp4());
- addFilterToDevice(filteringKey, mcastIp, null);
- });
- // Setup mcast role for the transit
- links.stream()
- .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
- .forEach(link -> {
- log.trace("Transit links {}", link);
- mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
- source), TRANSIT);
- });
- // Process the egress device
- addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
- // Setup mcast role for egress
- mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
- } else {
- log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
- }
- } finally {
- mcastUnlock();
+ addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
+ mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
+ return;
+ }
+ // Find a path. If present, create/update groups and flows for each hop
+ Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
+ if (mcastPath.isPresent()) {
+ List<Link> links = mcastPath.get().links();
+ // Setup mcast role for ingress
+ mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
+ // Setup properly the transit forwarding
+ links.forEach(link -> {
+ addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
+ mcastUtils.assignedVlan(link.src().deviceId()
+ .equals(source.deviceId()) ? source : null));
+ McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
+ mcastUtils.assignedVlan(null), mcastIp.isIp4());
+ addFilterToDevice(filteringKey, mcastIp, null);
+ });
+ // Setup mcast role for the transit
+ links.stream()
+ .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
+ .forEach(link -> {
+ log.trace("Transit links {}", link);
+ mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
+ source), TRANSIT);
+ });
+ // Process the egress device
+ addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
+ // Setup mcast role for egress
+ mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
+ } else {
+ log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
}
}
/**
- * Processes PORT_UPDATED event.
+ * Processes the PORT_UPDATED event.
*
* @param affectedDevice Affected device
* @param affectedPort Affected port
*/
public void processPortUpdate(Device affectedDevice, Port affectedPort) {
+ mcastWorker.execute(() -> processPortUpdateInternal(affectedDevice, affectedPort));
+ }
+
+ private void processPortUpdateInternal(Device affectedDevice, Port affectedPort) {
// Clean the filtering obj store. Edge port case.
+ lastMcastChange.set(Instant.now());
ConnectPoint portDown = new ConnectPoint(affectedDevice.id(), affectedPort.number());
if (!affectedPort.isEnabled()) {
+ log.info("Processing port down {}", portDown);
updateFilterObjStoreByPort(portDown);
}
}
@@ -797,19 +693,18 @@
* @param linkDown Link that is going down
*/
public void processLinkDown(Link linkDown) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- // Get mcast groups affected by the link going down
- Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
- log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
- affectedGroups.forEach(mcastIp -> {
- log.debug("Processing link down {} for group {}", linkDown, mcastIp);
- recoverFailure(mcastIp, linkDown);
- });
- } finally {
- mcastUnlock();
- }
+ mcastWorker.execute(() -> processLinkDownInternal(linkDown));
+ }
+
+ private void processLinkDownInternal(Link linkDown) {
+ lastMcastChange.set(Instant.now());
+ // Get mcast groups affected by the link going down
+ Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
+ log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
+ affectedGroups.forEach(mcastIp -> {
+ log.debug("Processing link down {} for group {}", linkDown, mcastIp);
+ recoverFailure(mcastIp, linkDown);
+ });
}
/**
@@ -818,20 +713,19 @@
* @param deviceDown device going down
*/
public void processDeviceDown(DeviceId deviceDown) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- // Get the mcast groups affected by the device going down
- Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
- log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
- updateFilterObjStoreByDevice(deviceDown);
- affectedGroups.forEach(mcastIp -> {
- log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
- recoverFailure(mcastIp, deviceDown);
- });
- } finally {
- mcastUnlock();
- }
+ mcastWorker.execute(() -> processDeviceDownInternal(deviceDown));
+ }
+
+ private void processDeviceDownInternal(DeviceId deviceDown) {
+ lastMcastChange.set(Instant.now());
+ // Get the mcast groups affected by the device going down
+ Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
+ log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
+ updateFilterObjStoreByDevice(deviceDown);
+ affectedGroups.forEach(mcastIp -> {
+ log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
+ recoverFailure(mcastIp, deviceDown);
+ });
}
/**
@@ -1804,33 +1698,33 @@
*/
public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
VlanId vlanId, boolean install) {
- lastMcastChange = Instant.now();
- mcastLock();
- try {
- // Iterates over the route and updates properly the filtering objective on the source device.
- srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
- log.debug("Update filter for {}", mcastRoute.group());
- if (!mcastUtils.isLeader(mcastRoute.group())) {
- log.debug("Skip {} due to lack of leadership", mcastRoute.group());
- return;
- }
- // Get the sources and for each one update properly the filtering objectives
- Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
- sources.forEach(source -> {
- if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
- if (install) {
- McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
- vlanId, mcastRoute.group().isIp4());
- addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
- } else {
- mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
- }
+ mcastWorker.execute(() -> updateFilterToDeviceInternal(deviceId, portNum, vlanId, install));
+ }
+
+ private void updateFilterToDeviceInternal(DeviceId deviceId, PortNumber portNum,
+ VlanId vlanId, boolean install) {
+ lastMcastChange.set(Instant.now());
+ // Iterates over the route and updates properly the filtering objective on the source device.
+ srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+ log.debug("Update filter for {}", mcastRoute.group());
+ if (!mcastUtils.isLeader(mcastRoute.group())) {
+ log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+ return;
+ }
+ // Get the sources and for each one update properly the filtering objectives
+ Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
+ sources.forEach(source -> {
+ if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
+ if (install) {
+ McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+ vlanId, mcastRoute.group().isIp4());
+ addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
+ } else {
+ mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
}
- });
+ }
});
- } finally {
- mcastUnlock();
- }
+ });
}
/**
@@ -1931,33 +1825,32 @@
* the operation is totally delegated to the driver.
*/
private final class McastBucketCorrector implements Runnable {
- // Internal params
- private static final int MAX_VERIFY_ON_FLIGHT = 10;
private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
// Define the context used for the back pressure mechanism
private final ObjectiveContext context = new DefaultObjectiveContext(
(objective) -> {
synchronized (verifyOnFlight) {
- verifyOnFlight.decrementAndGet();
+ log.trace("Verify {} done", objective.id());
+ verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
verifyOnFlight.notify();
}
},
(objective, error) -> {
synchronized (verifyOnFlight) {
- verifyOnFlight.decrementAndGet();
+ log.trace("Verify {} error {}", objective.id(), error);
+ verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
verifyOnFlight.notify();
}
});
@Override
public void run() {
- if (!isMcastStable() || wasBktCorrRunning()) {
- return;
- }
- mcastLock();
try {
// Iterates over the routes and verify the related next objectives
for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
+ if (!isMcastStable() || wasBktCorrRunning()) {
+ return;
+ }
IpAddress mcastIp = mcastRoute.group();
log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
// Verify leadership on the operation
@@ -1995,9 +1888,7 @@
}
// Create the set of the devices to be processed
ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
- if (!ingressDevices.isEmpty()) {
- devicesBuilder.addAll(ingressDevices);
- }
+ devicesBuilder.addAll(ingressDevices);
if (!transitDevices.isEmpty()) {
devicesBuilder.addAll(transitDevices);
}
@@ -2040,12 +1931,17 @@
}
}
}
+ // Let's wait the group before start the next one
+ synchronized (verifyOnFlight) {
+ while (verifyOnFlight.get() > 0) {
+ verifyOnFlight.wait();
+ }
+ }
}
} catch (InterruptedException e) {
log.warn("BktCorr has been interrupted");
} finally {
- lastBktCorrExecution = Instant.now();
- mcastUnlock();
+ lastBktCorrExecution.set(Instant.now());
}
}
}