[CORD-2739] Caching in McastHandler
Change-Id: I7cb2aa98f55ce96e5c1992bcada3d29a19b6526c
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index c6bb015..37b27ad 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1538,19 +1538,11 @@
public void event(McastEvent event) {
switch (event.type()) {
case SOURCE_ADDED:
- mcastHandler.processSourceAdded(event);
- break;
case SOURCE_UPDATED:
- mcastHandler.processSourceUpdated(event);
- break;
case SINK_ADDED:
- mcastHandler.processSinkAdded(event);
- break;
case SINK_REMOVED:
- mcastHandler.processSinkRemoved(event);
- break;
case ROUTE_REMOVED:
- mcastHandler.processRouteRemoved(event);
+ mcastHandler.processMcastEvent(event);
break;
case ROUTE_ADDED:
default:
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
new file mode 100644
index 0000000..b0875ca
--- /dev/null
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.mcast;
+
+import org.onlab.packet.IpAddress;
+import org.onosproject.net.ConnectPoint;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Key of the multicast event cache.
+ */
+class McastCacheKey {
+ // The group ip
+ private final IpAddress mcastIp;
+ // The sink connect point
+ private final ConnectPoint sink;
+
+ /**
+ * Constructs a key for multicast event cache.
+ *
+ * @param mcastIp multicast group IP address
+ * @param sink connect point of the sink
+ */
+ public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) {
+ checkNotNull(mcastIp, "mcastIp cannot be null");
+ checkNotNull(sink, "sink cannot be null");
+ checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
+ this.mcastIp = mcastIp;
+ this.sink = sink;
+ }
+
+ /**
+ * Returns the multicast IP address of this key.
+ *
+ * @return multicast IP
+ */
+ public IpAddress mcastIp() {
+ return mcastIp;
+ }
+
+ /**
+ * Returns the sink of this key.
+ *
+ * @return connect point of the sink
+ */
+ public ConnectPoint sink() {
+ return sink;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof McastCacheKey)) {
+ return false;
+ }
+ McastCacheKey that =
+ (McastCacheKey) o;
+ return (Objects.equals(this.mcastIp, that.mcastIp) &&
+ Objects.equals(this.sink, that.sink));
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(mcastIp, sink);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(getClass())
+ .add("mcastIp", mcastIp)
+ .add("sink", sink)
+ .toString();
+ }
+}
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index dc8a36d..51a4524 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -16,6 +16,10 @@
package org.onosproject.segmentrouting.mcast;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -40,6 +44,7 @@
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
@@ -84,6 +89,10 @@
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
+import static org.onosproject.net.mcast.McastEvent.Type.ROUTE_REMOVED;
+import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_ADDED;
+import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_UPDATED;
import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
/**
@@ -99,6 +108,98 @@
private final KryoNamespace.Builder mcastKryo;
private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
+ // Wait time for the cache
+ private static final int WAIT_TIME_MS = 1000;
+ /**
+ * The mcastEventCache is implemented to avoid race condition by giving more time to the
+ * underlying subsystems to process previous calls.
+ */
+ private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
+ // Get group ip, sink and related event
+ IpAddress mcastIp = notification.getKey().mcastIp();
+ ConnectPoint sink = notification.getKey().sink();
+ McastEvent mcastEvent = notification.getValue();
+ RemovalCause cause = notification.getCause();
+ log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
+ mcastIp, sink, mcastEvent, cause);
+ // If it expires or it has been replaced, we deque the event
+ switch (notification.getCause()) {
+ case REPLACED:
+ case EXPIRED:
+ dequeueMcastEvent(mcastEvent);
+ break;
+ default:
+ break;
+ }
+ }).build();
+
+ private void enqueueMcastEvent(McastEvent mcastEvent) {
+ log.debug("Enqueue mcastEvent {}", mcastEvent);
+ final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+ // Let's create the keys of the cache
+ ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
+ // For this event we will have a set of sinks
+ if (mcastEvent.type() == SOURCE_ADDED ||
+ mcastEvent.type() == SOURCE_UPDATED ||
+ mcastEvent.type() == ROUTE_REMOVED) {
+ // Add all the sinks
+ sinksBuilder.addAll(mcastRouteInfo.sinks());
+ } else {
+ // We have just one sink in this case
+ ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
+ // It is always true, unless something of bad happened
+ // in the mcast route store
+ if (sink != null) {
+ sinksBuilder.add(sink);
+ }
+ }
+ // Push the elements in the cache
+ sinksBuilder.build().forEach(sink -> {
+ McastCacheKey cacheKey = new McastCacheKey(mcastRouteInfo.route().group(),
+ sink);
+ mcastEventCache.put(cacheKey, mcastEvent);
+ });
+ }
+
+ private void dequeueMcastEvent(McastEvent mcastEvent) {
+ log.debug("Dequeue mcastEvent {}", mcastEvent);
+ final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+ // Get source, mcast group
+ ConnectPoint source = mcastRouteInfo.source().orElse(null);
+ IpAddress mcastIp = mcastRouteInfo.route().group();
+ // According to the event type let's call the proper method
+ switch (mcastEvent.type()) {
+ case SOURCE_ADDED:
+ // Get all the sinks and process
+ Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
+ sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
+ break;
+ case SOURCE_UPDATED:
+ // Get old source
+ ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
+ // Just the first cached element will be processed
+ processSourceUpdatedInternal(mcastIp, source, oldSource);
+ break;
+ case ROUTE_REMOVED:
+ // Process the route removed, just the first cached element will be processed
+ processRouteRemovedInternal(source, mcastIp);
+ break;
+ case SINK_ADDED:
+ // Get the only sink and process
+ ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
+ processSinkAddedInternal(source, sink, mcastIp);
+ break;
+ case SINK_REMOVED:
+ sink = mcastRouteInfo.sink().orElse(null);
+ processSinkRemovedInternal(source, sink, mcastIp);
+ break;
+ default:
+ break;
+ }
+ }
+
// Mcast lock to serialize local operations
private final Lock mcastLock = new ReentrantLock();
@@ -140,7 +241,7 @@
// Executor for mcast bucket corrector
private ScheduledExecutorService executorService
- = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
+ = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
/**
* Constructs the McastEventHandler.
@@ -170,6 +271,9 @@
executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
MCAST_VERIFY_INTERVAL,
TimeUnit.SECONDS);
+ // Schedule the clean up, this will allow the processing of the expired events
+ executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
+ WAIT_TIME_MS, TimeUnit.MILLISECONDS);
}
/**
@@ -193,97 +297,21 @@
}
/**
- * Processes the SOURCE_ADDED event.
+ * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
+ * SINK_REMOVED and ROUTE_REMOVED events.
*
* @param event McastEvent with SOURCE_ADDED type
*/
- public void processSourceAdded(McastEvent event) {
- log.info("processSourceAdded {}", event);
+ public void processMcastEvent(McastEvent event) {
+ log.info("process {}", event);
+ // Verify if it is a complete event
McastRouteInfo mcastRouteInfo = event.subject();
if (!mcastRouteInfo.isComplete()) {
- log.info("Incompleted McastRouteInfo. Abort.");
+ log.info("Incompleted McastRouteInfo. Abort {}", event.type());
return;
}
- ConnectPoint source = mcastRouteInfo.source().orElse(null);
- Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
- IpAddress mcastIp = mcastRouteInfo.route().group();
-
- sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
- }
-
- /**
- * Processes the SOURCE_UPDATED event.
- *
- * @param event McastEvent with SOURCE_UPDATED type
- */
- public void processSourceUpdated(McastEvent event) {
- log.info("processSourceUpdated {}", event);
- // Get old and new data
- McastRouteInfo mcastRouteInfo = event.subject();
- ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
- mcastRouteInfo = event.prevSubject();
- ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
- // and group ip
- IpAddress mcastIp = mcastRouteInfo.route().group();
- // Process the update event
- processSourceUpdatedInternal(mcastIp, newSource, oldSource);
- }
-
- /**
- * Processes the SINK_ADDED event.
- *
- * @param event McastEvent with SINK_ADDED type
- */
- public void processSinkAdded(McastEvent event) {
- log.info("processSinkAdded {}", event);
- McastRouteInfo mcastRouteInfo = event.subject();
- if (!mcastRouteInfo.isComplete()) {
- log.info("Incompleted McastRouteInfo. Abort.");
- return;
- }
- ConnectPoint source = mcastRouteInfo.source().orElse(null);
- ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
- IpAddress mcastIp = mcastRouteInfo.route().group();
-
- processSinkAddedInternal(source, sink, mcastIp);
- }
-
- /**
- * Processes the SINK_REMOVED event.
- *
- * @param event McastEvent with SINK_REMOVED type
- */
- public void processSinkRemoved(McastEvent event) {
- log.info("processSinkRemoved {}", event);
- McastRouteInfo mcastRouteInfo = event.subject();
- if (!mcastRouteInfo.isComplete()) {
- log.info("Incompleted McastRouteInfo. Abort.");
- return;
- }
- ConnectPoint source = mcastRouteInfo.source().orElse(null);
- ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
- IpAddress mcastIp = mcastRouteInfo.route().group();
-
- processSinkRemovedInternal(source, sink, mcastIp);
- }
-
- /**
- * Processes the ROUTE_REMOVED event.
- *
- * @param event McastEvent with ROUTE_REMOVED type
- */
- public void processRouteRemoved(McastEvent event) {
- log.info("processRouteRemoved {}", event);
- McastRouteInfo mcastRouteInfo = event.subject();
- if (!mcastRouteInfo.source().isPresent()) {
- log.info("Incompleted McastRouteInfo. Abort.");
- return;
- }
- // Get group ip and ingress connect point
- IpAddress mcastIp = mcastRouteInfo.route().group();
- ConnectPoint source = mcastRouteInfo.source().orElse(null);
-
- processRouteRemovedInternal(source, mcastIp);
+ // Just enqueue for now
+ enqueueMcastEvent(event);
}
/**
@@ -318,11 +346,14 @@
NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
Set<PortNumber> outputPorts = getPorts(nextObjective.next());
- // Let's remove old flows and groups
- removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
- // Push new flows and group
- outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
- mcastIp, assignedVlan(newSource)));
+ // This an optimization to avoid unnecessary removal and add
+ if (!assignedVlanFromNext(nextObjective).equals(assignedVlan(newSource))) {
+ // Let's remove old flows and groups
+ removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
+ // Push new flows and group
+ outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
+ mcastIp, assignedVlan(newSource)));
+ }
addFilterToDevice(newSource.deviceId(), newSource.port(),
assignedVlan(newSource), mcastIp);
// Setup mcast roles
@@ -1212,6 +1243,16 @@
}
/**
+ * Gets assigned VLAN according to the value in the meta.
+ *
+ * @param nextObjective nextObjective to analyze
+ * @return assigned VLAN ID
+ */
+ private VlanId assignedVlanFromNext(NextObjective nextObjective) {
+ return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
+ }
+
+ /**
* Gets the spine-facing port on ingress device of given multicast group.
*
* @param mcastIp multicast IP