Distributed work queue primitive
Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 175e253..f07f838 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -33,6 +33,8 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
@@ -40,8 +42,11 @@
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.WorkQueueStats;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
@@ -81,15 +86,20 @@
serializer.register(MapTransaction.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
+ serializer.register(Task.class, factory);
+ serializer.register(WorkQueueStats.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
+ serializer.register(ImmutableList.of().getClass(), factory);
serializer.resolve(new LongCommands.TypeResolver());
serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
+ serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
serializer.resolve(new ResourceManagerTypeResolver());
serializer.registerClassLoader(AtomixConsistentMapFactory.class)
- .registerClassLoader(AtomixLeaderElectorFactory.class);
+ .registerClassLoader(AtomixLeaderElectorFactory.class)
+ .registerClassLoader(AtomixWorkQueueFactory.class);
return serializer;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
index 88d65d5..b0d6841 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
@@ -54,20 +54,19 @@
}
@Override
- public void write(T object, BufferOutput buffer,
- io.atomix.catalyst.serializer.Serializer serializer) {
+ public void write(T object, BufferOutput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
try {
byte[] payload = this.serializer.encode(object);
buffer.writeInt(payload.length);
buffer.write(payload);
} catch (Exception e) {
log.warn("Failed to serialize {}", object, e);
+ throw Throwables.propagate(e);
}
}
@Override
- public T read(Class<T> type, BufferInput buffer,
- io.atomix.catalyst.serializer.Serializer serializer) {
+ public T read(Class<T> type, BufferInput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
int size = buffer.readInt();
try {
byte[] payload = new byte[size];
@@ -75,8 +74,7 @@
return this.serializer.decode(payload);
} catch (Exception e) {
log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
- Throwables.propagate(e);
- return null;
+ throw Throwables.propagate(e);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
new file mode 100644
index 0000000..549cfc6
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedWorkQueue.java
@@ -0,0 +1,67 @@
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueueStats;
+
+import com.google.common.collect.Collections2;
+
+/**
+ * Default implementation of {@link WorkQueue}.
+ *
+ * @param <E> task payload type.
+ */
+public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
+
+ private final WorkQueue<byte[]> backingQueue;
+ private final Serializer serializer;
+
+ public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
+ this.backingQueue = backingQueue;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public CompletableFuture<Void> addMultiple(Collection<E> items) {
+ return backingQueue.addMultiple(items.stream()
+ .map(serializer::encode)
+ .collect(Collectors.toCollection(ArrayList::new)));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
+ return backingQueue.take(maxTasks)
+ .thenApply(tasks -> Collections2.transform(tasks, task -> task.<E>map(serializer::decode)));
+ }
+
+ @Override
+ public CompletableFuture<Void> complete(Collection<String> ids) {
+ return backingQueue.complete(ids);
+ }
+
+ @Override
+ public CompletableFuture<WorkQueueStats> stats() {
+ return backingQueue.stats();
+ }
+
+ @Override
+ public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
+ int parallelism,
+ Executor executor) {
+ Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
+ return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopProcessing() {
+ return backingQueue.stopProcessing();
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index e4b6f9a..a095de1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -30,6 +30,7 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import com.google.common.base.Charsets;
@@ -101,6 +102,11 @@
}
@Override
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ return getCreator(name).newWorkQueue(name, serializer);
+ }
+
+ @Override
public Set<String> getAsyncConsistentMapNames() {
return members.values()
.stream()
@@ -118,6 +124,15 @@
.orElse(ImmutableSet.of());
}
+ @Override
+ public Set<String> getWorkQueueNames() {
+ return members.values()
+ .stream()
+ .map(DistributedPrimitiveCreator::getWorkQueueNames)
+ .reduce(Sets::union)
+ .orElse(ImmutableSet.of());
+ }
+
/**
* Returns the {@code DistributedPrimitiveCreator} to use for hosting a primitive.
* @param name primitive name
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 4def1e9..8eb138a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -46,6 +46,7 @@
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
@@ -54,6 +55,7 @@
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
@@ -171,6 +173,12 @@
}
@Override
+ public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+ }
+
+ @Override
public List<MapInfo> getMapInfo() {
return listMapInfo(federatedPrimitiveCreator);
}
@@ -185,6 +193,18 @@
}
@Override
+ public Map<String, WorkQueueStats> getQueueStats() {
+ Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
+ federatedPrimitiveCreator.getWorkQueueNames()
+ .forEach(name -> workQueueStats.put(name,
+ federatedPrimitiveCreator.newWorkQueue(name,
+ Serializer.using(KryoNamespaces.BASIC))
+ .stats()
+ .join()));
+ return workQueueStats;
+ }
+
+ @Override
public List<PartitionInfo> getPartitionInfo() {
return partitionAdminService.partitionInfo();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 0c1d8a6..d635a79 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -41,6 +41,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -49,6 +50,7 @@
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
@@ -159,11 +161,16 @@
@Override
public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
- // TODO: Implement
throw new UnsupportedOperationException();
}
@Override
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
+ return new DefaultDistributedWorkQueue<>(workQueue, serializer);
+ }
+
+ @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache)
@@ -187,6 +194,11 @@
}
@Override
+ public Set<String> getWorkQueueNames() {
+ return client.keys(AtomixWorkQueue.class).join();
+ }
+
+ @Override
public boolean isOpen() {
return resourceClient.client().state() != State.CLOSED;
}