ONOS- 2946 Adding ability to view packet processor statistics.

Change-Id: Ic55ec670b197b1ee08f2d11f97658fd614da1614
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketProcessorEntry.java b/core/api/src/main/java/org/onosproject/net/packet/PacketProcessorEntry.java
new file mode 100644
index 0000000..40386fb
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketProcessorEntry.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.packet;
+
+/**
+ * Packet processor entry tracking the processor, its priority and
+ * time consumption.
+ */
+public interface PacketProcessorEntry {
+
+    /**
+     * Returns the packet processor.
+     *
+     * @return packet processor
+     */
+    PacketProcessor processor();
+
+    /**
+     * Returns the packet processor priority.
+     *
+     * @return processor priority
+     */
+    int priority();
+
+    /**
+     * Returns the number of invocations.
+     *
+     * @return number of invocations
+     */
+    long invocations();
+
+    /**
+     * Returns the total time, in nanoseconds, spent processing packets.
+     *
+     * @return total time in nanos
+     */
+    long totalNanos();
+
+    /**
+     * Returns the average time, in nanoseconds, spent processing packets.
+     *
+     * @return average time in nanos
+     */
+    long averageNanos();
+}
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketService.java b/core/api/src/main/java/org/onosproject/net/packet/PacketService.java
index 98f4d8e..2e7a1b9 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketService.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketService.java
@@ -20,7 +20,6 @@
 import org.onosproject.net.flow.TrafficSelector;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * Service for intercepting data plane packets and for emitting synthetic
@@ -52,13 +51,12 @@
     void removeProcessor(PacketProcessor processor);
 
     /**
-     * Returns priority bindings of all registered packet processors.
+     * Returns priority bindings of all registered packet processor entries.
      *
-     * @return list of existing packet processors
+     * @return list of existing packet processor entries
      */
     @Beta
-    // TODO: Consider returning list of PacketProcessorEntry with processor, priority and stats
-    Map<Integer, PacketProcessor> getProcessors();
+    List<PacketProcessorEntry> getProcessors();
 
     /**
      * Requests that packets matching the given selector are punted from the
diff --git a/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java
index c438659..2993ce6 100644
--- a/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/net/packet/PacketServiceAdapter.java
@@ -19,7 +19,6 @@
 import org.onosproject.net.flow.TrafficSelector;
 
 import java.util.List;
-import java.util.Map;
 
 /**
  * Test adapter for packet service.
@@ -34,7 +33,7 @@
     }
 
     @Override
-    public Map<Integer, PacketProcessor> getProcessors() {
+    public List<PacketProcessorEntry> getProcessors() {
         return null;
     }
 
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index a0bc693..0337987 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -15,7 +15,8 @@
  */
 package org.onosproject.net.packet.impl;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -43,6 +44,7 @@
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketProcessorEntry;
 import org.onosproject.net.packet.PacketProvider;
 import org.onosproject.net.packet.PacketProviderRegistry;
 import org.onosproject.net.packet.PacketProviderService;
@@ -55,8 +57,6 @@
 import org.slf4j.Logger;
 
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -102,18 +102,18 @@
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
-    private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
+    private final List<ProcessorEntry> processors = Lists.newCopyOnWriteArrayList();
 
     private ApplicationId appId;
 
     @Activate
     public void activate() {
         eventHandlingExecutor = Executors.newSingleThreadExecutor(
-                        groupedThreads("onos/net/packet", "event-handler"));
+                groupedThreads("onos/net/packet", "event-handler"));
         appId = coreService.getAppId(CoreService.CORE_APP_NAME);
         store.setDelegate(delegate);
         deviceService.addListener(deviceListener);
-        // TODO: Should we request packets for all existing devices? I believe we should.
+        store.existingRequests().forEach(this::pushToAllDevices);
         log.info("Started");
     }
 
@@ -121,6 +121,7 @@
     public void deactivate() {
         store.unsetDelegate(delegate);
         deviceService.removeListener(deviceListener);
+        store.existingRequests().forEach(this::removeFromAllDevices);
         eventHandlingExecutor.shutdown();
         log.info("Stopped");
     }
@@ -129,19 +130,35 @@
     public void addProcessor(PacketProcessor processor, int priority) {
         checkPermission(PACKET_EVENT);
         checkNotNull(processor, "Processor cannot be null");
-        processors.put(priority, processor);
+        ProcessorEntry entry = new ProcessorEntry(processor, priority);
+
+        // Insert the new processor according to its priority.
+        int i = 0;
+        for (; i < processors.size(); i++) {
+            if (priority < processors.get(i).priority()) {
+                break;
+            }
+        }
+        processors.add(i, entry);
     }
 
     @Override
     public void removeProcessor(PacketProcessor processor) {
         checkPermission(PACKET_EVENT);
         checkNotNull(processor, "Processor cannot be null");
-        processors.values().remove(processor);
+
+        // Remove the processor entry.
+        for (int i = 0; i < processors.size(); i++) {
+            if (processors.get(i).processor() == processor) {
+                processors.remove(i);
+                break;
+            }
+        }
     }
 
     @Override
-    public Map<Integer, PacketProcessor> getProcessors() {
-        return ImmutableMap.copyOf(processors);
+    public List<PacketProcessorEntry> getProcessors() {
+        return ImmutableList.copyOf(processors);
     }
 
     @Override
@@ -176,6 +193,18 @@
     }
 
     /**
+     * Pushes all rules to the specified device.
+     *
+     * @param device device on which to install packet request flows
+     */
+    private void pushRulesToDevice(Device device) {
+        log.debug("Pushing packet requests to device {}", device.id());
+        for (PacketRequest request : store.existingRequests()) {
+            pushRule(device, request);
+        }
+    }
+
+    /**
      * Pushes a packet request flow rule to all devices.
      *
      * @param request the packet request
@@ -187,16 +216,13 @@
         }
     }
 
-
     /**
      * Removes packet request flow rule from all devices.
      *
      * @param request the packet request
      */
     private void removeFromAllDevices(PacketRequest request) {
-        for (Device device : deviceService.getDevices()) {
-            removeRule(device, request);
-        }
+        deviceService.getAvailableDevices().forEach(d -> removeRule(d, request));
     }
 
     /**
@@ -232,7 +258,6 @@
         if (!device.type().equals(Device.Type.SWITCH)) {
             return;
         }
-
         ForwardingObjective forwarding = createBuilder(request)
                 .remove(new ObjectiveContext() {
                     @Override
@@ -241,7 +266,6 @@
                                  request, device.id(), error);
                     }
                 });
-
         objectiveService.forward(device.id(), forwarding);
     }
 
@@ -263,12 +287,10 @@
     }
 
     private void localEmit(OutboundPacket packet) {
-        final Device device = deviceService.getDevice(packet.sendThrough());
-
+        Device device = deviceService.getDevice(packet.sendThrough());
         if (device == null) {
             return;
         }
-
         PacketProvider packetProvider = getProvider(device.providerId());
         if (packetProvider != null) {
             packetProvider.emit(packet);
@@ -280,7 +302,9 @@
         return new InternalPacketProviderService(provider);
     }
 
-    // Personalized packet provider service issued to the supplied provider.
+    /**
+     * Personalized packet provider service issued to the supplied provider.
+     */
     private class InternalPacketProviderService
             extends AbstractProviderService<PacketProvider>
             implements PacketProviderService {
@@ -292,8 +316,10 @@
         @Override
         public void processPacket(PacketContext context) {
             // TODO filter packets sent to processors based on registrations
-            for (PacketProcessor processor : processors.values()) {
-                processor.process(context);
+            for (ProcessorEntry entry : processors) {
+                long start = System.nanoTime();
+                entry.processor().process(context);
+                entry.addNanos(System.nanoTime() - start);
             }
         }
 
@@ -319,17 +345,14 @@
                 try {
                     Device device = event.subject();
                     switch (event.type()) {
-                    case DEVICE_ADDED:
-                    case DEVICE_AVAILABILITY_CHANGED:
-                        if (deviceService.isAvailable(event.subject().id())) {
-                            log.debug("Pushing packet requests to device {}", event.subject().id());
-                            for (PacketRequest request : store.existingRequests()) {
-                                pushRule(device, request);
+                        case DEVICE_ADDED:
+                        case DEVICE_AVAILABILITY_CHANGED:
+                            if (deviceService.isAvailable(event.subject().id())) {
+                                pushRulesToDevice(device);
                             }
-                        }
-                        break;
-                    default:
-                        break;
+                            break;
+                        default:
+                            break;
                     }
                 } catch (Exception e) {
                     log.warn("Failed to process {}", event, e);
@@ -338,4 +361,48 @@
         }
     }
 
+    /**
+     * Entity for tracking stats for a packet processor.
+     */
+    private class ProcessorEntry implements PacketProcessorEntry {
+        private final PacketProcessor processor;
+        private final int priority;
+        private long invocations = 0;
+        private long nanos = 0;
+
+        public ProcessorEntry(PacketProcessor processor, int priority) {
+            this.processor = processor;
+            this.priority = priority;
+        }
+
+        @Override
+        public PacketProcessor processor() {
+            return processor;
+        }
+
+        @Override
+        public int priority() {
+            return priority;
+        }
+
+        @Override
+        public long invocations() {
+            return invocations;
+        }
+
+        @Override
+        public long totalNanos() {
+            return nanos;
+        }
+
+        @Override
+        public long averageNanos() {
+            return invocations > 0 ? nanos / invocations : 0;
+        }
+
+        void addNanos(long nanos) {
+            this.nanos += nanos;
+            this.invocations++;
+        }
+    }
 }