Initial implementation of distributed intent batch queue

Change-Id: I7ffed03651569ade1be1e8dca905bfaf369b7e03
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/SQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SQueue.java
new file mode 100644
index 0000000..f560cd2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SQueue.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.store.hz;
+
+import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ItemListener;
+import com.hazelcast.monitor.LocalQueueStats;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+// TODO: implementation is incomplete
+
+/**
+ * Wrapper around IQueue<byte[]> which serializes/deserializes
+ * key and value using StoreSerializer.
+ *
+ * @param <T> type
+ */
+public class SQueue<T> implements IQueue<T> {
+
+    private final IQueue<byte[]> q;
+    private final StoreSerializer serializer;
+
+    /**
+     * Creates a SQueue instance.
+     *
+     * @param baseQueue base IQueue to use
+     * @param serializer serializer to use for both key and value
+     */
+    public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
+        this.q = checkNotNull(baseQueue);
+        this.serializer = checkNotNull(serializer);
+    }
+
+    private byte[] serialize(Object key) {
+        return serializer.encode(key);
+    }
+
+    private T deserialize(byte[] key) {
+        return serializer.decode(key);
+    }
+
+    @Override
+    public boolean add(T t) {
+        return q.add(serialize(t));
+    }
+
+    @Override
+    public boolean offer(T t) {
+        return q.offer(serialize(t));
+    }
+
+    @Override
+    public void put(T t) throws InterruptedException {
+        q.put(serialize(t));
+    }
+
+    @Override
+    public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
+        return q.offer(serialize(t), l, timeUnit);
+    }
+
+    @Override
+    public T take() throws InterruptedException {
+        return deserialize(q.take());
+    }
+
+    @Override
+    public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
+        return deserialize(q.poll(l, timeUnit));
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return q.remainingCapacity();
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        return q.remove(serialize(o));
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return q.contains(serialize(o));
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> collection) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> collection, int i) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public T remove() {
+        return deserialize(q.remove());
+    }
+
+    @Override
+    public T poll() {
+        return deserialize(q.poll());
+    }
+
+    @Override
+    public T element() {
+        return deserialize(q.element());
+    }
+
+    @Override
+    public T peek() {
+        return deserialize(q.peek());
+    }
+
+    @Override
+    public int size() {
+        return q.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return q.isEmpty();
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T1> T1[] toArray(T1[] t1s) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> collection) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends T> collection) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> collection) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> collection) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+        q.clear();
+    }
+
+    @Override
+    public LocalQueueStats getLocalQueueStats() {
+        return q.getLocalQueueStats();
+    }
+
+    @Override
+    public String addItemListener(ItemListener<T> itemListener, boolean b) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeItemListener(String s) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Deprecated
+    @Override
+    public Object getId() {
+        return q.getId();
+    }
+
+    @Override
+    public String getPartitionKey() {
+        return q.getPartitionKey();
+    }
+
+    @Override
+    public String getName() {
+        return q.getName();
+    }
+
+    @Override
+    public String getServiceName() {
+        return q.getServiceName();
+    }
+
+    @Override
+    public void destroy() {
+        q.destroy();
+    }
+}