blob: c58e41153c4d33b0b61a14fd42f0c1db932545a5 [file] [log] [blame]
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.hz;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import com.hazelcast.monitor.LocalQueueStats;
import org.onlab.onos.store.serializers.StoreSerializer;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
// TODO: implementation is incomplete
/**
* Wrapper around IQueue<byte[]> which serializes/deserializes
* key and value using StoreSerializer.
*
* @param <T> type
*/
public class SQueue<T> implements IQueue<T> {
private final IQueue<byte[]> q;
private final StoreSerializer serializer;
/**
* Creates a SQueue instance.
*
* @param baseQueue base IQueue to use
* @param serializer serializer to use for both key and value
*/
public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
this.q = checkNotNull(baseQueue);
this.serializer = checkNotNull(serializer);
}
private byte[] serialize(Object key) {
return serializer.encode(key);
}
private T deserialize(byte[] key) {
return serializer.decode(key);
}
@Override
public boolean add(T t) {
return q.add(serialize(t));
}
@Override
public boolean offer(T t) {
return q.offer(serialize(t));
}
@Override
public void put(T t) throws InterruptedException {
q.put(serialize(t));
}
@Override
public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
return q.offer(serialize(t), l, timeUnit);
}
@Override
public T take() throws InterruptedException {
return deserialize(q.take());
}
@Override
public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
return deserialize(q.poll(l, timeUnit));
}
@Override
public int remainingCapacity() {
return q.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return q.remove(serialize(o));
}
@Override
public boolean contains(Object o) {
return q.contains(serialize(o));
}
@Deprecated // not implemented yet
@Override
public int drainTo(Collection<? super T> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public int drainTo(Collection<? super T> collection, int i) {
throw new UnsupportedOperationException();
}
@Override
public T remove() {
return deserialize(q.remove());
}
@Override
public T poll() {
return deserialize(q.poll());
}
@Override
public T element() {
return deserialize(q.element());
}
@Override
public T peek() {
return deserialize(q.peek());
}
@Override
public int size() {
return q.size();
}
@Override
public boolean isEmpty() {
return q.isEmpty();
}
@Override
public Iterator<T> iterator() {
return FluentIterable.from(q)
.transform(new DeserializeVal())
.iterator();
}
@Deprecated // not implemented yet
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public <T1> T1[] toArray(T1[] t1s) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean containsAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean addAll(Collection<? extends T> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean removeAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean retainAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
q.clear();
}
@Override
public LocalQueueStats getLocalQueueStats() {
return q.getLocalQueueStats();
}
@Override
public String addItemListener(ItemListener<T> itemListener, boolean withValue) {
ItemListener<byte[]> il = new ItemListener<byte[]>() {
@Override
public void itemAdded(ItemEvent<byte[]> item) {
itemListener.itemAdded(new ItemEvent<T>(getName(item),
item.getEventType(),
deserialize(item.getItem()),
item.getMember()));
}
@Override
public void itemRemoved(ItemEvent<byte[]> item) {
itemListener.itemRemoved(new ItemEvent<T>(getName(item),
item.getEventType(),
deserialize(item.getItem()),
item.getMember()));
}
private String getName(ItemEvent<byte[]> item) {
return (item.getSource() instanceof String) ?
(String) item.getSource() : item.getSource().toString();
}
};
return q.addItemListener(il, withValue);
}
@Override
public boolean removeItemListener(String registrationId) {
return q.removeItemListener(registrationId);
}
@Deprecated
@Override
public Object getId() {
return q.getId();
}
@Override
public String getPartitionKey() {
return q.getPartitionKey();
}
@Override
public String getName() {
return q.getName();
}
@Override
public String getServiceName() {
return q.getServiceName();
}
@Override
public void destroy() {
q.destroy();
}
private final class DeserializeVal implements Function<byte[], T> {
@Override
public T apply(byte[] input) {
return deserialize(input);
}
}
}