fix removal of rules in a distributed setting
Change-Id: I44cb49990b8051f5f1542c11cbda6846049906e3
diff --git a/apps/olt/app/src/main/java/org/onosproject/olt/impl/Olt.java b/apps/olt/app/src/main/java/org/onosproject/olt/impl/Olt.java
index 301039f..96573f2 100644
--- a/apps/olt/app/src/main/java/org/onosproject/olt/impl/Olt.java
+++ b/apps/olt/app/src/main/java/org/onosproject/olt/impl/Olt.java
@@ -16,7 +16,6 @@
package org.onosproject.olt.impl;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -61,6 +60,9 @@
import org.onosproject.olt.AccessDeviceEvent;
import org.onosproject.olt.AccessDeviceListener;
import org.onosproject.olt.AccessDeviceService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -69,7 +71,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -90,6 +91,7 @@
implements AccessDeviceService {
private static final short DEFAULT_VLAN = 0;
+ private static final String SUBSCRIBERS = "existing-subscribers";
private final Logger log = getLogger(getClass());
@@ -108,6 +110,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
@Property(name = "defaultVlan", intValue = DEFAULT_VLAN,
label = "Default VLAN RG<->ONU traffic")
@@ -123,10 +127,7 @@
private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
- private Map<ConnectPoint, Set<ForwardingObjective.Builder>> objectives =
- Maps.newConcurrentMap();
-
- private Map<ConnectPoint, VlanId> subscribers = Maps.newConcurrentMap();
+ private Map<ConnectPoint, VlanId> subscribers;
private InternalNetworkConfigListener configListener =
new InternalNetworkConfigListener();
@@ -172,6 +173,11 @@
.forEach(p -> processFilteringObjectives((DeviceId) p.element().id(),
p.number(), true));
+ subscribers = storageService.<ConnectPoint, VlanId>consistentMapBuilder()
+ .withName(SUBSCRIBERS)
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .build().asJavaMap();
+
deviceService.addListener(deviceListener);
log.info("Started with Application ID {}", appId.id());
@@ -220,7 +226,15 @@
return;
}
- unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), olt.vlan());
+ VlanId subscriberVlan = subscribers.remove(port);
+
+ if (subscriberVlan == null) {
+ log.warn("Unknown subscriber at location {}", port);
+ return;
+ }
+
+ unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), subscriberVlan,
+ olt.vlan(), olt.defaultVlan());
}
@@ -230,39 +244,43 @@
}
private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
- PortNumber subscriberPort, VlanId deviceVlan) {
-
- //FIXME: This method is slightly ugly but it'll do until we have a better
- // way to remove flows from the flow store.
+ PortNumber subscriberPort, VlanId subscriberVlan,
+ VlanId deviceVlan, Optional<VlanId> defaultVlan) {
CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
- ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
-
- VlanId subscriberVlan = subscribers.remove(cp);
-
- Set<ForwardingObjective.Builder> fwds = objectives.remove(cp);
-
- if (fwds == null || fwds.size() != 2) {
- log.warn("Unknown or incomplete subscriber at {}", cp);
- return;
- }
+ ForwardingObjective.Builder upFwd = upBuilder(uplink, subscriberPort,
+ subscriberVlan, deviceVlan,
+ defaultVlan);
+ ForwardingObjective.Builder downFwd = downBuilder(uplink, subscriberPort,
+ subscriberVlan, deviceVlan,
+ defaultVlan);
- fwds.stream().forEach(
- fwd -> flowObjectiveService.forward(deviceId,
- fwd.remove(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- upFuture.complete(null);
- }
+ flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ upFuture.complete(null);
+ }
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- upFuture.complete(error);
- }
- })));
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
+ }
+ }));
+
+ flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ downFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ downFuture.complete(error);
+ }
+ }));
upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
if (upStatus == null && downStatus == null) {
@@ -291,53 +309,17 @@
CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
- TrafficSelector upstream = DefaultTrafficSelector.builder()
- .matchVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
- .matchInPort(subscriberPort)
- .build();
-
- TrafficSelector downstream = DefaultTrafficSelector.builder()
- .matchVlanId(deviceVlan)
- .matchInPort(uplinkPort)
- .matchInnerVlanId(subscriberVlan)
- .build();
-
- TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
- .pushVlan()
- .setVlanId(subscriberVlan)
- .pushVlan()
- .setVlanId(deviceVlan)
- .setOutput(uplinkPort)
- .build();
-
- TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
- .popVlan()
- .setVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
- .setOutput(subscriberPort)
- .build();
+ ForwardingObjective.Builder upFwd = upBuilder(uplinkPort, subscriberPort,
+ subscriberVlan, deviceVlan,
+ defaultVlan);
- ForwardingObjective.Builder upFwd = DefaultForwardingObjective.builder()
- .withFlag(ForwardingObjective.Flag.VERSATILE)
- .withPriority(1000)
- .makePermanent()
- .withSelector(upstream)
- .fromApp(appId)
- .withTreatment(upstreamTreatment);
-
-
- ForwardingObjective.Builder downFwd = DefaultForwardingObjective.builder()
- .withFlag(ForwardingObjective.Flag.VERSATILE)
- .withPriority(1000)
- .makePermanent()
- .withSelector(downstream)
- .fromApp(appId)
- .withTreatment(downstreamTreatment);
+ ForwardingObjective.Builder downFwd = downBuilder(uplinkPort, subscriberPort,
+ subscriberVlan, deviceVlan,
+ defaultVlan);
ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
-
subscribers.put(cp, subscriberVlan);
- objectives.put(cp, Sets.newHashSet(upFwd, downFwd));
flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
@Override
@@ -383,6 +365,60 @@
}
+ private ForwardingObjective.Builder downBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ VlanId subscriberVlan,
+ VlanId deviceVlan,
+ Optional<VlanId> defaultVlan) {
+ TrafficSelector downstream = DefaultTrafficSelector.builder()
+ .matchVlanId(deviceVlan)
+ .matchInPort(uplinkPort)
+ .matchInnerVlanId(subscriberVlan)
+ .build();
+
+ TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
+ .popVlan()
+ .setVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
+ .setOutput(subscriberPort)
+ .build();
+
+ return DefaultForwardingObjective.builder()
+ .withFlag(ForwardingObjective.Flag.VERSATILE)
+ .withPriority(1000)
+ .makePermanent()
+ .withSelector(downstream)
+ .fromApp(appId)
+ .withTreatment(downstreamTreatment);
+ }
+
+ private ForwardingObjective.Builder upBuilder(PortNumber uplinkPort,
+ PortNumber subscriberPort,
+ VlanId subscriberVlan,
+ VlanId deviceVlan,
+ Optional<VlanId> defaultVlan) {
+ TrafficSelector upstream = DefaultTrafficSelector.builder()
+ .matchVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
+ .matchInPort(subscriberPort)
+ .build();
+
+
+ TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
+ .pushVlan()
+ .setVlanId(subscriberVlan)
+ .pushVlan()
+ .setVlanId(deviceVlan)
+ .setOutput(uplinkPort)
+ .build();
+
+ return DefaultForwardingObjective.builder()
+ .withFlag(ForwardingObjective.Flag.VERSATILE)
+ .withPriority(1000)
+ .makePermanent()
+ .withSelector(upstream)
+ .fromApp(appId)
+ .withTreatment(upstreamTreatment);
+ }
+
private void processFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
@@ -430,9 +466,11 @@
break;
case PORT_REMOVED:
AccessDeviceData olt = oltData.get(devId);
+ VlanId vlan = subscribers.get(new ConnectPoint(devId,
+ event.port().number()));
unprovisionSubscriber(devId, olt.uplink(),
event.port().number(),
- olt.vlan());
+ vlan, olt.vlan(), olt.defaultVlan());
if (!oltData.get(devId).uplink().equals(event.port().number()) &&
event.port().isEnabled()) {
processFilteringObjectives(devId, event.port().number(), false);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java
index 58801e8..d9ab7e5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DatabaseState.java
@@ -104,6 +104,9 @@
Long counterGet(String counterName);
@Command
+ void counterSet(String counterName, long value);
+
+ @Command
CommitResponse prepareAndCommit(Transaction transaction);
@Command
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
index 027341e..27d953b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDatabaseState.java
@@ -207,6 +207,11 @@
}
@Override
+ public void counterSet(String counterName, long value) {
+ getCounter(counterName).set(value);
+ }
+
+ @Override
public Long queueSize(String queueName) {
return Long.valueOf(getQueue(queueName).size());
}
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index 1ce9782..767decf 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -38,6 +38,7 @@
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
@@ -592,7 +593,12 @@
builder.add(inner.build()).add(outer.build());
break;
case REMOVE:
- builder.remove(inner.build()).remove(outer.build());
+ Iterable<FlowEntry> flows = flowRuleService.getFlowEntries(deviceId);
+ for (FlowEntry fe : flows) {
+ if (fe.equals(inner.build()) || fe.equals(outer.build())) {
+ builder.remove(fe);
+ }
+ }
break;
case ADD_TO_EXISTING:
break;