ONOS-2145  Added ability to withdraw packet intercepts via PacketService::cancelPackets.

Change-Id: Ie41271fa02740560bd67b0faf49f633ee749773c
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java
index 5bb2bdc..c6db17f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java
@@ -20,14 +20,27 @@
 /**
  * The context of a objective that will become the subject of
  * the notification.
- *
+ * <p>
  * Implementations of this class must be serializable.
+ * </p>
  */
 @Beta
 public interface ObjectiveContext {
 
-    default void onSuccess(Objective objective) {}
+    /**
+     * Invoked on successful execution of the flow objective.
+     *
+     * @param objective objective to execute
+     */
+    default void onSuccess(Objective objective) {
+    }
 
-    default void onError(Objective objective, ObjectiveError error) {}
+    /**
+     * Invoked when error is encountered while executing the flow objective.
+     *
+     * @param objective objective to execute
+     */
+    default void onError(Objective objective, ObjectiveError error) {
+    }
 
 }
diff --git a/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java b/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java
index 0efcc7f..ce2eb11 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java
@@ -17,9 +17,10 @@
 
 import com.google.common.base.MoreObjects;
 import org.onosproject.core.ApplicationId;
-import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.TrafficSelector;
 
+import java.util.Objects;
+
 /**
  * Default implementation of a packet request.
  */
@@ -27,14 +28,19 @@
     private final TrafficSelector selector;
     private final PacketPriority priority;
     private final ApplicationId appId;
-    private final FlowRule.Type tableType;
 
+    /**
+     * Creates a new packet request.
+     *
+     * @param selector  traffic selector
+     * @param priority  intercept priority
+     * @param appId     application id
+     */
     public DefaultPacketRequest(TrafficSelector selector, PacketPriority priority,
-                                ApplicationId appId, FlowRule.Type tableType) {
+                                ApplicationId appId) {
         this.selector = selector;
         this.priority = priority;
         this.appId = appId;
-        this.tableType = tableType;
     }
 
     public TrafficSelector selector() {
@@ -49,39 +55,23 @@
         return appId;
     }
 
-    public FlowRule.Type tableType() {
-        return tableType;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        DefaultPacketRequest that = (DefaultPacketRequest) o;
-
-        if (priority != that.priority) {
-            return false;
-        }
-        if (!selector.equals(that.selector)) {
-            return false;
-        }
-        if (!tableType.equals(that.tableType)) {
-            return false;
-        }
-
-        return true;
-    }
-
     @Override
     public int hashCode() {
-        int result = selector.hashCode();
-        result = 31 * result + priority.hashCode();
-        return result;
+        return Objects.hash(selector, priority, appId);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final DefaultPacketRequest other = (DefaultPacketRequest) obj;
+        return Objects.equals(this.selector, other.selector)
+                && Objects.equals(this.priority, other.priority)
+                && Objects.equals(this.appId, other.appId);
     }
 
     @Override
@@ -89,7 +79,6 @@
         return MoreObjects.toStringHelper(this.getClass())
                 .add("selector", selector)
                 .add("priority", priority)
-                .add("appId", appId)
-                .add("table-type", tableType).toString();
+                .add("appId", appId).toString();
     }
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java b/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java
index a4e45ac..dc09219 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java
@@ -16,7 +16,6 @@
 package org.onosproject.net.packet;
 
 import org.onosproject.core.ApplicationId;
-import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.TrafficSelector;
 
 /**
@@ -26,26 +25,23 @@
 
     /**
      * Obtain the traffic selector.
+     *
      * @return a traffic selector
      */
     TrafficSelector selector();
 
     /**
      * Obtain the priority.
+     *
      * @return a PacketPriority
      */
     PacketPriority priority();
 
     /**
      * Obtain the application id.
+     *
      * @return an application id
      */
     ApplicationId appId();
 
-    /**
-     * Obtain the table type.
-     * @return a table type
-     */
-    FlowRule.Type tableType();
-
 }
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketService.java b/core/api/src/main/java/org/onosproject/net/packet/PacketService.java
index be5a505..06c416e 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketService.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketService.java
@@ -16,7 +16,6 @@
 package org.onosproject.net.packet;
 
 import org.onosproject.core.ApplicationId;
-import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.TrafficSelector;
 
 /**
@@ -54,28 +53,21 @@
      *
      * @param selector the traffic selector used to match packets
      * @param priority the priority of the rule
-     * @param appId the application ID of the requester
+     * @param appId    the application ID of the requester
      */
     void requestPackets(TrafficSelector selector, PacketPriority priority,
                         ApplicationId appId);
 
     /**
-     * Requests that packets matching the given selector are punted from the
-     * dataplane to the controller. Clients of the PacketService should use
-     * this call to hint at the tableType in the dataplane valid for the selector.
+     * Cancels previous packet requests for packets matching the given
+     * selector to be punted from the dataplane to the controller.
      *
      * @param selector the traffic selector used to match packets
      * @param priority the priority of the rule
-     * @param appId the application ID of the requester
-     * @param tableType the abstract table Type in the dataplane where flowrules
-     *                  should be inserted to punt the selector packets to the
-     *                  control plane
+     * @param appId    the application ID of the requester
      */
-    void requestPackets(TrafficSelector selector, PacketPriority priority,
-                        ApplicationId appId, FlowRule.Type tableType);
-
-
-    // TODO add API to allow applications to revoke requests when they deactivate
+    void cancelPackets(TrafficSelector selector, PacketPriority priority,
+                       ApplicationId appId);
 
     /**
      * Emits the specified outbound packet onto the network.
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
index 450c23b..ff45cc0 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
@@ -34,16 +34,22 @@
     void emit(OutboundPacket packet);
 
     /**
-     * Register a request for packets. If the registration
-     * is successful the manager can proceed, otherwise it should
-     * consider these packet already available in the system.
+     * Requests intercept of packets that match the given selector.
      *
      * @param request a packet request
-     * @return a boolean indicating registration state.
+     * @return true if the first time the given selector was requested
      */
     boolean requestPackets(PacketRequest request);
 
     /**
+     * Cancels intercept of packets that match the given selector.
+     *
+     * @param request a packet request
+     * @return true if there is no other application requesting the given selector
+     */
+    boolean cancelPackets(PacketRequest request);
+
+    /**
      * Obtains all existing requests in the system.
      *
      * @return a set of packet requests
diff --git a/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java
new file mode 100644
index 0000000..afe936b
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.packet;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.flow.TrafficSelector;
+
+/**
+ * Test adapter for packet service.
+ */
+public class PacketServiceAdapter implements PacketService {
+    @Override
+    public void addProcessor(PacketProcessor processor, int priority) {
+    }
+
+    @Override
+    public void removeProcessor(PacketProcessor processor) {
+    }
+
+    @Override
+    public void requestPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
+    }
+
+    @Override
+    public void cancelPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
+    }
+
+    @Override
+    public void emit(OutboundPacket packet) {
+    }
+}
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
index 81cef49..4345aba 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimplePacketStore.java
@@ -52,6 +52,11 @@
     }
 
     @Override
+    public boolean cancelPackets(PacketRequest request) {
+        return requests.remove(request);
+    }
+
+    @Override
     public Set<PacketRequest> existingRequests() {
         return Collections.unmodifiableSet(requests);
     }
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index d5b12b3..b92067f 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -498,10 +498,10 @@
                 FlowRuleBatchOperation batchOperation =
                         request.asBatchOperation(deviceId);
 
-                FlowRuleProvider flowRuleProvider =
-                        getProvider(deviceId);
-
-                flowRuleProvider.executeBatch(batchOperation);
+                FlowRuleProvider flowRuleProvider = getProvider(deviceId);
+                if (flowRuleProvider != null) {
+                    flowRuleProvider.executeBatch(batchOperation);
+                }
 
                 break;
 
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index 28f1df0..d7ed927 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -29,10 +29,8 @@
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
-import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.flowobjective.ForwardingObjective;
@@ -62,9 +60,9 @@
 import java.util.concurrent.Executors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.security.AppGuard.checkPermission;
+import static org.slf4j.LoggerFactory.getLogger;
 
 
 /**
@@ -78,6 +76,10 @@
 
     private final Logger log = getLogger(getClass());
 
+    private static final String TABLE_TYPE_MSG =
+            "Table Type cannot be null. For requesting packets without " +
+                    "table hints, use other methods in the packetService API";
+
     private final PacketStoreDelegate delegate = new InternalStoreDelegate();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -125,7 +127,6 @@
     @Override
     public void addProcessor(PacketProcessor processor, int priority) {
         checkPermission(Permission.PACKET_EVENT);
-
         checkNotNull(processor, "Processor cannot be null");
         processors.put(priority, processor);
     }
@@ -133,7 +134,6 @@
     @Override
     public void removeProcessor(PacketProcessor processor) {
         checkPermission(Permission.PACKET_EVENT);
-
         checkNotNull(processor, "Processor cannot be null");
         processors.values().remove(processor);
     }
@@ -142,35 +142,26 @@
     public void requestPackets(TrafficSelector selector, PacketPriority priority,
                                ApplicationId appId) {
         checkPermission(Permission.PACKET_READ);
-
         checkNotNull(selector, "Selector cannot be null");
         checkNotNull(appId, "Application ID cannot be null");
 
-        PacketRequest request =
-                new DefaultPacketRequest(selector, priority, appId, FlowRule.Type.DEFAULT);
-
+        PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
         if (store.requestPackets(request)) {
             pushToAllDevices(request);
         }
     }
 
     @Override
-    public void requestPackets(TrafficSelector selector, PacketPriority priority,
-                               ApplicationId appId, FlowRule.Type tableType) {
+    public void cancelPackets(TrafficSelector selector, PacketPriority priority,
+                              ApplicationId appId) {
         checkPermission(Permission.PACKET_READ);
-
         checkNotNull(selector, "Selector cannot be null");
         checkNotNull(appId, "Application ID cannot be null");
-        checkNotNull(tableType, "Table Type cannot be null. For requesting packets +"
-                + "without table hints, use other methods in the packetService API");
 
-        PacketRequest request =
-                new DefaultPacketRequest(selector, priority, appId, tableType);
-
-        if (store.requestPackets(request)) {
-            pushToAllDevices(request);
+        PacketRequest request = new DefaultPacketRequest(selector, priority, appId);
+        if (store.cancelPackets(request)) {
+            removeFromAllDevices(request);
         }
-
     }
 
     /**
@@ -184,9 +175,20 @@
         }
     }
 
+
     /**
-     * Pushes flow rules to the device to request packets be sent to the
-     * controller.
+     * Removes packet request flow rule from all devices.
+     *
+     * @param request the packet request
+     */
+    private void removeFromAllDevices(PacketRequest request) {
+        for (Device device : deviceService.getDevices()) {
+            removeRule(device, request);
+        }
+    }
+
+    /**
+     * Pushes packet intercept flow rules to the device.
      *
      * @param device  the device to push the rules to
      * @param request the packet request
@@ -197,37 +199,54 @@
             return;
         }
 
-        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
-                                                            .punt()
-                                                            .build();
-
-        ForwardingObjective forwarding = DefaultForwardingObjective.builder()
-                .withPriority(request.priority().priorityValue())
-                .withSelector(request.selector())
-                .fromApp(appId)
-                .withFlag(ForwardingObjective.Flag.VERSATILE)
-                .withTreatment(treatment)
-                .makePermanent()
+        ForwardingObjective forwarding = createBuilder(request)
                 .add(new ObjectiveContext() {
                     @Override
-                    public void onSuccess(Objective objective) { }
-
-                    @Override
                     public void onError(Objective objective, ObjectiveError error) {
-                        log.warn("Failed to install packet request {}: {}",
-                                 request, error);
+                        log.warn("Failed to install packet request {}: {}", request, error);
                     }
                 });
 
         objectiveService.forward(device.id(), forwarding);
     }
 
+    /**
+     * Removes packet intercept flow rules from the device.
+     *
+     * @param device  the device to remove the rules deom
+     * @param request the packet request
+     */
+    private void removeRule(Device device, PacketRequest request) {
+        // Everything is pre-provisioned on ROADMs
+        if (device.type().equals(Device.Type.ROADM)) {
+            return;
+        }
+
+        ForwardingObjective forwarding = createBuilder(request)
+                .remove(new ObjectiveContext() {
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.warn("Failed to withdraw packet request {}: {}", request, error);
+                    }
+                });
+
+        objectiveService.forward(device.id(), forwarding);
+    }
+
+    private DefaultForwardingObjective.Builder createBuilder(PacketRequest request) {
+        return DefaultForwardingObjective.builder()
+                .withPriority(request.priority().priorityValue())
+                .withSelector(request.selector())
+                .fromApp(appId)
+                .withFlag(ForwardingObjective.Flag.VERSATILE)
+                .withTreatment(DefaultTrafficTreatment.builder().punt().build())
+                .makePermanent();
+    }
+
     @Override
     public void emit(OutboundPacket packet) {
         checkPermission(Permission.PACKET_WRITE);
-
         checkNotNull(packet, "Packet cannot be null");
-
         store.emit(packet);
     }
 
@@ -238,8 +257,7 @@
             return;
         }
 
-        final PacketProvider packetProvider = getProvider(device.providerId());
-
+        PacketProvider packetProvider = getProvider(device.providerId());
         if (packetProvider != null) {
             packetProvider.emit(packet);
         }
@@ -250,7 +268,7 @@
         return new InternalPacketProviderService(provider);
     }
 
-    // Personalized link provider service issued to the supplied provider.
+    // Personalized packet provider service issued to the supplied provider.
     private class InternalPacketProviderService
             extends AbstractProviderService<PacketProvider>
             implements PacketProviderService {
diff --git a/core/net/src/test/java/org/onosproject/net/host/impl/HostMonitorTest.java b/core/net/src/test/java/org/onosproject/net/host/impl/HostMonitorTest.java
index 1028ddc..679a888 100644
--- a/core/net/src/test/java/org/onosproject/net/host/impl/HostMonitorTest.java
+++ b/core/net/src/test/java/org/onosproject/net/host/impl/HostMonitorTest.java
@@ -15,20 +15,9 @@
  */
 package org.onosproject.net.host.impl;
 
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import org.junit.After;
 import org.junit.Test;
 import org.onlab.packet.ARP;
@@ -36,7 +25,6 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
-import org.onosproject.core.ApplicationId;
 import org.onlab.packet.VlanId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
@@ -47,31 +35,31 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceServiceAdapter;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.instructions.Instruction;
 import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
 import org.onosproject.net.host.HostProvider;
 import org.onosproject.net.host.InterfaceIpAddress;
 import org.onosproject.net.host.PortAddresses;
 import org.onosproject.net.packet.OutboundPacket;
-import org.onosproject.net.packet.PacketPriority;
-import org.onosproject.net.packet.PacketProcessor;
-import org.onosproject.net.packet.PacketService;
+import org.onosproject.net.packet.PacketServiceAdapter;
 import org.onosproject.net.provider.ProviderId;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
 
 public class HostMonitorTest {
 
     private static final IpAddress TARGET_IP_ADDR =
-        IpAddress.valueOf("10.0.0.1");
+            IpAddress.valueOf("10.0.0.1");
     private static final IpAddress SOURCE_ADDR =
-        IpAddress.valueOf("10.0.0.99");
+            IpAddress.valueOf("10.0.0.99");
     private static final InterfaceIpAddress IA1 =
-        new InterfaceIpAddress(SOURCE_ADDR, IpPrefix.valueOf("10.0.0.0/24"));
+            new InterfaceIpAddress(SOURCE_ADDR, IpPrefix.valueOf("10.0.0.0/24"));
     private MacAddress sourceMac = MacAddress.valueOf(1L);
 
     private HostMonitor hostMonitor;
@@ -132,7 +120,7 @@
 
         ConnectPoint cp = new ConnectPoint(devId, portNum);
         PortAddresses pa =
-            new PortAddresses(cp, Collections.singleton(IA1), sourceMac, VlanId.NONE);
+                new PortAddresses(cp, Collections.singleton(IA1), sourceMac, VlanId.NONE);
 
         expect(hostManager.getHostsByIp(TARGET_IP_ADDR))
                 .andReturn(Collections.<Host>emptySet()).anyTimes();
@@ -200,8 +188,8 @@
 
         ConnectPoint cp = new ConnectPoint(devId, portNum);
         PortAddresses pa =
-            new PortAddresses(cp, Collections.singleton(IA1), sourceMac,
-                              VlanId.vlanId(vlan));
+                new PortAddresses(cp, Collections.singleton(IA1), sourceMac,
+                                  VlanId.vlanId(vlan));
 
         expect(hostManager.getHostsByIp(TARGET_IP_ADDR))
                 .andReturn(Collections.<Host>emptySet()).anyTimes();
@@ -246,33 +234,14 @@
                           arp.getTargetProtocolAddress());
     }
 
-    class TestPacketService implements PacketService {
+    class TestPacketService extends PacketServiceAdapter {
 
         List<OutboundPacket> packets = new ArrayList<>();
 
         @Override
-        public void addProcessor(PacketProcessor processor, int priority) {
-        }
-
-        @Override
-        public void removeProcessor(PacketProcessor processor) {
-        }
-
-        @Override
         public void emit(OutboundPacket packet) {
             packets.add(packet);
         }
-
-        @Override
-        public void requestPackets(TrafficSelector selector,
-                                   PacketPriority priority, ApplicationId appId) {
-        }
-
-        @Override
-        public void requestPackets(TrafficSelector selector,
-                                   PacketPriority priority, ApplicationId appId,
-                                   FlowRule.Type tableType) {
-        }
     }
 
     class TestDeviceService extends DeviceServiceAdapter {
diff --git a/core/net/src/test/java/org/onosproject/net/proxyarp/impl/ProxyArpManagerTest.java b/core/net/src/test/java/org/onosproject/net/proxyarp/impl/ProxyArpManagerTest.java
index 9e45a34..c79d44c 100644
--- a/core/net/src/test/java/org/onosproject/net/proxyarp/impl/ProxyArpManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/proxyarp/impl/ProxyArpManagerTest.java
@@ -15,21 +15,7 @@
  */
 package org.onosproject.net.proxyarp.impl;
 
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.ARP;
@@ -38,7 +24,6 @@
 import org.onlab.packet.Ip4Prefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
-import org.onosproject.core.ApplicationId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultHost;
 import org.onosproject.net.Device;
@@ -51,8 +36,6 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.instructions.Instruction;
 import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
 import org.onosproject.net.host.HostService;
@@ -61,12 +44,17 @@
 import org.onosproject.net.link.LinkListener;
 import org.onosproject.net.link.LinkService;
 import org.onosproject.net.packet.OutboundPacket;
-import org.onosproject.net.packet.PacketPriority;
-import org.onosproject.net.packet.PacketProcessor;
-import org.onosproject.net.packet.PacketService;
+import org.onosproject.net.packet.PacketServiceAdapter;
 import org.onosproject.net.provider.ProviderId;
 
-import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
 
 /**
  * Tests for the {@link ProxyArpManager} class.
@@ -208,17 +196,17 @@
         for (int i = 1; i <= NUM_ADDRESS_PORTS; i++) {
             ConnectPoint cp = new ConnectPoint(getDeviceId(i), P1);
             Ip4Prefix prefix1 =
-                Ip4Prefix.valueOf("10.0." + (2 * i - 1) + ".0/24");
+                    Ip4Prefix.valueOf("10.0." + (2 * i - 1) + ".0/24");
             Ip4Address addr1 =
-                Ip4Address.valueOf("10.0." + (2 * i - 1) + ".1");
+                    Ip4Address.valueOf("10.0." + (2 * i - 1) + ".1");
             Ip4Prefix prefix2 = Ip4Prefix.valueOf("10.0." + (2 * i) + ".0/24");
             Ip4Address addr2 = Ip4Address.valueOf("10.0." + (2 * i) + ".1");
             InterfaceIpAddress ia1 = new InterfaceIpAddress(addr1, prefix1);
             InterfaceIpAddress ia2 = new InterfaceIpAddress(addr2, prefix2);
             PortAddresses pa1 =
-                new PortAddresses(cp, Sets.newHashSet(ia1),
-                                  MacAddress.valueOf(2 * i - 1),
-                                  VlanId.vlanId((short) 1));
+                    new PortAddresses(cp, Sets.newHashSet(ia1),
+                                      MacAddress.valueOf(2 * i - 1),
+                                      VlanId.vlanId((short) 1));
             PortAddresses pa2 =
                     new PortAddresses(cp, Sets.newHashSet(ia2),
                                       MacAddress.valueOf(2 * i),
@@ -235,7 +223,7 @@
 
         for (int i = 1; i <= NUM_FLOOD_PORTS; i++) {
             ConnectPoint cp = new ConnectPoint(getDeviceId(i + NUM_ADDRESS_PORTS),
-                    P1);
+                                               P1);
             expect(hostService.getAddressBindingsForPort(cp))
                     .andReturn(Collections.<PortAddresses>emptySet()).anyTimes();
         }
@@ -279,13 +267,13 @@
     @Test
     public void testReplyKnown() {
         Host replyer = new DefaultHost(PID, HID1, MAC1, VLAN1, getLocation(4),
-                Collections.singleton(IP1));
+                                       Collections.singleton(IP1));
 
         Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, getLocation(5),
-                Collections.singleton(IP2));
+                                         Collections.singleton(IP2));
 
         expect(hostService.getHostsByIp(IP1))
-            .andReturn(Collections.singleton(replyer));
+                .andReturn(Collections.singleton(replyer));
         expect(hostService.getHost(HID2)).andReturn(requestor);
 
         replay(hostService);
@@ -307,7 +295,7 @@
     @Test
     public void testReplyUnknown() {
         Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, getLocation(5),
-                Collections.singleton(IP2));
+                                         Collections.singleton(IP2));
 
         expect(hostService.getHostsByIp(IP1))
                 .andReturn(Collections.<Host>emptySet());
@@ -331,10 +319,10 @@
     @Test
     public void testReplyDifferentVlan() {
         Host replyer = new DefaultHost(PID, HID1, MAC1, VLAN2, getLocation(4),
-                Collections.singleton(IP1));
+                                       Collections.singleton(IP1));
 
         Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, getLocation(5),
-                Collections.singleton(IP2));
+                                         Collections.singleton(IP2));
 
         expect(hostService.getHostsByIp(IP1))
                 .andReturn(Collections.singleton(replyer));
@@ -358,7 +346,7 @@
         MacAddress secondMac = MacAddress.valueOf(2L);
 
         Host requestor = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC1,
-                Collections.singleton(theirIp));
+                                         Collections.singleton(theirIp));
 
         expect(hostService.getHost(HID2)).andReturn(requestor);
         replay(hostService);
@@ -390,7 +378,7 @@
 
         // Request for a valid external IP address but coming in the wrong port
         Ethernet arpRequest = buildArp(ARP.OP_REQUEST, MAC1, null, theirIp,
-                Ip4Address.valueOf("10.0.3.1"));
+                                       Ip4Address.valueOf("10.0.3.1"));
         proxyArp.reply(arpRequest, LOC1);
         assertEquals(0, packetService.packets.size());
 
@@ -433,7 +421,7 @@
     @Test
     public void testForwardToHost() {
         Host host1 = new DefaultHost(PID, HID1, MAC1, VLAN1, LOC1,
-                Collections.singleton(IP1));
+                                     Collections.singleton(IP1));
 
         expect(hostService.getHost(HID1)).andReturn(host1);
         replay(hostService);
@@ -476,17 +464,17 @@
         assertEquals(NUM_FLOOD_PORTS - 1, packetService.packets.size());
 
         Collections.sort(packetService.packets,
-            new Comparator<OutboundPacket>() {
-                @Override
-                public int compare(OutboundPacket o1, OutboundPacket o2) {
-                    return o1.sendThrough().uri().compareTo(o2.sendThrough().uri());
-                }
-            });
+                         new Comparator<OutboundPacket>() {
+                             @Override
+                             public int compare(OutboundPacket o1, OutboundPacket o2) {
+                                 return o1.sendThrough().uri().compareTo(o2.sendThrough().uri());
+                             }
+                         });
 
 
         for (int i = 0; i < NUM_FLOOD_PORTS - 1; i++) {
             ConnectPoint cp = new ConnectPoint(getDeviceId(NUM_ADDRESS_PORTS + i + 1),
-                    PortNumber.portNumber(1));
+                                               PortNumber.portNumber(1));
 
             OutboundPacket outboundPacket = packetService.packets.get(i);
             verifyPacketOut(packet, cp, outboundPacket);
@@ -497,11 +485,11 @@
      * Verifies the given packet was sent out the given port.
      *
      * @param expected the packet that was expected to be sent
-     * @param outPort the port the packet was expected to be sent out
-     * @param actual the actual OutboundPacket to verify
+     * @param outPort  the port the packet was expected to be sent out
+     * @param actual   the actual OutboundPacket to verify
      */
     private void verifyPacketOut(Ethernet expected, ConnectPoint outPort,
-            OutboundPacket actual) {
+                                 OutboundPacket actual) {
         assertArrayEquals(expected.serialize(), actual.data().array());
         assertEquals(1, actual.treatment().immediate().size());
         assertEquals(outPort.deviceId(), actual.sendThrough());
@@ -530,12 +518,12 @@
      * @param opcode opcode of the ARP packet
      * @param srcMac source MAC address
      * @param dstMac destination MAC address, or null if this is a request
-     * @param srcIp source IP address
-     * @param dstIp destination IP address
+     * @param srcIp  source IP address
+     * @param dstIp  destination IP address
      * @return the ARP packet
      */
     private Ethernet buildArp(short opcode, MacAddress srcMac, MacAddress dstMac,
-            Ip4Address srcIp, Ip4Address dstIp) {
+                              Ip4Address srcIp, Ip4Address dstIp) {
         Ethernet eth = new Ethernet();
 
         if (dstMac == null) {
@@ -574,32 +562,14 @@
      * Test PacketService implementation that simply stores OutboundPackets
      * passed to {@link #emit(OutboundPacket)} for later verification.
      */
-    class TestPacketService implements PacketService {
+    class TestPacketService extends PacketServiceAdapter {
 
         List<OutboundPacket> packets = new ArrayList<>();
 
         @Override
-        public void addProcessor(PacketProcessor processor, int priority) {
-        }
-
-        @Override
-        public void removeProcessor(PacketProcessor processor) {
-        }
-
-        @Override
         public void emit(OutboundPacket packet) {
             packets.add(packet);
         }
 
-        @Override
-        public void requestPackets(TrafficSelector selector,
-                                   PacketPriority priority, ApplicationId appId) {
-        }
-
-        @Override
-        public void requestPackets(TrafficSelector selector,
-                                   PacketPriority priority, ApplicationId appId,
-                                   FlowRule.Type tableType) {
-        }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 5357fa8..027378a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.packet.impl;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -25,6 +26,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketEvent.Type;
@@ -41,8 +43,10 @@
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -96,7 +100,7 @@
 
     @Activate
     public void activate() {
-        messageHandlingExecutor =  Executors.newFixedThreadPool(
+        messageHandlingExecutor = Executors.newFixedThreadPool(
                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
                 groupedThreads("onos/store/packet", "message-handlers"));
 
@@ -104,7 +108,7 @@
                                            new InternalClusterMessageHandler(),
                                            messageHandlingExecutor);
 
-        tracker =  new PacketRequestTracker();
+        tracker = new PacketRequestTracker();
 
         log.info("Started");
     }
@@ -141,6 +145,11 @@
     }
 
     @Override
+    public boolean cancelPackets(PacketRequest request) {
+        return tracker.remove(request);
+    }
+
+    @Override
     public Set<PacketRequest> existingRequests() {
         return tracker.requests();
     }
@@ -162,47 +171,49 @@
 
     private class PacketRequestTracker {
 
-        private ConsistentMap<PacketRequest, Boolean> requests;
+        private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
 
         public PacketRequestTracker() {
-            requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
-                    .withName("packet-requests")
+            requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
+                    .withName("onos-packet-requests")
                     .withPartitionsDisabled()
-                    .withSerializer(Serializer.using(
-                            new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
-                    .withSerializer(new Serializer() {
-                        KryoNamespace kryo = new KryoNamespace.Builder()
-                                .register(KryoNamespaces.API)
-                                .build();
-
-                        @Override
-                        public <T> byte[] encode(T object) {
-                            return kryo.serialize(object);
-                        }
-
-                        @Override
-                        public <T> T decode(byte[] bytes) {
-                            return kryo.deserialize(bytes);
-                        }
-                    }).build();
+                    .withSerializer(Serializer.using(KryoNamespaces.API))
+                    .build();
         }
 
         public boolean add(PacketRequest request) {
-            if (requests.putIfAbsent(request, true) == null) {
-                return true;
+            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
+            if (old != null && old.value().contains(request)) {
+                return false;
             }
-            return false;
+            // FIXME: add retry logic using a random delay
+            Set<PacketRequest> newSet = new HashSet<>();
+            newSet.add(request);
+            if (old == null) {
+                return requests.putIfAbsent(request.selector(), newSet) == null;
+            }
+            newSet.addAll(old.value());
+            return requests.replace(request.selector(), old.version(), newSet);
         }
 
         public boolean remove(PacketRequest request) {
-            if (requests.remove(request) == null) {
+            Versioned<Set<PacketRequest>> old = requests.get(request.selector());
+            if (old == null || !old.value().contains(request)) {
                 return false;
             }
-            return true;
+            // FIXME: add retry logic using a random delay
+            Set<PacketRequest> newSet = new HashSet<>(old.value());
+            newSet.remove(request);
+            if (newSet.isEmpty()) {
+                return requests.remove(request.selector(), old.version());
+            }
+            return requests.replace(request.selector(), old.version(), newSet);
         }
 
         public Set<PacketRequest> requests() {
-            return requests.keySet();
+            ImmutableSet.Builder<PacketRequest> builder = ImmutableSet.builder();
+            requests.values().forEach(v -> builder.addAll(v.value()));
+            return builder.build();
         }
 
     }