ONOS- 2946 Adding ability to view packet processor statistics.

Change-Id: Ic55ec670b197b1ee08f2d11f97658fd614da1614
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index a0bc693..0337987 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -15,7 +15,8 @@
  */
 package org.onosproject.net.packet.impl;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -43,6 +44,7 @@
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketProcessorEntry;
 import org.onosproject.net.packet.PacketProvider;
 import org.onosproject.net.packet.PacketProviderRegistry;
 import org.onosproject.net.packet.PacketProviderService;
@@ -55,8 +57,6 @@
 import org.slf4j.Logger;
 
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -102,18 +102,18 @@
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
-    private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
+    private final List<ProcessorEntry> processors = Lists.newCopyOnWriteArrayList();
 
     private ApplicationId appId;
 
     @Activate
     public void activate() {
         eventHandlingExecutor = Executors.newSingleThreadExecutor(
-                        groupedThreads("onos/net/packet", "event-handler"));
+                groupedThreads("onos/net/packet", "event-handler"));
         appId = coreService.getAppId(CoreService.CORE_APP_NAME);
         store.setDelegate(delegate);
         deviceService.addListener(deviceListener);
-        // TODO: Should we request packets for all existing devices? I believe we should.
+        store.existingRequests().forEach(this::pushToAllDevices);
         log.info("Started");
     }
 
@@ -121,6 +121,7 @@
     public void deactivate() {
         store.unsetDelegate(delegate);
         deviceService.removeListener(deviceListener);
+        store.existingRequests().forEach(this::removeFromAllDevices);
         eventHandlingExecutor.shutdown();
         log.info("Stopped");
     }
@@ -129,19 +130,35 @@
     public void addProcessor(PacketProcessor processor, int priority) {
         checkPermission(PACKET_EVENT);
         checkNotNull(processor, "Processor cannot be null");
-        processors.put(priority, processor);
+        ProcessorEntry entry = new ProcessorEntry(processor, priority);
+
+        // Insert the new processor according to its priority.
+        int i = 0;
+        for (; i < processors.size(); i++) {
+            if (priority < processors.get(i).priority()) {
+                break;
+            }
+        }
+        processors.add(i, entry);
     }
 
     @Override
     public void removeProcessor(PacketProcessor processor) {
         checkPermission(PACKET_EVENT);
         checkNotNull(processor, "Processor cannot be null");
-        processors.values().remove(processor);
+
+        // Remove the processor entry.
+        for (int i = 0; i < processors.size(); i++) {
+            if (processors.get(i).processor() == processor) {
+                processors.remove(i);
+                break;
+            }
+        }
     }
 
     @Override
-    public Map<Integer, PacketProcessor> getProcessors() {
-        return ImmutableMap.copyOf(processors);
+    public List<PacketProcessorEntry> getProcessors() {
+        return ImmutableList.copyOf(processors);
     }
 
     @Override
@@ -176,6 +193,18 @@
     }
 
     /**
+     * Pushes all rules to the specified device.
+     *
+     * @param device device on which to install packet request flows
+     */
+    private void pushRulesToDevice(Device device) {
+        log.debug("Pushing packet requests to device {}", device.id());
+        for (PacketRequest request : store.existingRequests()) {
+            pushRule(device, request);
+        }
+    }
+
+    /**
      * Pushes a packet request flow rule to all devices.
      *
      * @param request the packet request
@@ -187,16 +216,13 @@
         }
     }
 
-
     /**
      * Removes packet request flow rule from all devices.
      *
      * @param request the packet request
      */
     private void removeFromAllDevices(PacketRequest request) {
-        for (Device device : deviceService.getDevices()) {
-            removeRule(device, request);
-        }
+        deviceService.getAvailableDevices().forEach(d -> removeRule(d, request));
     }
 
     /**
@@ -232,7 +258,6 @@
         if (!device.type().equals(Device.Type.SWITCH)) {
             return;
         }
-
         ForwardingObjective forwarding = createBuilder(request)
                 .remove(new ObjectiveContext() {
                     @Override
@@ -241,7 +266,6 @@
                                  request, device.id(), error);
                     }
                 });
-
         objectiveService.forward(device.id(), forwarding);
     }
 
@@ -263,12 +287,10 @@
     }
 
     private void localEmit(OutboundPacket packet) {
-        final Device device = deviceService.getDevice(packet.sendThrough());
-
+        Device device = deviceService.getDevice(packet.sendThrough());
         if (device == null) {
             return;
         }
-
         PacketProvider packetProvider = getProvider(device.providerId());
         if (packetProvider != null) {
             packetProvider.emit(packet);
@@ -280,7 +302,9 @@
         return new InternalPacketProviderService(provider);
     }
 
-    // Personalized packet provider service issued to the supplied provider.
+    /**
+     * Personalized packet provider service issued to the supplied provider.
+     */
     private class InternalPacketProviderService
             extends AbstractProviderService<PacketProvider>
             implements PacketProviderService {
@@ -292,8 +316,10 @@
         @Override
         public void processPacket(PacketContext context) {
             // TODO filter packets sent to processors based on registrations
-            for (PacketProcessor processor : processors.values()) {
-                processor.process(context);
+            for (ProcessorEntry entry : processors) {
+                long start = System.nanoTime();
+                entry.processor().process(context);
+                entry.addNanos(System.nanoTime() - start);
             }
         }
 
@@ -319,17 +345,14 @@
                 try {
                     Device device = event.subject();
                     switch (event.type()) {
-                    case DEVICE_ADDED:
-                    case DEVICE_AVAILABILITY_CHANGED:
-                        if (deviceService.isAvailable(event.subject().id())) {
-                            log.debug("Pushing packet requests to device {}", event.subject().id());
-                            for (PacketRequest request : store.existingRequests()) {
-                                pushRule(device, request);
+                        case DEVICE_ADDED:
+                        case DEVICE_AVAILABILITY_CHANGED:
+                            if (deviceService.isAvailable(event.subject().id())) {
+                                pushRulesToDevice(device);
                             }
-                        }
-                        break;
-                    default:
-                        break;
+                            break;
+                        default:
+                            break;
                     }
                 } catch (Exception e) {
                     log.warn("Failed to process {}", event, e);
@@ -338,4 +361,48 @@
         }
     }
 
+    /**
+     * Entity for tracking stats for a packet processor.
+     */
+    private class ProcessorEntry implements PacketProcessorEntry {
+        private final PacketProcessor processor;
+        private final int priority;
+        private long invocations = 0;
+        private long nanos = 0;
+
+        public ProcessorEntry(PacketProcessor processor, int priority) {
+            this.processor = processor;
+            this.priority = priority;
+        }
+
+        @Override
+        public PacketProcessor processor() {
+            return processor;
+        }
+
+        @Override
+        public int priority() {
+            return priority;
+        }
+
+        @Override
+        public long invocations() {
+            return invocations;
+        }
+
+        @Override
+        public long totalNanos() {
+            return nanos;
+        }
+
+        @Override
+        public long averageNanos() {
+            return invocations > 0 ? nanos / invocations : 0;
+        }
+
+        void addNanos(long nanos) {
+            this.nanos += nanos;
+            this.invocations++;
+        }
+    }
 }