blob: 2f604d395c7ca5a644641b5a85e272d960f749e6 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001package org.onosproject.store.primitives.impl;
2
3import java.util.ArrayList;
4import java.util.Collection;
5import java.util.concurrent.CompletableFuture;
6import java.util.concurrent.Executor;
7import java.util.function.Consumer;
8import java.util.stream.Collectors;
9
10import org.onosproject.store.service.WorkQueue;
11import org.onosproject.store.service.Serializer;
12import org.onosproject.store.service.Task;
13import org.onosproject.store.service.WorkQueueStats;
14
15import com.google.common.collect.Collections2;
16
17/**
18 * Default implementation of {@link WorkQueue}.
19 *
20 * @param <E> task payload type.
21 */
22public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
23
24 private final WorkQueue<byte[]> backingQueue;
25 private final Serializer serializer;
26
27 public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
28 this.backingQueue = backingQueue;
29 this.serializer = serializer;
30 }
31
32 @Override
33 public CompletableFuture<Void> addMultiple(Collection<E> items) {
34 return backingQueue.addMultiple(items.stream()
35 .map(serializer::encode)
36 .collect(Collectors.toCollection(ArrayList::new)));
37 }
38
Yuta HIGUCHIaa2f2f62016-07-25 14:26:11 -070039 private final Collection<Task<E>> decodeCollection(Collection<Task<byte[]>> tasks) {
40 return Collections2.transform(tasks, task -> task.map(serializer::decode));
41 }
42
Madan Jampani35708a92016-07-06 10:48:19 -070043 @Override
44 public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
45 return backingQueue.take(maxTasks)
Yuta HIGUCHIaa2f2f62016-07-25 14:26:11 -070046 .thenApply(this::decodeCollection);
Madan Jampani35708a92016-07-06 10:48:19 -070047 }
48
49 @Override
50 public CompletableFuture<Void> complete(Collection<String> ids) {
51 return backingQueue.complete(ids);
52 }
53
54 @Override
55 public CompletableFuture<WorkQueueStats> stats() {
56 return backingQueue.stats();
57 }
58
59 @Override
60 public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
61 int parallelism,
62 Executor executor) {
63 Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
64 return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
65 }
66
67 @Override
68 public CompletableFuture<Void> stopProcessing() {
69 return backingQueue.stopProcessing();
70 }
71}