ONOS-4604 Fixed flow objective installation

Removed context from objective toString methods.
Removed duplicate flow objective delegate notifications in the store for next objectives.
Synchronized queueing of forwarding objectives for pending next objectives to avoid notifications race.
Changed logging for better readability.

Change-Id: Ic2bd411a891ea035a2c5513b24dea5fbd48f187d
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index d8a0dec..97c40d2 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -16,6 +16,7 @@
 package org.onosproject.net.flowobjective.impl;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -55,21 +56,17 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.*;
-
-
+import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
 
 /**
  * Provides implementation of the flow objective programming service.
@@ -124,7 +121,7 @@
 
     protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
 
-    private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
+    private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
 
     // local store to track which nextObjectives were sent to which device
     // for debugging purposes
@@ -237,21 +234,33 @@
     public void initPolicy(String policy) {}
 
     private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
-        if (fwd.nextId() != null &&
-                flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
-            log.debug("Queuing forwarding objective {} for nextId {} meant for device {}",
-                      fwd.id(), fwd.nextId(), deviceId);
-            // TODO: change to computeIfAbsent
-            Set<PendingNext> newset = Collections.newSetFromMap(
-                                          new ConcurrentHashMap<PendingNext, Boolean>());
-            newset.add(new PendingNext(deviceId, fwd));
-            Set<PendingNext> pnext = pendingForwards.putIfAbsent(fwd.nextId(), newset);
-            if (pnext != null) {
-                pnext.add(new PendingNext(deviceId, fwd));
-            }
-            return true;
+        if (fwd.nextId() == null ||
+                flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
+            // fast path
+            return false;
         }
-        return false;
+        boolean queued = false;
+        synchronized (pendingForwards) {
+            // double check the flow objective store, because this block could run
+            // after a notification arrives
+            if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
+                pendingForwards.compute(fwd.nextId(), (id, pending) -> {
+                    PendingNext next = new PendingNext(deviceId, fwd);
+                    if (pending == null) {
+                        return Sets.newHashSet(next);
+                    } else {
+                        pending.add(next);
+                        return pending;
+                    }
+                });
+                queued = true;
+            }
+        }
+        if (queued) {
+            log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
+                      fwd.id(), fwd.nextId(), deviceId);
+        }
+        return queued;
     }
 
     // Retrieves the device pipeline behaviour from the cache.
@@ -396,7 +405,11 @@
         public void notify(ObjectiveEvent event) {
             if (event.type() == Type.ADD) {
                 log.debug("Received notification of obj event {}", event);
-                Set<PendingNext> pending = pendingForwards.remove(event.subject());
+                Set<PendingNext> pending;
+                synchronized (pendingForwards) {
+                    // needs to be synchronized for queueObjective lookup
+                    pending = pendingForwards.remove(event.subject());
+                }
 
                 if (pending == null) {
                     log.debug("Nothing pending for this obj event {}", event);