Fixes a regression introduced by 23223.

Additionally adds a cleanUp method for the pipeliners
to reset the internal states between different executions.
This was another regression introduced by 23223.

Fixes also a memory leak caused by re-init of the grouphandler
without terminating its internal references

Change-Id: I06e9e005110c5237cb3bdf893cc71975fb94281e
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
index c9ea8a9..22e89d8 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
@@ -90,4 +90,11 @@
         return true;
     }
 
+    /**
+     * Clean up internal state of the pipeliner.
+     * Implementation is pipeliner specific.
+     */
+    default void cleanUp() {
+    }
+
 }
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 84820cf..3c5c571 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -496,7 +496,10 @@
     private void invalidatePipeliner(DeviceId id) {
         log.info("Invalidating cached pipeline behaviour for {}", id);
         driverHandlers.remove(id);
-        pipeliners.remove(id);
+        Pipeliner pipeliner = pipeliners.remove(id);
+        if (pipeliner != null) {
+            pipeliner.cleanUp();
+        }
         if (deviceService.isAvailable(id)) {
             getAndInitDevicePipeliner(id);
         }
@@ -518,6 +521,7 @@
                         getAndInitDevicePipeliner(event.subject().id());
                       } else {
                         log.debug("Device is no longer available {}", event.subject().id());
+                        getDevicePipeliner(event.subject().id()).cleanUp();
                       }
                     });
                     break;
@@ -534,8 +538,11 @@
                     // before removing device, especially if they intend to
                     // replace driver/pipeliner assigned to the device.
                     devEventExecutor.execute(() -> {
-                      driverHandlers.remove(event.subject().id());
-                      pipeliners.remove(event.subject().id());
+                        driverHandlers.remove(event.subject().id());
+                        Pipeliner pipeliner = pipeliners.remove(event.subject().id());
+                        if (pipeliner != null) {
+                            pipeliner.cleanUp();
+                        }
                     });
                     break;
                 case DEVICE_SUSPENDED:
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/XpliantPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/XpliantPipeline.java
index 2dca42d..eced43e 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/XpliantPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/XpliantPipeline.java
@@ -34,6 +34,13 @@
 
     @Override
     protected void initGroupHander(PipelinerContext context) {
+        // Terminate internal references
+        // We are terminating the references here
+        // because when the device is offline the apps
+        // are still sending flowobjectives
+        if (groupHandler != null) {
+            groupHandler.terminate();
+        }
         groupHandler = new XpliantGroupHandler();
         groupHandler.init(deviceId, context);
     }
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
index a9ebd24..851715d 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2GroupHandler.java
@@ -124,6 +124,7 @@
             new ConcurrentHashMap<>();
     private ScheduledExecutorService groupCheckerExecutor =
             Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", "ofdpa-%d", log));
+    private InnerGroupListener innerGroupListener = new InnerGroupListener();
     /**
      * Determines whether this pipeline support copy ttl instructions or not.
      *
@@ -202,7 +203,34 @@
         pendingUpdateNextObjectives = new ConcurrentHashMap<>();
         GroupChecker groupChecker = new GroupChecker(this);
         groupCheckerExecutor.scheduleAtFixedRate(groupChecker, 0, 500, TimeUnit.MILLISECONDS);
-        groupService.addListener(new InnerGroupListener());
+        groupService.addListener(innerGroupListener);
+    }
+
+    // Terminate internal references
+    public void terminate() {
+        if (nextIndex != null) {
+            nextIndex.destroy();
+        }
+        nextIndex = null;
+        if (pendingAddNextObjectives != null) {
+            pendingAddNextObjectives.cleanUp();
+        }
+        pendingAddNextObjectives = null;
+        if (pendingRemoveNextObjectives != null) {
+            pendingRemoveNextObjectives.cleanUp();
+        }
+        pendingRemoveNextObjectives = null;
+        if (pendingGroups != null) {
+            pendingGroups.cleanUp();
+        }
+        pendingGroups = null;
+        if (groupCheckerExecutor != null) {
+            groupCheckerExecutor.shutdown();
+        }
+        groupCheckerExecutor = null;
+        if (groupService != null) {
+            groupService.removeListener(innerGroupListener);
+        }
     }
 
     //////////////////////////////////////
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
index d8b1f06..a891e40 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
@@ -155,7 +155,11 @@
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
-        if (!ready.getAndSet(true)) {
+        synchronized (this) {
+            if (isReady()) {
+                return;
+            }
+
             this.deviceId = deviceId;
 
             serviceDirectory = context.directory();
@@ -175,6 +179,7 @@
             initGroupHander(context);
 
             initializePipeline();
+            ready.set(true);
         }
     }
 
@@ -183,6 +188,17 @@
         return ready.get();
     }
 
+    @Override
+    public void cleanUp() {
+        synchronized (this) {
+            if (!isReady()) {
+                return;
+            }
+            ready.set(false);
+        }
+        log.info("Cleaning up...");
+    }
+
     void setupAccumulatorForTests(int maxFwd, int maxBatchMS, int maxIdleMS) {
         if (accumulator == null) {
             accumulator = new ForwardingObjectiveAccumulator(maxFwd,
@@ -197,6 +213,13 @@
     }
 
     protected void initGroupHander(PipelinerContext context) {
+        // Terminate internal references
+        // We are terminating the references here
+        // because when the device is offline the apps
+        // are still sending flowobjectives
+        if (groupHandler != null) {
+            groupHandler.terminate();
+        }
         groupHandler = new Ofdpa2GroupHandler();
         groupHandler.init(deviceId, context);
     }
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa3Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa3Pipeline.java
index 4632a4f..9ba942d 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa3Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa3Pipeline.java
@@ -91,6 +91,13 @@
 
     @Override
     protected void initGroupHander(PipelinerContext context) {
+        // Terminate internal references
+        // We are terminating the references here
+        // because when the device is offline the apps
+        // are still sending flowobjectives
+        if (groupHandler != null) {
+            groupHandler.terminate();
+        }
         groupHandler = new Ofdpa3GroupHandler();
         groupHandler.init(deviceId, context);
     }
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java
index 07e4415..8ca774a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/OvsOfdpaPipeline.java
@@ -152,16 +152,37 @@
 
     @Override
     protected void initGroupHander(PipelinerContext context) {
+        // Terminate internal references
+        // We are terminating the references here
+        // because when the device is offline the apps
+        // are still sending flowobjectives
+        if (groupHandler != null) {
+            groupHandler.terminate();
+        }
         groupHandler = new OvsOfdpaGroupHandler();
         groupHandler.init(deviceId, context);
     }
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
-        if (!ready.getAndSet(true)) {
+        synchronized (this) {
+            if (isReady()) {
+                return;
+            }
+
+            // Terminate internal references
+            // We are terminating the references here
+            // because when the device is offline the apps
+            // are still sending flowobjectives
+            if (groupChecker != null) {
+                groupChecker.shutdown();
+            }
             // create a new executor at each init and a new empty queue
             groupChecker = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/driver",
                     "ovs-ofdpa-%d", log));
+            if (flowRuleQueue != null) {
+                flowRuleQueue.clear();
+            }
             flowRuleQueue = new ConcurrentLinkedQueue<>();
             groupCheckerLock = new ReentrantLock();
             groupChecker.scheduleAtFixedRate(new PopVlanPuntGroupChecker(), 20, 50, TimeUnit.MILLISECONDS);