Packet throttle support
Change-Id: I6f2da5ed25f794561349013bfcbf9afa85d5e190
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index b9ac1a4..a020da7 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -59,12 +59,17 @@
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onosproject.net.packet.PacketInFilter;
+import org.onosproject.net.packet.PacketInFilter.FilterAction;
import org.slf4j.Logger;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
@@ -121,6 +126,8 @@
private ApplicationId appId;
private NodeId localNodeId;
+ private List<PacketInFilter> filters = new CopyOnWriteArrayList<>();
+
@Activate
public void activate() {
eventHandlingExecutor = Executors.newSingleThreadExecutor(
@@ -189,6 +196,16 @@
}
@Override
+ public void addFilter(PacketInFilter filter) {
+ filters.add(filter);
+ }
+
+ @Override
+ public void removeFilter(PacketInFilter filter) {
+ filters.remove(filter);
+ }
+
+ @Override
public List<PacketProcessorEntry> getProcessors() {
checkPermission(PACKET_READ);
return ImmutableList.copyOf(processors);
@@ -372,6 +389,19 @@
store.emit(packet);
}
+ @Override
+ public List<PacketInFilter> getFilters() {
+ return ImmutableList.copyOf(filters);
+ }
+
+ @Override
+ public void clearFilters() {
+ for (PacketInFilter filter: filters) {
+ filter.stop();
+ }
+ filters.clear();
+ }
+
private void localEmit(OutboundPacket packet) {
Device device = deviceService.getDevice(packet.sendThrough());
if (device == null) {
@@ -399,8 +429,53 @@
super(provider);
}
+ /**
+ * Loops through all packet filters and checks if the filter is
+ * enabled and allowed to be processed.
+ * It increments the counter to track the pending packets to be
+ * processed based on the filter selected.
+ *
+ * @param context PackerContext holding the packet information
+ * @return FilterAction Action decided for the based on the filter applied
+ */
+ private FilterAction prePacketProcess(PacketContext context) {
+ FilterAction filterAction = FilterAction.FILTER_INVALID;
+ for (PacketInFilter filter: filters) {
+ filterAction = filter.preProcess(context);
+ if (filterAction == FilterAction.FILTER_DISABLED) {
+ if (log.isTraceEnabled()) {
+ log.trace("{}: filter is disabled during pre processing", filter.name());
+ }
+ continue;
+ }
+ if (filterAction == FilterAction.PACKET_DENY) {
+ if (log.isTraceEnabled()) {
+ log.trace("{}: overflow counter after dropping packet is: {}", filter.name(),
+ filter.droppedPackets());
+ }
+ break;
+ }
+ if (filterAction == FilterAction.PACKET_ALLOW) {
+ if (log.isTraceEnabled()) {
+ log.trace("{}: counter after picked for processing is: {}", filter.name(),
+ filter.pendingPackets());
+ }
+ break;
+ }
+ }
+ return filterAction;
+ }
+
@Override
public void processPacket(PacketContext context) {
+ FilterAction filterAction = prePacketProcess(context);
+
+ if (filterAction == FilterAction.PACKET_DENY) {
+ if (log.isTraceEnabled()) {
+ log.trace("The packet is dropped as crossed the maxcount");
+ }
+ return;
+ }
// TODO filter packets sent to processors based on registrations
for (ProcessorEntry entry : processors) {
try {