Improves VERIFY operations
Changes:
- Avoids to sends duplicate next when there are multiple sources
- Introduces a backpressure mechanism to not flood the pipeliners
- Guarantees there are at least 30s between each mcast corrector
execution
- Introduce a pool of 4 verifiers in FlowObjectiveManager to
separate the thread used for the installation/removal of the
FlowObjectives
- Improves logging in verifyGroup
Change-Id: I45aac0f80c9eb6afd763f21977d62df4a98f686e
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 6d2b8cc..d08f111 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1014,21 +1014,11 @@
}
@Override
- public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
- return mcastHandler.getMcastRoles(mcastIp);
- }
-
- @Override
public Map<McastRoleStoreKey, McastRole> getMcastRoles(IpAddress mcastIp, ConnectPoint sourcecp) {
return mcastHandler.getMcastRoles(mcastIp, sourcecp);
}
@Override
- public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
- return mcastHandler.getMcastPaths(mcastIp);
- }
-
- @Override
public Multimap<ConnectPoint, List<ConnectPoint>> getMcastTrees(IpAddress mcastIp,
ConnectPoint sourcecp) {
return mcastHandler.getMcastTrees(mcastIp, sourcecp);
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index 74db140..dd84eeb 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -296,18 +296,6 @@
ImmutableMap<DeviceId, Set<PortNumber>> getDownedPortState();
/**
- * Returns the associated roles to the mcast groups or to the single
- * group if mcastIp is present.
- *
- * @param mcastIp the group ip
- * @return the mapping mcastIp-device to mcast role
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp);
-
- /**
* Returns the associated roles to the mcast groups.
*
* @param mcastIp the group ip
@@ -318,17 +306,6 @@
ConnectPoint sourcecp);
/**
- * Returns the associated paths to the mcast group.
- *
- * @param mcastIp the group ip
- * @return the mapping egress point to mcast path
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp);
-
- /**
* Returns the associated trees to the mcast group.
*
* @param mcastIp the group ip
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 9db890d..f7e9cb3 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -71,6 +71,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -212,6 +213,8 @@
private static final long MCAST_STABLITY_THRESHOLD = 5;
// Last change done
private Instant lastMcastChange = Instant.now();
+ // Last bucker corrector execution
+ private Instant lastBktCorrExecution = Instant.now();
/**
* Determines if mcast in the network has been stable in the last
@@ -223,10 +226,23 @@
private boolean isMcastStable() {
long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
long now = (long) (Instant.now().toEpochMilli() / 1000.0);
- log.trace("Mcast stable since {}s", now - last);
+ log.trace("Multicast stable since {}s", now - last);
return (now - last) > MCAST_STABLITY_THRESHOLD;
}
+ /**
+ * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
+ * by comparing the current time with the last corrector execution.
+ *
+ * @return true if stable
+ */
+ private boolean wasBktCorrRunning() {
+ long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
+ long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+ log.trace("McastBucketCorrector executed {}s ago", now - last);
+ return (now - last) < MCAST_VERIFY_INTERVAL;
+ }
+
// Verify interval for Mcast bucket corrector
private static final long MCAST_VERIFY_INTERVAL = 30;
// Executor for mcast bucket corrector and for cache
@@ -1767,89 +1783,121 @@
*/
private final class McastBucketCorrector implements Runnable {
+ private static final int MAX_VERIFY_ON_FLIGHT = 10;
+ private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
+ // Define the context used for the back pressure mechanism
+ private final ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> {
+ synchronized (verifyOnFlight) {
+ verifyOnFlight.decrementAndGet();
+ verifyOnFlight.notify();
+ }
+ },
+ (objective, error) -> {
+ synchronized (verifyOnFlight) {
+ verifyOnFlight.decrementAndGet();
+ verifyOnFlight.notify();
+ }
+ });
+
@Override
public void run() {
- if (!isMcastStable()) {
+ if (!isMcastStable() || wasBktCorrRunning()) {
return;
}
mcastLock();
try {
// Iterates over the routes and verify the related next objectives
- srManager.multicastRouteService.getRoutes()
- .stream().map(McastRoute::group)
- .forEach(mcastIp -> {
- log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
- // Verify leadership on the operation
- if (!mcastUtils.isLeader(mcastIp)) {
- log.trace("Skip {} due to lack of leadership", mcastIp);
- return;
+ for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
+ IpAddress mcastIp = mcastRoute.group();
+ log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
+ // Verify leadership on the operation
+ if (!mcastUtils.isLeader(mcastIp)) {
+ log.trace("Skip {} due to lack of leadership", mcastIp);
+ continue;
+ }
+ // Get sources and sinks from Mcast Route Service and warn about errors
+ Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
+ Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toSet());
+ // Do not proceed if sources of this group are missing
+ if (sources.isEmpty()) {
+ if (!sinks.isEmpty()) {
+ log.warn("Unable to run buckets corrector. " +
+ "Missing source {} for group {}", sources, mcastIp);
}
- // Get sources and sinks from Mcast Route Service and warn about errors
- Set<ConnectPoint> sources = mcastUtils.getSources(mcastIp);
- Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
- .flatMap(Collection::stream).collect(Collectors.toSet());
- // Do not proceed if sources of this group are missing
- if (sources.isEmpty()) {
+ continue;
+ }
+ // For each group we get current information in the store
+ // and issue a check of the next objectives in place
+ Set<McastStoreKey> processedKeys = Sets.newHashSet();
+ for (ConnectPoint source : sources) {
+ Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
+ Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
+ Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
+ // Do not proceed if ingress devices are missing
+ if (ingressDevices.isEmpty()) {
if (!sinks.isEmpty()) {
log.warn("Unable to run buckets corrector. " +
- "Missing source {} for group {}", sources, mcastIp);
+ "Missing ingress {} for source {} and for group {}",
+ ingressDevices, source, mcastIp);
}
- return;
+ continue;
}
- sources.forEach(source -> {
- // For each group we get current information in the store
- // and issue a check of the next objectives in place
- Set<DeviceId> ingressDevices = getDevice(mcastIp, INGRESS, source);
- Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
- Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
- // Do not proceed if ingress devices are missing
- if (ingressDevices.isEmpty()) {
- if (!sinks.isEmpty()) {
- log.warn("Unable to run buckets corrector. " +
- "Missing ingress {} for source {} and for group {}",
- ingressDevices, source, mcastIp);
- }
+ // Create the set of the devices to be processed
+ ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
+ if (!ingressDevices.isEmpty()) {
+ devicesBuilder.addAll(ingressDevices);
+ }
+ if (!transitDevices.isEmpty()) {
+ devicesBuilder.addAll(transitDevices);
+ }
+ if (!egressDevices.isEmpty()) {
+ devicesBuilder.addAll(egressDevices);
+ }
+ Set<DeviceId> devicesToProcess = devicesBuilder.build();
+ for (DeviceId deviceId : devicesToProcess) {
+ if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
+ log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
return;
}
- // Create the set of the devices to be processed
- ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
- if (!ingressDevices.isEmpty()) {
- devicesBuilder.addAll(ingressDevices);
- }
- if (!transitDevices.isEmpty()) {
- devicesBuilder.addAll(transitDevices);
- }
- if (!egressDevices.isEmpty()) {
- devicesBuilder.addAll(egressDevices);
- }
- Set<DeviceId> devicesToProcess = devicesBuilder.build();
- devicesToProcess.forEach(deviceId -> {
- if (!srManager.deviceConfiguration().isConfigured(deviceId)) {
- log.trace("Skipping Bucket corrector for unconfigured device {}", deviceId);
- return;
+ synchronized (verifyOnFlight) {
+ while (verifyOnFlight.get() == MAX_VERIFY_ON_FLIGHT) {
+ verifyOnFlight.wait();
}
- VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
- source : null);
- McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
- if (mcastNextObjStore.containsKey(currentKey)) {
- NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
- // Rebuild the next objective using assigned vlan
- currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
- mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify();
- // Send to the flowobjective service
- srManager.flowObjectiveService.next(deviceId, currentNext);
- } else {
- log.warn("Unable to run buckets corrector. " +
- "Missing next for {}, for source {} and for group {}",
- deviceId, source, mcastIp);
- }
- });
- });
- });
+ }
+ VlanId assignedVlan = mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
+ source : null);
+ McastStoreKey currentKey = new McastStoreKey(mcastIp, deviceId, assignedVlan);
+ // Check if we already processed this next - trees merge at some point
+ if (processedKeys.contains(currentKey)) {
+ continue;
+ }
+ // Verify the nextobjective or skip to next device
+ if (mcastNextObjStore.containsKey(currentKey)) {
+ NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
+ // Rebuild the next objective using assigned vlan
+ currentNext = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
+ mcastUtils.getPorts(currentNext.next()), currentNext.id()).verify(context);
+ // Send to the flowobjective service
+ srManager.flowObjectiveService.next(deviceId, currentNext);
+ verifyOnFlight.incrementAndGet();
+ log.trace("Verify on flight {}", verifyOnFlight);
+ processedKeys.add(currentKey);
+ } else {
+ log.warn("Unable to run buckets corrector. " +
+ "Missing next for {}, for source {} and for group {}",
+ deviceId, source, mcastIp);
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.warn("BktCorr has been interrupted");
} finally {
+ lastBktCorrExecution = Instant.now();
mcastUnlock();
}
-
}
}
@@ -1884,28 +1932,6 @@
}
/**
- * Returns the associated roles to the mcast groups or to the single
- * group if mcastIp is present.
- *
- * @param mcastIp the group ip
- * @return the mapping mcastIp-device to mcast role
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- public Map<McastStoreKey, McastRole> getMcastRoles(IpAddress mcastIp) {
- if (mcastIp != null) {
- return mcastRoleStore.entrySet().stream()
- .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
- .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
- entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
- }
- return mcastRoleStore.entrySet().stream()
- .collect(Collectors.toMap(entry -> new McastStoreKey(entry.getKey().mcastIp(),
- entry.getKey().deviceId(), null), entry -> entry.getValue().value()));
- }
-
- /**
* Returns the associated roles to the mcast groups.
*
* @param mcastIp the group ip
@@ -1932,28 +1958,6 @@
entry.getKey().deviceId(), entry.getKey().source()), entry -> entry.getValue().value()));
}
-
- /**
- * Returns the associated paths to the mcast group.
- *
- * @param mcastIp the group ip
- * @return the mapping egress point to mcast path
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
- Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
- ConnectPoint source = mcastUtils.getSource(mcastIp);
- if (source != null) {
- Set<DeviceId> visited = Sets.newHashSet();
- List<ConnectPoint> currentPath = Lists.newArrayList(source);
- mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
- currentPath, mcastIp, source);
- }
- return mcastPaths;
- }
-
/**
* Returns the associated trees to the mcast group.
*
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
index 6d3f938..78f7cb3 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -250,24 +250,6 @@
}
/**
- * Gets source connect point of given multicast group.
- *
- * @param mcastIp multicast IP
- * @return source connect point or null if not found
- *
- * @deprecated in 1.12 ("Magpie") release.
- */
- @Deprecated
- ConnectPoint getSource(IpAddress mcastIp) {
- McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
- .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
- .findFirst().orElse(null);
- return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
- .stream()
- .findFirst().orElse(null);
- }
-
- /**
* Gets sources connect points of given multicast group.
*
* @param mcastIp multicast IP
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 3c5c571..4ba0d6e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -105,8 +105,9 @@
private static final int INSTALL_RETRY_ATTEMPTS = 5;
private static final long INSTALL_RETRY_INTERVAL = 1000; // ms
- private static final String WORKER_PATTERN = "objective-installer-%d";
- private static final String GROUP_THREAD_NAME = "onos/objective-installer";
+ private static final String INSTALLER_PATTERN = "installer-%d";
+ private static final String VERIFIER_PATTERN = "verifier-%d";
+ private static final String GROUP_THREAD_NAME = "onos/objective";
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -174,14 +175,18 @@
// for debugging purposes
private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
- ExecutorService executorService;
+ ExecutorService installerExecutor;
+ ExecutorService verifierExecutor;
protected ExecutorService devEventExecutor;
@Activate
protected void activate(ComponentContext context) {
cfgService.registerProperties(FlowObjectiveManager.class);
- executorService = newFixedThreadPool(numThreads,
- groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ installerExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+ verifierExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
+
modified(context);
devEventExecutor = newSingleThreadScheduledExecutor(
groupedThreads("onos/flowobj-dev-events", "events-%d", log));
@@ -197,7 +202,8 @@
flowObjectiveStore.unsetDelegate(delegate);
deviceService.removeListener(deviceListener);
driverService.removeListener(driverListener);
- executorService.shutdown();
+ installerExecutor.shutdown();
+ verifierExecutor.shutdown();
devEventExecutor.shutdownNow();
devEventExecutor = null;
pipeliners.clear();
@@ -224,9 +230,15 @@
if (newNumThreads != numThreads && newNumThreads > 0) {
numThreads = newNumThreads;
- ExecutorService oldWorkerExecutor = executorService;
- executorService = newFixedThreadPool(numThreads,
- groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ ExecutorService oldWorkerExecutor = installerExecutor;
+ installerExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, INSTALLER_PATTERN, log));
+ if (oldWorkerExecutor != null) {
+ oldWorkerExecutor.shutdown();
+ }
+ oldWorkerExecutor = verifierExecutor;
+ verifierExecutor = newFixedThreadPool(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, VERIFIER_PATTERN, log));
if (oldWorkerExecutor != null) {
oldWorkerExecutor.shutdown();
}
@@ -269,20 +281,24 @@
* make a few attempts to find the appropriate driver, then eventually give
* up and report an error if no suitable driver could be found.
*/
- class ObjectiveInstaller implements Runnable {
+ class ObjectiveProcessor implements Runnable {
final DeviceId deviceId;
final Objective objective;
+ final ExecutorService executor;
private final int numAttempts;
- ObjectiveInstaller(DeviceId deviceId, Objective objective) {
- this(deviceId, objective, 1);
+ ObjectiveProcessor(DeviceId deviceId, Objective objective,
+ ExecutorService executorService) {
+ this(deviceId, objective, 1, executorService);
}
- ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+ ObjectiveProcessor(DeviceId deviceId, Objective objective, int attempts,
+ ExecutorService executorService) {
this.deviceId = checkNotNull(deviceId);
this.objective = checkNotNull(objective);
- this.numAttempts = attemps;
+ this.executor = checkNotNull(executorService);
+ this.numAttempts = attempts;
}
@Override
@@ -302,7 +318,8 @@
//Attempts to check if pipeliner is null for retry attempts
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
Thread.sleep(INSTALL_RETRY_INTERVAL);
- executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+ executor.execute(new ObjectiveProcessor(deviceId, objective,
+ numAttempts + 1, executor));
} else {
// Otherwise we've tried a few times and failed, report an
// error back to the user.
@@ -311,7 +328,7 @@
}
//Exception thrown
} catch (Exception e) {
- log.warn("Exception while installing flow objective", e);
+ log.warn("Exception while processing flow objective", e);
}
}
}
@@ -319,7 +336,7 @@
@Override
public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
checkPermission(FLOWRULE_WRITE);
- executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
+ installerExecutor.execute(new ObjectiveProcessor(deviceId, filteringObjective, installerExecutor));
}
@Override
@@ -329,19 +346,21 @@
flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
!queueFwdObjective(deviceId, forwardingObjective)) {
// fast path
- executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
+ installerExecutor.execute(new ObjectiveProcessor(deviceId, forwardingObjective, installerExecutor));
}
}
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
checkPermission(FLOWRULE_WRITE);
- if (nextObjective.op() == Operation.ADD ||
- nextObjective.op() == Operation.VERIFY ||
+ if (nextObjective.op() == Operation.VERIFY) {
+ // Verify does not need to wait
+ verifierExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, verifierExecutor));
+ } else if (nextObjective.op() == Operation.ADD ||
flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
!queueNextObjective(deviceId, nextObjective)) {
// either group exists or we are trying to create it - let it through
- executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+ installerExecutor.execute(new ObjectiveProcessor(deviceId, nextObjective, installerExecutor));
}
}
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 93b6337..8e88e77 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -193,7 +193,7 @@
mgr = new InOrderFlowObjectiveManager();
mgr.objTimeoutMs = objTimeoutMs;
mgr.pipeliners.put(DEV1, pipeliner);
- mgr.executorService = newFixedThreadPool(4, groupedThreads("foo", "bar"));
+ mgr.installerExecutor = newFixedThreadPool(4, groupedThreads("foo", "bar"));
mgr.cfgService = createMock(ComponentConfigService.class);
mgr.deviceService = createMock(DeviceService.class);
mgr.driverService = createMock(DriverService.class);
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index 851715d..0c68a7a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -2102,9 +2102,8 @@
"nextId:{}, nextObjective-size:{} next-size:{} .. correcting",
deviceId, nextObjective.id(), nextObjective.next().size(),
allActiveKeys.size());
- List<Integer> otherIndices =
- indicesToRemoveFromNextGroup(allActiveKeys, nextObjective,
- groupService, deviceId);
+ List<Integer> otherIndices = indicesToRemoveFromNextGroup(allActiveKeys, nextObjective,
+ groupService, deviceId);
// Filter out the indices not present
otherIndices = otherIndices.stream()
.filter(index -> !indicesToRemove.contains(index))
@@ -2142,11 +2141,12 @@
log.info("removing {} buckets as part of nextId: {} verification",
indicesToRemove.size(), nextObjective.id());
List<Deque<GroupKey>> chainsToRemove = Lists.newArrayList();
- indicesToRemove.forEach(index -> chainsToRemove
- .add(allActiveKeys.get(index)));
+ indicesToRemove.forEach(index -> chainsToRemove.add(allActiveKeys.get(index)));
removeBucket(chainsToRemove, nextObjective);
}
+ log.trace("Checking mismatch with GroupStore device:{} nextId:{}",
+ deviceId, nextObjective.id());
if (bucketsToCreate.isEmpty() && indicesToRemove.isEmpty()) {
// flowObjective store record is in-sync with nextObjective passed-in
// Nevertheless groupStore may not be in sync due to bug in the store
@@ -2181,8 +2181,7 @@
if (validChain.size() < 2) {
continue;
}
- GroupKey pointedGroupKey = validChain.stream()
- .collect(Collectors.toList()).get(1);
+ GroupKey pointedGroupKey = validChain.stream().collect(Collectors.toList()).get(1);
Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
if (pointedGroup != null && gidToCheck.equals(pointedGroup.id())) {
matches = true;
@@ -2200,9 +2199,8 @@
+ "buckets to remove");
} else {
GroupBuckets removeBuckets = new GroupBuckets(bucketsToRemove);
- groupService.removeBucketsFromGroup(deviceId, topGroupKey,
- removeBuckets, topGroupKey,
- nextObjective.appId());
+ groupService.removeBucketsFromGroup(deviceId, topGroupKey, removeBuckets, topGroupKey,
+ nextObjective.appId());
}
} else if (actualGroupSize < objGroupSize) {
// Group in the device has less chains
@@ -2213,8 +2211,7 @@
if (validChain.size() < 2) {
continue;
}
- GroupKey pointedGroupKey = validChain.stream()
- .collect(Collectors.toList()).get(1);
+ GroupKey pointedGroupKey = validChain.stream().collect(Collectors.toList()).get(1);
Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
if (pointedGroup == null) {
// group should exist, otherwise cannot be added as bucket
@@ -2254,7 +2251,7 @@
}
}
}
-
+ log.trace("Verify done for device:{} nextId:{}", deviceId, nextObjective.id());
pass(nextObjective);
}