In this commit:
   Bug fix where filtering objectives are not installed due to available ports becoming enabled later.
   Bug fix where flow objective store had no listener for notifications from drivers across multiple instances of the controller.
   NPE fix in ofdpa driver for non-existing groups.
   Preventing ofdpa driver from sending spurious pass notification to app.
   Incrementing retry filter timer from 1 to 5 secs in default routing handler.
   Made several debug messages clearer.

Change-Id: I828671ee4c8bcfe03c946d051e1d1aac9d8f68dd
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index b6c748a..6933c07 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -239,7 +239,8 @@
     private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
         if (fwd.nextId() != null &&
                 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
-            log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
+            log.debug("Queuing forwarding objective {} for nextId {} meant for device {}",
+                      fwd.id(), fwd.nextId(), deviceId);
             // TODO: change to computeIfAbsent
             Set<PendingNext> newset = Collections.newSetFromMap(
                                           new ConcurrentHashMap<PendingNext, Boolean>());
@@ -398,7 +399,7 @@
                 Set<PendingNext> pending = pendingForwards.remove(event.subject());
 
                 if (pending == null) {
-                    log.warn("Nothing pending for this obj event {}", event);
+                    log.debug("Nothing pending for this obj event {}", event);
                     return;
                 }
 
@@ -457,7 +458,7 @@
     public List<String> getNextMappings() {
         List<String> mappings = new ArrayList<>();
         Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
-        // XXX if the NextGroup upon decoding stored info of the deviceId
+        // XXX if the NextGroup after de-serialization actually stored info of the deviceId
         // then info on any nextObj could be retrieved from one controller instance.
         // Right now the drivers on one instance can only fetch for next-ids that came
         // to them.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
index 957523c..b5a57cd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
@@ -30,15 +30,22 @@
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Manages the inventory of created next groups.
@@ -57,9 +64,16 @@
     protected StorageService storageService;
 
     private AtomicCounter nextIds;
+    private MapEventListener<Integer, byte[]> mapListener = new NextGroupListener();
+    // event queue to separate map-listener threads from event-handler threads (tpool)
+    private BlockingQueue<ObjectiveEvent> eventQ;
+    private ExecutorService tpool;
 
     @Activate
     public void activate() {
+        tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/flobj-notifier", "%d", log));
+        eventQ = new LinkedBlockingQueue<ObjectiveEvent>();
+        tpool.execute(new FlowObjectiveNotifier());
         nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
                 .withName("flowobjective-groups")
                 .withSerializer(Serializer.using(
@@ -68,7 +82,7 @@
                                 .register(Versioned.class)
                                 .build("DistributedFlowObjectiveStore")))
                 .build();
-
+        nextGroups.addListener(mapListener);
         nextIds = storageService.getAtomicCounter("next-objective-counter");
         log.info("Started");
     }
@@ -76,6 +90,7 @@
 
     @Deactivate
     public void deactivate() {
+        tpool.shutdown();
         log.info("Stopped");
     }
 
@@ -120,4 +135,37 @@
     public int allocateNextId() {
         return (int) nextIds.incrementAndGet();
     }
+
+    private class FlowObjectiveNotifier implements Runnable {
+        @Override
+        public void run() {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    notifyDelegate(eventQ.take());
+                }
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private class NextGroupListener implements MapEventListener<Integer, byte[]> {
+        @Override
+        public void event(MapEvent<Integer, byte[]> event) {
+            switch (event.type()) {
+            case INSERT:
+                eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.ADD, event.key()));
+                break;
+            case REMOVE:
+                eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, event.key()));
+                break;
+            case UPDATE:
+                // TODO Introduce UPDATE ObjectiveEvent when the map is being updated
+                break;
+            default:
+                break;
+            }
+        }
+    }
+
 }