Refactor channel and mastership handling in P4Runtime

This (big) change aims at solving the issue observed with mastership flapping
and device connection/disconnection with P4Runtime.

Channel handling is now based on the underlying gRPC channel state. Before,
channel events (open/close/error) were generated as a consequence of P4Runtime
StreamChannel events, making device availability dependent on mastership. Now
Stream Channel events only affect mastership (MASTER/STANDBY or NONE when the
SteamChannel RPC is not active).

Mastership handling has been refactored to generate P4Runtime election IDs that
are compatible with the mastership preference decided by the MastershipService.

GeneralDeviceProvider has been re-implemented to support in-order
device event processing and to reduce implementation complexity. Stats polling
has been moved to a separate component, and netcfg handling updated to only
depend on BasicDeviceConfig, augmented with a pipeconf field, and re-using the
managementAddress field to set the gRPC server endpoints (e.g.
grpc://myswitch.local:50051). Before it was depending on 3 different config
classes, making hard to detect changes.

Finally, this change affects some core interfaces:
- Adds a method to DeviceProvider and DeviceHandshaker to check for device
availability, making the meaning of availability device-specific. This is needed
in cases where the device manager needs to change the availability state of a
device (as in change #20842)
- Support device providers not capable of reconciling mastership role responses
with requests (like P4Runtime).
- Clarify the meaning of "connection" in the DeviceConnect behavior.
- Allows driver-based providers to check devices for reachability and
availability without probing the device via the network.

Change-Id: I7ff30d29f5d02ad938e3171536e54ae2916629a2
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java
new file mode 100644
index 0000000..6f90256
--- /dev/null
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/DeviceTaskExecutor.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.general.device.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Allows submitting tasks related to a specific device. Takes care of executing
+ * pending tasks sequentially for each device in a FIFO order, while using a
+ * delegate executor. It also avoids executing duplicate tasks when arriving
+ * back-to-back.
+ *
+ * @param <T> enum describing the type of task
+ */
+
+class DeviceTaskExecutor<T extends Enum> {
+    /**
+     * Minimum interval between duplicate back-to-back tasks.
+     */
+    private static final int DUPLICATE_MIN_INTERVAL_MILLIS = 1000;
+
+    private final Logger log = getLogger(getClass());
+
+    private final ExecutorService delegate;
+    private final AtomicBoolean canceled = new AtomicBoolean(false);
+    private final Set<DeviceId> busyDevices = Sets.newConcurrentHashSet();
+    private final Set<DeviceId> pendingDevices = Sets.newConcurrentHashSet();
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
+    private final LoadingCache<DeviceId, TaskQueue> taskQueues = CacheBuilder.newBuilder()
+            .expireAfterAccess(1, TimeUnit.MINUTES)
+            .removalListener((RemovalListener<DeviceId, TaskQueue>) notification -> {
+                if (!notification.getValue().isEmpty()) {
+                    log.warn("Cache evicted non-empty task queue for {} ({} pending tasks)",
+                             notification.getKey(), notification.getValue().size());
+                }
+            })
+            .build(new CacheLoader<DeviceId, TaskQueue>() {
+                @SuppressWarnings("NullableProblems")
+                @Override
+                public TaskQueue load(DeviceId deviceId) {
+                    return new TaskQueue();
+                }
+            });
+
+    /**
+     * Creates a new executor with the given delegate executor service.
+     *
+     * @param delegate executor service
+     */
+    DeviceTaskExecutor(ExecutorService delegate) {
+        checkNotNull(delegate);
+        this.delegate = delegate;
+    }
+
+    /**
+     * Submit a tasks.
+     *
+     * @param deviceId device associated with the task
+     * @param type     type of task (used to remove eventual back-to-back
+     *                 duplicates)
+     * @param runnable runnable to execute
+     */
+    void submit(DeviceId deviceId, T type, Runnable runnable) {
+        checkNotNull(deviceId);
+        checkNotNull(type);
+        checkNotNull(runnable);
+
+        if (canceled.get()) {
+            log.warn("Executor was cancelled, cannot submit task {} for {}",
+                     type, deviceId);
+            return;
+        }
+
+        final DeviceTask task = new DeviceTask(deviceId, type, runnable);
+        deviceLocks.get(deviceId).lock();
+        try {
+            if (taskQueues.get(deviceId).isBackToBackDuplicate(type)) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Dropping back-to-back duplicate task {} for {}",
+                              type, deviceId);
+                }
+                return;
+            }
+            if (taskQueues.get(deviceId).offer(task)) {
+                pendingDevices.add(deviceId);
+                if (!busyDevices.contains(deviceId)) {
+                    // The task was submitted to the queue and we are not
+                    // performing any other task for this device. There is at
+                    // least one task that is ready to be executed.
+                    delegate.execute(this::performTaskIfAny);
+                }
+            } else {
+                log.warn("Unable to submit task {} for {}",
+                         task.type, task.deviceId);
+            }
+        } catch (ExecutionException e) {
+            log.warn("Exception while accessing task queue cache", e);
+        } finally {
+            deviceLocks.get(task.deviceId).unlock();
+        }
+    }
+
+    /**
+     * Prevents the executor from executing any more tasks.
+     */
+    void cancel() {
+        canceled.set(true);
+    }
+
+    private void performTaskIfAny() {
+        final DeviceTask task = pollTask();
+        if (task == null) {
+            // No tasks.
+            return;
+        }
+        if (canceled.get()) {
+            log.warn("Executor was cancelled, dropping task {} for {}",
+                     task.type, task.deviceId);
+            return;
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("STARTING task {} for {}...", task.type.name(), task.deviceId);
+        }
+        try {
+            task.runnable.run();
+        } catch (DeviceTaskException e) {
+            log.error("Unable to complete task {} for {}: {}",
+                      task.type, task.deviceId, e.getMessage());
+        } catch (Throwable t) {
+            log.error(format(
+                    "Uncaught exception when executing task %s for %s",
+                    task.type, task.deviceId), t);
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("COMPLETED task {} for {}", task.type.name(), task.deviceId);
+        }
+        busyDevices.remove(task.deviceId);
+        delegate.execute(this::performTaskIfAny);
+    }
+
+    private DeviceTask pollTask() {
+        for (DeviceId deviceId : pendingDevices) {
+            final DeviceTask task;
+            deviceLocks.get(deviceId).lock();
+            try {
+                if (busyDevices.contains(deviceId)) {
+                    // Next device.
+                    continue;
+                }
+                task = taskQueues.get(deviceId).poll();
+                if (task == null) {
+                    // Next device.
+                    continue;
+                }
+                if (taskQueues.get(deviceId).isEmpty()) {
+                    pendingDevices.remove(deviceId);
+                }
+                busyDevices.add(deviceId);
+                return task;
+            } catch (ExecutionException e) {
+                log.warn("Exception while accessing task queue cache", e);
+            } finally {
+                deviceLocks.get(deviceId).unlock();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Device task as stored in the task queue.
+     */
+    private class DeviceTask {
+
+        private final DeviceId deviceId;
+        private final T type;
+        private final Runnable runnable;
+
+        DeviceTask(DeviceId deviceId, T type, Runnable runnable) {
+            this.deviceId = deviceId;
+            this.type = type;
+            this.runnable = runnable;
+        }
+    }
+
+    /**
+     * A queue that keeps track of the last task added to detects back-to-back
+     * duplicates.
+     */
+    private class TaskQueue extends ConcurrentLinkedQueue<DeviceTask> {
+
+        private T lastTaskAdded;
+        private long lastAddedMillis;
+
+        @Override
+        public boolean offer(DeviceTask deviceTask) {
+            lastTaskAdded = deviceTask.type;
+            lastAddedMillis = currentTimeMillis();
+            return super.offer(deviceTask);
+        }
+
+        boolean isBackToBackDuplicate(T taskType) {
+            return lastTaskAdded != null
+                    && lastTaskAdded.equals(taskType)
+                    && (currentTimeMillis() - lastAddedMillis) <= DUPLICATE_MIN_INTERVAL_MILLIS;
+        }
+    }
+
+    /**
+     * Signals an error that prevented normal execution of the task.
+     */
+    static class DeviceTaskException extends RuntimeException {
+
+        /**
+         * Creates a new exception.
+         *
+         * @param message explanation
+         */
+        DeviceTaskException(String message) {
+            super(message);
+        }
+    }
+}