In this commit:
   Bug fix where filtering objectives are not installed due to available ports becoming enabled later.
   Bug fix where flow objective store had no listener for notifications from drivers across multiple instances of the controller.
   NPE fix in ofdpa driver for non-existing groups.
   Preventing ofdpa driver from sending spurious pass notification to app.
   Incrementing retry filter timer from 1 to 5 secs in default routing handler.
   Made several debug messages clearer.

Change-Id: I828671ee4c8bcfe03c946d051e1d1aac9d8f68dd
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
index 957523c..b5a57cd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
@@ -30,15 +30,22 @@
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Manages the inventory of created next groups.
@@ -57,9 +64,16 @@
     protected StorageService storageService;
 
     private AtomicCounter nextIds;
+    private MapEventListener<Integer, byte[]> mapListener = new NextGroupListener();
+    // event queue to separate map-listener threads from event-handler threads (tpool)
+    private BlockingQueue<ObjectiveEvent> eventQ;
+    private ExecutorService tpool;
 
     @Activate
     public void activate() {
+        tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/flobj-notifier", "%d", log));
+        eventQ = new LinkedBlockingQueue<ObjectiveEvent>();
+        tpool.execute(new FlowObjectiveNotifier());
         nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
                 .withName("flowobjective-groups")
                 .withSerializer(Serializer.using(
@@ -68,7 +82,7 @@
                                 .register(Versioned.class)
                                 .build("DistributedFlowObjectiveStore")))
                 .build();
-
+        nextGroups.addListener(mapListener);
         nextIds = storageService.getAtomicCounter("next-objective-counter");
         log.info("Started");
     }
@@ -76,6 +90,7 @@
 
     @Deactivate
     public void deactivate() {
+        tpool.shutdown();
         log.info("Stopped");
     }
 
@@ -120,4 +135,37 @@
     public int allocateNextId() {
         return (int) nextIds.incrementAndGet();
     }
+
+    private class FlowObjectiveNotifier implements Runnable {
+        @Override
+        public void run() {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    notifyDelegate(eventQ.take());
+                }
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private class NextGroupListener implements MapEventListener<Integer, byte[]> {
+        @Override
+        public void event(MapEvent<Integer, byte[]> event) {
+            switch (event.type()) {
+            case INSERT:
+                eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.ADD, event.key()));
+                break;
+            case REMOVE:
+                eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, event.key()));
+                break;
+            case UPDATE:
+                // TODO Introduce UPDATE ObjectiveEvent when the map is being updated
+                break;
+            default:
+                break;
+            }
+        }
+    }
+
 }