Provides a separate executor to handle map events.

Includes also several code optimizations

Change-Id: I7c4916b0f746379213f239766e912ad696e2f56f
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index be1dafc..b4440f1 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -15,13 +15,18 @@
  */
 package org.onosproject.segmentrouting.xconnect.impl;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.codec.CodecService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -77,10 +82,8 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -113,6 +116,12 @@
     public FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     public MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.OPTIONAL)
@@ -125,7 +134,7 @@
     HostService hostService;
 
     private static final String APP_NAME = "org.onosproject.xconnect";
-    private static final String ERROR_NOT_MASTER = "Not master controller";
+    private static final String ERROR_NOT_LEADER = "Not leader controller";
 
     private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
 
@@ -137,14 +146,14 @@
     private ConsistentMap<VlanNextObjectiveStoreKey, List<PortNumber>> xconnectMulticastPortsStore;
 
     private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
-    private final DeviceListener deviceListener = new InternalDeviceListener();
+    private ExecutorService xConnectExecutor;
 
+    private final DeviceListener deviceListener = new InternalDeviceListener();
     private ExecutorService deviceEventExecutor;
 
     private final HostListener hostListener = new InternalHostListener();
     private ExecutorService hostEventExecutor;
 
-
     @Activate
     void activate() {
         appId = coreService.registerApplication(APP_NAME);
@@ -161,7 +170,9 @@
                 .withRelaxedReadConsistency()
                 .withSerializer(Serializer.using(serializer.build()))
                 .build();
-        xconnectStore.addListener(xconnectListener);
+        xConnectExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("sr-xconnect-event", "%d", log));
+        xconnectStore.addListener(xconnectListener, xConnectExecutor);
 
         xconnectNextObjStore = storageService.<XconnectKey, Integer>consistentMapBuilder()
                 .withName("onos-sr-xconnect-next")
@@ -180,12 +191,10 @@
 
         deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
                 groupedThreads("sr-xconnect-device-event", "%d", log));
-
         deviceService.addListener(deviceListener);
 
         hostEventExecutor = Executors.newSingleThreadExecutor(
                 groupedThreads("sr-xconnect-host-event", "%d", log));
-
         hostService.addListener(hostListener);
 
         log.info("Started");
@@ -200,6 +209,7 @@
 
         deviceEventExecutor.shutdown();
         hostEventExecutor.shutdown();
+        xConnectExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -235,9 +245,8 @@
 
     @Override
     public boolean hasXconnect(ConnectPoint cp) {
-        return getXconnects().stream().anyMatch(desc ->
-                                                        desc.key().deviceId().equals(cp.deviceId())
-                                                                && desc.ports().contains(cp.port())
+        return getXconnects().stream().anyMatch(desc -> desc.key().deviceId().equals(cp.deviceId())
+                && desc.ports().contains(cp.port())
         );
     }
 
@@ -245,15 +254,14 @@
     public List<VlanId> getXconnectVlans(DeviceId deviceId, PortNumber port) {
         return getXconnects().stream()
                 .filter(desc -> desc.key().deviceId().equals(deviceId) && desc.ports().contains(port))
-                .map(XconnectDesc::key)
-                .map(XconnectKey::vlanId)
+                .map(desc -> desc.key().vlanId())
                 .collect(Collectors.toList());
     }
 
     @Override
     public boolean isXconnectVlan(DeviceId deviceId, VlanId vlanId) {
-        return getXconnects().stream()
-                .anyMatch(desc -> desc.key().deviceId().equals(deviceId) && desc.key().vlanId().equals(vlanId));
+        XconnectKey key = new XconnectKey(deviceId, vlanId);
+        return Versioned.valueOrNull(xconnectStore.get(key)) != null;
     }
 
     @Override
@@ -266,12 +274,8 @@
     }
 
     @Override
-    public int getNextId(final DeviceId deviceId, final VlanId vlanId) {
-        Optional<Integer> nextObjective = getNext().entrySet().stream()
-                .filter(d -> d.getKey().deviceId().equals(deviceId) && d.getKey().vlanId().equals(vlanId))
-                .findFirst()
-                .map(Map.Entry::getValue);
-        return nextObjective.isPresent() ? nextObjective.get() : -1;
+    public int getNextId(DeviceId deviceId, VlanId vlanId) {
+        return Versioned.valueOrElse(xconnectNextObjStore.get(new XconnectKey(deviceId, vlanId)), -1);
     }
 
     @Override
@@ -307,29 +311,32 @@
     }
 
     private class InternalDeviceListener implements DeviceListener {
+        // Offload the execution to an executor and then process the event
+        // if this instance is the leader of the device
         @Override
         public void event(DeviceEvent event) {
             deviceEventExecutor.execute(() -> {
                 DeviceId deviceId = event.subject().id();
-                if (!mastershipService.isLocalMaster(deviceId)) {
+                // Just skip if we are not the leader
+                if (!isLocalLeader(deviceId)) {
+                    log.debug("Not the leader of {}. Skip event {}", deviceId, event);
                     return;
                 }
-
-                switch (event.type()) {
-                    case DEVICE_ADDED:
-                    case DEVICE_AVAILABILITY_CHANGED:
-                    case DEVICE_UPDATED:
-                        if (deviceService.isAvailable(deviceId)) {
-                            init(deviceId);
-                        } else {
-                            cleanup(deviceId);
-                        }
-                        break;
-                    default:
-                        break;
+                // Populate or revoke according to the device availability
+                if (deviceService.isAvailable(deviceId)) {
+                    init(deviceId);
+                } else {
+                    cleanup(deviceId);
                 }
             });
         }
+        // We want to manage only a subset of events and if we are the leader
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return event.type() == DeviceEvent.Type.DEVICE_ADDED ||
+                    event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+                    event.type() == DeviceEvent.Type.DEVICE_UPDATED;
+        }
     }
 
     private class InternalHostListener implements HostListener {
@@ -369,7 +376,8 @@
                                                                                      xconnectVlan,
                                                                                      Collections.singletonList(
                                                                                              location.port())));
-                               // Ensure pair-port attached to xconnect vlan flooding group at dual home failed device.
+                                    // Ensure pair-port attached to xconnect vlan flooding group
+                                    // at dual home failed device.
                                     updateL2Flooding(prevLocation.deviceId(), pairLocalPort.get(), xconnectVlan, true);
                                 });
                             }
@@ -389,17 +397,16 @@
                                     // Remove recovered dual homed port from vlan L2 multicast group
                                     prevLocations.stream()
                                             .filter(prevLocation -> prevLocation.deviceId().equals(pairDeviceId.get()))
-                                            .forEach(prevLocation -> revokeL2Multicast(prevLocation.deviceId(),
-                                                                                 srService.getPairLocalPort(
-                                                                                       prevLocation.deviceId()).get(),
-                                                                                       xconnectVlan,
-                                                                      Collections.singletonList(newLocation.port()))
+                                            .forEach(prevLocation -> revokeL2Multicast(
+                                                    prevLocation.deviceId(),
+                                                    xconnectVlan,
+                                                    Collections.singletonList(newLocation.port()))
                                             );
 
-                                 // Remove pair-port from vlan's flooding group at dual home restored device,if needed.
-                                    if (!hasAccessPortInMulticastGroup(newLocation.deviceId(),
-                                                                       xconnectVlan,
-                                                                       pairLocalPort.get())) {
+                                    // Remove pair-port from vlan's flooding group at dual home
+                                    // restored device, if needed.
+                                    if (!hasAccessPortInMulticastGroup(new VlanNextObjectiveStoreKey(
+                                            newLocation.deviceId(), xconnectVlan), pairLocalPort.get())) {
                                         updateL2Flooding(newLocation.deviceId(),
                                                          pairLocalPort.get(),
                                                          xconnectVlan,
@@ -444,8 +451,8 @@
      * @param ports a set of ports to be cross-connected
      */
     private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
-        if (!mastershipService.isLocalMaster(key.deviceId())) {
-            log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+        if (!isLocalLeader(key.deviceId())) {
+            log.debug("Abort populating XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
         }
 
@@ -480,8 +487,8 @@
      * @param ports XConnect ports
      */
     private int populateNext(XconnectKey key, Set<PortNumber> ports) {
-        if (xconnectNextObjStore.containsKey(key)) {
-            int nextId = xconnectNextObjStore.get(key).value();
+        int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
+        if (nextId != -1) {
             log.debug("NextObj for {} found, id={}", key, nextId);
             return nextId;
         } else {
@@ -538,14 +545,14 @@
      * @param ports XConnect ports
      */
     private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
-        if (!mastershipService.isLocalMaster(key.deviceId())) {
-            log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+        if (!isLocalLeader(key.deviceId())) {
+            log.debug("Abort revoking XConnect {}: {}", key, ERROR_NOT_LEADER);
             return;
         }
 
         revokeFilter(key, ports);
-        if (xconnectNextObjStore.containsKey(key)) {
-            int nextId = xconnectNextObjStore.get(key).value();
+        int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
+        if (nextId != -1) {
             revokeFwd(key, nextId, null);
             revokeNext(key, ports, nextId, null);
         } else {
@@ -657,6 +664,10 @@
      */
     private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
                                 Set<PortNumber> ports) {
+        if (!isLocalLeader(key.deviceId())) {
+            log.debug("Abort updating XConnect {}: {}", key, ERROR_NOT_LEADER);
+            return;
+        }
         // NOTE: ACL flow doesn't include port information. No need to update it.
         //       Pair port is built-in and thus not going to change. No need to update it.
 
@@ -672,8 +683,8 @@
         CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
         CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
 
-        if (xconnectNextObjStore.containsKey(key)) {
-            int nextId = xconnectNextObjStore.get(key).value();
+        int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
+        if (nextId != -1) {
             revokeFwd(key, nextId, fwdFuture);
 
             fwdFuture.thenAcceptAsync(fwdStatus -> {
@@ -802,15 +813,16 @@
      * @param vlanId   VLAN ID
      * @param install  Whether to add or revoke pair link addition to flooding group
      */
-    private void updateL2Flooding(DeviceId deviceId, final PortNumber port, VlanId vlanId, boolean install) {
-
-        // Ensure mastership on device
-        if (!mastershipService.isLocalMaster(deviceId)) {
+    private void updateL2Flooding(DeviceId deviceId, PortNumber port, VlanId vlanId, boolean install) {
+        XconnectKey key = new XconnectKey(deviceId, vlanId);
+        // Ensure leadership on device
+        if (!isLocalLeader(deviceId)) {
+            log.debug("Abort updating L2Flood {}: {}", key, ERROR_NOT_LEADER);
             return;
         }
 
         // Locate L2 flooding group details for given xconnect vlan
-        int nextId = getNextId(deviceId, vlanId);
+        int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
         if (nextId == -1) {
             log.debug("XConnect vlan {} broadcast group for device {} doesn't exists. " +
                               "Aborting pair group linking.", vlanId, deviceId);
@@ -853,16 +865,17 @@
      * @param vlanId      VLAN ID
      * @param accessPorts List of access ports to be added into L2 multicast group
      */
-    private void populateL2Multicast(DeviceId deviceId, final PortNumber pairPort,
-                             VlanId vlanId, List<PortNumber> accessPorts) {
+    private void populateL2Multicast(DeviceId deviceId, PortNumber pairPort,
+                                     VlanId vlanId, List<PortNumber> accessPorts) {
+        // Ensure enough rights to program pair device
+        if (!srService.shouldProgram(deviceId)) {
+            log.debug("Abort populate L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
+            return;
+        }
 
         boolean multicastGroupExists = true;
         int vlanMulticastNextId;
-
-        // Ensure enough rights to program pair device
-        if (!srService.shouldProgram(deviceId)) {
-            return;
-        }
+        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
 
         // Step 1 : Populate single homed access ports into vlan's L2 multicast group
         NextObjective.Builder vlanMulticastNextObjBuilder = DefaultNextObjective
@@ -871,18 +884,18 @@
                 .fromApp(srService.appId())
             .withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId)
                           .matchEthDst(MacAddress.IPV4_MULTICAST).build());
-        vlanMulticastNextId = getMulticastGroupNextObjectiveId(deviceId, vlanId);
+        vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
         if (vlanMulticastNextId == -1) {
             // Vlan's L2 multicast group doesn't exist; create it, update store and add pair port as sub-group
             multicastGroupExists = false;
             vlanMulticastNextId = flowObjectiveService.allocateNextId();
-            addMulticastGroupNextObjectiveId(deviceId, vlanId, vlanMulticastNextId);
+            addMulticastGroupNextObjectiveId(key, vlanMulticastNextId);
             vlanMulticastNextObjBuilder.addTreatment(
                     DefaultTrafficTreatment.builder().popVlan().setOutput(pairPort).build()
             );
         }
         vlanMulticastNextObjBuilder.withId(vlanMulticastNextId);
-        final int nextId = vlanMulticastNextId;
+        int nextId = vlanMulticastNextId;
         accessPorts.forEach(p -> {
             TrafficTreatment.Builder egressAction = DefaultTrafficTreatment.builder();
             // Do vlan popup action based on interface configuration
@@ -892,7 +905,7 @@
             }
             egressAction.setOutput(p);
             vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
-            addMulticastGroupPort(deviceId, vlanId, p);
+            addMulticastGroupPort(key, p);
         });
         ObjectiveContext context = new DefaultObjectiveContext(
                 (objective) ->
@@ -938,19 +951,19 @@
      * Removes access ports from VLAN L2 multicast group on given deviceId.
      *
      * @param deviceId    Device ID
-     * @param pairPort    Pair port number
      * @param vlanId      VLAN ID
      * @param accessPorts List of access ports to be added into L2 multicast group
      */
-    private void revokeL2Multicast(DeviceId deviceId, final PortNumber pairPort,
-                           VlanId vlanId, List<PortNumber> accessPorts) {
-
+    private void revokeL2Multicast(DeviceId deviceId, VlanId vlanId, List<PortNumber> accessPorts) {
         // Ensure enough rights to program pair device
         if (!srService.shouldProgram(deviceId)) {
+            log.debug("Abort revoke L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
             return;
         }
 
-        int vlanMulticastNextId = getMulticastGroupNextObjectiveId(deviceId, vlanId);
+        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
+
+        int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
         if (vlanMulticastNextId == -1) {
             return;
         }
@@ -969,7 +982,7 @@
             }
             egressAction.setOutput(p);
             vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
-            removeMulticastGroupPort(deviceId, vlanId, p);
+            removeMulticastGroupPort(key, p);
         });
         ObjectiveContext context = new DefaultObjectiveContext(
                 (objective) ->
@@ -998,16 +1011,19 @@
 
         // Ensure enough rights to program pair device
         if (!srService.shouldProgram(deviceId)) {
+            log.debug("Abort cleanup L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
             return;
         }
 
+        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
+
         // Ensure L2 multicast group doesn't contain access ports
-        if (hasAccessPortInMulticastGroup(deviceId, vlanId, pairPort) && !force) {
+        if (hasAccessPortInMulticastGroup(key, pairPort) && !force) {
             return;
         }
 
         // Load L2 multicast group details
-        int vlanMulticastNextId = getMulticastGroupNextObjectiveId(deviceId, vlanId);
+        int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
         if (vlanMulticastNextId == -1) {
             return;
         }
@@ -1052,63 +1068,63 @@
         flowObjectiveService.next(deviceId, l2MulticastGroupBuilder.remove(context));
 
         // Finally clear store.
-        removeMulticastGroup(deviceId, vlanId);
+        removeMulticastGroup(key);
     }
 
-    private boolean isMulticastGroupExists(DeviceId deviceId, VlanId vlanId) {
-        return xconnectMulticastNextStore.asJavaMap().entrySet().stream()
-                .anyMatch(e -> e.getKey().deviceId().equals(deviceId) &&
-                        e.getKey().vlanId().equals(vlanId));
+    private int getMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key) {
+        return Versioned.valueOrElse(xconnectMulticastNextStore.get(key), -1);
     }
 
-    private int getMulticastGroupNextObjectiveId(DeviceId deviceId, VlanId vlanId) {
-        Optional<Integer> nextId
-                = xconnectMulticastNextStore.asJavaMap().entrySet().stream()
-                .filter(e -> e.getKey().deviceId().equals(deviceId) &&
-                        e.getKey().vlanId().equals(vlanId))
-                .findFirst()
-                .map(Map.Entry::getValue);
-        return nextId.orElse(-1);
-    }
-
-    private void addMulticastGroupNextObjectiveId(DeviceId deviceId, VlanId vlanId, int nextId) {
+    private void addMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key, int nextId) {
         if (nextId == -1) {
             return;
         }
-        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
         xconnectMulticastNextStore.put(key, nextId);
-
-        // Update port store with empty entry.
-        xconnectMulticastPortsStore.put(key, new ArrayList<PortNumber>());
     }
 
-    private void addMulticastGroupPort(DeviceId deviceId, VlanId vlanId, PortNumber port) {
-        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
-        List<PortNumber> ports = xconnectMulticastPortsStore.get(key).value();
-        ports.add(port);
-        xconnectMulticastPortsStore.put(key, ports);
+    private void addMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
+        xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
+            if (ports == null) {
+                ports = Lists.newArrayList();
+            }
+            ports.add(port);
+            return ports;
+        });
     }
 
-    private void removeMulticastGroupPort(DeviceId deviceId, VlanId vlanId, PortNumber port) {
-        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
-        List<PortNumber> ports = xconnectMulticastPortsStore.get(key).value();
-        ports.remove(port);
-        xconnectMulticastPortsStore.put(key, ports);
+    private void removeMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
+        xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
+            if (ports != null && !ports.isEmpty()) {
+                ports.remove(port);
+            }
+            return ports;
+        });
     }
 
-    private void removeMulticastGroup(DeviceId deviceId, VlanId vlanId) {
-        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
-        xconnectMulticastPortsStore.remove(key);
-        xconnectMulticastNextStore.remove(key);
+    private void removeMulticastGroup(VlanNextObjectiveStoreKey groupKey) {
+        xconnectMulticastPortsStore.remove(groupKey);
+        xconnectMulticastNextStore.remove(groupKey);
     }
 
-    private boolean hasAccessPortInMulticastGroup(DeviceId deviceId, VlanId vlanId, PortNumber pairPort) {
-        VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
-        if (!xconnectMulticastPortsStore.containsKey(key)) {
-            return false;
-        }
-        List<PortNumber> ports = xconnectMulticastPortsStore.get(key).value();
+    private boolean hasAccessPortInMulticastGroup(VlanNextObjectiveStoreKey groupKey, PortNumber pairPort) {
+        List<PortNumber> ports = Versioned.valueOrElse(xconnectMulticastPortsStore.get(groupKey), ImmutableList.of());
         return ports.stream().anyMatch(p -> !p.equals(pairPort));
     }
 
+    // Custom-built function, when the device is not available we need a fallback mechanism
+    private boolean isLocalLeader(DeviceId deviceId) {
+        if (!mastershipService.isLocalMaster(deviceId)) {
+            // When the device is available we just check the mastership
+            if (deviceService.isAvailable(deviceId)) {
+                return false;
+            }
+            // Fallback with Leadership service - device id is used as topic
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            // Verify if this node is the leader
+            return clusterService.getLocalNode().id().equals(leader);
+        }
+        return true;
+    }
+
 }