blob: 135312d6cb97d49e0ec8800d53cf5c43a4c304e7 [file] [log] [blame]
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.hz;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.EntryView;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapEvent;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.aggregation.Aggregation;
import com.hazelcast.mapreduce.aggregation.Supplier;
import com.hazelcast.monitor.LocalMapStats;
import com.hazelcast.query.Predicate;
// TODO: implement Predicate, etc. if we need them.
/**
* Wrapper around IMap<byte[], byte[]> which serializes/deserializes
* key and value using StoreSerializer.
*
* @param <K> key type
* @param <V> value type
*/
public class SMap<K, V> implements IMap<K, V> {
private final IMap<byte[], byte[]> m;
private final StoreSerializer serializer;
/**
* Creates a SMap instance.
*
* @param baseMap base IMap to use
* @param serializer serializer to use for both key and value
*/
public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
this.m = checkNotNull(baseMap);
this.serializer = checkNotNull(serializer);
}
@Override
public int size() {
return m.size();
}
@Override
public boolean isEmpty() {
return m.isEmpty();
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {
Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
}
m.putAll(sm);
}
@Deprecated
@Override
public Object getId() {
return m.getId();
}
@Override
public String getPartitionKey() {
return m.getPartitionKey();
}
@Override
public String getName() {
return m.getName();
}
@Override
public String getServiceName() {
return m.getServiceName();
}
@Override
public void destroy() {
m.destroy();
}
@Override
public boolean containsKey(Object key) {
return m.containsKey(serializeKey(key));
}
@Override
public boolean containsValue(Object value) {
return m.containsValue(serializeVal(value));
}
@Override
public V get(Object key) {
return deserializeVal(m.get(serializeKey(key)));
}
@Override
public V put(K key, V value) {
return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
}
@Override
public V remove(Object key) {
return deserializeVal(m.remove(serializeKey(key)));
}
@Override
public boolean remove(Object key, Object value) {
return m.remove(serializeKey(key), serializeVal(value));
}
@Override
public void delete(Object key) {
m.delete(serializeKey(key));
}
@Override
public void flush() {
m.flush();
}
@Override
public Map<K, V> getAll(Set<K> keys) {
Set<byte[]> sk = serializeKeySet(keys);
Map<byte[], byte[]> bm = m.getAll(sk);
Map<K, V> dsm = new HashMap<>(bm.size());
for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
}
return dsm;
}
@Override
public void loadAll(boolean replaceExistingValues) {
m.loadAll(replaceExistingValues);
}
@Override
public void loadAll(Set<K> keys, boolean replaceExistingValues) {
Set<byte[]> sk = serializeKeySet(keys);
m.loadAll(sk, replaceExistingValues);
}
@Override
public void clear() {
m.clear();
}
@Override
public Future<V> getAsync(K key) {
Future<byte[]> f = m.getAsync(serializeKey(key));
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public Future<V> putAsync(K key, V value) {
Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public Future<V> removeAsync(K key) {
Future<byte[]> f = m.removeAsync(serializeKey(key));
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
return m.tryRemove(serializeKey(key), timeout, timeunit);
}
@Override
public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
}
@Override
public V put(K key, V value, long ttl, TimeUnit timeunit) {
return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
}
@Override
public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
}
@Override
public V putIfAbsent(K key, V value) {
return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
}
@Override
public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
}
@Override
public V replace(K key, V value) {
return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
}
@Override
public void set(K key, V value) {
m.set(serializeKey(key), serializeVal(value));
}
@Override
public void set(K key, V value, long ttl, TimeUnit timeunit) {
m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
}
@Override
public void lock(K key) {
m.lock(serializeKey(key));
}
@Override
public void lock(K key, long leaseTime, TimeUnit timeUnit) {
m.lock(serializeKey(key), leaseTime, timeUnit);
}
@Override
public boolean isLocked(K key) {
return m.isLocked(serializeKey(key));
}
@Override
public boolean tryLock(K key) {
return m.tryLock(serializeKey(key));
}
@Override
public boolean tryLock(K key, long time, TimeUnit timeunit)
throws InterruptedException {
return m.tryLock(serializeKey(key), time, timeunit);
}
@Override
public void unlock(K key) {
m.unlock(serializeKey(key));
}
@Override
public void forceUnlock(K key) {
m.forceUnlock(serializeKey(key));
}
@Override
public String addLocalEntryListener(EntryListener<K, V> listener) {
return m.addLocalEntryListener(new BaseEntryListener(listener));
}
@Deprecated // marking method not implemented
@Override
public String addLocalEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public String addLocalEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, K key, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public String addInterceptor(MapInterceptor interceptor) {
throw new UnsupportedOperationException();
}
@Override
public void removeInterceptor(String id) {
m.removeInterceptor(id);
}
@Override
public String addEntryListener(EntryListener<K, V> listener,
boolean includeValue) {
return m.addEntryListener(new BaseEntryListener(listener), includeValue);
}
@Override
public boolean removeEntryListener(String id) {
return m.removeEntryListener(id);
}
@Override
public String addEntryListener(EntryListener<K, V> listener, K key,
boolean includeValue) {
return m.addEntryListener(new BaseEntryListener(listener),
serializeKey(key), includeValue);
}
@Deprecated // marking method not implemented
@Override
public String addEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public String addEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, K key, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public EntryView<K, V> getEntryView(K key) {
throw new UnsupportedOperationException();
}
@Override
public boolean evict(K key) {
return m.evict(serializeKey(key));
}
@Override
public void evictAll() {
m.evictAll();
}
@Override
public Set<K> keySet() {
return deserializeKeySet(m.keySet());
}
@Override
public Collection<V> values() {
return deserializeVal(m.values());
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
return deserializeEntrySet(m.entrySet());
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Set<K> keySet(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Collection<V> values(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Override
public Set<K> localKeySet() {
return deserializeKeySet(m.localKeySet());
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Set<K> localKeySet(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public void addIndex(String attribute, boolean ordered) {
throw new UnsupportedOperationException();
}
@Override
public LocalMapStats getLocalMapStats() {
return m.getLocalMapStats();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Object executeOnKey(K key, EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Map<K, Object> executeOnKeys(Set<K> keys,
EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public void submitToKey(K key, EntryProcessor entryProcessor,
ExecutionCallback callback) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Future submitToKey(K key, EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public <SuppliedValue, Result> Result aggregate(
Supplier<K, V, SuppliedValue> supplier,
Aggregation<K, SuppliedValue, Result> aggregation) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public <SuppliedValue, Result> Result aggregate(
Supplier<K, V, SuppliedValue> supplier,
Aggregation<K, SuppliedValue, Result> aggregation,
JobTracker jobTracker) {
throw new UnsupportedOperationException();
}
private byte[] serializeKey(Object key) {
return serializer.encode(key);
}
private K deserializeKey(byte[] key) {
return serializer.decode(key);
}
private byte[] serializeVal(Object val) {
return serializer.encode(val);
}
private V deserializeVal(byte[] val) {
if (val == null) {
return null;
}
return serializer.decode(val.clone());
}
private Set<byte[]> serializeKeySet(Set<K> keys) {
Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
for (K key : keys) {
sk.add(serializeKey(key));
}
return sk;
}
private Set<K> deserializeKeySet(Set<byte[]> keys) {
Set<K> dsk = new HashSet<>(keys.size());
for (byte[] key : keys) {
dsk.add(deserializeKey(key));
}
return dsk;
}
private Collection<V> deserializeVal(Collection<byte[]> vals) {
Collection<V> dsl = new ArrayList<>(vals.size());
for (byte[] val : vals) {
dsl.add(deserializeVal(val));
}
return dsl;
}
private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
Set<java.util.Map.Entry<byte[], byte[]>> entries) {
Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
dse.add(Pair.of(deserializeKey(entry.getKey()),
deserializeVal(entry.getValue())));
}
return dse;
}
private final class BaseEntryListener
implements EntryListener<byte[], byte[]> {
private final EntryListener<K, V> listener;
public BaseEntryListener(EntryListener<K, V> listener) {
this.listener = listener;
}
@Override
public void mapEvicted(MapEvent event) {
listener.mapEvicted(event);
}
@Override
public void mapCleared(MapEvent event) {
listener.mapCleared(event);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
deserializeVal(event.getOldValue()),
deserializeVal(event.getValue()));
listener.entryUpdated(evt);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
deserializeVal(event.getOldValue()),
null);
listener.entryRemoved(evt);
}
@Override
public void entryEvicted(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
deserializeVal(event.getOldValue()),
deserializeVal(event.getValue()));
listener.entryEvicted(evt);
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
null,
deserializeVal(event.getValue()));
listener.entryAdded(evt);
}
}
private final class DeserializeVal implements Function<byte[], V> {
@Override
public V apply(byte[] input) {
return deserializeVal(input);
}
}
}