blob: bdc9d31daebe2872fb05ab125a471f25937eaf58 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001package org.onosproject.store.primitives.impl;
2
3import java.util.ArrayList;
4import java.util.Collection;
5import java.util.concurrent.CompletableFuture;
6import java.util.concurrent.Executor;
7import java.util.function.Consumer;
8import java.util.stream.Collectors;
9
10import org.onosproject.store.service.WorkQueue;
11import org.onosproject.store.service.Serializer;
12import org.onosproject.store.service.Task;
13import org.onosproject.store.service.WorkQueueStats;
14
15import com.google.common.collect.Collections2;
16
17/**
18 * Default implementation of {@link WorkQueue}.
19 *
20 * @param <E> task payload type.
21 */
22public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
23
24 private final WorkQueue<byte[]> backingQueue;
25 private final Serializer serializer;
26
27 public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
28 this.backingQueue = backingQueue;
29 this.serializer = serializer;
30 }
31
32 @Override
Madan Jampani819d61d2016-07-25 20:29:43 -070033 public String name() {
34 return backingQueue.name();
35 }
36
37 @Override
Madan Jampani35708a92016-07-06 10:48:19 -070038 public CompletableFuture<Void> addMultiple(Collection<E> items) {
39 return backingQueue.addMultiple(items.stream()
40 .map(serializer::encode)
41 .collect(Collectors.toCollection(ArrayList::new)));
42 }
43
Sho SHIMIZU431cafd2016-08-12 15:00:24 -070044 private Collection<Task<E>> decodeCollection(Collection<Task<byte[]>> tasks) {
Yuta HIGUCHIaa2f2f62016-07-25 14:26:11 -070045 return Collections2.transform(tasks, task -> task.map(serializer::decode));
46 }
47
Madan Jampani35708a92016-07-06 10:48:19 -070048 @Override
49 public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
50 return backingQueue.take(maxTasks)
Yuta HIGUCHIaa2f2f62016-07-25 14:26:11 -070051 .thenApply(this::decodeCollection);
Madan Jampani35708a92016-07-06 10:48:19 -070052 }
53
54 @Override
55 public CompletableFuture<Void> complete(Collection<String> ids) {
56 return backingQueue.complete(ids);
57 }
58
59 @Override
60 public CompletableFuture<WorkQueueStats> stats() {
61 return backingQueue.stats();
62 }
63
64 @Override
65 public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
66 int parallelism,
67 Executor executor) {
68 Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
69 return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
70 }
71
72 @Override
73 public CompletableFuture<Void> stopProcessing() {
74 return backingQueue.stopProcessing();
75 }
76}