Packet throttle support

Change-Id: I6f2da5ed25f794561349013bfcbf9afa85d5e190
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index b9ac1a4..a020da7 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -59,12 +59,17 @@
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onosproject.net.packet.PacketInFilter;
+import org.onosproject.net.packet.PacketInFilter.FilterAction;
 import org.slf4j.Logger;
 
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.groupedThreads;
@@ -121,6 +126,8 @@
     private ApplicationId appId;
     private NodeId localNodeId;
 
+    private List<PacketInFilter> filters = new CopyOnWriteArrayList<>();
+
     @Activate
     public void activate() {
         eventHandlingExecutor = Executors.newSingleThreadExecutor(
@@ -189,6 +196,16 @@
     }
 
     @Override
+    public void addFilter(PacketInFilter filter) {
+        filters.add(filter);
+    }
+
+    @Override
+    public void removeFilter(PacketInFilter filter) {
+        filters.remove(filter);
+    }
+
+    @Override
     public List<PacketProcessorEntry> getProcessors() {
         checkPermission(PACKET_READ);
         return ImmutableList.copyOf(processors);
@@ -372,6 +389,19 @@
         store.emit(packet);
     }
 
+    @Override
+    public List<PacketInFilter> getFilters() {
+        return ImmutableList.copyOf(filters);
+    }
+
+    @Override
+    public void clearFilters() {
+        for (PacketInFilter filter: filters) {
+            filter.stop();
+        }
+        filters.clear();
+    }
+
     private void localEmit(OutboundPacket packet) {
         Device device = deviceService.getDevice(packet.sendThrough());
         if (device == null) {
@@ -399,8 +429,53 @@
             super(provider);
         }
 
+        /**
+         * Loops through all packet filters and checks if the filter is
+         * enabled and allowed to be processed.
+         * It increments the counter to track the pending packets to be
+         * processed based on the filter selected.
+         *
+         * @param context PackerContext holding the packet information
+         * @return FilterAction Action decided for the based on the filter applied
+         */
+        private FilterAction prePacketProcess(PacketContext context) {
+            FilterAction filterAction = FilterAction.FILTER_INVALID;
+            for (PacketInFilter filter: filters) {
+                filterAction = filter.preProcess(context);
+                if (filterAction == FilterAction.FILTER_DISABLED) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("{}: filter is disabled during pre processing", filter.name());
+                    }
+                    continue;
+                }
+                if (filterAction == FilterAction.PACKET_DENY) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("{}: overflow counter after dropping packet is: {}", filter.name(),
+                                filter.droppedPackets());
+                    }
+                    break;
+                }
+                if (filterAction == FilterAction.PACKET_ALLOW) {
+                    if (log.isTraceEnabled()) {
+                        log.trace("{}: counter after picked for processing is: {}", filter.name(),
+                                  filter.pendingPackets());
+                    }
+                    break;
+                }
+            }
+            return filterAction;
+        }
+
         @Override
         public void processPacket(PacketContext context) {
+            FilterAction filterAction = prePacketProcess(context);
+
+            if (filterAction == FilterAction.PACKET_DENY) {
+                if (log.isTraceEnabled()) {
+                    log.trace("The packet is dropped as crossed the maxcount");
+                }
+                return;
+            }
             // TODO filter packets sent to processors based on registrations
             for (ProcessorEntry entry : processors) {
                 try {