Fixes an NPE in OFDPA pipeliners

- Introduced a new method to indicate whether the pipeliner is ready to receive objectives or not
- Ensure init() in OfDpa2Pipeline and OvsOfdpaPipeline can only be invoked once
  This is to avoid processing duplicated DEVICE_ADDED events introduced by gerrit 18899

Change-Id: Icb08935cb1f2761d7c98b5086fc27b6a0d8bc0cf
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
index 6f497e7..c9ea8a9 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
@@ -81,4 +81,13 @@
      */
     List<String> getNextMappings(NextGroup nextGroup);
 
+    /**
+     * Returns pipeliner status.
+     *
+     * @return true if pipeliner is ready to accept objectives. False otherwise.
+     */
+    default boolean isReady() {
+        return true;
+    }
+
 }
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index de7d785..84820cf 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -290,7 +290,7 @@
             try {
                 Pipeliner pipeliner = getDevicePipeliner(deviceId);
 
-                if (pipeliner != null) {
+                if (pipeliner != null && pipeliner.isReady()) {
                     if (objective instanceof NextObjective) {
                         nextToDevice.put(objective.id(), deviceId);
                         pipeliner.next((NextObjective) objective);
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
index 17a2d86..d8b1f06 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
@@ -151,27 +151,36 @@
     private ScheduledExecutorService accumulatorExecutorService
         = newSingleThreadScheduledExecutor(groupedThreads("OfdpaPipeliner", "acc-%d", log));
 
+    protected AtomicBoolean ready = new AtomicBoolean(false);
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
-        this.deviceId = deviceId;
+        if (!ready.getAndSet(true)) {
+            this.deviceId = deviceId;
 
-        serviceDirectory = context.directory();
-        coreService = serviceDirectory.get(CoreService.class);
-        flowRuleService = serviceDirectory.get(FlowRuleService.class);
-        groupService = serviceDirectory.get(GroupService.class);
-        flowObjectiveStore = context.store();
-        deviceService = serviceDirectory.get(DeviceService.class);
-        // Init the accumulator, if enabled
-        if (isAccumulatorEnabled(this)) {
-            accumulator = new ForwardingObjectiveAccumulator(context.accumulatorMaxObjectives(),
-                                                             context.accumulatorMaxBatchMillis(),
-                                                             context.accumulatorMaxIdleMillis());
+            serviceDirectory = context.directory();
+            coreService = serviceDirectory.get(CoreService.class);
+            flowRuleService = serviceDirectory.get(FlowRuleService.class);
+            groupService = serviceDirectory.get(GroupService.class);
+            flowObjectiveStore = context.store();
+            deviceService = serviceDirectory.get(DeviceService.class);
+            // Init the accumulator, if enabled
+            if (isAccumulatorEnabled(this)) {
+                accumulator = new ForwardingObjectiveAccumulator(context.accumulatorMaxObjectives(),
+                        context.accumulatorMaxBatchMillis(),
+                        context.accumulatorMaxIdleMillis());
+            }
+
+            initDriverId();
+            initGroupHander(context);
+
+            initializePipeline();
         }
+    }
 
-        initDriverId();
-        initGroupHander(context);
-
-        initializePipeline();
+    @Override
+    public boolean isReady() {
+        return ready.get();
     }
 
     void setupAccumulatorForTests(int maxFwd, int maxBatchMS, int maxIdleMS) {
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java
index c269412..07e4415 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java
@@ -158,13 +158,15 @@
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
-        // create a new executor at each init and a new empty queue
-        groupChecker = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/driver",
-                                                                                 "ovs-ofdpa-%d", log));
-        flowRuleQueue = new ConcurrentLinkedQueue<>();
-        groupCheckerLock = new ReentrantLock();
-        groupChecker.scheduleAtFixedRate(new PopVlanPuntGroupChecker(), 20, 50, TimeUnit.MILLISECONDS);
-        super.init(deviceId, context);
+        if (!ready.getAndSet(true)) {
+            // create a new executor at each init and a new empty queue
+            groupChecker = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/driver",
+                    "ovs-ofdpa-%d", log));
+            flowRuleQueue = new ConcurrentLinkedQueue<>();
+            groupCheckerLock = new ReentrantLock();
+            groupChecker.scheduleAtFixedRate(new PopVlanPuntGroupChecker(), 20, 50, TimeUnit.MILLISECONDS);
+            super.init(deviceId, context);
+        }
     }
 
     protected void processFilter(FilteringObjective filteringObjective,