blob: 99eaa03ee70e91fbd855b72ad07227be199cd430 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.service;
17
18import java.util.Arrays;
19import java.util.Collection;
20import java.util.concurrent.CompletableFuture;
21import java.util.concurrent.Executor;
22import java.util.function.Consumer;
23
24import com.google.common.collect.ImmutableList;
25
26/**
27 * Distributed Work Queue primitive.
28 * <p>
Yuta HIGUCHIe18a2ef2016-07-11 23:34:38 -070029 * Work queue serves as a buffer allowing producers to {@link #addMultiple(Collection) add} tasks and consumers
Madan Jampani35708a92016-07-06 10:48:19 -070030 * to {@link #take() take} tasks to process.
31 * <p>
32 * In the system each task is tracked via its unique task identifier which is returned when a task is taken.
33 * Work queue guarantees that a task can be taken by only one consumer at a time. Once it finishes processing a
34 * consumer must invoke the {@link #complete(Collection) complete} method to mark the task(s) as completed.
35 * Tasks thus completed are removed from the queue. If a consumer unexpectedly terminates before it can complete
36 * all its tasks are returned back to the queue so that other consumers can pick them up. Since there is a distinct
37 * possibility that tasks could be processed more than once (under failure conditions), care should be taken to ensure
38 * task processing logic is idempotent.
39 *
40 * @param <E> task payload type.
41 */
Madan Jampani819d61d2016-07-25 20:29:43 -070042public interface WorkQueue<E> extends DistributedPrimitive {
43
44 @Override
45 default DistributedPrimitive.Type primitiveType() {
46 return DistributedPrimitive.Type.WORK_QUEUE;
47 }
Madan Jampani35708a92016-07-06 10:48:19 -070048
49 /**
50 * Adds a collection of tasks to the work queue.
51 * @param items collection of task items
52 * @return future that is completed when the operation completes
53 */
54 CompletableFuture<Void> addMultiple(Collection<E> items);
55
56 /**
57 * Picks up multiple tasks from the work queue to work on.
58 * <p>
59 * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
60 * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
61 * the task becomes visible again to other consumers to process.
62 * @param maxItems maximum number of items to take from the queue. The actual number of tasks returned
63 * can be at the max this number
64 * @return future for the tasks. The future can be completed with an empty collection if there are no
65 * unassigned tasks in the work queue
66 */
67 CompletableFuture<Collection<Task<E>>> take(int maxItems);
68
69 /**
70 * Completes a collection of tasks.
71 * @param taskIds ids of tasks to complete
72 * @return future that is completed when the operation completes
73 */
74 CompletableFuture<Void> complete(Collection<String> taskIds);
75
76 /**
77 * Registers a task processing callback to be automatically invoked when new tasks are
78 * added to the work queue.
79 * @param taskProcessor task processing callback
80 * @param parallelism max tasks that can be processed in parallel
81 * @param executor executor to use for processing the tasks
82 * @return future that is completed when the operation completes
83 */
84 CompletableFuture<Void> registerTaskProcessor(Consumer<E> taskProcessor,
85 int parallelism,
86 Executor executor);
87
88 /**
89 * Stops automatically processing tasks from work queue. This call nullifies the effect of a
90 * previous {@link #registerTaskProcessor registerTaskProcessor} call.
91 * @return future that is completed when the operation completes
92 */
93 CompletableFuture<Void> stopProcessing();
94
95 /**
96 * Returns work queue statistics.
97 * @return future that is completed with work queue stats when the operation completes
98 */
99 CompletableFuture<WorkQueueStats> stats();
100
101 /**
102 * Completes a collection of tasks.
103 * @param taskIds var arg list of task ids
104 * @return future that is completed when the operation completes
105 */
106 default CompletableFuture<Void> complete(String... taskIds) {
107 return complete(Arrays.asList(taskIds));
108 }
109
110 /**
111 * Adds a single task to the work queue.
112 * @param item task item
113 * @return future that is completed when the operation completes
114 */
115 default CompletableFuture<Void> addOne(E item) {
116 return addMultiple(ImmutableList.of(item));
117 }
118
119 /**
120 * Picks up a single task from the work queue to work on.
121 * <p>
122 * Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
123 * If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
124 * the task becomes visible again to other consumers to process.
125 * @return future for the task. The future can be completed with null, if there are no
126 * unassigned tasks in the work queue
127 */
128 default CompletableFuture<Task<E>> take() {
129 return this.take(1).thenApply(tasks -> tasks.isEmpty() ? null : tasks.iterator().next());
130 }
131}