Provides a separate executor to handle map events.
Includes also several code optimizations
Change-Id: I7c4916b0f746379213f239766e912ad696e2f56f
(cherry picked from commit 8f35163338b78556fdc8680efa7125b53e75212e)
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
index b65faa9..9947bb8 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -15,8 +15,10 @@
*/
package org.onosproject.segmentrouting.xconnect.impl;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -28,6 +30,9 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.codec.CodecService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -78,10 +83,8 @@
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -115,6 +118,12 @@
public FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
public MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
@@ -127,7 +136,7 @@
HostService hostService;
private static final String APP_NAME = "org.onosproject.xconnect";
- private static final String ERROR_NOT_MASTER = "Not master controller";
+ private static final String ERROR_NOT_LEADER = "Not leader controller";
private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
@@ -139,14 +148,14 @@
private ConsistentMap<VlanNextObjectiveStoreKey, List<PortNumber>> xconnectMulticastPortsStore;
private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
- private final DeviceListener deviceListener = new InternalDeviceListener();
+ private ExecutorService xConnectExecutor;
+ private final DeviceListener deviceListener = new InternalDeviceListener();
private ExecutorService deviceEventExecutor;
private final HostListener hostListener = new InternalHostListener();
private ExecutorService hostEventExecutor;
-
@Activate
void activate() {
appId = coreService.registerApplication(APP_NAME);
@@ -163,7 +172,9 @@
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(serializer.build()))
.build();
- xconnectStore.addListener(xconnectListener);
+ xConnectExecutor = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("sr-xconnect-event", "%d", log));
+ xconnectStore.addListener(xconnectListener, xConnectExecutor);
xconnectNextObjStore = storageService.<XconnectKey, Integer>consistentMapBuilder()
.withName("onos-sr-xconnect-next")
@@ -182,12 +193,10 @@
deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("sr-xconnect-device-event", "%d", log));
-
deviceService.addListener(deviceListener);
hostEventExecutor = Executors.newSingleThreadExecutor(
groupedThreads("sr-xconnect-host-event", "%d", log));
-
hostService.addListener(hostListener);
log.info("Started");
@@ -202,6 +211,7 @@
deviceEventExecutor.shutdown();
hostEventExecutor.shutdown();
+ xConnectExecutor.shutdown();
log.info("Stopped");
}
@@ -237,9 +247,8 @@
@Override
public boolean hasXconnect(ConnectPoint cp) {
- return getXconnects().stream().anyMatch(desc ->
- desc.key().deviceId().equals(cp.deviceId())
- && desc.ports().contains(cp.port())
+ return getXconnects().stream().anyMatch(desc -> desc.key().deviceId().equals(cp.deviceId())
+ && desc.ports().contains(cp.port())
);
}
@@ -247,15 +256,14 @@
public List<VlanId> getXconnectVlans(DeviceId deviceId, PortNumber port) {
return getXconnects().stream()
.filter(desc -> desc.key().deviceId().equals(deviceId) && desc.ports().contains(port))
- .map(XconnectDesc::key)
- .map(XconnectKey::vlanId)
+ .map(desc -> desc.key().vlanId())
.collect(Collectors.toList());
}
@Override
public boolean isXconnectVlan(DeviceId deviceId, VlanId vlanId) {
- return getXconnects().stream()
- .anyMatch(desc -> desc.key().deviceId().equals(deviceId) && desc.key().vlanId().equals(vlanId));
+ XconnectKey key = new XconnectKey(deviceId, vlanId);
+ return Versioned.valueOrNull(xconnectStore.get(key)) != null;
}
@Override
@@ -268,12 +276,8 @@
}
@Override
- public int getNextId(final DeviceId deviceId, final VlanId vlanId) {
- Optional<Integer> nextObjective = getNext().entrySet().stream()
- .filter(d -> d.getKey().deviceId().equals(deviceId) && d.getKey().vlanId().equals(vlanId))
- .findFirst()
- .map(Map.Entry::getValue);
- return nextObjective.isPresent() ? nextObjective.get() : -1;
+ public int getNextId(DeviceId deviceId, VlanId vlanId) {
+ return Versioned.valueOrElse(xconnectNextObjStore.get(new XconnectKey(deviceId, vlanId)), -1);
}
@Override
@@ -309,29 +313,32 @@
}
private class InternalDeviceListener implements DeviceListener {
+ // Offload the execution to an executor and then process the event
+ // if this instance is the leader of the device
@Override
public void event(DeviceEvent event) {
deviceEventExecutor.execute(() -> {
DeviceId deviceId = event.subject().id();
- if (!mastershipService.isLocalMaster(deviceId)) {
+ // Just skip if we are not the leader
+ if (!isLocalLeader(deviceId)) {
+ log.debug("Not the leader of {}. Skip event {}", deviceId, event);
return;
}
-
- switch (event.type()) {
- case DEVICE_ADDED:
- case DEVICE_AVAILABILITY_CHANGED:
- case DEVICE_UPDATED:
- if (deviceService.isAvailable(deviceId)) {
- init(deviceId);
- } else {
- cleanup(deviceId);
- }
- break;
- default:
- break;
+ // Populate or revoke according to the device availability
+ if (deviceService.isAvailable(deviceId)) {
+ init(deviceId);
+ } else {
+ cleanup(deviceId);
}
});
}
+ // We want to manage only a subset of events and if we are the leader
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ return event.type() == DeviceEvent.Type.DEVICE_ADDED ||
+ event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+ event.type() == DeviceEvent.Type.DEVICE_UPDATED;
+ }
}
private class InternalHostListener implements HostListener {
@@ -371,7 +378,8 @@
xconnectVlan,
Collections.singletonList(
location.port())));
- // Ensure pair-port attached to xconnect vlan flooding group at dual home failed device.
+ // Ensure pair-port attached to xconnect vlan flooding group
+ // at dual home failed device.
updateL2Flooding(prevLocation.deviceId(), pairLocalPort.get(), xconnectVlan, true);
});
}
@@ -391,17 +399,16 @@
// Remove recovered dual homed port from vlan L2 multicast group
prevLocations.stream()
.filter(prevLocation -> prevLocation.deviceId().equals(pairDeviceId.get()))
- .forEach(prevLocation -> revokeL2Multicast(prevLocation.deviceId(),
- srService.getPairLocalPort(
- prevLocation.deviceId()).get(),
- xconnectVlan,
- Collections.singletonList(newLocation.port()))
+ .forEach(prevLocation -> revokeL2Multicast(
+ prevLocation.deviceId(),
+ xconnectVlan,
+ Collections.singletonList(newLocation.port()))
);
- // Remove pair-port from vlan's flooding group at dual home restored device,if needed.
- if (!hasAccessPortInMulticastGroup(newLocation.deviceId(),
- xconnectVlan,
- pairLocalPort.get())) {
+ // Remove pair-port from vlan's flooding group at dual home
+ // restored device, if needed.
+ if (!hasAccessPortInMulticastGroup(new VlanNextObjectiveStoreKey(
+ newLocation.deviceId(), xconnectVlan), pairLocalPort.get())) {
updateL2Flooding(newLocation.deviceId(),
pairLocalPort.get(),
xconnectVlan,
@@ -446,8 +453,8 @@
* @param ports a set of ports to be cross-connected
*/
private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
- if (!mastershipService.isLocalMaster(key.deviceId())) {
- log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+ if (!isLocalLeader(key.deviceId())) {
+ log.debug("Abort populating XConnect {}: {}", key, ERROR_NOT_LEADER);
return;
}
@@ -482,8 +489,8 @@
* @param ports XConnect ports
*/
private int populateNext(XconnectKey key, Set<PortNumber> ports) {
- if (xconnectNextObjStore.containsKey(key)) {
- int nextId = xconnectNextObjStore.get(key).value();
+ int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
+ if (nextId != -1) {
log.debug("NextObj for {} found, id={}", key, nextId);
return nextId;
} else {
@@ -540,14 +547,14 @@
* @param ports XConnect ports
*/
private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
- if (!mastershipService.isLocalMaster(key.deviceId())) {
- log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+ if (!isLocalLeader(key.deviceId())) {
+ log.debug("Abort revoking XConnect {}: {}", key, ERROR_NOT_LEADER);
return;
}
revokeFilter(key, ports);
- if (xconnectNextObjStore.containsKey(key)) {
- int nextId = xconnectNextObjStore.get(key).value();
+ int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
+ if (nextId != -1) {
revokeFwd(key, nextId, null);
revokeNext(key, ports, nextId, null);
} else {
@@ -659,6 +666,10 @@
*/
private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
Set<PortNumber> ports) {
+ if (!isLocalLeader(key.deviceId())) {
+ log.debug("Abort updating XConnect {}: {}", key, ERROR_NOT_LEADER);
+ return;
+ }
// NOTE: ACL flow doesn't include port information. No need to update it.
// Pair port is built-in and thus not going to change. No need to update it.
@@ -674,8 +685,8 @@
CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
- if (xconnectNextObjStore.containsKey(key)) {
- int nextId = xconnectNextObjStore.get(key).value();
+ int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
+ if (nextId != -1) {
revokeFwd(key, nextId, fwdFuture);
fwdFuture.thenAcceptAsync(fwdStatus -> {
@@ -804,15 +815,16 @@
* @param vlanId VLAN ID
* @param install Whether to add or revoke pair link addition to flooding group
*/
- private void updateL2Flooding(DeviceId deviceId, final PortNumber port, VlanId vlanId, boolean install) {
-
- // Ensure mastership on device
- if (!mastershipService.isLocalMaster(deviceId)) {
+ private void updateL2Flooding(DeviceId deviceId, PortNumber port, VlanId vlanId, boolean install) {
+ XconnectKey key = new XconnectKey(deviceId, vlanId);
+ // Ensure leadership on device
+ if (!isLocalLeader(deviceId)) {
+ log.debug("Abort updating L2Flood {}: {}", key, ERROR_NOT_LEADER);
return;
}
// Locate L2 flooding group details for given xconnect vlan
- int nextId = getNextId(deviceId, vlanId);
+ int nextId = Versioned.valueOrElse(xconnectNextObjStore.get(key), -1);
if (nextId == -1) {
log.debug("XConnect vlan {} broadcast group for device {} doesn't exists. " +
"Aborting pair group linking.", vlanId, deviceId);
@@ -855,16 +867,17 @@
* @param vlanId VLAN ID
* @param accessPorts List of access ports to be added into L2 multicast group
*/
- private void populateL2Multicast(DeviceId deviceId, final PortNumber pairPort,
- VlanId vlanId, List<PortNumber> accessPorts) {
+ private void populateL2Multicast(DeviceId deviceId, PortNumber pairPort,
+ VlanId vlanId, List<PortNumber> accessPorts) {
+ // Ensure enough rights to program pair device
+ if (!srService.shouldProgram(deviceId)) {
+ log.debug("Abort populate L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
+ return;
+ }
boolean multicastGroupExists = true;
int vlanMulticastNextId;
-
- // Ensure enough rights to program pair device
- if (!srService.shouldProgram(deviceId)) {
- return;
- }
+ VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
// Step 1 : Populate single homed access ports into vlan's L2 multicast group
NextObjective.Builder vlanMulticastNextObjBuilder = DefaultNextObjective
@@ -873,18 +886,18 @@
.fromApp(srService.appId())
.withMeta(DefaultTrafficSelector.builder().matchVlanId(vlanId)
.matchEthDst(MacAddress.IPV4_MULTICAST).build());
- vlanMulticastNextId = getMulticastGroupNextObjectiveId(deviceId, vlanId);
+ vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
if (vlanMulticastNextId == -1) {
// Vlan's L2 multicast group doesn't exist; create it, update store and add pair port as sub-group
multicastGroupExists = false;
vlanMulticastNextId = flowObjectiveService.allocateNextId();
- addMulticastGroupNextObjectiveId(deviceId, vlanId, vlanMulticastNextId);
+ addMulticastGroupNextObjectiveId(key, vlanMulticastNextId);
vlanMulticastNextObjBuilder.addTreatment(
DefaultTrafficTreatment.builder().popVlan().setOutput(pairPort).build()
);
}
vlanMulticastNextObjBuilder.withId(vlanMulticastNextId);
- final int nextId = vlanMulticastNextId;
+ int nextId = vlanMulticastNextId;
accessPorts.forEach(p -> {
TrafficTreatment.Builder egressAction = DefaultTrafficTreatment.builder();
// Do vlan popup action based on interface configuration
@@ -894,7 +907,7 @@
}
egressAction.setOutput(p);
vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
- addMulticastGroupPort(deviceId, vlanId, p);
+ addMulticastGroupPort(key, p);
});
ObjectiveContext context = new DefaultObjectiveContext(
(objective) ->
@@ -940,19 +953,19 @@
* Removes access ports from VLAN L2 multicast group on given deviceId.
*
* @param deviceId Device ID
- * @param pairPort Pair port number
* @param vlanId VLAN ID
* @param accessPorts List of access ports to be added into L2 multicast group
*/
- private void revokeL2Multicast(DeviceId deviceId, final PortNumber pairPort,
- VlanId vlanId, List<PortNumber> accessPorts) {
-
+ private void revokeL2Multicast(DeviceId deviceId, VlanId vlanId, List<PortNumber> accessPorts) {
// Ensure enough rights to program pair device
if (!srService.shouldProgram(deviceId)) {
+ log.debug("Abort revoke L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
return;
}
- int vlanMulticastNextId = getMulticastGroupNextObjectiveId(deviceId, vlanId);
+ VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
+
+ int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
if (vlanMulticastNextId == -1) {
return;
}
@@ -971,7 +984,7 @@
}
egressAction.setOutput(p);
vlanMulticastNextObjBuilder.addTreatment(egressAction.build());
- removeMulticastGroupPort(deviceId, vlanId, p);
+ removeMulticastGroupPort(key, p);
});
ObjectiveContext context = new DefaultObjectiveContext(
(objective) ->
@@ -1000,16 +1013,19 @@
// Ensure enough rights to program pair device
if (!srService.shouldProgram(deviceId)) {
+ log.debug("Abort cleanup L2Multicast {}-{}: {}", deviceId, vlanId, ERROR_NOT_LEADER);
return;
}
+ VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
+
// Ensure L2 multicast group doesn't contain access ports
- if (hasAccessPortInMulticastGroup(deviceId, vlanId, pairPort) && !force) {
+ if (hasAccessPortInMulticastGroup(key, pairPort) && !force) {
return;
}
// Load L2 multicast group details
- int vlanMulticastNextId = getMulticastGroupNextObjectiveId(deviceId, vlanId);
+ int vlanMulticastNextId = getMulticastGroupNextObjectiveId(key);
if (vlanMulticastNextId == -1) {
return;
}
@@ -1054,63 +1070,63 @@
flowObjectiveService.next(deviceId, l2MulticastGroupBuilder.remove(context));
// Finally clear store.
- removeMulticastGroup(deviceId, vlanId);
+ removeMulticastGroup(key);
}
- private boolean isMulticastGroupExists(DeviceId deviceId, VlanId vlanId) {
- return xconnectMulticastNextStore.asJavaMap().entrySet().stream()
- .anyMatch(e -> e.getKey().deviceId().equals(deviceId) &&
- e.getKey().vlanId().equals(vlanId));
+ private int getMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key) {
+ return Versioned.valueOrElse(xconnectMulticastNextStore.get(key), -1);
}
- private int getMulticastGroupNextObjectiveId(DeviceId deviceId, VlanId vlanId) {
- Optional<Integer> nextId
- = xconnectMulticastNextStore.asJavaMap().entrySet().stream()
- .filter(e -> e.getKey().deviceId().equals(deviceId) &&
- e.getKey().vlanId().equals(vlanId))
- .findFirst()
- .map(Map.Entry::getValue);
- return nextId.orElse(-1);
- }
-
- private void addMulticastGroupNextObjectiveId(DeviceId deviceId, VlanId vlanId, int nextId) {
+ private void addMulticastGroupNextObjectiveId(VlanNextObjectiveStoreKey key, int nextId) {
if (nextId == -1) {
return;
}
- VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
xconnectMulticastNextStore.put(key, nextId);
-
- // Update port store with empty entry.
- xconnectMulticastPortsStore.put(key, new ArrayList<PortNumber>());
}
- private void addMulticastGroupPort(DeviceId deviceId, VlanId vlanId, PortNumber port) {
- VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
- List<PortNumber> ports = xconnectMulticastPortsStore.get(key).value();
- ports.add(port);
- xconnectMulticastPortsStore.put(key, ports);
+ private void addMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
+ xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
+ if (ports == null) {
+ ports = Lists.newArrayList();
+ }
+ ports.add(port);
+ return ports;
+ });
}
- private void removeMulticastGroupPort(DeviceId deviceId, VlanId vlanId, PortNumber port) {
- VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
- List<PortNumber> ports = xconnectMulticastPortsStore.get(key).value();
- ports.remove(port);
- xconnectMulticastPortsStore.put(key, ports);
+ private void removeMulticastGroupPort(VlanNextObjectiveStoreKey groupKey, PortNumber port) {
+ xconnectMulticastPortsStore.compute(groupKey, (key, ports) -> {
+ if (ports != null && !ports.isEmpty()) {
+ ports.remove(port);
+ }
+ return ports;
+ });
}
- private void removeMulticastGroup(DeviceId deviceId, VlanId vlanId) {
- VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
- xconnectMulticastPortsStore.remove(key);
- xconnectMulticastNextStore.remove(key);
+ private void removeMulticastGroup(VlanNextObjectiveStoreKey groupKey) {
+ xconnectMulticastPortsStore.remove(groupKey);
+ xconnectMulticastNextStore.remove(groupKey);
}
- private boolean hasAccessPortInMulticastGroup(DeviceId deviceId, VlanId vlanId, PortNumber pairPort) {
- VlanNextObjectiveStoreKey key = new VlanNextObjectiveStoreKey(deviceId, vlanId);
- if (!xconnectMulticastPortsStore.containsKey(key)) {
- return false;
- }
- List<PortNumber> ports = xconnectMulticastPortsStore.get(key).value();
+ private boolean hasAccessPortInMulticastGroup(VlanNextObjectiveStoreKey groupKey, PortNumber pairPort) {
+ List<PortNumber> ports = Versioned.valueOrElse(xconnectMulticastPortsStore.get(groupKey), ImmutableList.of());
return ports.stream().anyMatch(p -> !p.equals(pairPort));
}
+ // Custom-built function, when the device is not available we need a fallback mechanism
+ private boolean isLocalLeader(DeviceId deviceId) {
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ // When the device is available we just check the mastership
+ if (deviceService.isAvailable(deviceId)) {
+ return false;
+ }
+ // Fallback with Leadership service - device id is used as topic
+ NodeId leader = leadershipService.runForLeadership(
+ deviceId.toString()).leaderNodeId();
+ // Verify if this node is the leader
+ return clusterService.getLocalNode().id().equals(leader);
+ }
+ return true;
+ }
+
}