ONOS-2440: Simplify DistributedQueue implementation by leveraging state change notification support

Change-Id: Id0a48f07535d8b7e1d0f964bd1c0623ca81d4605
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 1bccf2e..b7c3794 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.store.consistent.impl;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -49,8 +48,6 @@
 import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.Service;
 
-import static org.onlab.util.Tools.groupedThreads;
-
 import org.onosproject.app.ApplicationEvent;
 import org.onosproject.app.ApplicationListener;
 import org.onosproject.app.ApplicationService;
@@ -61,7 +58,6 @@
 import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
 import org.onosproject.store.cluster.impl.NodeInfo;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicValueBuilder;
@@ -86,7 +82,6 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -112,8 +107,6 @@
     private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
     private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
 
-    protected static final MessageSubject QUEUE_UPDATED_TOPIC = new MessageSubject("distributed-queue-updated");
-
     private ClusterCoordinator coordinator;
     protected PartitionedDatabase partitionedDatabase;
     protected Database inMemoryDatabase;
@@ -122,15 +115,12 @@
     private TransactionManager transactionManager;
     private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
 
-    private ExecutorService eventDispatcher;
-    private ExecutorService queuePollExecutor;
     private ApplicationListener appListener = new InternalApplicationListener();
 
     private final Multimap<String, DefaultAsyncConsistentMap> maps =
             Multimaps.synchronizedMultimap(ArrayListMultimap.create());
     private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication =
             Multimaps.synchronizedMultimap(ArrayListMultimap.create());
-    private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -237,21 +227,6 @@
         transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
         partitionedDatabase.setTransactionManager(transactionManager);
 
-        eventDispatcher = Executors.newSingleThreadExecutor(
-                groupedThreads("onos/store/manager", "map-event-dispatcher"));
-
-        queuePollExecutor = Executors.newFixedThreadPool(4,
-                groupedThreads("onos/store/manager", "queue-poll-handler"));
-
-        clusterCommunicator.<String>addSubscriber(QUEUE_UPDATED_TOPIC,
-                data -> new String(data, Charsets.UTF_8),
-                name -> {
-                    DefaultDistributedQueue q = queues.get(name);
-                    if (q != null) {
-                        q.tryPoll();
-                    }
-                },
-                queuePollExecutor);
         log.info("Started");
     }
 
@@ -277,13 +252,10 @@
                     log.info("Successfully closed databases.");
                 }
             });
-        clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
         maps.values().forEach(this::unregisterMap);
         if (applicationService != null) {
             applicationService.removeListener(appListener);
         }
-        eventDispatcher.shutdown();
-        queuePollExecutor.shutdown();
         log.info("Stopped");
     }
 
@@ -467,13 +439,6 @@
         }
     }
 
-    protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
-        // TODO: Support multiple local instances of the same queue.
-        if (queues.putIfAbsent(queue.name(), queue) != null) {
-            throw new IllegalStateException("Queue by name " + queue.name() + " already exists");
-        }
-    }
-
     private class InternalApplicationListener implements ApplicationListener {
         @Override
         public void event(ApplicationEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 08317b5..95f9e39 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -21,7 +21,6 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
@@ -168,17 +167,16 @@
      * Inserts an entry into the queue.
      * @param queueName queue name
      * @param entry queue entry
-     * @return set of nodes to notify about the queue update
+     * @return void future
      */
-    CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry);
+    CompletableFuture<Void> queuePush(String queueName, byte[] entry);
 
     /**
      * Removes an entry from the queue if the queue is non-empty.
      * @param queueName queue name
-     * @param nodeId If the queue is empty the identifier of node to notify when an entry becomes available
-     * @return entry. Can be null if queue is empty
+     * @return entry future. Can be completed with null if queue is empty
      */
-    CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId);
+    CompletableFuture<byte[]> queuePop(String queueName);
 
     /**
      * Returns but does not remove an entry from the queue.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 8b6db1e..b3dd1c4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -21,7 +21,6 @@
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
@@ -93,10 +92,10 @@
   byte[] queuePeek(String queueName);
 
   @Command
-  byte[] queuePop(String queueName, NodeId requestor);
+  byte[] queuePop(String queueName);
 
   @Command
-  Set<NodeId> queuePush(String queueName, byte[] entry);
+  void queuePush(String queueName, byte[] entry);
 
   @Query
   Long counterGet(String counterName);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index c9311c9..9082ba6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -47,7 +47,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP_UPDATE;
 import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -122,7 +122,7 @@
         this.purgeOnUninstall = purgeOnUninstall;
         this.database.registerConsumer(update -> {
             SharedExecutors.getSingleThreadExecutor().execute(() -> {
-                if (update.target() == MAP) {
+                if (update.target() == MAP_UPDATE) {
                     Result<UpdateResult<String, byte[]>> result = update.output();
                     if (result.success() && result.value().mapName().equals(name)) {
                         MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index ba0b1be..4d9776e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -30,7 +30,6 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
 
@@ -159,13 +158,13 @@
     }
 
     @Override
-    public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+    public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
         return checkOpen(() -> proxy.queuePush(queueName, entry));
     }
 
     @Override
-    public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
-        return checkOpen(() -> proxy.queuePop(queueName, nodeId));
+    public CompletableFuture<byte[]> queuePop(String queueName) {
+        return checkOpen(() -> proxy.queuePop(queueName));
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 219b847..9d3505b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -18,7 +18,6 @@
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -27,7 +26,6 @@
 import java.util.stream.Collectors;
 import java.util.Set;
 
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
@@ -48,7 +46,6 @@
     private Map<String, AtomicLong> counters;
     private Map<String, Map<String, Versioned<byte[]>>> maps;
     private Map<String, Queue<byte[]>> queues;
-    private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
 
     /**
      * This locks map has a structure similar to the "tables" map above and
@@ -85,11 +82,6 @@
             queues = Maps.newConcurrentMap();
             context.put("queues", queues);
         }
-        queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
-        if (queueUpdateNotificationTargets == null) {
-            queueUpdateNotificationTargets = Maps.newConcurrentMap();
-            context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
-        }
         nextVersion = context.get("nextVersion");
         if (nextVersion == null) {
             nextVersion = new Long(0);
@@ -214,27 +206,17 @@
 
     @Override
     public byte[] queuePeek(String queueName) {
-        Queue<byte[]> queue = getQueue(queueName);
-        return queue.peek();
+        return getQueue(queueName).peek();
     }
 
     @Override
-    public byte[] queuePop(String queueName, NodeId requestor) {
-        Queue<byte[]> queue = getQueue(queueName);
-        if (queue.size() == 0 && requestor != null) {
-            getQueueUpdateNotificationTargets(queueName).add(requestor);
-            return null;
-        } else {
-            return queue.remove();
-        }
+    public byte[] queuePop(String queueName) {
+        return getQueue(queueName).poll();
     }
 
     @Override
-    public Set<NodeId> queuePush(String queueName, byte[] entry) {
-        getQueue(queueName).add(entry);
-        Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
-        getQueueUpdateNotificationTargets(queueName).clear();
-        return notifyList;
+    public void queuePush(String queueName, byte[] entry) {
+        getQueue(queueName).offer(entry);
     }
 
     @Override
@@ -289,10 +271,6 @@
         return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
     }
 
-    private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
-        return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
-    }
-
     private boolean isUpdatePossible(DatabaseUpdate update) {
         Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
         switch (update.type()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
index c27774a..e4b2641 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -17,15 +17,16 @@
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
-import org.onosproject.cluster.NodeId;
+
+import org.onlab.util.SharedExecutors;
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.Serializer;
 
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
 
 /**
  * DistributedQueue implementation that provides FIFO ordering semantics.
@@ -37,9 +38,7 @@
     private final String name;
     private final Database database;
     private final Serializer serializer;
-    private final NodeId localNodeId;
     private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
-    private final Consumer<Set<NodeId>> notifyConsumers;
 
     private static final String PRIMITIVE_NAME = "distributedQueue";
     private static final String SIZE = "size";
@@ -53,66 +52,59 @@
     public DefaultDistributedQueue(String name,
                                    Database database,
                                    Serializer serializer,
-                                   NodeId localNodeId,
-                                   boolean meteringEnabled,
-                                   Consumer<Set<NodeId>> notifyConsumers) {
+                                   boolean meteringEnabled) {
         this.name = checkNotNull(name, "queue name cannot be null");
         this.database = checkNotNull(database, "database cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
-        this.localNodeId = localNodeId;
-        this.notifyConsumers = notifyConsumers;
         this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
-
+        this.database.registerConsumer(update -> {
+            SharedExecutors.getSingleThreadExecutor().execute(() -> {
+                if (update.target() == QUEUE_PUSH) {
+                    List<Object> input = update.input();
+                    String queueName = (String) input.get(0);
+                    if (queueName.equals(name)) {
+                        tryPoll();
+                    }
+                }
+            });
+        });
     }
 
     @Override
     public long size() {
         final MeteringAgent.Context timer = monitor.startTimer(SIZE);
-        try {
-            return Futures.getUnchecked(database.queueSize(name));
-        } finally {
-            timer.stop();
-        }
+        return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop()));
     }
 
     @Override
     public void push(E entry) {
+        checkNotNull(entry, ERROR_NULL_ENTRY);
         final MeteringAgent.Context timer = monitor.startTimer(PUSH);
-        try {
-            checkNotNull(entry, ERROR_NULL_ENTRY);
-            Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
-                                         .thenAccept(notifyConsumers)
-                                         .thenApply(v -> null));
-        } finally {
-            timer.stop();
-        }
+        Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
+                                     .whenComplete((r, e) -> timer.stop()));
     }
 
     @Override
     public CompletableFuture<E> pop() {
         final MeteringAgent.Context timer = monitor.startTimer(POP);
-        return database.queuePop(name, localNodeId)
+        return database.queuePop(name)
+                       .whenComplete((r, e) -> timer.stop())
                        .thenCompose(v -> {
                            if (v != null) {
                                return CompletableFuture.completedFuture(serializer.decode(v));
-                           } else {
-                               CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
-                               pendingFutures.add(newPendingFuture);
-                               return newPendingFuture;
                            }
-                       })
-                .whenComplete((r, e) -> timer.stop());
+                           CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
+                           pendingFutures.add(newPendingFuture);
+                           return newPendingFuture;
+                       });
     }
 
     @Override
     public E peek() {
         final MeteringAgent.Context timer = monitor.startTimer(PEEK);
-        try {
-            return Futures.getUnchecked(database.queuePeek(name)
-                                                .thenApply(v -> v != null ? serializer.decode(v) : null));
-        } finally {
-            timer.stop();
-        }
+        return Futures.getUnchecked(database.queuePeek(name)
+                                            .thenApply(v -> v != null ? serializer.<E>decode(v) : null)
+                                            .whenComplete((r, e) -> timer.stop()));
     }
 
     public String name() {
@@ -122,7 +114,7 @@
     protected void tryPoll() {
         Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
         for (CompletableFuture<E> future : pendingFutures) {
-            E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
+            E entry = Futures.getUnchecked(database.queuePop(name)
                                                    .thenApply(v -> v != null ? serializer.decode(v) : null));
             if (entry != null) {
                 future.complete(entry);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
index b463aff..d6654e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
@@ -15,15 +15,10 @@
  */
 package org.onosproject.store.consistent.impl;
 
-import com.google.common.base.Charsets;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.DistributedQueueBuilder;
 import org.onosproject.store.service.Serializer;
 
-import java.util.Set;
-import java.util.function.Consumer;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -40,8 +35,7 @@
     private final DatabaseManager databaseManager;
     private boolean metering = true;
 
-    public DefaultDistributedQueueBuilder(
-            DatabaseManager databaseManager) {
+    public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) {
         this.databaseManager = databaseManager;
     }
 
@@ -78,18 +72,10 @@
     @Override
     public DistributedQueue<E> build() {
         checkState(validInputs());
-        Consumer<Set<NodeId>> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name,
-                        DatabaseManager.QUEUE_UPDATED_TOPIC,
-                        s -> s.getBytes(Charsets.UTF_8),
-                        nodes);
-        DefaultDistributedQueue<E> queue = new DefaultDistributedQueue<>(
+        return new DefaultDistributedQueue<>(
                 name,
                 persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
                 serializer,
-                databaseManager.localNodeId,
-                metering,
-                notifyOthers);
-        databaseManager.registerQueue(queue);
-        return queue;
+                metering);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 09b3f59..a294681 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -28,7 +28,6 @@
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.DatabaseUpdate;
 import org.onosproject.store.service.Transaction;
 import org.onosproject.store.service.Versioned;
@@ -229,15 +228,15 @@
     }
 
     @Override
-    public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+    public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
     }
 
     @Override
-    public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
+    public CompletableFuture<byte[]> queuePop(String queueName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
-        return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
+        return partitioner.getPartition(queueName, queueName).queuePop(queueName);
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
index 9b064b0..72356d0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/StateMachineUpdate.java
@@ -29,7 +29,7 @@
         /**
          * Update is for a map.
          */
-        MAP,
+        MAP_UPDATE,
 
         /**
          * Update is a transaction commit.
@@ -37,7 +37,12 @@
         TX_COMMIT,
 
         /**
-         * Update is for a non-map data structure.
+         * Update is a queue push.
+         */
+        QUEUE_PUSH,
+
+        /**
+         * Update is for some other operation.
          */
         OTHER
     }
@@ -55,9 +60,11 @@
     public Target target() {
         // FIXME: This check is brittle
         if (operationName.contains("mapUpdate")) {
-            return Target.MAP;
+            return Target.MAP_UPDATE;
         } else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
             return Target.TX_COMMIT;
+        } else if (operationName.contains("queuePush")) {
+            return Target.QUEUE_PUSH;
         } else {
             return Target.OTHER;
         }