Fixes an NPE in OFDPA pipeliners
- Introduced a new method to indicate whether the pipeliner is ready to receive objectives or not
- Ensure init() in OfDpa2Pipeline and OvsOfdpaPipeline can only be invoked once
This is to avoid processing duplicated DEVICE_ADDED events introduced by gerrit 18899
Change-Id: Icb08935cb1f2761d7c98b5086fc27b6a0d8bc0cf
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
index 6f497e7..c9ea8a9 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
@@ -81,4 +81,13 @@
*/
List<String> getNextMappings(NextGroup nextGroup);
+ /**
+ * Returns pipeliner status.
+ *
+ * @return true if pipeliner is ready to accept objectives. False otherwise.
+ */
+ default boolean isReady() {
+ return true;
+ }
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index d730ba7..144764a 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -295,7 +295,7 @@
try {
Pipeliner pipeliner = getDevicePipeliner(deviceId);
- if (pipeliner != null) {
+ if (pipeliner != null && pipeliner.isReady()) {
if (objective instanceof NextObjective) {
nextToDevice.put(objective.id(), deviceId);
pipeliner.next((NextObjective) objective);
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java
index 4937a12..85ce51b 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java
@@ -188,15 +188,16 @@
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
-
- if (supportPuntGroup()) {
- // create a new executor at each init and a new empty queue
- groupChecker = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/driver",
- "cpqd-ofdpa-%d", log));
- flowRuleQueue = new ConcurrentLinkedQueue<>();
- groupCheckerLock = new ReentrantLock();
- groupChecker.scheduleAtFixedRate(new PopVlanPuntGroupChecker(), 20, 50, TimeUnit.MILLISECONDS);
- super.init(deviceId, context);
+ if (!ready.getAndSet(true)) {
+ if (supportPuntGroup()) {
+ // create a new executor at each init and a new empty queue
+ groupChecker = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/driver",
+ "cpqd-ofdpa-%d", log));
+ flowRuleQueue = new ConcurrentLinkedQueue<>();
+ groupCheckerLock = new ReentrantLock();
+ groupChecker.scheduleAtFixedRate(new PopVlanPuntGroupChecker(), 20, 50, TimeUnit.MILLISECONDS);
+ super.init(deviceId, context);
+ }
}
}
/*
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
index 083a14b..6731d72 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
@@ -144,27 +144,36 @@
private ScheduledExecutorService accumulatorExecutorService
= newSingleThreadScheduledExecutor(groupedThreads("OfdpaPipeliner", "acc-%d", log));
+ protected AtomicBoolean ready = new AtomicBoolean(false);
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
- this.deviceId = deviceId;
+ if (!ready.getAndSet(true)) {
+ this.deviceId = deviceId;
- serviceDirectory = context.directory();
- coreService = serviceDirectory.get(CoreService.class);
- flowRuleService = serviceDirectory.get(FlowRuleService.class);
- groupService = serviceDirectory.get(GroupService.class);
- flowObjectiveStore = context.store();
- deviceService = serviceDirectory.get(DeviceService.class);
- // Init the accumulator, if enabled
- if (isAccumulatorEnabled(this)) {
- accumulator = new ForwardingObjectiveAccumulator(context.accumulatorMaxObjectives(),
- context.accumulatorMaxBatchMillis(),
- context.accumulatorMaxIdleMillis());
+ serviceDirectory = context.directory();
+ coreService = serviceDirectory.get(CoreService.class);
+ flowRuleService = serviceDirectory.get(FlowRuleService.class);
+ groupService = serviceDirectory.get(GroupService.class);
+ flowObjectiveStore = context.store();
+ deviceService = serviceDirectory.get(DeviceService.class);
+ // Init the accumulator, if enabled
+ if (isAccumulatorEnabled(this)) {
+ accumulator = new ForwardingObjectiveAccumulator(context.accumulatorMaxObjectives(),
+ context.accumulatorMaxBatchMillis(),
+ context.accumulatorMaxIdleMillis());
+ }
+
+ initDriverId();
+ initGroupHander(context);
+
+ initializePipeline();
}
+ }
- initDriverId();
- initGroupHander(context);
-
- initializePipeline();
+ @Override
+ public boolean isReady() {
+ return ready.get();
}
void setupAccumulatorForTests(int maxFwd, int maxBatchMS, int maxIdleMS) {