Upgrade packet requests to use flow objectives API.

Addressed a few issues found while using the flow objectives across a cluster:
 * Flow objectives should be installable from any node, not just the master.
   Therefore we need to ensure all nodes initialize a driver for each switch.
 * We no longer store a list of objectives that are waiting for the switch
   to arrive. If the we don't know about the switch yet we'll try a few times
   over a few seconds to find it, but after that we'll give up and report an
   error to the client.
 * Default drivers need to be available when the FlowObjectiveManager starts
   up, otherwise it is common to get flow objective requests before any
   drivers have been loaded.

Change-Id: I1c2ea6a223232402c31e8139729e4b6251ab8b0f
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 7f6b0ee..79d486e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.net.flowobjective.impl;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
@@ -48,17 +47,18 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
 import org.onosproject.net.group.GroupService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import static com.google.common.base.Preconditions.checkState;
+import static org.onlab.util.Tools.groupedThreads;
 
 /**
  * Provides implementation of the flow objective programming service.
@@ -67,9 +67,10 @@
 @Service
 public class FlowObjectiveManager implements FlowObjectiveService {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    public static final int INSTALL_RETRY_ATTEMPTS = 5;
+    public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
 
-    public static final String NOT_INITIALIZED = "Driver not initialized";
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
@@ -106,17 +107,18 @@
 
     protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
 
-    private final Map<DeviceId, Collection<Objective>> pendingObjectives =
-            Maps.newConcurrentMap();
-
     private NodeId localNode;
 
     private Map<Integer, Set<PendingNext>> pendingForwards =
             Maps.newConcurrentMap();
 
+    private ExecutorService executorService;
 
     @Activate
     protected void activate() {
+        executorService = Executors.newFixedThreadPool(
+                4, groupedThreads("onos/objective-installer", "%d"));
+
         flowObjectiveStore.setDelegate(delegate);
         localNode = clusterService.getLocalNode().id();
         mastershipService.addListener(mastershipListener);
@@ -133,15 +135,55 @@
         log.info("Stopped");
     }
 
-    @Override
-    public void filter(DeviceId deviceId,
-                                  FilteringObjective filteringObjective) {
-        if (deviceService.isAvailable(deviceId)) {
-            getDevicePipeliner(deviceId).filter(filteringObjective);
-        } else {
-            updatePendingMap(deviceId, filteringObjective);
+    /**
+     * Task that passes the flow objective down to the driver. The task will
+     * make a few attempts to find the appropriate driver, then eventually give
+     * up and report an error if no suitable driver could be found.
+     */
+    private class ObjectiveInstaller implements Runnable {
+        private final DeviceId deviceId;
+        private final Objective objective;
+
+        private int numAttempts = 0;
+
+        public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+            this.deviceId = deviceId;
+            this.objective = objective;
         }
 
+        @Override
+        public void run() {
+            try {
+                numAttempts++;
+
+                Pipeliner pipeliner = getDevicePipeliner(deviceId);
+
+                if (pipeliner != null) {
+                    if (objective instanceof NextObjective) {
+                        pipeliner.next((NextObjective) objective);
+                    } else if (objective instanceof ForwardingObjective) {
+                        pipeliner.forward((ForwardingObjective) objective);
+                    } else {
+                        pipeliner.filter((FilteringObjective) objective);
+                    }
+                } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
+                    Thread.currentThread().sleep(INSTALL_RETRY_INTERVAL);
+                    executorService.submit(this);
+                } else {
+                    // Otherwise we've tried a few times and failed, report an
+                    // error back to the user.
+                    objective.context().ifPresent(
+                            c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
+                }
+            } catch (Exception e) {
+                log.warn("Exception while installing flow objective", e);
+            }
+        }
+    }
+
+    @Override
+    public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+        executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
     }
 
     @Override
@@ -152,22 +194,12 @@
             return;
         }
 
-        if (deviceService.isAvailable(deviceId)) {
-            getDevicePipeliner(deviceId).forward(forwardingObjective);
-        } else {
-            updatePendingMap(deviceId, forwardingObjective);
-        }
-
+        executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
     }
 
     @Override
-    public void next(DeviceId deviceId,
-                                NextObjective nextObjective) {
-        if (deviceService.isAvailable(deviceId)) {
-            getDevicePipeliner(deviceId).next(nextObjective);
-        } else {
-            updatePendingMap(deviceId, nextObjective);
-        }
+    public void next(DeviceId deviceId, NextObjective nextObjective) {
+        executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
     }
 
     @Override
@@ -189,49 +221,38 @@
         return false;
     }
 
-
-    private void updatePendingMap(DeviceId deviceId, Objective pending) {
-        if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
-            Collection<Objective> objectives = pendingObjectives.get(deviceId);
-            objectives.add(pending);
-        }
-
-    }
-
     // Retrieves the device pipeline behaviour from the cache.
     private Pipeliner getDevicePipeliner(DeviceId deviceId) {
         Pipeliner pipeliner = pipeliners.get(deviceId);
-        checkState(pipeliner != null, NOT_INITIALIZED);
         return pipeliner;
     }
 
     private void setupPipelineHandler(DeviceId deviceId) {
-        if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
-            // Attempt to lookup the handler in the cache
-            DriverHandler handler = driverHandlers.get(deviceId);
-            if (handler == null) {
-                try {
-                    // Otherwise create it and if it has pipeline behaviour, cache it
-                    handler = driverService.createHandler(deviceId);
-                    if (!handler.driver().hasBehaviour(Pipeliner.class)) {
-                        log.warn("Pipeline behaviour not supported for device {}",
-                                 deviceId);
-                        return;
-                    }
-                } catch (ItemNotFoundException e) {
-                    log.warn("No applicable driver for device {}", deviceId);
+        // Attempt to lookup the handler in the cache
+        DriverHandler handler = driverHandlers.get(deviceId);
+        if (handler == null) {
+            try {
+                // Otherwise create it and if it has pipeline behaviour, cache it
+                handler = driverService.createHandler(deviceId);
+                if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+                    log.warn("Pipeline behaviour not supported for device {}",
+                             deviceId);
                     return;
                 }
-                driverHandlers.put(deviceId, handler);
+            } catch (ItemNotFoundException e) {
+                log.warn("No applicable driver for device {}", deviceId);
+                return;
             }
 
-            // Always (re)initialize the pipeline behaviour
-            log.info("Driver {} bound to device {} ... initializing driver",
-                     handler.driver().name(), deviceId);
-            Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
-            pipeliner.init(deviceId, context);
-            pipeliners.putIfAbsent(deviceId, pipeliner);
+            driverHandlers.put(deviceId, handler);
         }
+
+        // Always (re)initialize the pipeline behaviour
+        log.info("Driver {} bound to device {} ... initializing driver",
+                 handler.driver().name(), deviceId);
+        Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+        pipeliner.init(deviceId, context);
+        pipeliners.putIfAbsent(deviceId, pipeliner);
     }
 
     // Triggers driver setup when the local node becomes a device master.
@@ -240,10 +261,8 @@
         public void event(MastershipEvent event) {
             switch (event.type()) {
                 case MASTER_CHANGED:
-                    if (event.roleInfo().master() != null) {
-                        setupPipelineHandler(event.subject());
-                        log.info("mastership changed on device {}", event.subject());
-                    }
+                    log.info("mastership changed on device {}", event.subject());
+                    setupPipelineHandler(event.subject());
                     break;
                 case BACKUPS_CHANGED:
                     break;
@@ -259,13 +278,14 @@
         public void event(DeviceEvent event) {
             switch (event.type()) {
                 case DEVICE_ADDED:
+                    setupPipelineHandler(event.subject().id());
+                    break;
                 case DEVICE_AVAILABILITY_CHANGED:
                     log.info("Device either added or availability changed {}",
                              event.subject().id());
                     if (deviceService.isAvailable(event.subject().id())) {
                         log.info("Device is now available {}", event.subject().id());
                         setupPipelineHandler(event.subject().id());
-                        processPendingObjectives(event.subject().id());
                     }
                     break;
                 case DEVICE_UPDATED:
@@ -284,22 +304,6 @@
                     break;
             }
         }
-
-        private void processPendingObjectives(DeviceId deviceId) {
-            log.debug("Processing pending objectives for device {}", deviceId);
-
-            pendingObjectives.getOrDefault(deviceId,
-                                           Collections.emptySet()).forEach(obj -> {
-                if (obj instanceof NextObjective) {
-                    next(deviceId, (NextObjective) obj);
-                } else if (obj instanceof ForwardingObjective) {
-                    forward(deviceId, (ForwardingObjective) obj);
-                } else {
-                    getDevicePipeliner(deviceId)
-                            .filter((FilteringObjective) obj);
-                }
-            });
-        }
     }
 
     // Processing context for initializing pipeline driver behaviours.
@@ -313,8 +317,6 @@
         public FlowObjectiveStore store() {
             return flowObjectiveStore;
         }
-
-
     }
 
     private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@@ -356,7 +358,5 @@
         public ForwardingObjective forwardingObjective() {
             return fwd;
         }
-
-
     }
 }