[AETHER-1295][AETHER-1294] First stab to the TOST APIs framework
and to the DROP policies.
Patch includes also some CLI commands. REST APIs will be implemented
in a separate review. Other review will follow to implement the logic
of the REDIRECT policies
Change-Id: I34aa3da700c5a16682196e4dd8db9c4757d609c4
diff --git a/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java b/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
new file mode 100644
index 0000000..dbd5667
--- /dev/null
+++ b/impl/src/main/java/org/onosproject/segmentrouting/policy/impl/PolicyManager.java
@@ -0,0 +1,821 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.segmentrouting.policy.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.glassfish.jersey.internal.guava.Sets;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.PredictableExecutor;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.intent.WorkPartitionService;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+import org.onosproject.segmentrouting.policy.api.DropPolicy;
+import org.onosproject.segmentrouting.policy.api.Policy;
+import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
+import org.onosproject.segmentrouting.policy.api.PolicyData;
+import org.onosproject.segmentrouting.policy.api.PolicyId;
+import org.onosproject.segmentrouting.policy.api.PolicyService;
+import org.onosproject.segmentrouting.policy.api.PolicyState;
+import org.onosproject.segmentrouting.policy.api.TrafficMatch;
+import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
+import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
+import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the policy service interface.
+ */
+@Component(immediate = true, service = PolicyService.class)
+public class PolicyManager implements PolicyService {
+
+ // App related things
+ private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
+ private ApplicationId appId;
+ private Logger log = getLogger(getClass());
+ static final String KEY_SEPARATOR = "|";
+
+ // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
+ // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
+ // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
+ private static final String POLICY_STORE = "sr-policy-store";
+ private ConsistentMap<PolicyId, PolicyRequest> policies;
+ private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
+ private Map<PolicyId, PolicyRequest> policiesMap;
+
+ private static final String OPS_STORE = "sr-ops-store";
+ private ConsistentMap<String, Operation> operations;
+ private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
+ private Map<String, Operation> opsMap;
+
+ private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
+ private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
+ private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
+ new InternalTMatchMapEventListener();
+ private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
+
+ // Leadership related objects - consistent hashing
+ private static final HashFunction HASH_FN = Hashing.md5();
+ // Read only cache of the Policy leader
+ private Map<PolicyId, NodeId> policyLeaderCache;
+
+ // Worker threads for policy and traffic match related ops
+ private static final int DEFAULT_THREADS = 4;
+ protected PredictableExecutor workers;
+
+ // Serializers and ONOS services
+ private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(PolicyId.class)
+ .register(PolicyType.class)
+ .register(DropPolicy.class)
+ .register(PolicyState.class)
+ .register(PolicyRequest.class)
+ .register(TrafficMatchId.class)
+ .register(TrafficMatchState.class)
+ .register(TrafficMatch.class)
+ .register(TrafficMatchRequest.class)
+ .register(Operation.class);
+ private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ public WorkPartitionService workPartitionService;
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL)
+ public SegmentRoutingService srService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ public FlowObjectiveService flowObjectiveService;
+
+ @Activate
+ public void activate() {
+ appId = coreService.registerApplication(APP_NAME);
+
+ policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
+ .withName(POLICY_STORE)
+ .withSerializer(serializer).build();
+ policies.addListener(mapPolListener);
+ policiesMap = policies.asJavaMap();
+
+ trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
+ .withName(TRAFFIC_MATCH_STORE)
+ .withSerializer(serializer).build();
+ trafficMatches.addListener(mapTMatchListener);
+ trafficMatchesMap = trafficMatches.asJavaMap();
+
+ operations = storageService.<String, Operation>consistentMapBuilder()
+ .withName(OPS_STORE)
+ .withSerializer(serializer).build();
+ operations.addListener(mapOpsListener);
+ opsMap = operations.asJavaMap();
+
+ policyLeaderCache = Maps.newConcurrentMap();
+
+ workers = new PredictableExecutor(DEFAULT_THREADS,
+ groupedThreads("sr-policy", "worker-%d", log));
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ // Teardown everything
+ policies.removeListener(mapPolListener);
+ policies.destroy();
+ policiesMap.clear();
+ trafficMatches.removeListener(mapTMatchListener);
+ trafficMatches.destroy();
+ trafficMatchesMap.clear();
+ operations.removeListener(mapOpsListener);
+ operations.destroy();
+ operations.clear();
+ workers.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ //FIXME update does not work well
+ public PolicyId addOrUpdatePolicy(Policy policy) {
+ PolicyId policyId = policy.policyId();
+ try {
+ policies.put(policyId, new PolicyRequest(policy));
+ } catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
+ policyId = null;
+ }
+ return policyId;
+ }
+
+ @Override
+ public boolean removePolicy(PolicyId policyId) {
+ boolean result;
+ try {
+ result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
+ if (v.policyState() != PolicyState.PENDING_REMOVE) {
+ v.policyState(PolicyState.PENDING_REMOVE);
+ }
+ return v;
+ })) != null;
+ } catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
+ result = false;
+ }
+ return result;
+ }
+
+ @Override
+ public Set<PolicyData> policies(Set<PolicyType> filter) {
+ Set<PolicyData> policyData = Sets.newHashSet();
+ List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ Set<PolicyRequest> policyRequests;
+ if (filter.isEmpty()) {
+ policyRequests = ImmutableSet.copyOf(policiesMap.values());
+ } else {
+ policyRequests = policiesMap.values().stream()
+ .filter(policyRequest -> filter.contains(policyRequest.policyType()))
+ .collect(Collectors.toSet());
+ }
+ PolicyKey policyKey;
+ List<String> ops;
+ for (PolicyRequest policyRequest : policyRequests) {
+ ops = Lists.newArrayList();
+ for (DeviceId deviceId : edgeDeviceIds) {
+ policyKey = new PolicyKey(deviceId, policyRequest.policyId());
+ Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
+ if (operation != null) {
+ ops.add(deviceId + " -> " + operation.toStringMinimal());
+ }
+ }
+ policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
+ }
+ return policyData;
+ }
+
+ @Override
+ //FIXME update does not work well
+ public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
+ TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
+ try {
+ trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
+ } catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
+ trafficMatchId = null;
+ }
+ return trafficMatchId;
+ }
+
+ @Override
+ public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
+ boolean result;
+ try {
+ result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
+ if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
+ v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
+ }
+ return v;
+ })) != null;
+ } catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
+ result = false;
+ }
+ return result;
+ }
+
+ @Override
+ public Set<TrafficMatchData> trafficMatches() {
+ Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
+ List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
+ TrafficMatchKey trafficMatchKey;
+ List<String> ops;
+ for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
+ ops = Lists.newArrayList();
+ for (DeviceId deviceId : edgeDeviceIds) {
+ trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
+ Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
+ if (operation != null) {
+ ops.add(deviceId + " -> " + operation.toStringMinimal());
+ }
+ }
+ trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
+ trafficMatchRequest.trafficMatch(), ops));
+ }
+ return trafficMatchData;
+ }
+
+ // Install/remove the policies on the edge devices
+ private void sendPolicy(Policy policy, boolean install) {
+ if (!isLeader(policy.policyId())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Instance is not leader for policy {}", policy.policyId());
+ }
+ return;
+ }
+ // We know that we are the leader, offloads to the workers the remaining
+ // part: issue fobj installation/removal and update the maps
+ List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ for (DeviceId deviceId : edgeDeviceIds) {
+ workers.execute(() -> {
+ if (install) {
+ installPolicyInDevice(deviceId, policy);
+ } else {
+ removePolicyInDevice(deviceId, policy);
+ }
+ }, deviceId.hashCode());
+ }
+ }
+
+ // Orchestrate policy installation according to the type
+ private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
+ PolicyKey policyKey;
+ Operation operation;
+ if (policy.policyType() == PolicyType.DROP) {
+ if (log.isDebugEnabled()) {
+ log.debug("Installing DROP policy {}", policy.policyId());
+ }
+ // DROP policies do not need the next objective installation phase
+ // we can update directly the map and signal the ops as done
+ policyKey = new PolicyKey(deviceId, policy.policyId());
+ operation = Operation.builder()
+ .isDone(true)
+ .isInstall(true)
+ .policy(policy)
+ .build();
+ operations.put(policyKey.toString(), operation);
+ } else if (policy.policyType() == PolicyType.REDIRECT) {
+ if (log.isDebugEnabled()) {
+ log.debug("Installing REDIRECT policy {}", policy.policyId());
+ }
+ // REDIRECT Uses objective context to update the ops as done when it returns
+ // successfully. In the other cases leaves the ops as undone and the
+ // relative policy will remain in pending.
+ } else {
+ log.warn("Policy {} type {} not yet supported",
+ policy.policyId(), policy.policyType());
+ }
+ }
+
+ // Remove policy in a device according to the type
+ private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing policy {}", policy.policyId());
+ }
+ PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
+ Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
+ // Policy might be still in pending or not present anymore
+ if (operation == null || operation.objectiveOperation() == null) {
+ log.warn("There are no ops associated with {}", policyKey);
+ operation = Operation.builder()
+ .isDone(true)
+ .isInstall(false)
+ .policy(policy)
+ .build();
+ operations.put(policyKey.toString(), operation);
+ } else {
+ if (policy.policyType() == PolicyType.DROP) {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing DROP policy {}", policy.policyId());
+ }
+ operation = Operation.builder()
+ .isDone(true)
+ .isInstall(false)
+ .policy(policy)
+ .build();
+ operations.put(policyKey.toString(), operation);
+ } else if (policy.policyType() == PolicyType.REDIRECT) {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing REDIRECT policy {}", policy.policyId());
+ }
+ // REDIRECT has to remove first a next objective
+ } else {
+ log.warn("Policy {} type {} not yet supported",
+ policy.policyId(), policy.policyType());
+ }
+ }
+ }
+
+ // Updates policy status if all the pending ops are done
+ private void updatePolicy(PolicyId policyId, boolean install) {
+ if (!isLeader(policyId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Instance is not leader for policy {}", policyId);
+ }
+ return;
+ }
+ workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
+ }
+
+ private void updatePolicyInternal(PolicyId policyId, boolean install) {
+ // If there are no more pending ops we are ready to go; potentially we can check
+ // if the id is contained. Updates policies only if they are still present
+ Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
+ .filter(entry -> entry.getValue().value().policy().isPresent())
+ .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
+ .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
+ .findFirst();
+ if (notYetDone.isEmpty()) {
+ PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
+ if (v.policyState() == PolicyState.PENDING_ADD && install) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policy {} is ready", policyId);
+ }
+ v.policyState(PolicyState.ADDED);
+ } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policy {} is removed", policyId);
+ }
+ v = null;
+ }
+ return v;
+ }));
+ // Greedy check for pending traffic matches
+ if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
+ updatePendingTrafficMatches(policyRequest.policyId());
+ }
+ }
+ }
+
+ // Install/remove the traffic match on the edge devices
+ private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
+ if (!isLeader(trafficMatch.policyId())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
+ }
+ return;
+ }
+ // We know that we are the leader, offloads to the workers the remaining
+ // part: issue fobj installation/removal and update the maps
+ List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ for (DeviceId deviceId : edgeDeviceIds) {
+ workers.execute(() -> {
+ if (install) {
+ installTrafficMatchToDevice(deviceId, trafficMatch);
+ } else {
+ removeTrafficMatchInDevice(deviceId, trafficMatch);
+ }
+ }, deviceId.hashCode());
+ }
+ }
+
+ // Orchestrate traffic match installation according to the type
+ private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
+ if (log.isDebugEnabled()) {
+ log.debug("Installing traffic match {} associated to policy {}",
+ trafficMatch.trafficMatchId(), trafficMatch.policyId());
+ }
+ // Updates the store and then send the versatile fwd objective to the pipeliner
+ TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
+ Operation trafficOperation = Operation.builder()
+ .isInstall(true)
+ .trafficMatch(trafficMatch)
+ .build();
+ operations.put(trafficMatchKey.toString(), trafficOperation);
+ // For the DROP policy we need to set an ACL drop in the fwd objective. The other
+ // policies require to retrieve the next Id and sets the next step.
+ PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
+ Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
+ if (policyOperation == null || !policyOperation.isDone() ||
+ !policyOperation.isInstall() || policyOperation.policy().isEmpty()) {
+ log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
+ trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
+ return;
+ }
+ Policy policy = policyOperation.policy().get();
+ ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch);
+ // TODO we can try to reuse some code: context and completable future logic
+ if (policy.policyType() == PolicyType.DROP) {
+ // Firstly builds the fwd objective with the wipeDeferred action. Once, the fwd
+ // objective has completed its execution, we update the policiesOps map
+ TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
+ .wipeDeferred()
+ .build();
+ builder.withTreatment(dropTreatment);
+ CompletableFuture<Objective> future = new CompletableFuture<>();
+ if (log.isDebugEnabled()) {
+ log.debug("Installing ACL drop forwarding objectives for dev: {}", deviceId);
+ }
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> {
+ if (log.isDebugEnabled()) {
+ log.debug("ACL drop rule for policy {} installed", trafficMatch.policyId());
+ }
+ future.complete(objective);
+ },
+ (objective, error) -> {
+ log.warn("Failed to install ACL drop rule for policy {}: {}", trafficMatch.policyId(), error);
+ future.complete(null);
+ });
+ // Context is not serializable
+ ForwardingObjective serializableObjective = builder.add();
+ flowObjectiveService.forward(deviceId, builder.add(context));
+ future.whenComplete((objective, ex) -> {
+ if (ex != null) {
+ log.error("Exception installing ACL drop rule", ex);
+ } else if (objective != null) {
+ operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
+ if (!v.isDone() && v.isInstall()) {
+ v.isDone(true);
+ v.objectiveOperation(serializableObjective);
+ }
+ return v;
+ });
+ }
+ });
+ } else {
+ log.warn("Policy {} type {} not yet supported", policy.policyId(), policy.policyType());
+ }
+ }
+
+ // Updates traffic match status if all the pending ops are done
+ private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
+ if (!isLeader(trafficMatch.policyId())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
+ }
+ return;
+ }
+ workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
+ trafficMatch.policyId().hashCode());
+ }
+
+ private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
+ // If there are no more pending ops we are ready to go; potentially we can check
+ // if the id is contained. Updates traffic matches only if they are still present
+ Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
+ .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
+ .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
+ .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
+ .findFirst();
+ if (notYetDone.isEmpty()) {
+ trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
+ if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
+ if (log.isDebugEnabled()) {
+ log.debug("Traffic match {} is ready", trafficMatchId);
+ }
+ v.trafficMatchState(TrafficMatchState.ADDED);
+ } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
+ if (log.isDebugEnabled()) {
+ log.debug("Traffic match {} is removed", trafficMatchId);
+ }
+ v = null;
+ }
+ return v;
+ });
+ }
+ }
+
+ // Look for any pending traffic match waiting for the policy
+ private void updatePendingTrafficMatches(PolicyId policyId) {
+ Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
+ .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
+ trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
+ .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
+ .collect(Collectors.toSet());
+ for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
+ sendTrafficMatch(trafficMatch.trafficMatch(), true);
+ }
+ }
+
+ // Traffic match removal in a device
+ private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
+ if (log.isDebugEnabled()) {
+ log.debug("Removing traffic match {} associated to policy {}",
+ trafficMatch.trafficMatchId(), trafficMatch.policyId());
+ }
+ TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
+ Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
+ if (operation == null || operation.objectiveOperation() == null) {
+ log.warn("There are no ops associated with {}", trafficMatchKey);
+ operation = Operation.builder()
+ .isDone(true)
+ .isInstall(false)
+ .trafficMatch(trafficMatch)
+ .build();
+ operations.put(trafficMatchKey.toString(), operation);
+ } else {
+ ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
+ operation = Operation.builder(operation)
+ .isDone(false)
+ .isInstall(false)
+ .build();
+ operations.put(trafficMatchKey.toString(), operation);
+ ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
+ CompletableFuture<Objective> future = new CompletableFuture<>();
+ if (log.isDebugEnabled()) {
+ log.debug("Removing ACL drop forwarding objectives for dev: {}", deviceId);
+ }
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> {
+ if (log.isDebugEnabled()) {
+ log.debug("ACL drop rule for policy {} removed", trafficMatch.policyId());
+ }
+ future.complete(objective);
+ },
+ (objective, error) -> {
+ log.warn("Failed to remove ACL drop rule for policy {}: {}", trafficMatch.policyId(), error);
+ future.complete(null);
+ });
+ ForwardingObjective serializableObjective = builder.remove();
+ flowObjectiveService.forward(deviceId, builder.remove(context));
+ future.whenComplete((objective, ex) -> {
+ if (ex != null) {
+ log.error("Exception removing ACL drop rule", ex);
+ } else if (objective != null) {
+ operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
+ if (!v.isDone() && !v.isInstall()) {
+ v.isDone(true);
+ v.objectiveOperation(serializableObjective);
+ }
+ return v;
+ });
+ }
+ });
+ }
+ }
+
+ // Update any depending traffic match on the policy. It is used when a policy
+ // has been removed but there are still traffic matches depending on it
+ private void updateDependingTrafficMatches(PolicyId policyId) {
+ if (!isLeader(policyId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Instance is not leader for policy {}", policyId);
+ }
+ return;
+ }
+ workers.execute(() -> updateDependingTrafficMatchesInternal(policyId), policyId.hashCode());
+ }
+
+ private void updateDependingTrafficMatchesInternal(PolicyId policyId) {
+ Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
+ .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
+ trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
+ .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
+ .collect(Collectors.toSet());
+ for (TrafficMatchRequest trafficMatchRequest : pendingTrafficMatches) {
+ trafficMatches.computeIfPresent(trafficMatchRequest.trafficMatchId(), (k, v) -> {
+ if (v.trafficMatchState() == TrafficMatchState.ADDED) {
+ v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
+ }
+ return v;
+ });
+ }
+ }
+
+ // Utility that removes operations related to a policy or to a traffic match.
+ private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
+ if (!isLeader(policyId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Instance is not leader for policy {}", policyId);
+ }
+ return;
+ }
+ List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
+ for (DeviceId deviceId : edgeDeviceIds) {
+ workers.execute(() -> {
+ String key;
+ if (trafficMatchId.isPresent()) {
+ key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
+ } else {
+ key = new PolicyKey(deviceId, policyId).toString();
+ }
+ operations.remove(key);
+ }, deviceId.hashCode());
+ }
+ }
+
+ private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch) {
+ return DefaultForwardingObjective.builder()
+ .withPriority(PolicyService.TRAFFIC_MATCH_PRIORITY)
+ .withSelector(trafficMatch.trafficSelector())
+ .fromApp(appId)
+ .withFlag(ForwardingObjective.Flag.VERSATILE)
+ .makePermanent();
+ }
+
+ // Each map has an event listener enabling the events distribution across the cluster
+ private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
+ @Override
+ public void event(MapEvent<PolicyId, PolicyRequest> event) {
+ Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
+ event.oldValue() : event.newValue();
+ PolicyRequest policyRequest = value.value();
+ Policy policy = policyRequest.policy();
+ switch (event.type()) {
+ case INSERT:
+ case UPDATE:
+ switch (policyRequest.policyState()) {
+ case PENDING_ADD:
+ sendPolicy(policy, true);
+ break;
+ case PENDING_REMOVE:
+ sendPolicy(policy, false);
+ break;
+ case ADDED:
+ break;
+ default:
+ log.warn("Unknown policy state type {}", policyRequest.policyState());
+ }
+ break;
+ case REMOVE:
+ removeOperations(policy.policyId(), Optional.empty());
+ updateDependingTrafficMatches(policy.policyId());
+ break;
+ default:
+ log.warn("Unknown event type {}", event.type());
+
+ }
+ }
+ }
+
+ private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
+ @Override
+ public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
+ Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
+ event.oldValue() : event.newValue();
+ TrafficMatchRequest trafficMatchRequest = value.value();
+ TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
+ switch (event.type()) {
+ case INSERT:
+ case UPDATE:
+ switch (trafficMatchRequest.trafficMatchState()) {
+ case PENDING_ADD:
+ sendTrafficMatch(trafficMatch, true);
+ break;
+ case PENDING_REMOVE:
+ sendTrafficMatch(trafficMatch, false);
+ break;
+ case ADDED:
+ break;
+ default:
+ log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
+ }
+ break;
+ case REMOVE:
+ removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
+ break;
+ default:
+ log.warn("Unknown event type {}", event.type());
+ }
+ }
+ }
+
+ private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
+ @Override
+ public void event(MapEvent<String, Operation> event) {
+ String key = event.key();
+ Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
+ event.oldValue() : event.newValue();
+ Operation operation = value.value();
+ switch (event.type()) {
+ case INSERT:
+ case UPDATE:
+ if (operation.isDone()) {
+ if (operation.policy().isPresent()) {
+ PolicyKey policyKey = PolicyKey.fromString(key);
+ updatePolicy(policyKey.policyId(), operation.isInstall());
+ } else if (operation.trafficMatch().isPresent()) {
+ updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
+ } else {
+ log.warn("Unknown pending operation");
+ }
+ }
+ break;
+ case REMOVE:
+ break;
+ default:
+ log.warn("Unknown event type {}", event.type());
+ }
+ }
+ }
+
+ // Using the work partition service defines who is in charge of a given policy.
+ private boolean isLeader(PolicyId policyId) {
+ final NodeId currentNodeId = clusterService.getLocalNode().id();
+ final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
+ if (leader == null) {
+ log.error("Fail to elect a leader for {}.", policyId);
+ return false;
+ }
+ policyLeaderCache.put(policyId, leader);
+ return currentNodeId.equals(leader);
+ }
+
+ private Long hasher(PolicyId policyId) {
+ return HASH_FN.newHasher()
+ .putUnencodedChars(policyId.toString())
+ .hash()
+ .asLong();
+ }
+
+ // Check periodically for any issue and try to resolve automatically if possible
+ private final class PolicyChecker implements Runnable {
+ @Override
+ public void run() {
+ }
+ }
+}