Wrapper around Hazelcast IMap
Change-Id: I43353b97483353be83b49214970a5fceedaea980
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
new file mode 100644
index 0000000..93a7b0d
--- /dev/null
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
@@ -0,0 +1,612 @@
+package org.onlab.onos.store.common;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.EntryView;
+import com.hazelcast.core.ExecutionCallback;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.MapEvent;
+import com.hazelcast.map.EntryProcessor;
+import com.hazelcast.map.MapInterceptor;
+import com.hazelcast.mapreduce.JobTracker;
+import com.hazelcast.mapreduce.aggregation.Aggregation;
+import com.hazelcast.mapreduce.aggregation.Supplier;
+import com.hazelcast.monitor.LocalMapStats;
+import com.hazelcast.query.Predicate;
+
+// TODO: implement Predicate, etc. if we need them.
+/**
+ * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
+ * Key and Value using StoreSerializer.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class SMap<K, V> implements IMap<K, V> {
+
+ private final IMap<byte[], byte[]> m;
+ private final StoreSerializer serializer;
+
+ /**
+ * Creates a SMap instance.
+ *
+ * @param baseMap base IMap to use
+ * @param serializer serializer to use for both key and value
+ */
+ public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
+ this.m = checkNotNull(baseMap);
+ this.serializer = checkNotNull(serializer);
+ }
+
+ @Override
+ public int size() {
+ return m.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return m.isEmpty();
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> map) {
+ Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
+ for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
+ sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
+ }
+ m.putAll(sm);
+ }
+
+ @Deprecated
+ @Override
+ public Object getId() {
+ return m.getId();
+ }
+
+ @Override
+ public String getPartitionKey() {
+ return m.getPartitionKey();
+ }
+
+ @Override
+ public String getName() {
+ return m.getName();
+ }
+
+ @Override
+ public String getServiceName() {
+ return m.getServiceName();
+ }
+
+ @Override
+ public void destroy() {
+ m.destroy();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return m.containsKey(serializeKey(key));
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return m.containsValue(serializeVal(value));
+ }
+
+ @Override
+ public V get(Object key) {
+ return deserializeVal(m.get(serializeKey(key)));
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
+ }
+
+ @Override
+ public V remove(Object key) {
+ return deserializeVal(m.remove(serializeKey(key)));
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ return m.remove(serializeKey(key), serializeVal(value));
+ }
+
+ @Override
+ public void delete(Object key) {
+ m.delete(serializeKey(key));
+ }
+
+ @Override
+ public void flush() {
+ m.flush();
+ }
+
+ @Override
+ public Map<K, V> getAll(Set<K> keys) {
+ Set<byte[]> sk = serializeKeySet(keys);
+ Map<byte[], byte[]> bm = m.getAll(sk);
+ Map<K, V> dsm = new HashMap<>(bm.size());
+ for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
+ dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
+ }
+ return dsm;
+ }
+
+ @Override
+ public void loadAll(boolean replaceExistingValues) {
+ m.loadAll(replaceExistingValues);
+ }
+
+ @Override
+ public void loadAll(Set<K> keys, boolean replaceExistingValues) {
+ Set<byte[]> sk = serializeKeySet(keys);
+ m.loadAll(sk, replaceExistingValues);
+ }
+
+ @Override
+ public void clear() {
+ m.clear();
+ }
+
+ @Override
+ public Future<V> getAsync(K key) {
+ Future<byte[]> f = m.getAsync(serializeKey(key));
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public Future<V> putAsync(K key, V value) {
+ Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
+ Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public Future<V> removeAsync(K key) {
+ Future<byte[]> f = m.removeAsync(serializeKey(key));
+ return Futures.lazyTransform(f, new DeserializeVal());
+ }
+
+ @Override
+ public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
+ return m.tryRemove(serializeKey(key), timeout, timeunit);
+ }
+
+ @Override
+ public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
+ return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
+ }
+
+ @Override
+ public V put(K key, V value, long ttl, TimeUnit timeunit) {
+ return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
+ }
+
+ @Override
+ public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
+ m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
+ return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
+ }
+
+ @Override
+ public V replace(K key, V value) {
+ return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
+ }
+
+ @Override
+ public void set(K key, V value) {
+ m.set(serializeKey(key), serializeVal(value));
+ }
+
+ @Override
+ public void set(K key, V value, long ttl, TimeUnit timeunit) {
+ m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
+ }
+
+ @Override
+ public void lock(K key) {
+ m.lock(serializeKey(key));
+ }
+
+ @Override
+ public void lock(K key, long leaseTime, TimeUnit timeUnit) {
+ m.lock(serializeKey(key), leaseTime, timeUnit);
+ }
+
+ @Override
+ public boolean isLocked(K key) {
+ return m.isLocked(serializeKey(key));
+ }
+
+ @Override
+ public boolean tryLock(K key) {
+ return m.tryLock(serializeKey(key));
+ }
+
+ @Override
+ public boolean tryLock(K key, long time, TimeUnit timeunit)
+ throws InterruptedException {
+ return m.tryLock(serializeKey(key), time, timeunit);
+ }
+
+ @Override
+ public void unlock(K key) {
+ m.unlock(serializeKey(key));
+ }
+
+ @Override
+ public void forceUnlock(K key) {
+ m.forceUnlock(serializeKey(key));
+ }
+
+ @Override
+ public String addLocalEntryListener(EntryListener<K, V> listener) {
+ return m.addLocalEntryListener(new BaseEntryListener(listener));
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addLocalEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addLocalEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, K key, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addInterceptor(MapInterceptor interceptor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeInterceptor(String id) {
+ m.removeInterceptor(id);
+ }
+
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener,
+ boolean includeValue) {
+ return m.addEntryListener(new BaseEntryListener(listener), includeValue);
+ }
+
+ @Override
+ public boolean removeEntryListener(String id) {
+ return m.removeEntryListener(id);
+ }
+
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener, K key,
+ boolean includeValue) {
+ return m.addEntryListener(new BaseEntryListener(listener),
+ serializeKey(key), includeValue);
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public String addEntryListener(EntryListener<K, V> listener,
+ Predicate<K, V> predicate, K key, boolean includeValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public EntryView<K, V> getEntryView(K key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean evict(K key) {
+ return m.evict(serializeKey(key));
+ }
+
+ @Override
+ public void evictAll() {
+ m.evictAll();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return deserializeKeySet(m.keySet());
+ }
+
+ @Override
+ public Collection<V> values() {
+ return deserializeVal(m.values());
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<K, V>> entrySet() {
+ return deserializeEntrySet(m.entrySet());
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<K> keySet(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Collection<V> values(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<K> localKeySet() {
+ return deserializeKeySet(m.localKeySet());
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Set<K> localKeySet(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public void addIndex(String attribute, boolean ordered) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LocalMapStats getLocalMapStats() {
+ return m.getLocalMapStats();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Object executeOnKey(K key, EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Map<K, Object> executeOnKeys(Set<K> keys,
+ EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void submitToKey(K key, EntryProcessor entryProcessor,
+ ExecutionCallback callback) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Future submitToKey(K key, EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
+ Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public <SuppliedValue, Result> Result aggregate(
+ Supplier<K, V, SuppliedValue> supplier,
+ Aggregation<K, SuppliedValue, Result> aggregation) {
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Deprecated // marking method not implemented
+ @Override
+ public <SuppliedValue, Result> Result aggregate(
+ Supplier<K, V, SuppliedValue> supplier,
+ Aggregation<K, SuppliedValue, Result> aggregation,
+ JobTracker jobTracker) {
+
+ throw new UnsupportedOperationException();
+ }
+
+ private byte[] serializeKey(Object key) {
+ return serializer.encode(key);
+ }
+
+ private K deserializeKey(byte[] key) {
+ return serializer.decode(key);
+ }
+
+ private byte[] serializeVal(Object val) {
+ return serializer.encode(val);
+ }
+
+ private V deserializeVal(byte[] val) {
+ return serializer.decode(val);
+ }
+
+ private Set<byte[]> serializeKeySet(Set<K> keys) {
+ Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
+ for (K key : keys) {
+ sk.add(serializeKey(key));
+ }
+ return sk;
+ }
+
+ private Set<K> deserializeKeySet(Set<byte[]> keys) {
+ Set<K> dsk = new HashSet<>(keys.size());
+ for (byte[] key : keys) {
+ dsk.add(deserializeKey(key));
+ }
+ return dsk;
+ }
+
+ private Collection<V> deserializeVal(Collection<byte[]> vals) {
+ Collection<V> dsl = new ArrayList<>(vals.size());
+ for (byte[] val : vals) {
+ dsl.add(deserializeVal(val));
+ }
+ return dsl;
+ }
+
+ private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
+ Set<java.util.Map.Entry<byte[], byte[]>> entries) {
+
+ Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
+ for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
+ dse.add(Pair.of(deserializeKey(entry.getKey()),
+ deserializeVal(entry.getValue())));
+ }
+ return dse;
+ }
+
+ private final class BaseEntryListener
+ implements EntryListener<byte[], byte[]> {
+
+ private final EntryListener<K, V> listener;
+
+ public BaseEntryListener(EntryListener<K, V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void mapEvicted(MapEvent event) {
+ listener.mapEvicted(event);
+ }
+
+ @Override
+ public void mapCleared(MapEvent event) {
+ listener.mapCleared(event);
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ deserializeVal(event.getOldValue()),
+ deserializeVal(event.getValue()));
+
+ listener.entryUpdated(evt);
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ deserializeVal(event.getOldValue()),
+ null);
+
+ listener.entryRemoved(evt);
+ }
+
+ @Override
+ public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ deserializeVal(event.getOldValue()),
+ deserializeVal(event.getValue()));
+
+ listener.entryEvicted(evt);
+ }
+
+ @Override
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ EntryEvent<K, V> evt = new EntryEvent<K, V>(
+ event.getSource(),
+ event.getMember(),
+ event.getEventType().getType(),
+ deserializeKey(event.getKey()),
+ null,
+ deserializeVal(event.getValue()));
+
+ listener.entryAdded(evt);
+ }
+ }
+
+ private final class DeserializeVal implements Function<byte[], V> {
+ @Override
+ public V apply(byte[] input) {
+ return deserializeVal(input);
+ }
+ }
+
+}