[AETHER-1230] Prevents the ejection of the P4RuntimePacketProvider listener

Offloads to a predictable executor the handling of the packet-ins.
Without this the listener can be easily ejected as its execution includes
the PacketProviderService and potentially all the processors installed by the apps
and everything is executed in the context of the core dispatcher thread

Change-Id: I2e5f57fdf0a0a21a8f4f3c8326e4268467328833
(cherry picked from commit 283e217508b46d002f9e4640ea62cd7a63334a8c)
diff --git a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/OsgiPropertyConstants.java b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/OsgiPropertyConstants.java
new file mode 100644
index 0000000..9dadb40
--- /dev/null
+++ b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/OsgiPropertyConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.p4runtime.packet.impl;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {}
+
+    public static final String P4RUNTIME_PACKET_PROVIDER_WORKERS = "workers";
+
+    public static final int P4RUNTIME_PACKET_PROVIDER_WORKERS_DEFAULT = 4;
+
+}
diff --git a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
index 2fa7493..edf95de 100644
--- a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
+++ b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
@@ -16,8 +16,9 @@
 
 package org.onosproject.provider.p4runtime.packet.impl;
 
-
 import org.onlab.packet.EthType;
+import org.onlab.util.PredictableExecutor;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
@@ -40,22 +41,34 @@
 import org.onosproject.p4runtime.api.P4RuntimeEvent;
 import org.onosproject.p4runtime.api.P4RuntimeEventListener;
 import org.onosproject.p4runtime.api.P4RuntimePacketIn;
+import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
+import java.util.Dictionary;
+import java.util.concurrent.ExecutorService;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.DefaultTrafficTreatment.emptyTreatment;
+import static org.onosproject.provider.p4runtime.packet.impl.OsgiPropertyConstants.P4RUNTIME_PACKET_PROVIDER_WORKERS;
+import static org.onosproject.provider.p4runtime.packet.impl.OsgiPropertyConstants.P4RUNTIME_PACKET_PROVIDER_WORKERS_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Implementation of a packet provider for P4Runtime device.
  */
-@Component(immediate = true)
+@Component(immediate = true,
+        property = {
+                P4RUNTIME_PACKET_PROVIDER_WORKERS + ":Integer=" + P4RUNTIME_PACKET_PROVIDER_WORKERS_DEFAULT,
+        })
 public class P4RuntimePacketProvider extends AbstractProvider implements PacketProvider {
 
     private final Logger log = getLogger(getClass());
@@ -72,10 +85,18 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipService mastershipService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService cfgService;
+
     private PacketProviderService providerService;
 
     private InternalPacketListener packetListener = new InternalPacketListener();
 
+    /** Number of P4Runtime packet provider workers. */
+    private int workers = P4RUNTIME_PACKET_PROVIDER_WORKERS_DEFAULT;
+    // Predictable executor to stitch the packet processing always to the same thread
+    private PredictableExecutor packetWorkers;
+
     /**
      * Creates a new P4Runtime packet provider.
      */
@@ -84,20 +105,48 @@
     }
 
     @Activate
-    protected void activate() {
+    protected void activate(ComponentContext context) {
+        cfgService.registerProperties(getClass());
         providerService = providerRegistry.register(this);
+        modified(context);
         controller.addListener(packetListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
         controller.removeListener(packetListener);
         providerRegistry.unregister(this);
         providerService = null;
+        stopWorkersIfNeeded();
         log.info("Stopped");
     }
 
+    @Modified
+    protected void modified(ComponentContext context) {
+        if (context != null) {
+            Dictionary<?, ?> properties = context.getProperties();
+            int newWorkers;
+            try {
+                String s = get(properties, P4RUNTIME_PACKET_PROVIDER_WORKERS);
+                newWorkers = isNullOrEmpty(s) ? workers : Integer.parseInt(s.trim());
+            } catch (NumberFormatException | ClassCastException e) {
+                newWorkers = workers;
+            }
+
+            // Stop previous executor and start a new one when there are changes in the config
+            // OR during the start up of the service
+            if (newWorkers != workers || packetWorkers == null) {
+                workers = newWorkers;
+                stopWorkersIfNeeded();
+                packetWorkers = new PredictableExecutor(workers, groupedThreads("onos/p4rt",
+                        "packet-worker-%d", log));
+                log.info("Settings: p4RuntimePacketProviderWorkers={}", workers);
+            }
+        }
+    }
+
     @Override
     public void emit(OutboundPacket packet) {
         if (packet != null) {
@@ -112,6 +161,14 @@
         }
     }
 
+    private void stopWorkersIfNeeded() {
+        if (packetWorkers != null) {
+            ExecutorService oldWorkerExecutor = packetWorkers;
+            oldWorkerExecutor.shutdown();
+            packetWorkers = null;
+        }
+    }
+
     private EthType.EtherType getEtherType(ByteBuffer data) {
         final short shortEthType = data.getShort(12);
         data.rewind();
@@ -151,6 +208,58 @@
         }
     }
 
+    private void handleP4RuntimeEvent(P4RuntimeEvent event) {
+        //Mastership message is sent to everybody but picked up only by master.
+        //FIXME we need the device ID into p4RuntimeEvnetSubject to check for mastsership
+        if (!(event.subject() instanceof P4RuntimePacketIn) || event.type() != P4RuntimeEvent.Type.PACKET_IN) {
+            log.debug("Unrecognized event type {}, discarding", event.type());
+            // Not a packet-in event, ignore it.
+            return;
+        }
+        P4RuntimePacketIn eventSubject = (P4RuntimePacketIn) event.subject();
+        DeviceId deviceId = eventSubject.deviceId();
+
+        Device device = deviceService.getDevice(eventSubject.deviceId());
+        if (device == null) {
+            log.warn("Unable to process packet-in from {}, device is null in the core", deviceId);
+            return;
+        }
+
+        if (!device.is(PiPipelineInterpreter.class)) {
+            log.warn("Unable to process packet-in from {}, device has no PiPipelineInterpreter behaviour",
+                    deviceId);
+            return;
+        }
+
+        PiPacketOperation operation = eventSubject.packetOperation();
+        InboundPacket inPkt;
+        try {
+            inPkt = device.as(PiPipelineInterpreter.class).mapInboundPacket(operation, deviceId);
+        } catch (PiPipelineInterpreter.PiInterpreterException e) {
+            log.warn("Unable to interpret inbound packet from {}: {}", deviceId, e.getMessage());
+            return;
+        }
+
+        if (log.isTraceEnabled()) {
+            final EthType.EtherType etherType = getEtherType(inPkt.unparsed());
+            log.trace("Received PACKET-IN <<< device={} ingress_port={} eth_type={}",
+                    inPkt.receivedFrom().deviceId(), inPkt.receivedFrom().port(),
+                    etherType.ethType().toString());
+        }
+
+        if (inPkt == null) {
+            log.debug("Received null inbound packet. Ignoring.");
+            return;
+        }
+
+        OutboundPacket outPkt = new DefaultOutboundPacket(eventSubject.deviceId(), null,
+                operation.data().asReadOnlyBuffer());
+        PacketContext pktCtx = new P4RuntimePacketContext(System.currentTimeMillis(), inPkt, outPkt, false);
+
+        // Pushing the packet context up for processing.
+        providerService.processPacket(pktCtx);
+    }
+
     /**
      * Internal packet listener to handle packet-in events received from the P4Runtime controller.
      */
@@ -158,55 +267,9 @@
 
         @Override
         public void event(P4RuntimeEvent event) {
-            //Masterhip message is sent to everybody but picked up only by master.
-            //FIXME we need the device ID into p4RuntimeEvnetSubject to check for mastsership
-            if (!(event.subject() instanceof P4RuntimePacketIn) || event.type() != P4RuntimeEvent.Type.PACKET_IN) {
-                log.debug("Unrecognized event type {}, discarding", event.type());
-                // Not a packet-in event, ignore it.
-                return;
-            }
-            P4RuntimePacketIn eventSubject = (P4RuntimePacketIn) event.subject();
-            DeviceId deviceId = eventSubject.deviceId();
-
-            Device device = deviceService.getDevice(eventSubject.deviceId());
-            if (device == null) {
-                log.warn("Unable to process packet-in from {}, device is null in the core", deviceId);
-                return;
-            }
-
-            if (!device.is(PiPipelineInterpreter.class)) {
-                log.warn("Unable to process packet-in from {}, device has no PiPipelineInterpreter behaviour",
-                        deviceId);
-                return;
-            }
-
-            PiPacketOperation operation = eventSubject.packetOperation();
-            InboundPacket inPkt;
-            try {
-                inPkt = device.as(PiPipelineInterpreter.class).mapInboundPacket(operation, deviceId);
-            } catch (PiPipelineInterpreter.PiInterpreterException e) {
-                log.warn("Unable to interpret inbound packet from {}: {}", deviceId, e.getMessage());
-                return;
-            }
-
-            if (log.isTraceEnabled()) {
-                final EthType.EtherType etherType = getEtherType(inPkt.unparsed());
-                log.trace("Received PACKET-IN <<< device={} ingress_port={} eth_type={}",
-                          inPkt.receivedFrom().deviceId(), inPkt.receivedFrom().port(),
-                          etherType.ethType().toString());
-            }
-
-            if (inPkt == null) {
-                log.debug("Received null inbound packet. Ignoring.");
-                return;
-            }
-
-            OutboundPacket outPkt = new DefaultOutboundPacket(eventSubject.deviceId(), null,
-                    operation.data().asReadOnlyBuffer());
-            PacketContext pktCtx = new P4RuntimePacketContext(System.currentTimeMillis(), inPkt, outPkt, false);
-
-            // Pushing the packet context up for processing.
-            providerService.processPacket(pktCtx);
+            // Offload to another executor to prevent the ejection of the listener - it uses
+            // the device id to stitch the packets coming from a device always to the same worker
+            packetWorkers.execute(() -> handleP4RuntimeEvent(event), event.subject().deviceId().hashCode());
         }
     }
 }