Flow Objective implementation

Provides an abstraction which isolates the application from any pipeline
knowledge. By using the provided objectives applications can express
their forwarding desires in a pipeline agnostic way. The objectives
are then consumed by a driver for the specific device who converts them
into the appropriate pipeline coherent flows.

Change-Id: I74a68b4971c367c0cd5b7de9d877abdd117afa98
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 4e78d04..17b0aa7 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -17,7 +17,7 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -43,9 +43,12 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
 import org.onosproject.net.group.GroupService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,14 +56,14 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Provides implementation of the flow objective programming service.
  */
-@Component(immediate = false)
+@Component(immediate = true)
 @Service
 public class FlowObjectiveManager implements FlowObjectiveService {
 
@@ -89,6 +92,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected GroupService groupService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowObjectiveStore flowObjectiveStore;
+
+    private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
 
     private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
     private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
@@ -101,10 +108,16 @@
 
     private final Map<DeviceId, Collection<Objective>> pendingObjectives =
             Maps.newConcurrentMap();
+
     private NodeId localNode;
 
+    private Map<Integer, Set<PendingNext>> pendingForwards =
+            Maps.newConcurrentMap();
+
+
     @Activate
     protected void activate() {
+        flowObjectiveStore.setDelegate(delegate);
         localNode = clusterService.getLocalNode().id();
         mastershipService.addListener(mastershipListener);
         deviceService.addListener(deviceListener);
@@ -114,46 +127,64 @@
 
     @Deactivate
     protected void deactivate() {
+        flowObjectiveStore.unsetDelegate(delegate);
         mastershipService.removeListener(mastershipListener);
         deviceService.removeListener(deviceListener);
         log.info("Stopped");
     }
 
     @Override
-    public Future<Boolean> filter(DeviceId deviceId,
-                                  Collection<FilteringObjective> filteringObjectives) {
+    public void filter(DeviceId deviceId,
+                                  FilteringObjective filteringObjective) {
         if (deviceService.isAvailable(deviceId)) {
-            return getDevicePipeliner(deviceId).filter(filteringObjectives);
+            getDevicePipeliner(deviceId).filter(filteringObjective);
         } else {
-            filteringObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+            updatePendingMap(deviceId, filteringObjective);
         }
-        return Futures.immediateFuture(true);
-    }
 
-
-
-    @Override
-    public Future<Boolean> forward(DeviceId deviceId,
-                                   Collection<ForwardingObjective> forwardingObjectives) {
-        if (deviceService.isAvailable(deviceId)) {
-            return getDevicePipeliner(deviceId).forward(forwardingObjectives);
-        } else {
-            forwardingObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
-        }
-        return Futures.immediateFuture(true);
     }
 
     @Override
-    public Future<Boolean> next(DeviceId deviceId,
-                                Collection<NextObjective> nextObjectives) {
-        if (deviceService.isAvailable(deviceId)) {
-            return getDevicePipeliner(deviceId).next(nextObjectives);
-        } else {
-            nextObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+    public void forward(DeviceId deviceId,
+                                   ForwardingObjective forwardingObjective) {
+
+        if (queueObjective(deviceId, forwardingObjective)) {
+            return;
         }
-        return Futures.immediateFuture(true);
+
+        if (deviceService.isAvailable(deviceId)) {
+            getDevicePipeliner(deviceId).forward(forwardingObjective);
+        } else {
+            updatePendingMap(deviceId, forwardingObjective);
+        }
+
     }
 
+    @Override
+    public void next(DeviceId deviceId,
+                                NextObjective nextObjective) {
+        if (deviceService.isAvailable(deviceId)) {
+            getDevicePipeliner(deviceId).next(nextObjective);
+        } else {
+            updatePendingMap(deviceId, nextObjective);
+        }
+    }
+
+    private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+        if (fwd.nextId() != null &&
+                flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
+            log.warn("Queuing forwarding objective.");
+            if (pendingForwards.putIfAbsent(fwd.nextId(),
+                                Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
+                Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
+                pending.add(new PendingNext(deviceId, fwd));
+            }
+            return true;
+        }
+        return false;
+    }
+
+
     private void updatePendingMap(DeviceId deviceId, Objective pending) {
         if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
             Collection<Objective> objectives = pendingObjectives.get(deviceId);
@@ -169,6 +200,33 @@
         return pipeliner;
     }
 
+    private void setupPipelineHandler(DeviceId deviceId) {
+        if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
+            // Attempt to lookup the handler in the cache
+            DriverHandler handler = driverHandlers.get(deviceId);
+            if (handler == null) {
+                try {
+                    // Otherwise create it and if it has pipeline behaviour, cache it
+                    handler = driverService.createHandler(deviceId);
+                    if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+                        log.warn("Pipeline behaviour not supported for device {}",
+                                 deviceId);
+                        return;
+                    }
+                } catch (ItemNotFoundException e) {
+                    log.warn("No applicable driver for device {}", deviceId);
+                    return;
+                }
+                driverHandlers.put(deviceId, handler);
+            }
+
+            // Always (re)initialize the pipeline behaviour
+            Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+            pipeliner.init(deviceId, context);
+            pipeliners.putIfAbsent(deviceId, pipeliner);
+            log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
+        }
+    }
 
     // Triggers driver setup when the local node becomes a device master.
     private class InnerMastershipListener implements MastershipListener {
@@ -221,52 +279,70 @@
             pendingObjectives.getOrDefault(deviceId,
                                            Collections.emptySet()).forEach(obj -> {
                 if (obj instanceof NextObjective) {
-                    getDevicePipeliner(deviceId)
-                            .next(Collections.singletonList((NextObjective) obj));
+                    next(deviceId, (NextObjective) obj);
                 } else if (obj instanceof ForwardingObjective) {
-                    getDevicePipeliner(deviceId)
-                            .forward(Collections.singletonList((ForwardingObjective) obj));
+                    forward(deviceId, (ForwardingObjective) obj);
                 } else {
                     getDevicePipeliner(deviceId)
-                            .filter(Collections.singletonList((FilteringObjective) obj));
+                            .filter((FilteringObjective) obj);
                 }
             });
         }
     }
 
-    private void setupPipelineHandler(DeviceId deviceId) {
-        if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
-            // Attempt to lookup the handler in the cache
-            DriverHandler handler = driverHandlers.get(deviceId);
-            if (handler == null) {
-                try {
-                    // Otherwise create it and if it has pipeline behaviour, cache it
-                    handler = driverService.createHandler(deviceId);
-                    if (!handler.driver().hasBehaviour(Pipeliner.class)) {
-                        log.warn("Pipeline behaviour not supported for device {}",
-                                 deviceId);
-                        return;
-                    }
-                } catch (ItemNotFoundException e) {
-                    log.warn("No applicable driver for device {}", deviceId);
-                    return;
-                }
-                driverHandlers.put(deviceId, handler);
-            }
-
-            // Always (re)initialize the pipeline behaviour
-            Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
-            pipeliner.init(deviceId, context);
-            pipeliners.putIfAbsent(deviceId, pipeliner);
-            log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
-        }
-    }
-
     // Processing context for initializing pipeline driver behaviours.
     private class InnerPipelineContext implements PipelinerContext {
         @Override
         public ServiceDirectory directory() {
             return serviceDirectory;
         }
+
+        @Override
+        public FlowObjectiveStore store() {
+            return flowObjectiveStore;
+        }
+
+
+    }
+
+    private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+        @Override
+        public void notify(ObjectiveEvent event) {
+            Set<PendingNext> pending = pendingForwards.remove(event.subject());
+
+            if (pending == null) {
+                return;
+            }
+
+            log.info("Processing pending objectives {}", pending.size());
+
+            pending.forEach(p -> getDevicePipeliner(p.deviceId())
+                    .forward(p.forwardingObjective()));
+
+        }
+    }
+
+    /**
+     * Data class used to hold a pending forwarding objective that could not
+     * be processed because the associated next object was not present.
+     */
+    private class PendingNext {
+        private final DeviceId deviceId;
+        private final ForwardingObjective fwd;
+
+        public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+            this.deviceId = deviceId;
+            this.fwd = fwd;
+        }
+
+        public DeviceId deviceId() {
+            return deviceId;
+        }
+
+        public ForwardingObjective forwardingObjective() {
+            return fwd;
+        }
+
+
     }
 }