Flow Objective implementation

Provides an abstraction which isolates the application from any pipeline
knowledge. By using the provided objectives applications can express
their forwarding desires in a pipeline agnostic way. The objectives
are then consumed by a driver for the specific device who converts them
into the appropriate pipeline coherent flows.

Change-Id: I74a68b4971c367c0cd5b7de9d877abdd117afa98
diff --git a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
index c012999..069fc50 100644
--- a/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
+++ b/apps/bgprouter/src/main/java/org/onosproject/bgprouter/BgpRouter.java
@@ -20,7 +20,6 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -28,35 +27,29 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.Ip4Address;
+import org.onlab.packet.Ip4Prefix;
 import org.onlab.packet.Ip6Address;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.config.NetworkConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
-import org.onosproject.net.group.DefaultGroupBucket;
-import org.onosproject.net.group.DefaultGroupDescription;
-import org.onosproject.net.group.DefaultGroupKey;
-import org.onosproject.net.group.Group;
-import org.onosproject.net.group.GroupBucket;
-import org.onosproject.net.group.GroupBuckets;
-import org.onosproject.net.group.GroupDescription;
-import org.onosproject.net.group.GroupEvent;
-import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.group.GroupService;
 import org.onosproject.net.packet.PacketService;
 import org.onosproject.routing.FibEntry;
@@ -74,7 +67,8 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.delay;
 
 /**
  * BgpRouter component.
@@ -124,7 +118,7 @@
     private final Map<IpPrefix, IpAddress> prefixToNextHop = Maps.newHashMap();
 
     // Mapping from next hop IP to next hop object containing group info
-    private final Map<IpAddress, NextHop> nextHops = Maps.newHashMap();
+    private final Map<IpAddress, Integer> nextHops = Maps.newHashMap();
 
     // Stores FIB updates that are waiting for groups to be set up
     private final Multimap<NextHopGroupKey, FibEntry> pendingUpdates = HashMultimap.create();
@@ -136,7 +130,7 @@
     // learned from config
     private DeviceId ctrlDeviceId;
 
-    private final GroupListener groupListener = new InternalGroupListener();
+    //private final GroupListener groupListener = new InternalGroupListener();
 
     private TunnellingConnectivityManager connectivityManager;
 
@@ -160,7 +154,7 @@
         appId = coreService.registerApplication(BGP_ROUTER_APP);
         getDeviceConfiguration(configService.getBgpSpeakers());
 
-        groupService.addListener(groupListener);
+        //groupService.addListener(groupListener);
 
         processIntfFilters(true, configService.getInterfaces());
 
@@ -179,6 +173,14 @@
         icmpHandler.start();
 
         log.info("BgpRouter started");
+
+        delay(1000);
+
+        FibEntry fibEntry = new FibEntry(Ip4Prefix.valueOf("10.1.0.0/16"),
+                                         Ip4Address.valueOf("192.168.10.1"),
+                                         MacAddress.valueOf("DE:AD:BE:EF:FE:ED"));
+        FibUpdate fibUpdate = new FibUpdate(FibUpdate.Type.UPDATE, fibEntry);
+        updateFibEntry(Collections.singletonList(fibUpdate));
     }
 
     @Deactivate
@@ -188,7 +190,7 @@
         icmpHandler.stop();
         processIntfFilters(false, configService.getInterfaces());
 
-        groupService.removeListener(groupListener);
+        //groupService.removeListener(groupListener);
 
         log.info("BgpRouter stopped");
     }
@@ -213,16 +215,18 @@
     }
 
     private void updateFibEntry(Collection<FibUpdate> updates) {
-        Map<FibEntry, Group> toInstall = new HashMap<>(updates.size());
+        Map<FibEntry, Integer> toInstall = new HashMap<>(updates.size());
 
         for (FibUpdate update : updates) {
             FibEntry entry = update.entry();
 
             addNextHop(entry);
 
-            Group group;
+            Integer nextId;
             synchronized (pendingUpdates) {
-                NextHop nextHop = nextHops.get(entry.nextHopIp());
+                nextId = nextHops.get(entry.nextHopIp());
+
+                /*
                 group = groupService.getGroup(deviceId,
                                               new DefaultGroupKey(
                                               appKryo.serialize(nextHop.group())));
@@ -231,66 +235,70 @@
                     log.debug("Adding pending flow {}", update.entry());
                     pendingUpdates.put(nextHop.group(), update.entry());
                     continue;
-                }
+                }*/
             }
 
-            toInstall.put(update.entry(), group);
+            toInstall.put(update.entry(), nextId);
         }
 
         installFlows(toInstall);
     }
 
-    private void installFlows(Map<FibEntry, Group> entriesToInstall) {
-        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+    private void installFlows(Map<FibEntry, Integer> entriesToInstall) {
 
-        for (Map.Entry<FibEntry, Group> entry : entriesToInstall.entrySet()) {
+        for (Map.Entry<FibEntry, Integer> entry : entriesToInstall.entrySet()) {
             FibEntry fibEntry = entry.getKey();
-            Group group = entry.getValue();
+            Integer nextId = entry.getValue();
 
-            FlowRule flowRule = generateRibFlowRule(fibEntry.prefix(), group);
+            flowObjectiveService.forward(deviceId,
+                                         generateRibFlowRule(fibEntry.prefix(), nextId).add());
 
-            builder.add(flowRule);
+
         }
+        log.info("Sending flow forwarding objective");
 
-        flowService.apply(builder.build());
     }
 
     private synchronized void deleteFibEntry(Collection<FibUpdate> withdraws) {
-        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
 
         for (FibUpdate update : withdraws) {
             FibEntry entry = update.entry();
+            Integer nextId = nextHops.get(entry.nextHopIp());
 
-            Group group = deleteNextHop(entry.prefix());
+            /*Group group = deleteNextHop(entry.prefix());
             if (group == null) {
                 log.warn("Group not found when deleting {}", entry);
                 return;
-            }
+            }*/
 
-            FlowRule flowRule = generateRibFlowRule(entry.prefix(), group);
+            flowObjectiveService.forward(deviceId,
+                                         generateRibFlowRule(entry.prefix(), nextId).remove());
 
-            builder.remove(flowRule);
         }
 
-        flowService.apply(builder.build());
     }
 
-    private FlowRule generateRibFlowRule(IpPrefix prefix, Group group) {
+    private ForwardingObjective.Builder generateRibFlowRule(IpPrefix prefix, Integer nextId) {
         TrafficSelector selector = DefaultTrafficSelector.builder()
                 .matchEthType(Ethernet.TYPE_IPV4)
                 .matchIPDst(prefix)
                 .build();
 
-        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
-                .group(group.id())
-                .build();
 
 
         int priority = prefix.prefixLength() * PRIORITY_MULTIPLIER + PRIORITY_OFFSET;
 
-        return new DefaultFlowRule(deviceId, selector, treatment,
-                                   priority, appId, 0, true,
-                                   FlowRule.Type.IP);
+        ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
+                .fromApp(appId)
+                .makePermanent()
+                .nextStep(nextId)
+                .withSelector(selector)
+                .withPriority(priority)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC);
+
+        return fwdBuilder;
+
+
     }
 
     private synchronized void addNextHop(FibEntry entry) {
@@ -317,6 +325,16 @@
                     .setOutput(egressIntf.connectPoint().port())
                     .build();
 
+            NextObjective nextObjective = DefaultNextObjective.builder()
+                    .withId(entry.hashCode())
+                    .addTreatment(treatment)
+                    .withType(NextObjective.Type.SIMPLE)
+                    .fromApp(appId)
+                    .add();
+
+            flowObjectiveService.next(deviceId, nextObjective);
+
+            /*
             GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(treatment);
 
             GroupDescription groupDescription
@@ -328,15 +346,16 @@
                                                   appId);
 
             groupService.addGroup(groupDescription);
+            */
 
-            nextHops.put(nextHop.ip(), nextHop);
+            nextHops.put(nextHop.ip(), entry.hashCode());
 
         }
 
         nextHopsCount.add(entry.nextHopIp());
     }
 
-    private synchronized Group deleteNextHop(IpPrefix prefix) {
+    /*private synchronized Group deleteNextHop(IpPrefix prefix) {
         IpAddress nextHopIp = prefixToNextHop.remove(prefix);
         NextHop nextHop = nextHops.get(nextHopIp);
         if (nextHop == null) {
@@ -349,7 +368,7 @@
                                                                 serialize(nextHop.group())));
 
         // FIXME disabling group deletes for now until we verify the logic is OK
-        /*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
+        *//*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
             // There was one or less next hops, so there are now none
 
             log.debug("removing group for next hop {}", nextHop);
@@ -359,10 +378,10 @@
             groupService.removeGroup(deviceId,
                                      new DefaultGroupKey(appKryo.build().serialize(nextHop.group())),
                                      appId);
-        }*/
+        }*//*
 
         return group;
-    }
+    }*/
 
     private class InternalFibListener implements FibListener {
 
@@ -385,12 +404,11 @@
                 .forEach(ipaddr -> fob.addCondition(
                                    Criteria.matchIPDst(ipaddr.subnetAddress())));
             fob.permit().fromApp(appId);
-            flowObjectiveService.filter(deviceId,
-                                 Collections.singletonList(fob.add()));
+            flowObjectiveService.filter(deviceId, fob.add());
         }
     }
 
-    private class InternalGroupListener implements GroupListener {
+   /* private class InternalGroupListener implements GroupListener {
 
         @Override
         public void event(GroupEvent event) {
@@ -412,5 +430,5 @@
                 }
             }
         }
-    }
+    }*/
 }
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/DefaultNextGroup.java b/core/api/src/main/java/org/onosproject/net/behaviour/DefaultNextGroup.java
new file mode 100644
index 0000000..ef1f9de
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/DefaultNextGroup.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.behaviour;
+
+/**
+ * Default implementation of a next group.
+ */
+public class DefaultNextGroup implements NextGroup {
+
+    private final byte[] data;
+
+    public DefaultNextGroup(byte[] data) {
+        this.data = data;
+    }
+
+    @Override
+    public byte[] data() {
+        return data;
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/NextGroup.java b/core/api/src/main/java/org/onosproject/net/behaviour/NextGroup.java
new file mode 100644
index 0000000..b5a3891
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/NextGroup.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.behaviour;
+
+/**
+ * Opaque data type for carrying group-like information.
+ * Only relevant to a pipeliner driver.
+ */
+public interface NextGroup {
+
+    /**
+     * Serialized form of the next group.
+     * @return a byte array.
+     */
+    byte[] data();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
index eda131a..dcfc588 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
@@ -21,9 +21,6 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 
-import java.util.Collection;
-import java.util.concurrent.Future;
-
 /**
  * Behaviour for handling various pipelines.
  */
@@ -40,24 +37,21 @@
     /**
      * Installs the filtering rules onto the device.
      *
-     * @param filterObjectives the collection of filters
-     * @return a future indicating the success of the operation
+     * @param filterObjective a filtering objective
      */
-    Future<Boolean> filter(Collection<FilteringObjective> filterObjectives);
+    void filter(FilteringObjective filterObjective);
 
     /**
      * Installs the forwarding rules onto the device.
      *
-     * @param forwardObjectives the collection of forwarding objectives
-     * @return a future indicating the success of the operation
+     * @param forwardObjective a forwarding objective
      */
-    Future<Boolean> forward(Collection<ForwardingObjective> forwardObjectives);
+    void forward(ForwardingObjective forwardObjective);
 
     /**
      * Installs the next hop elements into the device.
      *
-     * @param nextObjectives the collection of next objectives
-     * @return a future indicating the success of the operation
+     * @param nextObjective a next objectives
      */
-    Future<Boolean> next(Collection<NextObjective> nextObjectives);
+    void next(NextObjective nextObjective);
 }
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
index c2c6dfd..d0ca42b 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
@@ -16,6 +16,7 @@
 package org.onosproject.net.behaviour;
 
 import org.onlab.osgi.ServiceDirectory;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
 
 /**
  * Processing context and supporting services for the pipeline behaviour.
@@ -30,5 +31,11 @@
      */
     ServiceDirectory directory();
 
+    /**
+     * Returns the Objective Store where data can be stored and retrieved.
+     * @return the flow objective store
+     */
+    FlowObjectiveStore store();
+
     // TODO: add means to store and access shared state
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java
index 33b8f5a..94519a9 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -42,6 +43,7 @@
     private final List<Criterion> conditions;
     private final int id;
     private final Operation op;
+    private final Optional<ObjectiveContext> context;
 
     private DefaultFilteringObjective(Type type, boolean permanent, int timeout,
                                       ApplicationId appId, int priority, Criterion key,
@@ -54,6 +56,25 @@
         this.priority = priority;
         this.conditions = conditions;
         this.op = op;
+        this.context = Optional.empty();
+
+        this.id = Objects.hash(type, key, conditions, permanent,
+                               timeout, appId, priority);
+    }
+
+    public DefaultFilteringObjective(Type type, boolean permanent, int timeout,
+                                     ApplicationId appId, int priority, Criterion key,
+                                     List<Criterion> conditions,
+                                     ObjectiveContext context, Operation op) {
+        this.key = key;
+        this.type = type;
+        this.permanent = permanent;
+        this.timeout = timeout;
+        this.appId = appId;
+        this.priority = priority;
+        this.conditions = conditions;
+        this.op = op;
+        this.context = Optional.ofNullable(context);
 
         this.id = Objects.hash(type, key, conditions, permanent,
                                timeout, appId, priority);
@@ -104,6 +125,11 @@
         return op;
     }
 
+    @Override
+    public Optional<ObjectiveContext> context() {
+        return context;
+    }
+
     /**
      * Returns a new builder.
      *
@@ -201,6 +227,31 @@
 
         }
 
+        @Override
+        public FilteringObjective add(ObjectiveContext context) {
+            List<Criterion> conditions = listBuilder.build();
+            checkNotNull(type, "Must have a type.");
+            checkArgument(!conditions.isEmpty(), "Must have at least one condition.");
+            checkNotNull(appId, "Must supply an application id");
+
+            return new DefaultFilteringObjective(type, permanent, timeout,
+                                                 appId, priority, key, conditions,
+                                                 context, Operation.ADD);
+        }
+
+        @Override
+        public FilteringObjective remove(ObjectiveContext context) {
+            List<Criterion> conditions = listBuilder.build();
+            checkNotNull(type, "Must have a type.");
+            checkArgument(!conditions.isEmpty(), "Must have at least one condition.");
+            checkNotNull(appId, "Must supply an application id");
+
+
+            return new DefaultFilteringObjective(type, permanent, timeout,
+                                                 appId, priority, key, conditions,
+                                                 context, Operation.REMOVE);
+        }
+
 
     }
 
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
index d110e07..6489bea 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
@@ -20,6 +20,7 @@
 import org.onosproject.net.flow.TrafficTreatment;
 
 import java.util.Objects;
+import java.util.Optional;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,6 +39,7 @@
     private final int nextId;
     private final TrafficTreatment treatment;
     private final Operation op;
+    private final Optional<ObjectiveContext> context;
 
     private final int id;
 
@@ -55,6 +57,29 @@
         this.nextId = nextId;
         this.treatment = treatment;
         this.op = op;
+        this.context = Optional.empty();
+
+        this.id = Objects.hash(selector, flag, permanent,
+                               timeout, appId, priority, nextId,
+                               treatment, op);
+    }
+
+    private DefaultForwardingObjective(TrafficSelector selector,
+                                       Flag flag, boolean permanent,
+                                       int timeout, ApplicationId appId,
+                                       int priority, int nextId,
+                                       TrafficTreatment treatment,
+                                       ObjectiveContext context, Operation op) {
+        this.selector = selector;
+        this.flag = flag;
+        this.permanent = permanent;
+        this.timeout = timeout;
+        this.appId = appId;
+        this.priority = priority;
+        this.nextId = nextId;
+        this.treatment = treatment;
+        this.op = op;
+        this.context = Optional.ofNullable(context);
 
         this.id = Objects.hash(selector, flag, permanent,
                                timeout, appId, priority, nextId,
@@ -113,6 +138,11 @@
         return op;
     }
 
+    @Override
+    public Optional<ObjectiveContext> context() {
+        return context;
+    }
+
     /**
      * Returns a new builder.
      *
@@ -186,7 +216,7 @@
         public ForwardingObjective add() {
             checkNotNull(selector, "Must have a selector");
             checkNotNull(flag, "A flag must be set");
-            checkArgument(nextId != null && treatment != null, "Must supply at " +
+            checkArgument(nextId != null || treatment != null, "Must supply at " +
                     "least a treatment and/or a nextId");
             checkNotNull(appId, "Must supply an application id");
             return new DefaultForwardingObjective(selector, flag, permanent,
@@ -198,12 +228,38 @@
         public ForwardingObjective remove() {
             checkNotNull(selector, "Must have a selector");
             checkNotNull(flag, "A flag must be set");
-            checkArgument(nextId != null && treatment != null, "Must supply at " +
+            checkArgument(nextId != null || treatment != null, "Must supply at " +
                     "least a treatment and/or a nextId");
             checkNotNull(appId, "Must supply an application id");
             return new DefaultForwardingObjective(selector, flag, permanent,
                                                    timeout, appId, priority,
                                                    nextId, treatment, Operation.REMOVE);
         }
+
+        @Override
+        public ForwardingObjective add(ObjectiveContext context) {
+            checkNotNull(selector, "Must have a selector");
+            checkNotNull(flag, "A flag must be set");
+            checkArgument(nextId != null || treatment != null, "Must supply at " +
+                    "least a treatment and/or a nextId");
+            checkNotNull(appId, "Must supply an application id");
+            return new DefaultForwardingObjective(selector, flag, permanent,
+                                                  timeout, appId, priority,
+                                                  nextId, treatment,
+                                                  context, Operation.ADD);
+        }
+
+        @Override
+        public ForwardingObjective remove(ObjectiveContext context) {
+            checkNotNull(selector, "Must have a selector");
+            checkNotNull(flag, "A flag must be set");
+            checkArgument(nextId != null || treatment != null, "Must supply at " +
+                    "least a treatment and/or a nextId");
+            checkNotNull(appId, "Must supply an application id");
+            return new DefaultForwardingObjective(selector, flag, permanent,
+                                                  timeout, appId, priority,
+                                                  nextId, treatment,
+                                                  context, Operation.REMOVE);
+        }
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java
index 4ab79ff..cc316fe 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java
@@ -21,6 +21,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -34,13 +35,28 @@
     private final ApplicationId appId;
     private final Type type;
     private final Integer id;
+    private final Operation op;
+    private final Optional<ObjectiveContext> context;
 
     private DefaultNextObjective(Integer id, List<TrafficTreatment> treatments,
-                                ApplicationId appId, Type type) {
+                                ApplicationId appId, Type type, Operation op) {
         this.treatments = treatments;
         this.appId = appId;
         this.type = type;
         this.id = id;
+        this.op = op;
+        this.context = Optional.empty();
+    }
+
+    private DefaultNextObjective(Integer id, List<TrafficTreatment> treatments,
+                                 ApplicationId appId, ObjectiveContext context,
+                                 Type type, Operation op) {
+        this.treatments = treatments;
+        this.appId = appId;
+        this.type = type;
+        this.id = id;
+        this.op = op;
+        this.context = Optional.ofNullable(context);
     }
 
     @Override
@@ -80,7 +96,12 @@
 
     @Override
     public Operation op() {
-        throw new UnsupportedOperationException("Next Objective has no operation");
+        return op;
+    }
+
+    @Override
+    public Optional<ObjectiveContext> context() {
+        return context;
     }
 
     /**
@@ -101,8 +122,6 @@
         private final ImmutableList.Builder<TrafficTreatment> listBuilder
                 = ImmutableList.builder();
 
-
-
         @Override
         public NextObjective.Builder withId(int nextId) {
             this.id = nextId;
@@ -143,7 +162,7 @@
         }
 
         @Override
-        public Builder fromApp(ApplicationId appId) {
+        public NextObjective.Builder fromApp(ApplicationId appId) {
             this.appId = appId;
             return this;
         }
@@ -160,14 +179,49 @@
         }
 
         @Override
-        public NextObjective build() {
+        public NextObjective add() {
             List<TrafficTreatment> treatments = listBuilder.build();
             checkNotNull(appId, "Must supply an application id");
             checkNotNull(id, "id cannot be null");
             checkNotNull(type, "The type cannot be null");
             checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
 
-            return new DefaultNextObjective(id, treatments, appId, type);
+            return new DefaultNextObjective(id, treatments, appId, type, Operation.ADD);
+        }
+
+        @Override
+        public NextObjective remove() {
+            List<TrafficTreatment> treatments = listBuilder.build();
+            checkNotNull(appId, "Must supply an application id");
+            checkNotNull(id, "id cannot be null");
+            checkNotNull(type, "The type cannot be null");
+            checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
+
+            return new DefaultNextObjective(id, treatments, appId, type, Operation.REMOVE);
+        }
+
+        @Override
+        public NextObjective add(ObjectiveContext context) {
+            List<TrafficTreatment> treatments = listBuilder.build();
+            checkNotNull(appId, "Must supply an application id");
+            checkNotNull(id, "id cannot be null");
+            checkNotNull(type, "The type cannot be null");
+            checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
+
+            return new DefaultNextObjective(id, treatments, appId,
+                                            context, type, Operation.ADD);
+        }
+
+        @Override
+        public NextObjective remove(ObjectiveContext context) {
+            List<TrafficTreatment> treatments = listBuilder.build();
+            checkNotNull(appId, "Must supply an application id");
+            checkNotNull(id, "id cannot be null");
+            checkNotNull(type, "The type cannot be null");
+            checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
+
+            return new DefaultNextObjective(id, treatments, appId,
+                                            context, type, Operation.REMOVE);
         }
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java
index d892a97..89b668d 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java
@@ -114,6 +114,24 @@
          */
         public FilteringObjective remove();
 
+        /**
+         * Builds the filtering objective that will be added.
+         * The context will be used to notify the calling application.
+         *
+         * @param context an objective context
+         * @return a filtering objective
+         */
+        public FilteringObjective add(ObjectiveContext context);
+
+        /**
+         * Builds the filtering objective that will be removed.
+         * The context will be used to notify the calling application.
+         *
+         * @param context an objective context
+         * @return a filtering objective
+         */
+        public FilteringObjective remove(ObjectiveContext context);
+
 
     }
 
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java
index 14af2b8..690fcc7 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java
@@ -17,9 +17,6 @@
 
 import org.onosproject.net.DeviceId;
 
-import java.util.Collection;
-import java.util.concurrent.Future;
-
 /**
  * Service for programming data plane flow rules in manner independent of
  * specific device table pipeline configuration.
@@ -30,27 +27,24 @@
      * Installs the filtering rules onto the specified device.
      *
      * @param deviceId            device identifier
-     * @param filteringObjectives the collection of filters
-     * @return a future indicating the success of the operation
+     * @param filteringObjective the filtering objective
      */
-    Future<Boolean> filter(DeviceId deviceId, Collection<FilteringObjective> filteringObjectives);
+    void filter(DeviceId deviceId, FilteringObjective filteringObjective);
 
     /**
      * Installs the forwarding rules onto the specified device.
      *
      * @param deviceId             device identifier
-     * @param forwardingObjectives the collection of forwarding objectives
-     * @return a future indicating the success of the operation
+     * @param forwardingObjective the forwarding objective
      */
-    Future<Boolean> forward(DeviceId deviceId, Collection<ForwardingObjective> forwardingObjectives);
+    void forward(DeviceId deviceId, ForwardingObjective forwardingObjective);
 
     /**
      * Installs the next hop elements into the specified device.
      *
      * @param deviceId       device identifier
-     * @param nextObjectives the collection of next objectives
-     * @return a future indicating the success of the operation
+     * @param nextObjective a next objective
      */
-    Future<Boolean> next(DeviceId deviceId, Collection<NextObjective> nextObjectives);
+    void next(DeviceId deviceId, NextObjective nextObjective);
 
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java
new file mode 100644
index 0000000..e667618
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.store.Store;
+
+/**
+ * The flow objective store.
+ */
+public interface FlowObjectiveStore
+        extends Store<ObjectiveEvent, FlowObjectiveStoreDelegate> {
+
+    /**
+     * Adds a NextGroup to the store.
+     *
+     * @param nextId an integer
+     * @param group a next group opaque object
+     */
+    void putNextGroup(Integer nextId, NextGroup group);
+
+    /**
+     * Fetch a next group from the store.
+     * @param nextId an integer
+     * @return a next group
+     */
+    NextGroup getNextGroup(Integer nextId);
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStoreDelegate.java
new file mode 100644
index 0000000..5af7836
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Flow Objective store delegate abstraction.
+ */
+public interface FlowObjectiveStoreDelegate extends StoreDelegate<ObjectiveEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java
index 4fecc54..dcc377f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java
@@ -121,5 +121,23 @@
          * @return a forwarding objective.
          */
         public ForwardingObjective remove();
+
+        /**
+         * Builds the forwarding objective that will be added.
+         * The context will be used to notify the calling application.
+         *
+         * @param context an objective context
+         * @return a forwarding objective
+         */
+        public ForwardingObjective add(ObjectiveContext context);
+
+        /**
+         * Builds the forwarding objective that will be removed.
+         * The context will be used to notify the calling application.
+         *
+         * @param context an objective context
+         * @return a forwarding objective
+         */
+        public ForwardingObjective remove(ObjectiveContext context);
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java
index 52e79ee..02c4d9e 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.net.flowobjective;
 
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.flow.TrafficTreatment;
 
 import java.util.Collection;
@@ -95,12 +96,40 @@
          */
         public Builder addTreatment(TrafficTreatment treatment);
 
+        @Override
+        public Builder fromApp(ApplicationId appId);
+
         /**
-         * Builds a next step.
+         * Builds the next objective that will be added.
          *
-         * @return a next step
+         * @return a next objective
          */
-        public NextObjective build();
+        public NextObjective add();
+
+        /**
+         * Builds the next objective that will be removed.
+         *
+         * @return a next objective.
+         */
+        public NextObjective remove();
+
+        /**
+         * Builds the next objective that will be added.
+         * The context will be used to notify the calling application.
+         *
+         * @param context an objective context
+         * @return a next objective
+         */
+        public NextObjective add(ObjectiveContext context);
+
+        /**
+         * Builds the next objective that will be removed.
+         * The context will be used to notify the calling application.
+         *
+         * @param context an objective context
+         * @return a next objective
+         */
+        public NextObjective remove(ObjectiveContext context);
 
     }
 
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java
index 3971b03..fa98b6d 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java
@@ -17,6 +17,8 @@
 
 import org.onosproject.core.ApplicationId;
 
+import java.util.Optional;
+
 /**
  * Base representation of an flow description.
  */
@@ -84,6 +86,14 @@
     Operation op();
 
     /**
+     * Obtains an optional context.
+     *
+     * @return optional; which will be empty if there is no context.
+     * Otherwise it will return the context.
+     */
+    Optional<ObjectiveContext> context();
+
+    /**
      * An objective builder.
      */
     public interface Builder {
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java
new file mode 100644
index 0000000..00e4ed8
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * The context of a objective that will become the subject of
+ * the notification.
+ *
+ * Implementations of this class must be serializable.
+ */
+public interface ObjectiveContext {
+
+    default void onSuccess(Objective objective) {}
+
+    default void onError(Objective objective, ObjectiveError error) {}
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
new file mode 100644
index 0000000..6e60ab6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * Represents the set of errors possible when processing an objective.
+ */
+public enum ObjectiveError {
+
+    /**
+     * The driver processing this objective does not know how to process it.
+     */
+    UNSUPPORTED,
+
+    /**
+     * The flow installation for this objective failed.
+     */
+    FLOWINSTALLATIONFAILED,
+
+    /**
+     * THe group installation for this objective failed.
+     */
+    GROUPINSTALLATIONFAILED,
+
+    /**
+     * The group was reported as installed but is not missing.
+     */
+    GROUPMISSING,
+
+    /**
+     * An unknown error occurred.
+     */
+    UNKNOWN
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveEvent.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveEvent.java
new file mode 100644
index 0000000..9f095cf
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Describes a objective event.
+ */
+public class ObjectiveEvent extends AbstractEvent<ObjectiveEvent.Type, Integer> {
+
+    /**
+     * Type of objective events.
+     */
+    public enum Type {
+        /**
+         * Signifies that the objective has been added to the store.
+         */
+        ADD,
+
+        /**
+         * Signifies that the objective has been removed.
+         */
+        REMOVE
+    }
+
+    /**
+     * Creates an event of the given type for the specified objective id.
+     *
+     * @param type the type of the event
+     * @param objective the objective id the event is about
+     */
+    public ObjectiveEvent(Type type, Integer objective) {
+        super(type, objective);
+    }
+
+    /**
+     * Creates an event of the given type for the specified objective id at the given
+     * time.
+     *
+     * @param type the type of the event
+     * @param objective the objective id the event is about
+     * @param time the time of the event
+     */
+    public ObjectiveEvent(Type type, Integer objective, long time) {
+        super(type, objective, time);
+    }
+}
+
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 4e78d04..17b0aa7 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -17,7 +17,7 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -43,9 +43,12 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
 import org.onosproject.net.group.GroupService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,14 +56,14 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Provides implementation of the flow objective programming service.
  */
-@Component(immediate = false)
+@Component(immediate = true)
 @Service
 public class FlowObjectiveManager implements FlowObjectiveService {
 
@@ -89,6 +92,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected GroupService groupService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowObjectiveStore flowObjectiveStore;
+
+    private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
 
     private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
     private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
@@ -101,10 +108,16 @@
 
     private final Map<DeviceId, Collection<Objective>> pendingObjectives =
             Maps.newConcurrentMap();
+
     private NodeId localNode;
 
+    private Map<Integer, Set<PendingNext>> pendingForwards =
+            Maps.newConcurrentMap();
+
+
     @Activate
     protected void activate() {
+        flowObjectiveStore.setDelegate(delegate);
         localNode = clusterService.getLocalNode().id();
         mastershipService.addListener(mastershipListener);
         deviceService.addListener(deviceListener);
@@ -114,46 +127,64 @@
 
     @Deactivate
     protected void deactivate() {
+        flowObjectiveStore.unsetDelegate(delegate);
         mastershipService.removeListener(mastershipListener);
         deviceService.removeListener(deviceListener);
         log.info("Stopped");
     }
 
     @Override
-    public Future<Boolean> filter(DeviceId deviceId,
-                                  Collection<FilteringObjective> filteringObjectives) {
+    public void filter(DeviceId deviceId,
+                                  FilteringObjective filteringObjective) {
         if (deviceService.isAvailable(deviceId)) {
-            return getDevicePipeliner(deviceId).filter(filteringObjectives);
+            getDevicePipeliner(deviceId).filter(filteringObjective);
         } else {
-            filteringObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+            updatePendingMap(deviceId, filteringObjective);
         }
-        return Futures.immediateFuture(true);
-    }
 
-
-
-    @Override
-    public Future<Boolean> forward(DeviceId deviceId,
-                                   Collection<ForwardingObjective> forwardingObjectives) {
-        if (deviceService.isAvailable(deviceId)) {
-            return getDevicePipeliner(deviceId).forward(forwardingObjectives);
-        } else {
-            forwardingObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
-        }
-        return Futures.immediateFuture(true);
     }
 
     @Override
-    public Future<Boolean> next(DeviceId deviceId,
-                                Collection<NextObjective> nextObjectives) {
-        if (deviceService.isAvailable(deviceId)) {
-            return getDevicePipeliner(deviceId).next(nextObjectives);
-        } else {
-            nextObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+    public void forward(DeviceId deviceId,
+                                   ForwardingObjective forwardingObjective) {
+
+        if (queueObjective(deviceId, forwardingObjective)) {
+            return;
         }
-        return Futures.immediateFuture(true);
+
+        if (deviceService.isAvailable(deviceId)) {
+            getDevicePipeliner(deviceId).forward(forwardingObjective);
+        } else {
+            updatePendingMap(deviceId, forwardingObjective);
+        }
+
     }
 
+    @Override
+    public void next(DeviceId deviceId,
+                                NextObjective nextObjective) {
+        if (deviceService.isAvailable(deviceId)) {
+            getDevicePipeliner(deviceId).next(nextObjective);
+        } else {
+            updatePendingMap(deviceId, nextObjective);
+        }
+    }
+
+    private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+        if (fwd.nextId() != null &&
+                flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
+            log.warn("Queuing forwarding objective.");
+            if (pendingForwards.putIfAbsent(fwd.nextId(),
+                                Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
+                Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
+                pending.add(new PendingNext(deviceId, fwd));
+            }
+            return true;
+        }
+        return false;
+    }
+
+
     private void updatePendingMap(DeviceId deviceId, Objective pending) {
         if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
             Collection<Objective> objectives = pendingObjectives.get(deviceId);
@@ -169,6 +200,33 @@
         return pipeliner;
     }
 
+    private void setupPipelineHandler(DeviceId deviceId) {
+        if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
+            // Attempt to lookup the handler in the cache
+            DriverHandler handler = driverHandlers.get(deviceId);
+            if (handler == null) {
+                try {
+                    // Otherwise create it and if it has pipeline behaviour, cache it
+                    handler = driverService.createHandler(deviceId);
+                    if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+                        log.warn("Pipeline behaviour not supported for device {}",
+                                 deviceId);
+                        return;
+                    }
+                } catch (ItemNotFoundException e) {
+                    log.warn("No applicable driver for device {}", deviceId);
+                    return;
+                }
+                driverHandlers.put(deviceId, handler);
+            }
+
+            // Always (re)initialize the pipeline behaviour
+            Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+            pipeliner.init(deviceId, context);
+            pipeliners.putIfAbsent(deviceId, pipeliner);
+            log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
+        }
+    }
 
     // Triggers driver setup when the local node becomes a device master.
     private class InnerMastershipListener implements MastershipListener {
@@ -221,52 +279,70 @@
             pendingObjectives.getOrDefault(deviceId,
                                            Collections.emptySet()).forEach(obj -> {
                 if (obj instanceof NextObjective) {
-                    getDevicePipeliner(deviceId)
-                            .next(Collections.singletonList((NextObjective) obj));
+                    next(deviceId, (NextObjective) obj);
                 } else if (obj instanceof ForwardingObjective) {
-                    getDevicePipeliner(deviceId)
-                            .forward(Collections.singletonList((ForwardingObjective) obj));
+                    forward(deviceId, (ForwardingObjective) obj);
                 } else {
                     getDevicePipeliner(deviceId)
-                            .filter(Collections.singletonList((FilteringObjective) obj));
+                            .filter((FilteringObjective) obj);
                 }
             });
         }
     }
 
-    private void setupPipelineHandler(DeviceId deviceId) {
-        if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
-            // Attempt to lookup the handler in the cache
-            DriverHandler handler = driverHandlers.get(deviceId);
-            if (handler == null) {
-                try {
-                    // Otherwise create it and if it has pipeline behaviour, cache it
-                    handler = driverService.createHandler(deviceId);
-                    if (!handler.driver().hasBehaviour(Pipeliner.class)) {
-                        log.warn("Pipeline behaviour not supported for device {}",
-                                 deviceId);
-                        return;
-                    }
-                } catch (ItemNotFoundException e) {
-                    log.warn("No applicable driver for device {}", deviceId);
-                    return;
-                }
-                driverHandlers.put(deviceId, handler);
-            }
-
-            // Always (re)initialize the pipeline behaviour
-            Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
-            pipeliner.init(deviceId, context);
-            pipeliners.putIfAbsent(deviceId, pipeliner);
-            log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
-        }
-    }
-
     // Processing context for initializing pipeline driver behaviours.
     private class InnerPipelineContext implements PipelinerContext {
         @Override
         public ServiceDirectory directory() {
             return serviceDirectory;
         }
+
+        @Override
+        public FlowObjectiveStore store() {
+            return flowObjectiveStore;
+        }
+
+
+    }
+
+    private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+        @Override
+        public void notify(ObjectiveEvent event) {
+            Set<PendingNext> pending = pendingForwards.remove(event.subject());
+
+            if (pending == null) {
+                return;
+            }
+
+            log.info("Processing pending objectives {}", pending.size());
+
+            pending.forEach(p -> getDevicePipeliner(p.deviceId())
+                    .forward(p.forwardingObjective()));
+
+        }
+    }
+
+    /**
+     * Data class used to hold a pending forwarding objective that could not
+     * be processed because the associated next object was not present.
+     */
+    private class PendingNext {
+        private final DeviceId deviceId;
+        private final ForwardingObjective fwd;
+
+        public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+            this.deviceId = deviceId;
+            this.fwd = fwd;
+        }
+
+        public DeviceId deviceId() {
+            return deviceId;
+        }
+
+        public ForwardingObjective forwardingObjective() {
+            return fwd;
+        }
+
+
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
new file mode 100644
index 0000000..94d72ec
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flowobjective.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.behaviour.DefaultNextGroup;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of created next groups.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedFlowObjectiveStore
+        extends AbstractStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
+        implements FlowObjectiveStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private ConsistentMap<Integer, byte[]> nextGroups;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Activate
+    public void activate() {
+        nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
+                .withName("flowobjective-groups")
+                .withSerializer(Serializer.using(
+                        new KryoNamespace.Builder()
+                                .register(byte[].class)
+                                .build()))
+                .build();
+
+        log.info("Started");
+    }
+
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+
+    @Override
+    public void putNextGroup(Integer nextId, NextGroup group) {
+        nextGroups.putIfAbsent(nextId, group.data());
+        notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.ADD, nextId));
+    }
+
+    @Override
+    public NextGroup getNextGroup(Integer nextId) {
+        Versioned<byte[]> versionGroup = nextGroups.get(nextId);
+        if (versionGroup != null) {
+            return new DefaultNextGroup(versionGroup.value());
+        }
+        return null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 3081194..bc9d609 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -167,10 +167,13 @@
         public PacketRequestTracker() {
             requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
                     .withName("packet-requests")
+                    .withSerializer(Serializer.using(
+                            new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
                     .withSerializer(new Serializer() {
                         KryoNamespace kryo = new KryoNamespace.Builder()
                                 .register(KryoNamespaces.API)
                                 .build();
+
                         @Override
                         public <T> byte[] encode(T object) {
                             return kryo.serialize(object);
diff --git a/drivers/pom.xml b/drivers/pom.xml
index bb07a78..334338a 100644
--- a/drivers/pom.xml
+++ b/drivers/pom.xml
@@ -44,6 +44,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>1.2.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.easymock</groupId>
             <artifactId>easymock</artifactId>
             <scope>test</scope>
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
index 1e2d277..7308898 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
@@ -31,11 +31,9 @@
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveError;
 import org.slf4j.Logger;
 
-import java.util.Collection;
-import java.util.concurrent.Future;
-
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -58,59 +56,62 @@
     }
 
     @Override
-    public Future<Boolean> filter(Collection<FilteringObjective> filters) {
+    public void filter(FilteringObjective filter) {
         throw new UnsupportedOperationException("Single table does not filter.");
     }
 
     @Override
-    public Future<Boolean> forward(Collection<ForwardingObjective> forwardings) {
+    public void forward(ForwardingObjective fwd) {
         FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
-        forwardings.forEach(fwd -> {
-            if (fwd.flag() != ForwardingObjective.Flag.VERSATILE) {
-                throw new UnsupportedOperationException(
-                        "Only VERSATILE is supported.");
-            }
 
-            TrafficSelector selector = fwd.selector();
+        if (fwd.flag() != ForwardingObjective.Flag.VERSATILE) {
+            throw new UnsupportedOperationException(
+                    "Only VERSATILE is supported.");
+        }
 
-            FlowRule rule = new DefaultFlowRule(deviceId, selector,
-                                                fwd.treatment(),
-                                                fwd.priority(), fwd.appId(),
-                                                new DefaultGroupId(fwd.id()),
-                                                fwd.timeout(), fwd.permanent());
+        TrafficSelector selector = fwd.selector();
 
-            switch (fwd.op()) {
+        FlowRule rule = new DefaultFlowRule(deviceId, selector,
+                                            fwd.treatment(),
+                                            fwd.priority(), fwd.appId(),
+                                            new DefaultGroupId(fwd.id()),
+                                            fwd.timeout(), fwd.permanent());
 
-                case ADD:
-                    flowBuilder.add(rule);
-                    break;
-                case REMOVE:
-                    flowBuilder.remove(rule);
-                    break;
-                default:
-                    log.warn("Unknown operation {}", fwd.op());
-            }
+        switch (fwd.op()) {
 
-        });
+            case ADD:
+                flowBuilder.add(rule);
+                break;
+            case REMOVE:
+                flowBuilder.remove(rule);
+                break;
+            default:
+                log.warn("Unknown operation {}", fwd.op());
+        }
+
 
         SettableFuture<Boolean> future = SettableFuture.create();
 
         flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
             @Override
             public void onSuccess(FlowRuleOperations ops) {
-                future.set(true);
+                if (fwd.context().isPresent()) {
+                    fwd.context().get().onSuccess(fwd);
+                }
             }
 
             @Override
             public void onError(FlowRuleOperations ops) {
-                future.set(false);
+                if (fwd.context().isPresent()) {
+                    fwd.context().get().onError(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
+                }
             }
         }));
-        return future;
+
     }
 
     @Override
-    public Future<Boolean> next(Collection<NextObjective> nextObjectives) {
+    public void next(NextObjective nextObjective) {
         throw new UnsupportedOperationException("Single table does not next hop.");
     }
 
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
index 997c207..4c7bdee 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
@@ -15,15 +15,19 @@
  */
 package org.onosproject.driver.pipeline;
 
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.behaviour.PipelinerContext;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
@@ -39,18 +43,37 @@
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
 import org.slf4j.Logger;
 
 import java.util.Collection;
-import java.util.concurrent.Future;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Corsa pipeline handler.
+ * OpenvSwitch emulation of the Corsa pipeline handler.
  */
 public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeliner {
 
@@ -63,17 +86,45 @@
     private ServiceDirectory serviceDirectory;
     private FlowRuleService flowRuleService;
     private CoreService coreService;
+    private GroupService groupService;
+    private FlowObjectiveStore flowObjectiveStore;
     private DeviceId deviceId;
     private ApplicationId appId;
 
+    private KryoNamespace appKryo = new KryoNamespace.Builder()
+            .register(GroupKey.class)
+            .register(DefaultGroupKey.class)
+            .register(CorsaGroup.class)
+            .register(byte[].class)
+            .build();
+
+    private Cache<GroupKey, NextObjective> pendingGroups;
+
+    private ScheduledExecutorService groupChecker =
+            Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
+                                                               "ovs-corsa-%d"));
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         this.serviceDirectory = context.directory();
         this.deviceId = deviceId;
 
+        pendingGroups = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+                    }
+                }).build();
+
+        groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
 
         coreService = serviceDirectory.get(CoreService.class);
         flowRuleService = serviceDirectory.get(FlowRuleService.class);
+        groupService = serviceDirectory.get(GroupService.class);
+        flowObjectiveStore = context.store();
+
+        groupService.addListener(new InnerGroupListener());
 
         appId = coreService.registerApplication(
                 "org.onosproject.driver.OVSCorsaPipeline");
@@ -82,33 +133,159 @@
     }
 
     @Override
-    public Future<Boolean> filter(Collection<FilteringObjective> filteringObjectives) {
-        Collection<Future<Boolean>> results = Sets.newHashSet();
-        filteringObjectives.stream()
-                .filter(obj -> obj.type() == FilteringObjective.Type.PERMIT)
-                .forEach(filtobj -> results.add(processFilter(filtobj,
-                                        filtobj.op() == Objective.Operation.ADD,
-                                        filtobj.appId()
-                        )));
+    public void filter(FilteringObjective filteringObjective) {
+        if (filteringObjective.type() == FilteringObjective.Type.PERMIT) {
+            processFilter(filteringObjective,
+                          filteringObjective.op() == Objective.Operation.ADD,
+                          filteringObjective.appId());
+        } else {
+            fail(filteringObjective, ObjectiveError.UNSUPPORTED);
+        }
+    }
 
-        //TODO: return something more helpful/sensible in the future (no pun intended)
-        return results.iterator().next();
+    @Override
+    public void forward(ForwardingObjective fwd) {
+        Collection<FlowRule> rules;
+        FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
+
+        rules = processForward(fwd);
+        switch (fwd.op()) {
+            case ADD:
+                rules.stream()
+                        .filter(rule -> rule != null)
+                        .forEach(flowBuilder::add);
+                break;
+            case REMOVE:
+                rules.stream()
+                        .filter(rule -> rule != null)
+                        .forEach(flowBuilder::remove);
+                break;
+            default:
+                fail(fwd, ObjectiveError.UNKNOWN);
+                log.warn("Unknown forwarding type {}", fwd.op());
+        }
+
+
+        flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                pass(fwd);
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
+            }
+        }));
 
     }
 
-    private Future<Boolean> processFilter(FilteringObjective filt, boolean install,
+    @Override
+    public void next(NextObjective nextObjective) {
+        switch (nextObjective.type()) {
+            case SIMPLE:
+                Collection<TrafficTreatment> treatments = nextObjective.next();
+                if (treatments.size() == 1) {
+                    TrafficTreatment treatment = treatments.iterator().next();
+                    GroupBucket bucket =
+                            DefaultGroupBucket.createIndirectGroupBucket(treatment);
+                    final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+                    GroupDescription groupDescription
+                            = new DefaultGroupDescription(deviceId,
+                                    GroupDescription.Type.INDIRECT,
+                                    new GroupBuckets(Collections
+                                                             .singletonList(bucket)),
+                                    key,
+                                    nextObjective.appId());
+                    groupService.addGroup(groupDescription);
+                    pendingGroups.put(key, nextObjective);
+                }
+                break;
+            case HASHED:
+            case BROADCAST:
+            case FAILOVER:
+                fail(nextObjective, ObjectiveError.UNSUPPORTED);
+                log.warn("Unsupported next objective type {}", nextObjective.type());
+                break;
+            default:
+                fail(nextObjective, ObjectiveError.UNKNOWN);
+                log.warn("Unknown next objective type {}", nextObjective.type());
+        }
+
+    }
+
+    private Collection<FlowRule> processForward(ForwardingObjective fwd) {
+        switch (fwd.flag()) {
+            case SPECIFIC:
+                return processSpecific(fwd);
+            case VERSATILE:
+                return processVersatile(fwd);
+            default:
+                fail(fwd, ObjectiveError.UNKNOWN);
+                log.warn("Unknown forwarding flag {}", fwd.flag());
+        }
+        return Collections.emptySet();
+    }
+
+    private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
+        fail(fwd, ObjectiveError.UNSUPPORTED);
+        return Collections.emptySet();
+    }
+
+    private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
+        log.warn("Processing specific");
+        TrafficSelector selector = fwd.selector();
+        Criteria.EthTypeCriterion ethType =
+                (Criteria.EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
+        if (ethType == null || ethType.ethType() != Ethernet.TYPE_IPV4) {
+            fail(fwd, ObjectiveError.UNSUPPORTED);
+            return Collections.emptySet();
+        }
+
+        TrafficSelector filteredSelector =
+                DefaultTrafficSelector.builder()
+                        .matchEthType(Ethernet.TYPE_IPV4)
+                        .matchIPDst(
+                                ((Criteria.IPCriterion)
+                                        selector.getCriterion(Criterion.Type.IPV4_DST)).ip())
+                        .build();
+
+        NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
+
+        GroupKey key = appKryo.deserialize(next.data());
+
+        Group group = groupService.getGroup(deviceId, key);
+
+        if (group == null) {
+            log.warn("The group left!");
+            fail(fwd, ObjectiveError.GROUPMISSING);
+            return Collections.emptySet();
+        }
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .group(group.id())
+                .build();
+
+        return Collections.singletonList(
+                new DefaultFlowRule(deviceId, filteredSelector, treatment,
+                                   fwd.priority(), fwd.appId(), 0, fwd.permanent(),
+                                   FlowRule.Type.IP));
+
+    }
+
+    private void processFilter(FilteringObjective filt, boolean install,
                                              ApplicationId applicationId) {
-        SettableFuture<Boolean> result = SettableFuture.create();
         // This driver only processes filtering criteria defined with switch
         // ports as the key
-        Criteria.PortCriterion p = null;
+        Criteria.PortCriterion p;
         if (!filt.key().equals(Criteria.dummy()) &&
                 filt.key().type() == Criterion.Type.IN_PORT) {
             p = (Criteria.PortCriterion) filt.key();
         } else {
             log.warn("No key defined in filtering objective from app: {}. Not"
                     + "processing filtering objective", applicationId);
-            return null;
+            fail(filt, ObjectiveError.UNKNOWN);
+            return;
         }
         // convert filtering conditions for switch-intfs into flowrules
         FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
@@ -154,45 +331,45 @@
             } else {
                 log.warn("Driver does not currently process filtering condition"
                         + " of type: {}", c.type());
+                fail(filt, ObjectiveError.UNSUPPORTED);
             }
         }
         // apply filtering flow rules
         flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
             @Override
             public void onSuccess(FlowRuleOperations ops) {
-                result.set(true);
+                pass(filt);
                 log.info("Provisioned default table for bgp router");
             }
 
             @Override
             public void onError(FlowRuleOperations ops) {
-                result.set(false);
+                fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
                 log.info("Failed to provision default table for bgp router");
             }
         }));
-
-        return result;
     }
 
-    @Override
-    public Future<Boolean> forward(Collection<ForwardingObjective> forwardObjectives) {
-        return null;
+    private void pass(Objective obj) {
+        if (obj.context().isPresent()) {
+            obj.context().get().onSuccess(obj);
+        }
     }
 
-    @Override
-    public Future<Boolean> next(Collection<NextObjective> nextObjectives) {
-        return null;
+    private void fail(Objective obj, ObjectiveError error) {
+        if (obj.context().isPresent()) {
+            obj.context().get().onError(obj, error);
+        }
     }
 
     private void pushDefaultRules() {
-        boolean install = true;
-        processTableZero(install);
-        processTableOne(install);
-        processTableTwo(install);
-        processTableFour(install);
-        processTableFive(install);
-        processTableSix(install);
-        processTableNine(install);
+        processTableZero(true);
+        processTableOne(true);
+        processTableTwo(true);
+        processTableFour(true);
+        processTableFive(true);
+        processTableSix(true);
+        processTableNine(true);
     }
 
     private void processTableZero(boolean install) {
@@ -447,4 +624,59 @@
         }));
     }
 
+    private class InnerGroupListener implements GroupListener {
+        @Override
+        public void event(GroupEvent event) {
+            if (event.type() == GroupEvent.Type.GROUP_ADDED) {
+                GroupKey key = event.subject().appCookie();
+
+                NextObjective obj = pendingGroups.getIfPresent(key);
+                if (obj != null) {
+                    flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
+                    pass(obj);
+                    pendingGroups.invalidate(key);
+                }
+            }
+        }
+    }
+
+
+    private class GroupChecker implements Runnable {
+
+        @Override
+        public void run() {
+            Set<GroupKey> keys = pendingGroups.asMap().keySet().stream()
+                    .filter(key -> groupService.getGroup(deviceId, key) != null)
+                    .collect(Collectors.toSet());
+
+            keys.stream().forEach(key -> {
+                NextObjective obj = pendingGroups.getIfPresent(key);
+                if (obj == null) {
+                    return;
+                }
+                pass(obj);
+                pendingGroups.invalidate(key);
+                flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
+            });
+        }
+    }
+
+    private class CorsaGroup implements NextGroup {
+
+        private final GroupKey key;
+
+        public CorsaGroup(GroupKey key) {
+            this.key = key;
+        }
+
+        public GroupKey key() {
+            return key;
+        }
+
+        @Override
+        public byte[] data() {
+            return appKryo.serialize(key);
+        }
+
+    }
 }
diff --git a/drivers/src/main/resources/onos-drivers.xml b/drivers/src/main/resources/onos-drivers.xml
index f1a1017..2efc607 100644
--- a/drivers/src/main/resources/onos-drivers.xml
+++ b/drivers/src/main/resources/onos-drivers.xml
@@ -19,7 +19,7 @@
         <behaviour api="org.onosproject.net.behaviour.Pipeliner"
                    impl="org.onosproject.driver.pipeline.DefaultSingleTablePipeline"/>
     </driver>
-    <driver name="ovs-corsa" manufacturer="Nicira, Inc." hwVersion="Open vSwitch" swVersion="2.3.0">
+    <driver name="ovs-corsa" manufacturer="Corsa" hwVersion="emulation" swVersion="0.0.0">
         <behaviour api="org.onosproject.net.behaviour.Pipeliner"
                    impl="org.onosproject.driver.pipeline.OVSCorsaPipeline"/>
     </driver>
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowModBuilderVer13.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowModBuilderVer13.java
index 25e2568..f0ce1f4 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowModBuilderVer13.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowModBuilderVer13.java
@@ -195,7 +195,6 @@
         for (Instruction i : treatments) {
             switch (i.type()) {
                 case DROP:
-                    log.warn("Saw drop action; assigning drop action");
                     return new LinkedList<>();
                 case L0MODIFICATION:
                     actions.add(buildL0Modification(i));