[SDFAB-8] Policy stuck in PENDING_ADD if the netcfg is not provided

Change-Id: Id80048cdc26d85190d714955c48dffec6d1ca5a6
diff --git a/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java b/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
index 21cb8e3..1d4d471 100644
--- a/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
+++ b/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
@@ -20,6 +20,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
+
 import org.glassfish.jersey.internal.guava.Sets;
 import org.onlab.packet.MacAddress;
 import org.onlab.util.KryoNamespace;
@@ -30,6 +31,10 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.config.NetworkConfigEvent;
+import org.onosproject.net.config.NetworkConfigListener;
+import org.onosproject.net.config.NetworkConfigRegistry;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
 import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -46,7 +51,6 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.intent.WorkPartitionService;
 import org.onosproject.net.link.LinkService;
-import org.onosproject.segmentrouting.SegmentRoutingService;
 import org.onosproject.segmentrouting.policy.api.DropPolicy;
 import org.onosproject.segmentrouting.policy.api.Policy;
 import org.onosproject.segmentrouting.policy.api.PolicyData;
@@ -60,6 +64,7 @@
 import org.onosproject.segmentrouting.policy.api.TrafficMatchPriority;
 import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
 import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
+import org.onosproject.segmentrouting.config.SegmentRoutingDeviceConfig;
 import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
@@ -76,11 +81,14 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static org.onlab.util.Tools.groupedThreads;
@@ -167,14 +175,22 @@
     private WorkPartitionService workPartitionService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    private SegmentRoutingService srService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private LinkService linkService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private NetworkConfigRegistry cfgService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private DeviceService deviceService;
+
+    // Netcfg listener
+    private final InternalConfigListener cfgListener = new InternalConfigListener();
+    // EventExecutor for netcfg event
+    private ExecutorService eventExecutor;
+
     @Activate
     public void activate() {
         appId = coreService.registerApplication(APP_NAME);
@@ -183,6 +199,8 @@
         codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
         codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
 
+        cfgService.addListener(cfgListener);
+
         policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
                 .withName(POLICY_STORE)
                 .withSerializer(serializer).build();
@@ -206,6 +224,8 @@
         workers = new PredictableExecutor(DEFAULT_THREADS,
                 groupedThreads("sr-policy", "worker-%d", log));
 
+        eventExecutor = Executors.newSingleThreadExecutor();
+
         log.info("Started");
     }
 
@@ -215,6 +235,7 @@
         codecService.unregisterCodec(DropPolicy.class);
         codecService.unregisterCodec(RedirectPolicy.class);
         codecService.unregisterCodec(TrafficMatch.class);
+        cfgService.removeListener(cfgListener);
         policies.removeListener(mapPolListener);
         policies.destroy();
         policiesMap.clear();
@@ -225,6 +246,7 @@
         operations.destroy();
         operations.clear();
         workers.shutdown();
+        eventExecutor.shutdown();
 
         log.info("Stopped");
     }
@@ -270,7 +292,7 @@
     @Override
     public Set<PolicyData> policies(Set<PolicyType> filter) {
         Set<PolicyData> policyData = Sets.newHashSet();
-        List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+        List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
         Set<PolicyRequest> policyRequests;
         if (filter.isEmpty()) {
             policyRequests = ImmutableSet.copyOf(policiesMap.values());
@@ -330,7 +352,7 @@
     @Override
     public Set<TrafficMatchData> trafficMatches() {
         Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
-        List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+        List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
         Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
         TrafficMatchKey trafficMatchKey;
         List<String> ops;
@@ -359,7 +381,7 @@
         }
         // We know that we are the leader, offloads to the workers the remaining
         // part: issue fobj installation/removal and update the maps
-        List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+        List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
         for (DeviceId deviceId : edgeDeviceIds) {
             workers.execute(() -> {
                 if (install) {
@@ -385,6 +407,17 @@
                     policy.policyType(), policy.policyId(), deviceId);
         }
         policyKey = new PolicyKey(deviceId, policy.policyId());
+        // Prevent duplicate installation of the policies. With the current
+        // implementation is not possible to update a policy since a change
+        // in the params will create a new policy. Thus, there is no need to
+        // check the equality like we do for the TM
+        Operation oldPolicyOp = Versioned.valueOrNull(operations.get(policyKey.toString()));
+        if (oldPolicyOp != null && oldPolicyOp.isInstall()) {
+            if (log.isDebugEnabled()) {
+                log.debug("There is already an install operation for policy {}", policy.policyId());
+            }
+            return;
+        }
         operation = Operation.builder()
                 .isInstall(true)
                 .policy(policy);
@@ -554,7 +587,7 @@
         }
         // We know that we are the leader, offloads to the workers the remaining
         // part: issue fobj installation/removal and update the maps
-        List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+        List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
         for (DeviceId deviceId : edgeDeviceIds) {
             workers.execute(() -> {
                 if (install) {
@@ -821,7 +854,7 @@
             }
             return;
         }
-        List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+        List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
         for (DeviceId deviceId : edgeDeviceIds) {
             workers.execute(() -> {
                 String key;
@@ -852,7 +885,7 @@
     private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
         Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
         Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
-        List<DeviceId> edgeDevices = srService.getEdgeDeviceIds();
+        List<DeviceId> edgeDevices = getEdgeDeviceIds();
         egressLinks.stream()
                 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
                                 !edgeDevices.contains(link.dst().deviceId()))
@@ -873,7 +906,7 @@
                 .fromApp(appId);
         MacAddress srcDeviceMac;
         try {
-            srcDeviceMac = srService.getDeviceMacAddress(srcDevice);
+            srcDeviceMac = getDeviceMacAddress(srcDevice);
         } catch (DeviceConfigNotFoundException e) {
             log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
             return null;
@@ -882,7 +915,7 @@
         TrafficTreatment.Builder tBuilder;
         for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
             try {
-                neigborDeviceMac = srService.getDeviceMacAddress(entry.getValue());
+                neigborDeviceMac = getDeviceMacAddress(entry.getValue());
             } catch (DeviceConfigNotFoundException e) {
                 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
                 return null;
@@ -1018,4 +1051,59 @@
         public void run() {
         }
     }
+
+    // Netcfg event listener
+    private class InternalConfigListener implements NetworkConfigListener {
+        @Override
+        public void event(NetworkConfigEvent event) {
+            eventExecutor.execute(() -> {
+                switch (event.type()) {
+                    case CONFIG_ADDED:
+                    case CONFIG_UPDATED:
+                        if (event.configClass() == SegmentRoutingDeviceConfig.class) {
+                            // Reset all policy state to PENDING_ADD
+                            ImmutableSet<PolicyId> policyIds = ImmutableSet.copyOf(policies.keySet());
+                            policyIds.forEach((policyId) -> {
+                                policies.computeIfPresent(policyId, (k, v) -> {
+                                    v.policyState(PolicyState.PENDING_ADD);
+                                    return v;
+                                });
+                            });
+                            // Reset all TrafficMatch state to PENDING_ADD
+                            ImmutableSet<TrafficMatchId> trafficMatchIds = ImmutableSet.copyOf(trafficMatches.keySet());
+                            trafficMatchIds.forEach((trafficMatchId) -> {
+                                trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
+                                    v.trafficMatchState(TrafficMatchState.PENDING_ADD);
+                                    return v;
+                                });
+                            });
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            });
+        }
+    }
+
+    private List<DeviceId> getEdgeDeviceIds() {
+        List<DeviceId> edges = new ArrayList<>();
+        deviceService.getDevices().forEach((device) -> {
+            DeviceId deviceId = device.id();
+            SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
+            if (srDevCfg != null && srDevCfg.isEdgeRouter()) {
+                edges.add(deviceId);
+            }
+        });
+        return edges;
+    }
+
+    private MacAddress getDeviceMacAddress(DeviceId deviceId) throws DeviceConfigNotFoundException {
+        SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
+        if (srDevCfg != null) {
+            return srDevCfg.routerMac();
+        } else {
+            throw new DeviceConfigNotFoundException("Config for device: " + deviceId.toString() + " not found");
+        }
+    }
 }