[SDFAB-8] Policy stuck in PENDING_ADD if the netcfg is not provided
Change-Id: Id80048cdc26d85190d714955c48dffec6d1ca5a6
diff --git a/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java b/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
index 21cb8e3..1d4d471 100644
--- a/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
+++ b/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
@@ -20,6 +20,7 @@
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
+
import org.glassfish.jersey.internal.guava.Sets;
import org.onlab.packet.MacAddress;
import org.onlab.util.KryoNamespace;
@@ -30,6 +31,10 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -46,7 +51,6 @@
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.link.LinkService;
-import org.onosproject.segmentrouting.SegmentRoutingService;
import org.onosproject.segmentrouting.policy.api.DropPolicy;
import org.onosproject.segmentrouting.policy.api.Policy;
import org.onosproject.segmentrouting.policy.api.PolicyData;
@@ -60,6 +64,7 @@
import org.onosproject.segmentrouting.policy.api.TrafficMatchPriority;
import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
+import org.onosproject.segmentrouting.config.SegmentRoutingDeviceConfig;
import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
@@ -76,11 +81,14 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.onlab.util.Tools.groupedThreads;
@@ -167,14 +175,22 @@
private WorkPartitionService workPartitionService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- private SegmentRoutingService srService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
private FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private LinkService linkService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private NetworkConfigRegistry cfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private DeviceService deviceService;
+
+ // Netcfg listener
+ private final InternalConfigListener cfgListener = new InternalConfigListener();
+ // EventExecutor for netcfg event
+ private ExecutorService eventExecutor;
+
@Activate
public void activate() {
appId = coreService.registerApplication(APP_NAME);
@@ -183,6 +199,8 @@
codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
+ cfgService.addListener(cfgListener);
+
policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
.withName(POLICY_STORE)
.withSerializer(serializer).build();
@@ -206,6 +224,8 @@
workers = new PredictableExecutor(DEFAULT_THREADS,
groupedThreads("sr-policy", "worker-%d", log));
+ eventExecutor = Executors.newSingleThreadExecutor();
+
log.info("Started");
}
@@ -215,6 +235,7 @@
codecService.unregisterCodec(DropPolicy.class);
codecService.unregisterCodec(RedirectPolicy.class);
codecService.unregisterCodec(TrafficMatch.class);
+ cfgService.removeListener(cfgListener);
policies.removeListener(mapPolListener);
policies.destroy();
policiesMap.clear();
@@ -225,6 +246,7 @@
operations.destroy();
operations.clear();
workers.shutdown();
+ eventExecutor.shutdown();
log.info("Stopped");
}
@@ -270,7 +292,7 @@
@Override
public Set<PolicyData> policies(Set<PolicyType> filter) {
Set<PolicyData> policyData = Sets.newHashSet();
- List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
Set<PolicyRequest> policyRequests;
if (filter.isEmpty()) {
policyRequests = ImmutableSet.copyOf(policiesMap.values());
@@ -330,7 +352,7 @@
@Override
public Set<TrafficMatchData> trafficMatches() {
Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
- List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
TrafficMatchKey trafficMatchKey;
List<String> ops;
@@ -359,7 +381,7 @@
}
// We know that we are the leader, offloads to the workers the remaining
// part: issue fobj installation/removal and update the maps
- List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
for (DeviceId deviceId : edgeDeviceIds) {
workers.execute(() -> {
if (install) {
@@ -385,6 +407,17 @@
policy.policyType(), policy.policyId(), deviceId);
}
policyKey = new PolicyKey(deviceId, policy.policyId());
+ // Prevent duplicate installation of the policies. With the current
+ // implementation is not possible to update a policy since a change
+ // in the params will create a new policy. Thus, there is no need to
+ // check the equality like we do for the TM
+ Operation oldPolicyOp = Versioned.valueOrNull(operations.get(policyKey.toString()));
+ if (oldPolicyOp != null && oldPolicyOp.isInstall()) {
+ if (log.isDebugEnabled()) {
+ log.debug("There is already an install operation for policy {}", policy.policyId());
+ }
+ return;
+ }
operation = Operation.builder()
.isInstall(true)
.policy(policy);
@@ -554,7 +587,7 @@
}
// We know that we are the leader, offloads to the workers the remaining
// part: issue fobj installation/removal and update the maps
- List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
for (DeviceId deviceId : edgeDeviceIds) {
workers.execute(() -> {
if (install) {
@@ -821,7 +854,7 @@
}
return;
}
- List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
for (DeviceId deviceId : edgeDeviceIds) {
workers.execute(() -> {
String key;
@@ -852,7 +885,7 @@
private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
- List<DeviceId> edgeDevices = srService.getEdgeDeviceIds();
+ List<DeviceId> edgeDevices = getEdgeDeviceIds();
egressLinks.stream()
.filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
!edgeDevices.contains(link.dst().deviceId()))
@@ -873,7 +906,7 @@
.fromApp(appId);
MacAddress srcDeviceMac;
try {
- srcDeviceMac = srService.getDeviceMacAddress(srcDevice);
+ srcDeviceMac = getDeviceMacAddress(srcDevice);
} catch (DeviceConfigNotFoundException e) {
log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
return null;
@@ -882,7 +915,7 @@
TrafficTreatment.Builder tBuilder;
for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
try {
- neigborDeviceMac = srService.getDeviceMacAddress(entry.getValue());
+ neigborDeviceMac = getDeviceMacAddress(entry.getValue());
} catch (DeviceConfigNotFoundException e) {
log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
return null;
@@ -1018,4 +1051,59 @@
public void run() {
}
}
+
+ // Netcfg event listener
+ private class InternalConfigListener implements NetworkConfigListener {
+ @Override
+ public void event(NetworkConfigEvent event) {
+ eventExecutor.execute(() -> {
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ if (event.configClass() == SegmentRoutingDeviceConfig.class) {
+ // Reset all policy state to PENDING_ADD
+ ImmutableSet<PolicyId> policyIds = ImmutableSet.copyOf(policies.keySet());
+ policyIds.forEach((policyId) -> {
+ policies.computeIfPresent(policyId, (k, v) -> {
+ v.policyState(PolicyState.PENDING_ADD);
+ return v;
+ });
+ });
+ // Reset all TrafficMatch state to PENDING_ADD
+ ImmutableSet<TrafficMatchId> trafficMatchIds = ImmutableSet.copyOf(trafficMatches.keySet());
+ trafficMatchIds.forEach((trafficMatchId) -> {
+ trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
+ v.trafficMatchState(TrafficMatchState.PENDING_ADD);
+ return v;
+ });
+ });
+ }
+ break;
+ default:
+ break;
+ }
+ });
+ }
+ }
+
+ private List<DeviceId> getEdgeDeviceIds() {
+ List<DeviceId> edges = new ArrayList<>();
+ deviceService.getDevices().forEach((device) -> {
+ DeviceId deviceId = device.id();
+ SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
+ if (srDevCfg != null && srDevCfg.isEdgeRouter()) {
+ edges.add(deviceId);
+ }
+ });
+ return edges;
+ }
+
+ private MacAddress getDeviceMacAddress(DeviceId deviceId) throws DeviceConfigNotFoundException {
+ SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
+ if (srDevCfg != null) {
+ return srDevCfg.routerMac();
+ } else {
+ throw new DeviceConfigNotFoundException("Config for device: " + deviceId.toString() + " not found");
+ }
+ }
}