blob: f9b0173a6cf632babaf5e1593b13dabbbae57e9a [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.service;
17
18import java.util.Arrays;
19import java.util.Collection;
20import java.util.concurrent.CompletableFuture;
21import java.util.concurrent.Executor;
22import java.util.function.Consumer;
23
24import com.google.common.collect.ImmutableList;
25
26/**
27 * Distributed Work Queue primitive.
28 * <p>
Yuta HIGUCHIe18a2ef2016-07-11 23:34:38 -070029 * Work queue serves as a buffer allowing producers to {@link #addMultiple(Collection) add} tasks and consumers
Madan Jampani35708a92016-07-06 10:48:19 -070030 * to {@link #take() take} tasks to process.
31 * <p>
32 * In the system each task is tracked via its unique task identifier which is returned when a task is taken.
33 * Work queue guarantees that a task can be taken by only one consumer at a time. Once it finishes processing a
34 * consumer must invoke the {@link #complete(Collection) complete} method to mark the task(s) as completed.
35 * Tasks thus completed are removed from the queue. If a consumer unexpectedly terminates before it can complete
36 * all its tasks are returned back to the queue so that other consumers can pick them up. Since there is a distinct
37 * possibility that tasks could be processed more than once (under failure conditions), care should be taken to ensure
38 * task processing logic is idempotent.
39 *
40 * @param <E> task payload type.
41 */
42public interface WorkQueue<E> {
43
44 /**
45 * Adds a collection of tasks to the work queue.
46 * @param items collection of task items
47 * @return future that is completed when the operation completes
48 */
49 CompletableFuture<Void> addMultiple(Collection<E> items);
50
51 /**
52 * Picks up multiple tasks from the work queue to work on.
53 * <p>
54 * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
55 * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
56 * the task becomes visible again to other consumers to process.
57 * @param maxItems maximum number of items to take from the queue. The actual number of tasks returned
58 * can be at the max this number
59 * @return future for the tasks. The future can be completed with an empty collection if there are no
60 * unassigned tasks in the work queue
61 */
62 CompletableFuture<Collection<Task<E>>> take(int maxItems);
63
64 /**
65 * Completes a collection of tasks.
66 * @param taskIds ids of tasks to complete
67 * @return future that is completed when the operation completes
68 */
69 CompletableFuture<Void> complete(Collection<String> taskIds);
70
71 /**
72 * Registers a task processing callback to be automatically invoked when new tasks are
73 * added to the work queue.
74 * @param taskProcessor task processing callback
75 * @param parallelism max tasks that can be processed in parallel
76 * @param executor executor to use for processing the tasks
77 * @return future that is completed when the operation completes
78 */
79 CompletableFuture<Void> registerTaskProcessor(Consumer<E> taskProcessor,
80 int parallelism,
81 Executor executor);
82
83 /**
84 * Stops automatically processing tasks from work queue. This call nullifies the effect of a
85 * previous {@link #registerTaskProcessor registerTaskProcessor} call.
86 * @return future that is completed when the operation completes
87 */
88 CompletableFuture<Void> stopProcessing();
89
90 /**
91 * Returns work queue statistics.
92 * @return future that is completed with work queue stats when the operation completes
93 */
94 CompletableFuture<WorkQueueStats> stats();
95
96 /**
97 * Completes a collection of tasks.
98 * @param taskIds var arg list of task ids
99 * @return future that is completed when the operation completes
100 */
101 default CompletableFuture<Void> complete(String... taskIds) {
102 return complete(Arrays.asList(taskIds));
103 }
104
105 /**
106 * Adds a single task to the work queue.
107 * @param item task item
108 * @return future that is completed when the operation completes
109 */
110 default CompletableFuture<Void> addOne(E item) {
111 return addMultiple(ImmutableList.of(item));
112 }
113
114 /**
115 * Picks up a single task from the work queue to work on.
116 * <p>
117 * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
118 * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
119 * the task becomes visible again to other consumers to process.
120 * @return future for the task. The future can be completed with null, if there are no
121 * unassigned tasks in the work queue
122 */
123 default CompletableFuture<Task<E>> take() {
124 return this.take(1).thenApply(tasks -> tasks.isEmpty() ? null : tasks.iterator().next());
125 }
126}