[CORD-2739] Caching in McastHandler

Change-Id: I7cb2aa98f55ce96e5c1992bcada3d29a19b6526c
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index c6bb015..37b27ad 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1538,19 +1538,11 @@
         public void event(McastEvent event) {
             switch (event.type()) {
                 case SOURCE_ADDED:
-                    mcastHandler.processSourceAdded(event);
-                    break;
                 case SOURCE_UPDATED:
-                    mcastHandler.processSourceUpdated(event);
-                    break;
                 case SINK_ADDED:
-                    mcastHandler.processSinkAdded(event);
-                    break;
                 case SINK_REMOVED:
-                    mcastHandler.processSinkRemoved(event);
-                    break;
                 case ROUTE_REMOVED:
-                    mcastHandler.processRouteRemoved(event);
+                    mcastHandler.processMcastEvent(event);
                     break;
                 case ROUTE_ADDED:
                 default:
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
new file mode 100644
index 0000000..b0875ca
--- /dev/null
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.mcast;
+
+import org.onlab.packet.IpAddress;
+import org.onosproject.net.ConnectPoint;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Key of the multicast event cache.
+ */
+class McastCacheKey {
+    // The group ip
+    private final IpAddress mcastIp;
+    // The sink connect point
+    private final ConnectPoint sink;
+
+    /**
+     * Constructs a key for multicast event cache.
+     *
+     * @param mcastIp multicast group IP address
+     * @param sink connect point of the sink
+     */
+    public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) {
+        checkNotNull(mcastIp, "mcastIp cannot be null");
+        checkNotNull(sink, "sink cannot be null");
+        checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
+        this.mcastIp = mcastIp;
+        this.sink = sink;
+    }
+
+    /**
+     * Returns the multicast IP address of this key.
+     *
+     * @return multicast IP
+     */
+    public IpAddress mcastIp() {
+        return mcastIp;
+    }
+
+    /**
+     * Returns the sink of this key.
+     *
+     * @return connect point of the sink
+     */
+    public ConnectPoint sink() {
+        return sink;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof McastCacheKey)) {
+            return false;
+        }
+        McastCacheKey that =
+                (McastCacheKey) o;
+        return (Objects.equals(this.mcastIp, that.mcastIp) &&
+                Objects.equals(this.sink, that.sink));
+    }
+
+    @Override
+    public int hashCode() {
+         return Objects.hash(mcastIp, sink);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(getClass())
+                .add("mcastIp", mcastIp)
+                .add("sink", sink)
+                .toString();
+    }
+}
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index dc8a36d..51a4524 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -16,6 +16,10 @@
 
 package org.onosproject.segmentrouting.mcast;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -40,6 +44,7 @@
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
 import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
@@ -84,6 +89,10 @@
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
+import static org.onosproject.net.mcast.McastEvent.Type.ROUTE_REMOVED;
+import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_ADDED;
+import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_UPDATED;
 import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
 
 /**
@@ -99,6 +108,98 @@
     private final KryoNamespace.Builder mcastKryo;
     private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
 
+    // Wait time for the cache
+    private static final int WAIT_TIME_MS = 1000;
+    /**
+     * The mcastEventCache is implemented to avoid race condition by giving more time to the
+     * underlying subsystems to process previous calls.
+     */
+    private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
+            .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
+            .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
+                // Get group ip, sink and related event
+                IpAddress mcastIp = notification.getKey().mcastIp();
+                ConnectPoint sink = notification.getKey().sink();
+                McastEvent mcastEvent = notification.getValue();
+                RemovalCause cause = notification.getCause();
+                log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
+                          mcastIp, sink, mcastEvent, cause);
+                // If it expires or it has been replaced, we deque the event
+                switch (notification.getCause()) {
+                    case REPLACED:
+                    case EXPIRED:
+                        dequeueMcastEvent(mcastEvent);
+                        break;
+                    default:
+                        break;
+                }
+            }).build();
+
+    private void enqueueMcastEvent(McastEvent mcastEvent) {
+        log.debug("Enqueue mcastEvent {}", mcastEvent);
+        final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+        // Let's create the keys of the cache
+        ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
+        // For this event we will have a set of sinks
+        if (mcastEvent.type() == SOURCE_ADDED ||
+                mcastEvent.type() == SOURCE_UPDATED ||
+                mcastEvent.type() == ROUTE_REMOVED) {
+            // Add all the sinks
+            sinksBuilder.addAll(mcastRouteInfo.sinks());
+        } else {
+            // We have just one sink in this case
+            ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
+            // It is always true, unless something of bad happened
+            // in the mcast route store
+            if (sink != null) {
+                sinksBuilder.add(sink);
+            }
+        }
+        // Push the elements in the cache
+        sinksBuilder.build().forEach(sink -> {
+            McastCacheKey cacheKey = new McastCacheKey(mcastRouteInfo.route().group(),
+                                                       sink);
+            mcastEventCache.put(cacheKey, mcastEvent);
+        });
+    }
+
+    private void dequeueMcastEvent(McastEvent mcastEvent) {
+        log.debug("Dequeue mcastEvent {}", mcastEvent);
+        final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+        // Get source, mcast group
+        ConnectPoint source = mcastRouteInfo.source().orElse(null);
+        IpAddress mcastIp = mcastRouteInfo.route().group();
+        // According to the event type let's call the proper method
+        switch (mcastEvent.type()) {
+            case SOURCE_ADDED:
+                // Get all the sinks and process
+                Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
+                sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
+                break;
+            case SOURCE_UPDATED:
+                // Get old source
+                ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
+                // Just the first cached element will be processed
+                processSourceUpdatedInternal(mcastIp, source, oldSource);
+                break;
+            case ROUTE_REMOVED:
+                // Process the route removed, just the first cached element will be processed
+                processRouteRemovedInternal(source, mcastIp);
+                break;
+            case SINK_ADDED:
+                // Get the only sink and process
+                ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
+                processSinkAddedInternal(source, sink, mcastIp);
+                break;
+            case SINK_REMOVED:
+                sink = mcastRouteInfo.sink().orElse(null);
+                processSinkRemovedInternal(source, sink, mcastIp);
+                break;
+            default:
+                break;
+        }
+    }
+
     // Mcast lock to serialize local operations
     private final Lock mcastLock = new ReentrantLock();
 
@@ -140,7 +241,7 @@
 
     // Executor for mcast bucket corrector
     private ScheduledExecutorService executorService
-            = newScheduledThreadPool(1, groupedThreads("mcastBktCorrector", "mcastbktC-%d", log));
+            = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
 
     /**
      * Constructs the McastEventHandler.
@@ -170,6 +271,9 @@
         executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
                                                MCAST_VERIFY_INTERVAL,
                                                TimeUnit.SECONDS);
+        // Schedule the clean up, this will allow the processing of the expired events
+        executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
+                                            WAIT_TIME_MS, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -193,97 +297,21 @@
     }
 
     /**
-     * Processes the SOURCE_ADDED event.
+     * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
+     * SINK_REMOVED and ROUTE_REMOVED events.
      *
      * @param event McastEvent with SOURCE_ADDED type
      */
-    public void processSourceAdded(McastEvent event) {
-        log.info("processSourceAdded {}", event);
+    public void processMcastEvent(McastEvent event) {
+        log.info("process {}", event);
+        // Verify if it is a complete event
         McastRouteInfo mcastRouteInfo = event.subject();
         if (!mcastRouteInfo.isComplete()) {
-            log.info("Incompleted McastRouteInfo. Abort.");
+            log.info("Incompleted McastRouteInfo. Abort {}", event.type());
             return;
         }
-        ConnectPoint source = mcastRouteInfo.source().orElse(null);
-        Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
-        IpAddress mcastIp = mcastRouteInfo.route().group();
-
-        sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
-    }
-
-    /**
-     * Processes the SOURCE_UPDATED event.
-     *
-     * @param event McastEvent with SOURCE_UPDATED type
-     */
-    public void processSourceUpdated(McastEvent event) {
-        log.info("processSourceUpdated {}", event);
-        // Get old and new data
-        McastRouteInfo mcastRouteInfo = event.subject();
-        ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
-        mcastRouteInfo = event.prevSubject();
-        ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
-        // and group ip
-        IpAddress mcastIp = mcastRouteInfo.route().group();
-        // Process the update event
-        processSourceUpdatedInternal(mcastIp, newSource, oldSource);
-    }
-
-    /**
-     * Processes the SINK_ADDED event.
-     *
-     * @param event McastEvent with SINK_ADDED type
-     */
-    public void processSinkAdded(McastEvent event) {
-        log.info("processSinkAdded {}", event);
-        McastRouteInfo mcastRouteInfo = event.subject();
-        if (!mcastRouteInfo.isComplete()) {
-            log.info("Incompleted McastRouteInfo. Abort.");
-            return;
-        }
-        ConnectPoint source = mcastRouteInfo.source().orElse(null);
-        ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
-        IpAddress mcastIp = mcastRouteInfo.route().group();
-
-        processSinkAddedInternal(source, sink, mcastIp);
-    }
-
-    /**
-     * Processes the SINK_REMOVED event.
-     *
-     * @param event McastEvent with SINK_REMOVED type
-     */
-    public void processSinkRemoved(McastEvent event) {
-        log.info("processSinkRemoved {}", event);
-        McastRouteInfo mcastRouteInfo = event.subject();
-        if (!mcastRouteInfo.isComplete()) {
-            log.info("Incompleted McastRouteInfo. Abort.");
-            return;
-        }
-        ConnectPoint source = mcastRouteInfo.source().orElse(null);
-        ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
-        IpAddress mcastIp = mcastRouteInfo.route().group();
-
-        processSinkRemovedInternal(source, sink, mcastIp);
-    }
-
-    /**
-     * Processes the ROUTE_REMOVED event.
-     *
-     * @param event McastEvent with ROUTE_REMOVED type
-     */
-    public void processRouteRemoved(McastEvent event) {
-        log.info("processRouteRemoved {}", event);
-        McastRouteInfo mcastRouteInfo = event.subject();
-        if (!mcastRouteInfo.source().isPresent()) {
-            log.info("Incompleted McastRouteInfo. Abort.");
-            return;
-        }
-        // Get group ip and ingress connect point
-        IpAddress mcastIp = mcastRouteInfo.route().group();
-        ConnectPoint source = mcastRouteInfo.source().orElse(null);
-
-        processRouteRemovedInternal(source, mcastIp);
+        // Just enqueue for now
+        enqueueMcastEvent(event);
     }
 
     /**
@@ -318,11 +346,14 @@
             NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
             Set<PortNumber> outputPorts = getPorts(nextObjective.next());
 
-            // Let's remove old flows and groups
-            removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
-            // Push new flows and group
-            outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
-                                                              mcastIp, assignedVlan(newSource)));
+            // This an optimization to avoid unnecessary removal and add
+            if (!assignedVlanFromNext(nextObjective).equals(assignedVlan(newSource))) {
+                // Let's remove old flows and groups
+                removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
+                // Push new flows and group
+                outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
+                                                                  mcastIp, assignedVlan(newSource)));
+            }
             addFilterToDevice(newSource.deviceId(), newSource.port(),
                               assignedVlan(newSource), mcastIp);
             // Setup mcast roles
@@ -1212,6 +1243,16 @@
     }
 
     /**
+     * Gets assigned VLAN according to the value in the meta.
+     *
+     * @param nextObjective nextObjective to analyze
+     * @return assigned VLAN ID
+     */
+    private VlanId assignedVlanFromNext(NextObjective nextObjective) {
+        return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
+    }
+
+    /**
      * Gets the spine-facing port on ingress device of given multicast group.
      *
      * @param mcastIp multicast IP