| /* |
| * Copyright 2019-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.onosproject.provider.general.device.impl; |
| |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.cache.RemovalListener; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.Striped; |
| import org.onosproject.net.DeviceId; |
| import org.slf4j.Logger; |
| |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.Lock; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static java.lang.String.format; |
| import static java.lang.System.currentTimeMillis; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| /** |
| * Allows submitting tasks related to a specific device. Takes care of executing |
| * pending tasks sequentially for each device in a FIFO order, while using a |
| * delegate executor. It also avoids executing duplicate tasks when arriving |
| * back-to-back. |
| * |
| * @param <T> enum describing the type of task |
| */ |
| |
| class DeviceTaskExecutor<T extends Enum> { |
| /** |
| * Minimum interval between duplicate back-to-back tasks. |
| */ |
| private static final int DUPLICATE_MIN_INTERVAL_MILLIS = 1000; |
| |
| private final Logger log = getLogger(getClass()); |
| |
| private final ExecutorService delegate; |
| private final AtomicBoolean canceled = new AtomicBoolean(false); |
| private final Set<DeviceId> busyDevices = Sets.newConcurrentHashSet(); |
| private final Set<DeviceId> pendingDevices = Sets.newConcurrentHashSet(); |
| private final Striped<Lock> deviceLocks = Striped.lock(30); |
| private final LoadingCache<DeviceId, TaskQueue> taskQueues = CacheBuilder.newBuilder() |
| .expireAfterAccess(1, TimeUnit.MINUTES) |
| .removalListener((RemovalListener<DeviceId, TaskQueue>) notification -> { |
| if (!notification.getValue().isEmpty()) { |
| log.warn("Cache evicted non-empty task queue for {} ({} pending tasks)", |
| notification.getKey(), notification.getValue().size()); |
| } |
| }) |
| .build(new CacheLoader<DeviceId, TaskQueue>() { |
| @SuppressWarnings("NullableProblems") |
| @Override |
| public TaskQueue load(DeviceId deviceId) { |
| return new TaskQueue(); |
| } |
| }); |
| |
| /** |
| * Creates a new executor with the given delegate executor service. |
| * |
| * @param delegate executor service |
| */ |
| DeviceTaskExecutor(ExecutorService delegate) { |
| checkNotNull(delegate); |
| this.delegate = delegate; |
| } |
| |
| /** |
| * Submit a tasks. |
| * |
| * @param deviceId device associated with the task |
| * @param type type of task (used to remove eventual back-to-back |
| * duplicates) |
| * @param runnable runnable to execute |
| */ |
| void submit(DeviceId deviceId, T type, Runnable runnable) { |
| checkNotNull(deviceId); |
| checkNotNull(type); |
| checkNotNull(runnable); |
| |
| if (canceled.get()) { |
| log.warn("Executor was cancelled, cannot submit task {} for {}", |
| type, deviceId); |
| return; |
| } |
| |
| final DeviceTask task = new DeviceTask(deviceId, type, runnable); |
| deviceLocks.get(deviceId).lock(); |
| try { |
| if (taskQueues.get(deviceId).isBackToBackDuplicate(type)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Dropping back-to-back duplicate task {} for {}", |
| type, deviceId); |
| } |
| return; |
| } |
| if (taskQueues.get(deviceId).offer(task)) { |
| pendingDevices.add(deviceId); |
| if (!busyDevices.contains(deviceId)) { |
| // The task was submitted to the queue and we are not |
| // performing any other task for this device. There is at |
| // least one task that is ready to be executed. |
| delegate.execute(this::performTaskIfAny); |
| } |
| } else { |
| log.warn("Unable to submit task {} for {}", |
| task.type, task.deviceId); |
| } |
| } catch (ExecutionException e) { |
| log.warn("Exception while accessing task queue cache", e); |
| } finally { |
| deviceLocks.get(task.deviceId).unlock(); |
| } |
| } |
| |
| /** |
| * Prevents the executor from executing any more tasks. |
| */ |
| void cancel() { |
| canceled.set(true); |
| } |
| |
| private void performTaskIfAny() { |
| final DeviceTask task = pollTask(); |
| if (task == null) { |
| // No tasks. |
| return; |
| } |
| if (canceled.get()) { |
| log.warn("Executor was cancelled, dropping task {} for {}", |
| task.type, task.deviceId); |
| return; |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("STARTING task {} for {}...", task.type.name(), task.deviceId); |
| } |
| try { |
| task.runnable.run(); |
| } catch (DeviceTaskException e) { |
| log.error("Unable to complete task {} for {}: {}", |
| task.type, task.deviceId, e.getMessage()); |
| } catch (Throwable t) { |
| log.error(format( |
| "Uncaught exception when executing task %s for %s", |
| task.type, task.deviceId), t); |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("COMPLETED task {} for {}", task.type.name(), task.deviceId); |
| } |
| busyDevices.remove(task.deviceId); |
| delegate.execute(this::performTaskIfAny); |
| } |
| |
| private DeviceTask pollTask() { |
| for (DeviceId deviceId : pendingDevices) { |
| final DeviceTask task; |
| deviceLocks.get(deviceId).lock(); |
| try { |
| if (busyDevices.contains(deviceId)) { |
| // Next device. |
| continue; |
| } |
| task = taskQueues.get(deviceId).poll(); |
| if (task == null) { |
| // Next device. |
| continue; |
| } |
| if (taskQueues.get(deviceId).isEmpty()) { |
| pendingDevices.remove(deviceId); |
| } |
| busyDevices.add(deviceId); |
| return task; |
| } catch (ExecutionException e) { |
| log.warn("Exception while accessing task queue cache", e); |
| } finally { |
| deviceLocks.get(deviceId).unlock(); |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Device task as stored in the task queue. |
| */ |
| private class DeviceTask { |
| |
| private final DeviceId deviceId; |
| private final T type; |
| private final Runnable runnable; |
| |
| DeviceTask(DeviceId deviceId, T type, Runnable runnable) { |
| this.deviceId = deviceId; |
| this.type = type; |
| this.runnable = runnable; |
| } |
| } |
| |
| /** |
| * A queue that keeps track of the last task added to detects back-to-back |
| * duplicates. |
| */ |
| private class TaskQueue extends ConcurrentLinkedQueue<DeviceTask> { |
| |
| private T lastTaskAdded; |
| private long lastAddedMillis; |
| |
| @Override |
| public boolean offer(DeviceTask deviceTask) { |
| lastTaskAdded = deviceTask.type; |
| lastAddedMillis = currentTimeMillis(); |
| return super.offer(deviceTask); |
| } |
| |
| boolean isBackToBackDuplicate(T taskType) { |
| return lastTaskAdded != null |
| && lastTaskAdded.equals(taskType) |
| && (currentTimeMillis() - lastAddedMillis) <= DUPLICATE_MIN_INTERVAL_MILLIS; |
| } |
| } |
| |
| /** |
| * Signals an error that prevented normal execution of the task. |
| */ |
| static class DeviceTaskException extends RuntimeException { |
| |
| /** |
| * Creates a new exception. |
| * |
| * @param message explanation |
| */ |
| DeviceTaskException(String message) { |
| super(message); |
| } |
| } |
| } |