blob: 549cfc60196cee25d48674661a63ff8d6729c925 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001package org.onosproject.store.primitives.impl;
2
3import java.util.ArrayList;
4import java.util.Collection;
5import java.util.concurrent.CompletableFuture;
6import java.util.concurrent.Executor;
7import java.util.function.Consumer;
8import java.util.stream.Collectors;
9
10import org.onosproject.store.service.WorkQueue;
11import org.onosproject.store.service.Serializer;
12import org.onosproject.store.service.Task;
13import org.onosproject.store.service.WorkQueueStats;
14
15import com.google.common.collect.Collections2;
16
17/**
18 * Default implementation of {@link WorkQueue}.
19 *
20 * @param <E> task payload type.
21 */
22public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
23
24 private final WorkQueue<byte[]> backingQueue;
25 private final Serializer serializer;
26
27 public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
28 this.backingQueue = backingQueue;
29 this.serializer = serializer;
30 }
31
32 @Override
33 public CompletableFuture<Void> addMultiple(Collection<E> items) {
34 return backingQueue.addMultiple(items.stream()
35 .map(serializer::encode)
36 .collect(Collectors.toCollection(ArrayList::new)));
37 }
38
39 @Override
40 public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
41 return backingQueue.take(maxTasks)
42 .thenApply(tasks -> Collections2.transform(tasks, task -> task.<E>map(serializer::decode)));
43 }
44
45 @Override
46 public CompletableFuture<Void> complete(Collection<String> ids) {
47 return backingQueue.complete(ids);
48 }
49
50 @Override
51 public CompletableFuture<WorkQueueStats> stats() {
52 return backingQueue.stats();
53 }
54
55 @Override
56 public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
57 int parallelism,
58 Executor executor) {
59 Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
60 return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
61 }
62
63 @Override
64 public CompletableFuture<Void> stopProcessing() {
65 return backingQueue.stopProcessing();
66 }
67}