Upgrade packet requests to use flow objectives API.

Addressed a few issues found while using the flow objectives across a cluster:
 * Flow objectives should be installable from any node, not just the master.
   Therefore we need to ensure all nodes initialize a driver for each switch.
 * We no longer store a list of objectives that are waiting for the switch
   to arrive. If the we don't know about the switch yet we'll try a few times
   over a few seconds to find it, but after that we'll give up and report an
   error to the client.
 * Default drivers need to be available when the FlowObjectiveManager starts
   up, otherwise it is common to get flow objective requests before any
   drivers have been loaded.

Change-Id: I1c2ea6a223232402c31e8139729e4b6251ab8b0f
diff --git a/core/api/src/main/java/org/onosproject/net/driver/DefaultDriverProviderService.java b/core/api/src/main/java/org/onosproject/net/driver/DefaultDriverProviderService.java
new file mode 100644
index 0000000..2b7eeff
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/driver/DefaultDriverProviderService.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.driver;
+
+/**
+ * Service capable of providing a set of default drivers.
+ */
+public interface DefaultDriverProviderService extends DriverProvider {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
index 6489bea..9bbe991 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
@@ -36,7 +36,7 @@
     private final int timeout;
     private final ApplicationId appId;
     private final int priority;
-    private final int nextId;
+    private final Integer nextId;
     private final TrafficTreatment treatment;
     private final Operation op;
     private final Optional<ObjectiveContext> context;
@@ -46,7 +46,7 @@
     private DefaultForwardingObjective(TrafficSelector selector,
                                       Flag flag, boolean permanent,
                                       int timeout, ApplicationId appId,
-                                      int priority, int nextId,
+                                      int priority, Integer nextId,
                                       TrafficTreatment treatment, Operation op) {
         this.selector = selector;
         this.flag = flag;
@@ -67,7 +67,7 @@
     private DefaultForwardingObjective(TrafficSelector selector,
                                        Flag flag, boolean permanent,
                                        int timeout, ApplicationId appId,
-                                       int priority, int nextId,
+                                       int priority, Integer nextId,
                                        TrafficTreatment treatment,
                                        ObjectiveContext context, Operation op) {
         this.selector = selector;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
index 6e60ab6..192b206 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
@@ -41,6 +41,11 @@
     GROUPMISSING,
 
     /**
+     * The device was not available to install objectives to.
+     */
+    DEVICEMISSING,
+
+    /**
      * An unknown error occurred.
      */
     UNKNOWN
diff --git a/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java b/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java
index 5f7e2e2..c3160be 100644
--- a/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java
+++ b/core/net/src/main/java/org/onosproject/net/driver/impl/DriverManager.java
@@ -31,6 +31,7 @@
 import org.onosproject.net.driver.DefaultDriverData;
 import org.onosproject.net.driver.DefaultDriverHandler;
 import org.onosproject.net.driver.DefaultDriverProvider;
+import org.onosproject.net.driver.DefaultDriverProviderService;
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverAdminService;
 import org.onosproject.net.driver.DriverHandler;
@@ -62,16 +63,21 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DefaultDriverProviderService defaultDriverService;
+
     private Set<DriverProvider> providers = Sets.newConcurrentHashSet();
     private Map<String, Driver> driverByKey = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
+        registerProvider(defaultDriverService);
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        unregisterProvider(defaultDriverService);
         log.info("Stopped");
     }
 
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 7f6b0ee..79d486e 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.net.flowobjective.impl;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
@@ -48,17 +47,18 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
 import org.onosproject.net.group.GroupService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import static com.google.common.base.Preconditions.checkState;
+import static org.onlab.util.Tools.groupedThreads;
 
 /**
  * Provides implementation of the flow objective programming service.
@@ -67,9 +67,10 @@
 @Service
 public class FlowObjectiveManager implements FlowObjectiveService {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    public static final int INSTALL_RETRY_ATTEMPTS = 5;
+    public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
 
-    public static final String NOT_INITIALIZED = "Driver not initialized";
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
@@ -106,17 +107,18 @@
 
     protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
 
-    private final Map<DeviceId, Collection<Objective>> pendingObjectives =
-            Maps.newConcurrentMap();
-
     private NodeId localNode;
 
     private Map<Integer, Set<PendingNext>> pendingForwards =
             Maps.newConcurrentMap();
 
+    private ExecutorService executorService;
 
     @Activate
     protected void activate() {
+        executorService = Executors.newFixedThreadPool(
+                4, groupedThreads("onos/objective-installer", "%d"));
+
         flowObjectiveStore.setDelegate(delegate);
         localNode = clusterService.getLocalNode().id();
         mastershipService.addListener(mastershipListener);
@@ -133,15 +135,55 @@
         log.info("Stopped");
     }
 
-    @Override
-    public void filter(DeviceId deviceId,
-                                  FilteringObjective filteringObjective) {
-        if (deviceService.isAvailable(deviceId)) {
-            getDevicePipeliner(deviceId).filter(filteringObjective);
-        } else {
-            updatePendingMap(deviceId, filteringObjective);
+    /**
+     * Task that passes the flow objective down to the driver. The task will
+     * make a few attempts to find the appropriate driver, then eventually give
+     * up and report an error if no suitable driver could be found.
+     */
+    private class ObjectiveInstaller implements Runnable {
+        private final DeviceId deviceId;
+        private final Objective objective;
+
+        private int numAttempts = 0;
+
+        public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+            this.deviceId = deviceId;
+            this.objective = objective;
         }
 
+        @Override
+        public void run() {
+            try {
+                numAttempts++;
+
+                Pipeliner pipeliner = getDevicePipeliner(deviceId);
+
+                if (pipeliner != null) {
+                    if (objective instanceof NextObjective) {
+                        pipeliner.next((NextObjective) objective);
+                    } else if (objective instanceof ForwardingObjective) {
+                        pipeliner.forward((ForwardingObjective) objective);
+                    } else {
+                        pipeliner.filter((FilteringObjective) objective);
+                    }
+                } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
+                    Thread.currentThread().sleep(INSTALL_RETRY_INTERVAL);
+                    executorService.submit(this);
+                } else {
+                    // Otherwise we've tried a few times and failed, report an
+                    // error back to the user.
+                    objective.context().ifPresent(
+                            c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
+                }
+            } catch (Exception e) {
+                log.warn("Exception while installing flow objective", e);
+            }
+        }
+    }
+
+    @Override
+    public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+        executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
     }
 
     @Override
@@ -152,22 +194,12 @@
             return;
         }
 
-        if (deviceService.isAvailable(deviceId)) {
-            getDevicePipeliner(deviceId).forward(forwardingObjective);
-        } else {
-            updatePendingMap(deviceId, forwardingObjective);
-        }
-
+        executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
     }
 
     @Override
-    public void next(DeviceId deviceId,
-                                NextObjective nextObjective) {
-        if (deviceService.isAvailable(deviceId)) {
-            getDevicePipeliner(deviceId).next(nextObjective);
-        } else {
-            updatePendingMap(deviceId, nextObjective);
-        }
+    public void next(DeviceId deviceId, NextObjective nextObjective) {
+        executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
     }
 
     @Override
@@ -189,49 +221,38 @@
         return false;
     }
 
-
-    private void updatePendingMap(DeviceId deviceId, Objective pending) {
-        if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
-            Collection<Objective> objectives = pendingObjectives.get(deviceId);
-            objectives.add(pending);
-        }
-
-    }
-
     // Retrieves the device pipeline behaviour from the cache.
     private Pipeliner getDevicePipeliner(DeviceId deviceId) {
         Pipeliner pipeliner = pipeliners.get(deviceId);
-        checkState(pipeliner != null, NOT_INITIALIZED);
         return pipeliner;
     }
 
     private void setupPipelineHandler(DeviceId deviceId) {
-        if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
-            // Attempt to lookup the handler in the cache
-            DriverHandler handler = driverHandlers.get(deviceId);
-            if (handler == null) {
-                try {
-                    // Otherwise create it and if it has pipeline behaviour, cache it
-                    handler = driverService.createHandler(deviceId);
-                    if (!handler.driver().hasBehaviour(Pipeliner.class)) {
-                        log.warn("Pipeline behaviour not supported for device {}",
-                                 deviceId);
-                        return;
-                    }
-                } catch (ItemNotFoundException e) {
-                    log.warn("No applicable driver for device {}", deviceId);
+        // Attempt to lookup the handler in the cache
+        DriverHandler handler = driverHandlers.get(deviceId);
+        if (handler == null) {
+            try {
+                // Otherwise create it and if it has pipeline behaviour, cache it
+                handler = driverService.createHandler(deviceId);
+                if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+                    log.warn("Pipeline behaviour not supported for device {}",
+                             deviceId);
                     return;
                 }
-                driverHandlers.put(deviceId, handler);
+            } catch (ItemNotFoundException e) {
+                log.warn("No applicable driver for device {}", deviceId);
+                return;
             }
 
-            // Always (re)initialize the pipeline behaviour
-            log.info("Driver {} bound to device {} ... initializing driver",
-                     handler.driver().name(), deviceId);
-            Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
-            pipeliner.init(deviceId, context);
-            pipeliners.putIfAbsent(deviceId, pipeliner);
+            driverHandlers.put(deviceId, handler);
         }
+
+        // Always (re)initialize the pipeline behaviour
+        log.info("Driver {} bound to device {} ... initializing driver",
+                 handler.driver().name(), deviceId);
+        Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+        pipeliner.init(deviceId, context);
+        pipeliners.putIfAbsent(deviceId, pipeliner);
     }
 
     // Triggers driver setup when the local node becomes a device master.
@@ -240,10 +261,8 @@
         public void event(MastershipEvent event) {
             switch (event.type()) {
                 case MASTER_CHANGED:
-                    if (event.roleInfo().master() != null) {
-                        setupPipelineHandler(event.subject());
-                        log.info("mastership changed on device {}", event.subject());
-                    }
+                    log.info("mastership changed on device {}", event.subject());
+                    setupPipelineHandler(event.subject());
                     break;
                 case BACKUPS_CHANGED:
                     break;
@@ -259,13 +278,14 @@
         public void event(DeviceEvent event) {
             switch (event.type()) {
                 case DEVICE_ADDED:
+                    setupPipelineHandler(event.subject().id());
+                    break;
                 case DEVICE_AVAILABILITY_CHANGED:
                     log.info("Device either added or availability changed {}",
                              event.subject().id());
                     if (deviceService.isAvailable(event.subject().id())) {
                         log.info("Device is now available {}", event.subject().id());
                         setupPipelineHandler(event.subject().id());
-                        processPendingObjectives(event.subject().id());
                     }
                     break;
                 case DEVICE_UPDATED:
@@ -284,22 +304,6 @@
                     break;
             }
         }
-
-        private void processPendingObjectives(DeviceId deviceId) {
-            log.debug("Processing pending objectives for device {}", deviceId);
-
-            pendingObjectives.getOrDefault(deviceId,
-                                           Collections.emptySet()).forEach(obj -> {
-                if (obj instanceof NextObjective) {
-                    next(deviceId, (NextObjective) obj);
-                } else if (obj instanceof ForwardingObjective) {
-                    forward(deviceId, (ForwardingObjective) obj);
-                } else {
-                    getDevicePipeliner(deviceId)
-                            .filter((FilteringObjective) obj);
-                }
-            });
-        }
     }
 
     // Processing context for initializing pipeline driver behaviours.
@@ -313,8 +317,6 @@
         public FlowObjectiveStore store() {
             return flowObjectiveStore;
         }
-
-
     }
 
     private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@@ -356,7 +358,5 @@
         public ForwardingObjective forwardingObjective() {
             return fwd;
         }
-
-
     }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index 9dd0634..a64b678 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -27,12 +27,17 @@
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.packet.DefaultPacketRequest;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketContext;
@@ -73,6 +78,9 @@
     private CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private FlowObjectiveService objectiveService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -169,12 +177,28 @@
             return;
         }
 
-        TrafficTreatment treatment = DefaultTrafficTreatment.builder().punt().build();
-        FlowRule flow = new DefaultFlowRule(device.id(), request.selector(), treatment,
-                                            request.priority().priorityValue(),
-                                            appId, 0, true, request.tableType());
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                                                            .punt()
+                                                            .build();
 
-        flowService.applyFlowRules(flow);
+        ForwardingObjective forwarding = DefaultForwardingObjective.builder()
+                .withPriority(request.priority().priorityValue())
+                .withSelector(request.selector())
+                .fromApp(appId)
+                .withFlag(ForwardingObjective.Flag.VERSATILE)
+                .withTreatment(treatment)
+                .makePermanent()
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) { }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.warn("Failed to install packet request flow: {}", error);
+                    }
+                });
+
+        objectiveService.forward(device.id(), forwarding);
     }
 
     @Override
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java b/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java
index 580691b..bfa9c32 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultDrivers.java
@@ -16,32 +16,31 @@
 package org.onosproject.driver.pipeline;
 
 import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.net.driver.DriverAdminService;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.driver.DefaultDriverProviderService;
+import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverProvider;
 import org.onosproject.net.driver.XmlDriverLoader;
-import org.apache.felix.scr.annotations.Component;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Set;
 
 /**
  * Bootstrap for built in drivers.
  */
-@Component(immediate = true)
-public class DefaultDrivers {
+@Service
+@Component(immediate = false)
+public class DefaultDrivers implements DefaultDriverProviderService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String DRIVERS_XML = "/onos-drivers.xml";
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DriverAdminService driverService;
-
     private DriverProvider provider;
 
     @Activate
@@ -50,7 +49,6 @@
         try {
             InputStream stream = classLoader.getResourceAsStream(DRIVERS_XML);
             provider = new XmlDriverLoader(classLoader).loadDrivers(stream);
-            driverService.registerProvider(provider);
         } catch (IOException e) {
             log.error("Unable to load default drivers", e);
         }
@@ -59,10 +57,11 @@
 
     @Deactivate
     protected void deactivate() {
-        if (provider != null) {
-            driverService.unregisterProvider(provider);
-        }
         log.info("Stopped");
     }
 
+    @Override
+    public Set<Driver> getDrivers() {
+        return provider.getDrivers();
+    }
 }