Implements [CORD-587] and [CORD-588]
Changes:
- Add termination in SR app;
- Add termination in the drivers
Change-Id: Ia9bb31c2c2e20acab8d6bfe27113f7421a8b83da
diff --git a/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java b/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java
index 5f0ed35..8206180 100644
--- a/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java
+++ b/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.RandomUtils;
+import org.onlab.packet.Ethernet;
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
import org.onlab.packet.VlanId;
@@ -63,7 +64,10 @@
import static org.onosproject.net.flowobjective.ForwardingObjective.Flag.VERSATILE;
import static org.onosproject.segmentrouting.pwaas.L2Mode.TAGGED;
import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Pipeline.INITIATION;
+import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Pipeline.TERMINATION;
import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Result.*;
+import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Direction.FWD;
+import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Direction.REV;
/**
* Handles pwaas related events.
@@ -71,20 +75,30 @@
public class L2TunnelHandler {
private static final Logger log = LoggerFactory.getLogger(L2TunnelHandler.class);
-
- private static final String FWD = "f";
- private static final String REV = "r";
-
- private static final String NOT_MASTER = "Not master controller";
+ /**
+ * Error message for invalid paths.
+ */
private static final String WRONG_TOPOLOGY = "Path in leaf-spine topology" +
" should always be two hops: ";
private final SegmentRoutingManager srManager;
-
- private final ConsistentMap<String, NextObjective> l2InitiationNextObjStore;
-
/**
- * TODO a proper store is necessary to handle the policies and collisions.
+ * To store the next objectives related to the initiation.
+ */
+ private final ConsistentMap<String, NextObjective> l2InitiationNextObjStore;
+ /**
+ * To store the next objectives related to the termination.
+ */
+ private final ConsistentMap<String, NextObjective> l2TerminationNextObjStore;
+ /**
+ * TODO a proper store is necessary to handle the policies, collisions and recovery.
+ * We should have a proper store for the policies and the tunnels. For several reasons:
+ * 1) We should avoid the overlapping of different policies;
+ * 2) We should avoid the overlapping of different tunnels;
+ * 3) We should have a proper mechanism for the protection;
+ * The most important one is 3). At least for 3.0 EA0 was not possible
+ * to remove the bucket, so we need a mapping between policies and tunnel
+ * in order to proper update the fwd objective for the recovery of a fault.
*/
private final KryoNamespace.Builder l2TunnelKryo;
@@ -104,6 +118,12 @@
.withName("onos-l2initiation-nextobj-store")
.withSerializer(Serializer.using(l2TunnelKryo.build()))
.build();
+
+ l2TerminationNextObjStore = srManager.storageService
+ .<String, NextObjective>consistentMapBuilder()
+ .withName("onos-l2termination-nextobj-store")
+ .withSerializer(Serializer.using(l2TunnelKryo.build()))
+ .build();
}
/**
@@ -122,6 +142,11 @@
deploy(pwToAdd);
}
+ /**
+ * To deploy a number of pseudo wires.
+ *
+ * @param pwToAdd the set of pseudo wires to add
+ */
private void deploy(Set<DefaultL2TunnelDescription> pwToAdd) {
Result result;
long l2TunnelId;
@@ -129,7 +154,7 @@
l2TunnelId = currentL2Tunnel.l2Tunnel().tunnelId();
// The tunnel id cannot be 0.
if (l2TunnelId == 0) {
- log.warn("Tunnel id cannot be 0");
+ log.warn("Tunnel id id must be > 0");
continue;
}
// We do a sanity check of the pseudo wire.
@@ -138,7 +163,7 @@
continue;
}
// We establish the tunnel.
- result = deployPseudoWire(
+ result = deployPseudoWireInit(
currentL2Tunnel.l2Tunnel(),
currentL2Tunnel.l2TunnelPolicy().cP1(),
currentL2Tunnel.l2TunnelPolicy().cP2(),
@@ -158,8 +183,18 @@
if (result != SUCCESS) {
continue;
}
+ // We terminate the tunnel
+ result = deployPseudoWireTerm(
+ currentL2Tunnel.l2Tunnel(),
+ currentL2Tunnel.l2TunnelPolicy().cP2(),
+ currentL2Tunnel.l2TunnelPolicy().cP2OuterTag(),
+ FWD
+ );
+ if (result != SUCCESS) {
+ continue;
+ }
// We establish the reverse tunnel.
- result = deployPseudoWire(
+ result = deployPseudoWireInit(
currentL2Tunnel.l2Tunnel(),
currentL2Tunnel.l2TunnelPolicy().cP2(),
currentL2Tunnel.l2TunnelPolicy().cP1(),
@@ -168,18 +203,27 @@
if (result != SUCCESS) {
continue;
}
- deployPolicy(
+ result = deployPolicy(
l2TunnelId,
currentL2Tunnel.l2TunnelPolicy().cP2(),
currentL2Tunnel.l2TunnelPolicy().cP2InnerTag(),
currentL2Tunnel.l2TunnelPolicy().cP2OuterTag(),
result.nextId
);
+ if (result != SUCCESS) {
+ continue;
+ }
+ deployPseudoWireTerm(
+ currentL2Tunnel.l2Tunnel(),
+ currentL2Tunnel.l2TunnelPolicy().cP1(),
+ currentL2Tunnel.l2TunnelPolicy().cP1OuterTag(),
+ REV
+ );
}
}
/**
- * Processes Pwaas Config updated event.
+ * Processes PWaaS Config updated event.
*
* @param event network config updated event
*/
@@ -209,12 +253,10 @@
.collect(Collectors.toSet());
deploy(pwToAdd);
// The pseudo wires to update.
- updPws.forEach(tunnelId -> {
- updatePw(
- prevConfig.getPwDescription(tunnelId),
- config.getPwDescription(tunnelId)
- );
- });
+ updPws.forEach(tunnelId -> updatePw(
+ prevConfig.getPwDescription(tunnelId),
+ config.getPwDescription(tunnelId))
+ );
}
/**
@@ -225,88 +267,91 @@
*/
private void updatePw(DefaultL2TunnelDescription oldPw,
DefaultL2TunnelDescription newPw) {
-
long tunnelId = oldPw.l2Tunnel().tunnelId();
- String fwdKey = generateKey(tunnelId, FWD);
- String revKey = generateKey(tunnelId, REV);
- Result result;
- NextObjective fwdNextObjective;
- NextObjective revNextObjective;
// The async tasks to orchestrate the next and
// forwarding update.
- CompletableFuture<ObjectiveError> revPolicyFuture = new CompletableFuture<>();
CompletableFuture<ObjectiveError> fwdInitNextFuture = new CompletableFuture<>();
CompletableFuture<ObjectiveError> revInitNextFuture = new CompletableFuture<>();
- CompletableFuture<ObjectiveError> newPwFuture = new CompletableFuture<>();
+ CompletableFuture<ObjectiveError> fwdTermNextFuture = new CompletableFuture<>();
+ CompletableFuture<ObjectiveError> revTermNextFuture = new CompletableFuture<>();
+ CompletableFuture<ObjectiveError> fwdPwFuture = new CompletableFuture<>();
+ CompletableFuture<ObjectiveError> revPwFuture = new CompletableFuture<>();
- result = verifyPseudoWire(newPw);
+
+ Result result = verifyPseudoWire(newPw);
if (result != SUCCESS) {
return;
}
- if (!l2InitiationNextObjStore.containsKey(fwdKey)) {
- log.warn("NextObj for {} does not exist in the store.", fwdKey);
- return;
- }
- fwdNextObjective = l2InitiationNextObjStore.get(fwdKey).value();
- if (!l2InitiationNextObjStore.containsKey(revKey)) {
- log.warn("NextObj for {} does not exist in the store.", revKey);
- return;
- }
// First we remove both policy.
- revNextObjective = l2InitiationNextObjStore.get(revKey).value();
log.debug("Start deleting fwd policy for {}", tunnelId);
deletePolicy(
tunnelId,
oldPw.l2TunnelPolicy().cP1(),
oldPw.l2TunnelPolicy().cP1InnerTag(),
oldPw.l2TunnelPolicy().cP1OuterTag(),
- fwdNextObjective.id(),
- revPolicyFuture
+ fwdInitNextFuture,
+ FWD
);
- revPolicyFuture.thenAcceptAsync(status -> {
- if (status == null) {
- log.debug("Fwd policy removed. Now remove rev policy for {}", tunnelId);
- deletePolicy(
- tunnelId,
- oldPw.l2TunnelPolicy().cP2(),
- oldPw.l2TunnelPolicy().cP2InnerTag(),
- oldPw.l2TunnelPolicy().cP2OuterTag(),
- revNextObjective.id(),
- fwdInitNextFuture
- );
- }
- });
+ log.debug("Start deleting rev policy for {}", tunnelId);
+ deletePolicy(
+ tunnelId,
+ oldPw.l2TunnelPolicy().cP2(),
+ oldPw.l2TunnelPolicy().cP2InnerTag(),
+ oldPw.l2TunnelPolicy().cP2OuterTag(),
+ revInitNextFuture,
+ REV
+ );
// Finally we remove both the tunnels.
fwdInitNextFuture.thenAcceptAsync(status -> {
if (status == null) {
- log.debug("Rev policy removed. Now remove fwd pw for {}", tunnelId);
- tearDownPseudoWire(
- fwdKey,
- fwdNextObjective,
+ log.debug("Fwd policy removed. Now remove fwd {} for {}", INITIATION, tunnelId);
+ tearDownPseudoWireInit(
+ tunnelId,
oldPw.l2TunnelPolicy().cP1(),
- oldPw.l2TunnelPolicy().cP2(),
- revInitNextFuture
+ fwdTermNextFuture,
+ FWD
);
}
});
revInitNextFuture.thenAcceptAsync(status -> {
if (status == null) {
- log.debug("Fwd tunnel removed. Now remove rev pw for {}", tunnelId);
- tearDownPseudoWire(
- revKey,
- revNextObjective,
+ log.debug("Rev policy removed. Now remove rev {} for {}", INITIATION, tunnelId);
+ tearDownPseudoWireInit(
+ tunnelId,
oldPw.l2TunnelPolicy().cP2(),
- oldPw.l2TunnelPolicy().cP1(),
- newPwFuture
+ revTermNextFuture,
+ REV
);
}
});
+ fwdTermNextFuture.thenAcceptAsync(status -> {
+ if (status == null) {
+ log.debug("Fwd {} removed. Now remove fwd {} for {}", INITIATION, TERMINATION, tunnelId);
+ tearDownPseudoWireTerm(
+ oldPw.l2Tunnel(),
+ oldPw.l2TunnelPolicy().cP2(),
+ fwdPwFuture,
+ FWD
+ );
+ }
+ });
+ revTermNextFuture.thenAcceptAsync(status -> {
+ if (status == null) {
+ log.debug("Rev {} removed. Now remove rev {} for {}", INITIATION, TERMINATION, tunnelId);
+ tearDownPseudoWireTerm(
+ oldPw.l2Tunnel(),
+ oldPw.l2TunnelPolicy().cP1(),
+ revPwFuture,
+ REV
+ );
+ }
+ });
// At the end we install the new pw.
- newPwFuture.thenAcceptAsync(status -> {
+ fwdPwFuture.thenAcceptAsync(status -> {
if (status == null) {
log.debug("Deploying new fwd pw for {}", tunnelId);
- Result lamdaResult = deployPseudoWire(
+ Result lamdaResult = deployPseudoWireInit(
newPw.l2Tunnel(),
newPw.l2TunnelPolicy().cP1(),
newPw.l2TunnelPolicy().cP2(),
@@ -322,8 +367,22 @@
newPw.l2TunnelPolicy().cP1OuterTag(),
lamdaResult.nextId
);
+ if (lamdaResult != SUCCESS) {
+ return;
+ }
+ deployPseudoWireTerm(
+ newPw.l2Tunnel(),
+ newPw.l2TunnelPolicy().cP2(),
+ newPw.l2TunnelPolicy().cP2OuterTag(),
+ FWD
+ );
+
+ }
+ });
+ revPwFuture.thenAcceptAsync(status -> {
+ if (status == null) {
log.debug("Deploying new rev pw for {}", tunnelId);
- lamdaResult = deployPseudoWire(
+ Result lamdaResult = deployPseudoWireInit(
newPw.l2Tunnel(),
newPw.l2TunnelPolicy().cP2(),
newPw.l2TunnelPolicy().cP1(),
@@ -339,6 +398,15 @@
newPw.l2TunnelPolicy().cP2OuterTag(),
lamdaResult.nextId
);
+ if (lamdaResult != SUCCESS) {
+ return;
+ }
+ deployPseudoWireTerm(
+ newPw.l2Tunnel(),
+ newPw.l2TunnelPolicy().cP1(),
+ newPw.l2TunnelPolicy().cP1OuterTag(),
+ REV
+ );
}
});
}
@@ -366,11 +434,8 @@
*/
private void tearDown(Set<DefaultL2TunnelDescription> pwToRemove) {
Result result;
- int nextId;
- NextObjective nextObjective;
long l2TunnelId;
- // We remove all the pw in the configuration
- // file.
+ // We remove all the pw in the configuration file.
for (DefaultL2TunnelDescription currentL2Tunnel : pwToRemove) {
l2TunnelId = currentL2Tunnel.l2Tunnel().tunnelId();
if (l2TunnelId == 0) {
@@ -381,52 +446,48 @@
if (result != SUCCESS) {
continue;
}
- String key = generateKey(l2TunnelId, FWD);
- if (!l2InitiationNextObjStore.containsKey(key)) {
- log.warn("NextObj for {} does not exist in the store.", key);
- continue;
- }
- nextObjective = l2InitiationNextObjStore.get(key).value();
- nextId = nextObjective.id();
// First all we have to delete the policy.
deletePolicy(
l2TunnelId,
currentL2Tunnel.l2TunnelPolicy().cP1(),
currentL2Tunnel.l2TunnelPolicy().cP1InnerTag(),
currentL2Tunnel.l2TunnelPolicy().cP1OuterTag(),
- nextId,
- null
+ null,
+ FWD
);
// Finally we will tear down the pseudo wire.
- tearDownPseudoWire(
- key,
- nextObjective,
+ tearDownPseudoWireInit(
+ l2TunnelId,
currentL2Tunnel.l2TunnelPolicy().cP1(),
+ null,
+ FWD
+ );
+ tearDownPseudoWireTerm(
+ currentL2Tunnel.l2Tunnel(),
currentL2Tunnel.l2TunnelPolicy().cP2(),
- null
+ null,
+ FWD
);
// We do the same operations on the reverse side.
- key = generateKey(l2TunnelId, REV);
- if (!l2InitiationNextObjStore.containsKey(key)) {
- log.warn("NextObj for {} does not exist in the store.", key);
- continue;
- }
- nextObjective = l2InitiationNextObjStore.get(key).value();
- nextId = nextObjective.id();
deletePolicy(
l2TunnelId,
currentL2Tunnel.l2TunnelPolicy().cP2(),
currentL2Tunnel.l2TunnelPolicy().cP2InnerTag(),
currentL2Tunnel.l2TunnelPolicy().cP2OuterTag(),
- nextId,
- null
+ null,
+ REV
);
- tearDownPseudoWire(
- key,
- nextObjective,
+ tearDownPseudoWireInit(
+ l2TunnelId,
currentL2Tunnel.l2TunnelPolicy().cP2(),
+ null,
+ REV
+ );
+ tearDownPseudoWireTerm(
+ currentL2Tunnel.l2Tunnel(),
currentL2Tunnel.l2TunnelPolicy().cP1(),
- null
+ null,
+ REV
);
}
@@ -444,7 +505,7 @@
DefaultL2TunnelPolicy l2TunnelPolicy = l2TunnelDescription.l2TunnelPolicy();
result = verifyTunnel(l2Tunnel);
if (result != SUCCESS) {
- log.warn("Tunnel {} did not pass the validation", l2Tunnel.tunnelId());
+ log.warn("Tunnel {}: did not pass the validation", l2Tunnel.tunnelId());
return result;
}
result = verifyPolicy(
@@ -455,7 +516,7 @@
l2TunnelPolicy.cP2OuterTag()
);
if (result != SUCCESS) {
- log.warn("Policy for tunnel {} did not pass the validation", l2Tunnel.tunnelId());
+ log.warn("Policy for tunnel {}: did not pass the validation", l2Tunnel.tunnelId());
return result;
}
@@ -463,8 +524,6 @@
}
/**
- * TODO Operation on the policies store.
- *
* Handles the policy establishment which consists in
* create the filtering and forwarding objectives related
* to the initiation and termination.
@@ -474,27 +533,21 @@
* @param ingressInner the ingress inner tag
* @param ingressOuter the ingress outer tag
* @param nextId the next objective id
- * @return SUCCESS if the policy has been deployed.
- * Otherwise an error according to the failure
- * scenario.
+ * @return the result of the operation
*/
private Result deployPolicy(long tunnelId,
ConnectPoint ingress,
VlanId ingressInner,
VlanId ingressOuter,
int nextId) {
-
- ForwardingObjective.Builder fwdBuilder;
- FilteringObjective.Builder filtBuilder;
- List<Objective> objectives = Lists.newArrayList();
if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
- log.info("Abort creation of policy for L2 tunnel {}: {}", tunnelId, NOT_MASTER);
+ log.info("Abort creation of policy for tunnel {}: I am not the master", tunnelId);
return SUCCESS;
}
+ List<Objective> objectives = Lists.newArrayList();
// We create the forwarding objective for supporting
// the l2 tunnel.
- fwdBuilder = createFwdObjective(
- INITIATION,
+ ForwardingObjective.Builder fwdBuilder = createInitFwdObjective(
tunnelId,
ingress.port(),
nextId
@@ -508,7 +561,7 @@
objectives.add(fwdBuilder.add(context));
// We create the filtering objective to define the
// permit traffic in the switch
- filtBuilder = createFiltObjective(
+ FilteringObjective.Builder filtBuilder = createFiltObjective(
ingress.port(),
ingressInner,
ingressOuter
@@ -528,7 +581,7 @@
for (Objective objective : objectives) {
if (objective instanceof ForwardingObjective) {
srManager.flowObjectiveService.forward(ingress.deviceId(), (ForwardingObjective) objective);
- log.debug("Creating new FwdObj for NextObj with id={} for tunnel {}", nextId, tunnelId);
+ log.debug("Creating new FwdObj for initiation NextObj with id={} for tunnel {}", nextId, tunnelId);
} else {
srManager.flowObjectiveService.filter(ingress.deviceId(), (FilteringObjective) objective);
log.debug("Creating new FiltObj for tunnel {}", tunnelId);
@@ -572,46 +625,36 @@
}
/**
- * TODO Operation on the policies store.
- *
* Handles the tunnel establishment which consists in
- * create the next objectives related to the initiation
- * and termination.
+ * create the next objectives related to the initiation.
*
* @param l2Tunnel the tunnel to deploy
* @param ingress the ingress connect point
* @param egress the egress connect point
* @param direction the direction of the pw
- * @return SUCCESS if the tunnel has been created.
- * Otherwise an error according to the failure
- * scenario
+ * @return the result of the operation
*/
- private Result deployPseudoWire(DefaultL2Tunnel l2Tunnel,
- ConnectPoint ingress,
- ConnectPoint egress,
- String direction) {
- Link nextHop;
- NextObjective.Builder nextObjectiveBuilder;
- NextObjective nextObjective;
- int nextId;
- Result result;
+ private Result deployPseudoWireInit(DefaultL2Tunnel l2Tunnel,
+ ConnectPoint ingress,
+ ConnectPoint egress,
+ Direction direction) {
if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
- log.info("Abort initiation creation of L2 tunnel {}: {}",
- l2Tunnel.tunnelId(), NOT_MASTER);
+ log.info("Abort initiation of tunnel {}: I am not the master", l2Tunnel.tunnelId());
return SUCCESS;
}
// We need at least a path between ingress and egress.
- nextHop = getNextHop(ingress, egress);
+ Link nextHop = getNextHop(ingress, egress);
if (nextHop == null) {
log.warn("No path between ingress and egress");
return WRONG_PARAMETERS;
}
// We create the next objective without the metadata
// context and id. We check if it already exists in the
- // store. If not we store as it is in the store ?
- nextObjectiveBuilder = createNextObjective(
+ // store. If not we store as it is in the store.
+ NextObjective.Builder nextObjectiveBuilder = createNextObjective(
INITIATION,
- nextHop,
+ nextHop.src(),
+ nextHop.dst(),
l2Tunnel,
egress.deviceId()
);
@@ -625,7 +668,7 @@
.matchTunnelId(l2Tunnel.tunnelId())
.build();
nextObjectiveBuilder.withMeta(metadata);
- nextId = srManager.flowObjectiveService.allocateNextId();
+ int nextId = srManager.flowObjectiveService.allocateNextId();
if (nextId < 0) {
log.warn("Not able to allocate a next id for initiation");
return INTERNAL_ERROR;
@@ -640,18 +683,95 @@
(objective, error)
-> log.warn("Failed to populate Initiation l2 tunnel rule for {}: {}",
l2Tunnel.tunnelId(), error));
- nextObjective = nextObjectiveBuilder.add(context);
+ NextObjective nextObjective = nextObjectiveBuilder.add(context);
srManager.flowObjectiveService.next(ingress.deviceId(), nextObjective);
log.debug("Initiation next objective for {} not found. Creating new NextObj with id={}",
l2Tunnel.tunnelId(),
nextObjective.id()
);
- result = SUCCESS;
+ Result result = SUCCESS;
result.nextId = nextObjective.id();
return result;
}
/**
+ * Handles the tunnel termination, which consists in the creation
+ * of a forwarding objective and a next objective.
+ *
+ * @param l2Tunnel the tunnel to terminate
+ * @param egress the egress point
+ * @param egressVlan the expected vlan at egress
+ * @param direction the direction
+ * @return the result of the operation
+ */
+ private Result deployPseudoWireTerm(DefaultL2Tunnel l2Tunnel,
+ ConnectPoint egress,
+ VlanId egressVlan,
+ Direction direction) {
+ // We create the group relative to the termination.
+ // It's fine to abort the termination if we are
+ // not the master.
+ if (!srManager.mastershipService.isLocalMaster(egress.deviceId())) {
+ log.info("Abort termination of tunnel {}: I am not the master", l2Tunnel.tunnelId());
+ return SUCCESS;
+ }
+ NextObjective.Builder nextObjectiveBuilder = createNextObjective(
+ TERMINATION,
+ egress,
+ null,
+ null,
+ egress.deviceId()
+ );
+ if (nextObjectiveBuilder == null) {
+ return INTERNAL_ERROR;
+ }
+ TrafficSelector metadata = DefaultTrafficSelector
+ .builder()
+ .matchVlanId(egressVlan)
+ .build();
+ nextObjectiveBuilder.withMeta(metadata);
+ int nextId = srManager.flowObjectiveService.allocateNextId();
+ if (nextId < 0) {
+ log.warn("Not able to allocate a next id for initiation");
+ return INTERNAL_ERROR;
+ }
+ nextObjectiveBuilder.withId(nextId);
+ String key = generateKey(l2Tunnel.tunnelId(), direction);
+ l2TerminationNextObjStore.put(key, nextObjectiveBuilder.add());
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective)
+ -> log.debug("Termination l2 tunnel rule for {} populated",
+ l2Tunnel.tunnelId()),
+ (objective, error)
+ -> log.warn("Failed to populate termination l2 tunnel rule for {}: {}",
+ l2Tunnel.tunnelId(), error));
+ NextObjective nextObjective = nextObjectiveBuilder.add(context);
+ srManager.flowObjectiveService.next(egress.deviceId(), nextObjective);
+ log.debug("Termination next objective for {} not found. Creating new NextObj with id={}",
+ l2Tunnel.tunnelId(),
+ nextObjective.id()
+ );
+ // We create the flow relative to the termination.
+ ForwardingObjective.Builder fwdBuilder = createTermFwdObjective(
+ l2Tunnel.pwLabel(),
+ l2Tunnel.tunnelId(),
+ egress.port(),
+ nextObjective.id()
+ );
+ context = new DefaultObjectiveContext(
+ (objective)
+ -> log.debug("FwdObj for tunnel termination {} populated",
+ l2Tunnel.tunnelId()),
+ (objective, error)
+ -> log.warn("Failed to populate fwdrObj for tunnel termination {}",
+ l2Tunnel.tunnelId(), error));
+ srManager.flowObjectiveService.forward(egress.deviceId(), fwdBuilder.add(context));
+ log.debug("Creating new FwdObj for termination NextObj with id={} for tunnel {}", nextId, l2Tunnel.tunnelId());
+ return SUCCESS;
+
+ }
+
+ /**
* Helper method to verify if the tunnel is whether or not
* supported.
*
@@ -675,7 +795,7 @@
}
/**
- * Create the filtering objective according to a given policy.
+ * Creates the filtering objective according to a given policy.
*
* @param inPort the in port
* @param innerTag the inner vlan tag
@@ -695,34 +815,69 @@
}
/**
- * Create the forwarding objective according to a given pipeline.
+ * Creates the forwarding objective for the termination.
*
- * @param pipeline the pipeline
+ * @param pwLabel the pseudo wire label
* @param tunnelId the tunnel id
+ * @param egressPort the egress port
* @param nextId the next step
- * @return the forwarding objective to support the pipeline.
+ * @return the forwarding objective to support the termination
*/
- private ForwardingObjective.Builder createFwdObjective(Pipeline pipeline,
- long tunnelId,
- PortNumber inPort,
- int nextId) {
- ForwardingObjective.Builder fwdBuilder = null;
+ private ForwardingObjective.Builder createTermFwdObjective(MplsLabel pwLabel,
+ long tunnelId,
+ PortNumber egressPort,
+ int nextId) {
TrafficSelector.Builder trafficSelector = DefaultTrafficSelector
.builder();
- if (pipeline == INITIATION) {
- // The flow has to match on the mpls logical
- // port and the tunnel id.
- trafficSelector.matchTunnelId(tunnelId);
- trafficSelector.matchInPort(inPort);
- fwdBuilder = DefaultForwardingObjective.builder()
- .fromApp(srManager.appId)
- .makePermanent()
- .nextStep(nextId)
- .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)
- .withSelector(trafficSelector.build())
- .withFlag(VERSATILE);
- }
- return fwdBuilder;
+ TrafficTreatment.Builder trafficTreatment = DefaultTrafficTreatment
+ .builder();
+ // The flow has to match on the pw label and bos
+ trafficSelector.matchEthType(Ethernet.MPLS_UNICAST);
+ trafficSelector.matchMplsLabel(pwLabel);
+ trafficSelector.matchMplsBos(true);
+ // The flow has to decrement ttl, restore ttl in
+ // pop mpls, set tunnel id and port.
+ trafficTreatment.decMplsTtl();
+ trafficTreatment.copyTtlIn();
+ trafficTreatment.popMpls();
+ trafficTreatment.setTunnelId(tunnelId);
+ trafficTreatment.setOutput(egressPort);
+
+ return DefaultForwardingObjective.builder()
+ .fromApp(srManager.appId)
+ .makePermanent()
+ .nextStep(nextId)
+ .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)
+ .withSelector(trafficSelector.build())
+ .withTreatment(trafficTreatment.build())
+ .withFlag(VERSATILE);
+ }
+
+ /**
+ * Creates the forwarding objective for the initiation.
+ *
+ * @param tunnelId the tunnel id
+ * @param inPort the input port
+ * @param nextId the next step
+ * @return the forwarding objective to support the initiation.
+ */
+ private ForwardingObjective.Builder createInitFwdObjective(long tunnelId,
+ PortNumber inPort,
+ int nextId) {
+ TrafficSelector.Builder trafficSelector = DefaultTrafficSelector
+ .builder();
+ // The flow has to match on the mpls logical
+ // port and the tunnel id.
+ trafficSelector.matchTunnelId(tunnelId);
+ trafficSelector.matchInPort(inPort);
+
+ return DefaultForwardingObjective.builder()
+ .fromApp(srManager.appId)
+ .makePermanent()
+ .nextStep(nextId)
+ .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)
+ .withSelector(trafficSelector.build())
+ .withFlag(VERSATILE);
}
@@ -733,13 +888,15 @@
* the same next objective for different tunnels.
*
* @param pipeline the pipeline to support
- * @param nextHop the next hop towards the destination
+ * @param srcCp the source port
+ * @param dstCp the destination port
* @param l2Tunnel the tunnel to support
* @param egressId the egress device id
* @return the next objective to support the pipeline
*/
private NextObjective.Builder createNextObjective(Pipeline pipeline,
- Link nextHop,
+ ConnectPoint srcCp,
+ ConnectPoint dstCp,
DefaultL2Tunnel l2Tunnel,
DeviceId egressId) {
NextObjective.Builder nextObjBuilder;
@@ -786,7 +943,7 @@
try {
ingressMac = srManager
.deviceConfiguration
- .getDeviceMac(nextHop.src().deviceId());
+ .getDeviceMac(srcCp.deviceId());
} catch (DeviceConfigNotFoundException e) {
log.warn("Was not able to find the ingress mac");
return null;
@@ -796,20 +953,21 @@
try {
neighborMac = srManager
.deviceConfiguration
- .getDeviceMac(nextHop.dst().deviceId());
+ .getDeviceMac(dstCp.deviceId());
} catch (DeviceConfigNotFoundException e) {
log.warn("Was not able to find the neighbor mac");
return null;
}
treatmentBuilder.setEthDst(neighborMac);
} else {
+ // We create the next objective which
+ // will be a simple l2 group.
nextObjBuilder = DefaultNextObjective
.builder()
.withType(NextObjective.Type.SIMPLE)
.fromApp(srManager.appId);
-
}
- treatmentBuilder.setOutput(nextHop.src().port());
+ treatmentBuilder.setOutput(srcCp.port());
nextObjBuilder.addTreatment(treatmentBuilder.build());
return nextObjBuilder;
}
@@ -841,32 +999,41 @@
}
/**
- * TODO Operation on the store.
* Deletes a given policy using the parameter supplied.
*
* @param tunnelId the tunnel id
* @param ingress the ingress point
* @param ingressInner the ingress inner vlan id
* @param ingressOuter the ingress outer vlan id
- * @param nextId the next objective id
+ * @param future to perform the async operation
+ * @param direction the direction: forward or reverse
*/
private void deletePolicy(long tunnelId,
ConnectPoint ingress,
VlanId ingressInner,
VlanId ingressOuter,
- int nextId,
- CompletableFuture<ObjectiveError> fwdFuture) {
-
- ForwardingObjective.Builder fwdBuilder;
- FilteringObjective.Builder filtBuilder;
- List<Objective> objectives = Lists.newArrayList();
+ CompletableFuture<ObjectiveError> future,
+ Direction direction) {
if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
- log.info("Abort delete of policy for L2 tunnel {}: {}", tunnelId, NOT_MASTER);
+ log.info("Abort delete of policy for tunnel {}: I am not the master", tunnelId);
+ if (future != null) {
+ future.complete(null);
+ }
return;
}
+ String key = generateKey(tunnelId, direction);
+ if (!l2InitiationNextObjStore.containsKey(key)) {
+ log.warn("Abort delete of policy for tunnel {}: next does not exist in the store", tunnelId);
+ if (future != null) {
+ future.complete(null);
+ }
+ return;
+ }
+ NextObjective nextObjective = l2InitiationNextObjStore.get(key).value();
+ int nextId = nextObjective.id();
+ List<Objective> objectives = Lists.newArrayList();
// We create the forwarding objective.
- fwdBuilder = createFwdObjective(
- INITIATION,
+ ForwardingObjective.Builder fwdBuilder = createInitFwdObjective(
tunnelId,
ingress.port(),
nextId
@@ -874,24 +1041,24 @@
ObjectiveContext context = new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.debug("Previous FwdObj for policy {} removed", tunnelId);
- if (fwdFuture != null) {
- fwdFuture.complete(null);
+ log.debug("Previous fwdObj for policy {} removed", tunnelId);
+ if (future != null) {
+ future.complete(null);
}
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.warn("Failed to remove previous FwdObj for policy {}: {}", tunnelId, error);
- if (fwdFuture != null) {
- fwdFuture.complete(error);
+ log.warn("Failed to remove previous fwdObj for policy {}: {}", tunnelId, error);
+ if (future != null) {
+ future.complete(error);
}
}
};
objectives.add(fwdBuilder.remove(context));
// We create the filtering objective to define the
// permit traffic in the switch
- filtBuilder = createFiltObjective(
+ FilteringObjective.Builder filtBuilder = createFiltObjective(
ingress.port(),
ingressInner,
ingressOuter
@@ -916,55 +1083,130 @@
}
/**
- * TODO Operation on the store.
- * Deletes a given pseudo wire using the parameter supplied.
+ * Deletes the pseudo wire initiation.
*
- * @param key the key of the store
- * @param nextObjective the next objective representing the pw
+ * @param l2TunnelId the tunnel id
* @param ingress the ingress connect point
- * @param egress the egress connect point
+ * @param future to perform an async operation
+ * @param direction the direction: reverse of forward
*/
- private void tearDownPseudoWire(String key,
- NextObjective nextObjective,
- ConnectPoint ingress,
- ConnectPoint egress,
- CompletableFuture<ObjectiveError> nextFutureForInit) {
+ private void tearDownPseudoWireInit(long l2TunnelId,
+ ConnectPoint ingress,
+ CompletableFuture<ObjectiveError> future,
+ Direction direction) {
+ String key = generateKey(l2TunnelId, direction);
if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
- log.info("Abort delete of {} for {}: {}", INITIATION, key, NOT_MASTER);
+ log.info("Abort delete of {} for {}: I am not the master", INITIATION, key);
+ if (future != null) {
+ future.complete(null);
+ }
return;
}
+ if (!l2InitiationNextObjStore.containsKey(key)) {
+ log.info("Abort delete of {} for {}: next does not exist in the store", INITIATION, key);
+ if (future != null) {
+ future.complete(null);
+ }
+ return;
+ }
+ NextObjective nextObjective = l2InitiationNextObjStore.get(key).value();
ObjectiveContext context = new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
- log.debug("Previous {} NextObj for {} removed", INITIATION, key);
- if (nextFutureForInit != null) {
- nextFutureForInit.complete(null);
+ log.debug("Previous {} next for {} removed", INITIATION, key);
+ if (future != null) {
+ future.complete(null);
}
}
@Override
public void onError(Objective objective, ObjectiveError error) {
- log.warn("Failed to remove previous {} NextObj for {}: {}", INITIATION, key, error);
- if (nextFutureForInit != null) {
- nextFutureForInit.complete(error);
+ log.warn("Failed to remove previous {} next for {}: {}", INITIATION, key, error);
+ if (future != null) {
+ future.complete(error);
}
}
};
- srManager.flowObjectiveService.next(
- ingress.deviceId(),
- (NextObjective) nextObjective.copy().remove(context)
- );
+ srManager.flowObjectiveService
+ .next(ingress.deviceId(), (NextObjective) nextObjective.copy().remove(context));
l2InitiationNextObjStore.remove(key);
}
/**
+ * Deletes the pseudo wire termination.
+ *
+ * @param l2Tunnel the tunnel
+ * @param egress the egress connect point
+ * @param future the async task
+ * @param direction the direction of the tunnel
+ */
+ private void tearDownPseudoWireTerm(DefaultL2Tunnel l2Tunnel,
+ ConnectPoint egress,
+ CompletableFuture<ObjectiveError> future,
+ Direction direction) {
+ /*
+ * We verify the mastership for the termination.
+ */
+ String key = generateKey(l2Tunnel.tunnelId(), direction);
+ if (!srManager.mastershipService.isLocalMaster(egress.deviceId())) {
+ log.info("Abort delete of {} for {}: I am not the master", TERMINATION, key);
+ if (future != null) {
+ future.complete(null);
+ }
+ return;
+ }
+ if (!l2TerminationNextObjStore.containsKey(key)) {
+ log.info("Abort delete of {} for {}: next does not exist in the store", TERMINATION, key);
+ if (future != null) {
+ future.complete(null);
+ }
+ return;
+ }
+ NextObjective nextObjective = l2TerminationNextObjStore.get(key).value();
+ ForwardingObjective.Builder fwdBuilder = createTermFwdObjective(
+ l2Tunnel.pwLabel(),
+ l2Tunnel.tunnelId(),
+ egress.port(),
+ nextObjective.id()
+ );
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective)
+ -> log.debug("FwdObj for {} {} removed", TERMINATION, l2Tunnel.tunnelId()),
+ (objective, error)
+ -> log.warn("Failed to remove fwdObj for {} {}", TERMINATION, l2Tunnel.tunnelId(),
+ error));
+ srManager.flowObjectiveService.forward(egress.deviceId(), fwdBuilder.remove(context));
+
+ context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.debug("Previous {} next for {} removed", TERMINATION, key);
+ if (future != null) {
+ future.complete(null);
+ }
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.warn("Failed to remove previous {} next for {}: {}", TERMINATION, key, error);
+ if (future != null) {
+ future.complete(error);
+ }
+ }
+ };
+ srManager.flowObjectiveService
+ .next(egress.deviceId(), (NextObjective) nextObjective.copy().remove(context));
+ l2TerminationNextObjStore.remove(key);
+ }
+
+ /**
* Utilities to generate pw key.
*
* @param tunnelId the tunnel id
* @param direction the direction of the pw
* @return the key of the store
*/
- private String generateKey(long tunnelId, String direction) {
+ private String generateKey(long tunnelId, Direction direction) {
return String.format("%s-%s", tunnelId, direction);
}
@@ -979,7 +1221,7 @@
/**
* The termination pipeline.
*/
- TERMINATION;
+ TERMINATION
}
/**
@@ -1012,7 +1254,7 @@
private final String description;
private int nextId;
- private Result(int code, String description) {
+ Result(int code, String description) {
this.code = code;
this.description = description;
}
@@ -1021,14 +1263,24 @@
return description;
}
- public int getCode() {
- return code;
- }
-
@Override
public String toString() {
return code + ": " + description;
}
}
+ /**
+ * Enum helper for handling the direction of the pw.
+ */
+ public enum Direction {
+ /**
+ * The forward direction of the pseudo wire.
+ */
+ FWD,
+ /**
+ * The reverse direction of the pseudo wire.
+ */
+ REV;
+ }
+
}