blob: 6dd4bfbd340b0191bf8bedd7f9a0a4d705bc0fd5 [file] [log] [blame]
Yuta HIGUCHIa97b5e42014-10-13 14:04:58 -07001package org.onlab.onos.store.common;
2
3import static com.google.common.base.Preconditions.checkNotNull;
4
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.HashMap;
9import java.util.HashSet;
10import java.util.IdentityHashMap;
11import java.util.Map;
12import java.util.Set;
13import java.util.concurrent.Future;
14import java.util.concurrent.TimeUnit;
15
16import org.apache.commons.lang3.tuple.Pair;
17import org.onlab.onos.store.serializers.StoreSerializer;
18
19import com.google.common.base.Function;
20import com.google.common.util.concurrent.Futures;
21import com.hazelcast.core.EntryEvent;
22import com.hazelcast.core.EntryListener;
23import com.hazelcast.core.EntryView;
24import com.hazelcast.core.ExecutionCallback;
25import com.hazelcast.core.IMap;
26import com.hazelcast.core.MapEvent;
27import com.hazelcast.map.EntryProcessor;
28import com.hazelcast.map.MapInterceptor;
29import com.hazelcast.mapreduce.JobTracker;
30import com.hazelcast.mapreduce.aggregation.Aggregation;
31import com.hazelcast.mapreduce.aggregation.Supplier;
32import com.hazelcast.monitor.LocalMapStats;
33import com.hazelcast.query.Predicate;
34
35// TODO: implement Predicate, etc. if we need them.
36/**
37 * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
38 * Key and Value using StoreSerializer.
39 *
40 * @param <K> key type
41 * @param <V> value type
42 */
43public class SMap<K, V> implements IMap<K, V> {
44
45 private final IMap<byte[], byte[]> m;
46 private final StoreSerializer serializer;
47
48 /**
49 * Creates a SMap instance.
50 *
51 * @param baseMap base IMap to use
52 * @param serializer serializer to use for both key and value
53 */
54 public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
55 this.m = checkNotNull(baseMap);
56 this.serializer = checkNotNull(serializer);
57 }
58
59 @Override
60 public int size() {
61 return m.size();
62 }
63
64 @Override
65 public boolean isEmpty() {
66 return m.isEmpty();
67 }
68
69 @Override
70 public void putAll(Map<? extends K, ? extends V> map) {
71 Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
72 for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
73 sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
74 }
75 m.putAll(sm);
76 }
77
78 @Deprecated
79 @Override
80 public Object getId() {
81 return m.getId();
82 }
83
84 @Override
85 public String getPartitionKey() {
86 return m.getPartitionKey();
87 }
88
89 @Override
90 public String getName() {
91 return m.getName();
92 }
93
94 @Override
95 public String getServiceName() {
96 return m.getServiceName();
97 }
98
99 @Override
100 public void destroy() {
101 m.destroy();
102 }
103
104 @Override
105 public boolean containsKey(Object key) {
106 return m.containsKey(serializeKey(key));
107 }
108
109 @Override
110 public boolean containsValue(Object value) {
111 return m.containsValue(serializeVal(value));
112 }
113
114 @Override
115 public V get(Object key) {
116 return deserializeVal(m.get(serializeKey(key)));
117 }
118
119 @Override
120 public V put(K key, V value) {
121 return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
122 }
123
124 @Override
125 public V remove(Object key) {
126 return deserializeVal(m.remove(serializeKey(key)));
127 }
128
129 @Override
130 public boolean remove(Object key, Object value) {
131 return m.remove(serializeKey(key), serializeVal(value));
132 }
133
134 @Override
135 public void delete(Object key) {
136 m.delete(serializeKey(key));
137 }
138
139 @Override
140 public void flush() {
141 m.flush();
142 }
143
144 @Override
145 public Map<K, V> getAll(Set<K> keys) {
146 Set<byte[]> sk = serializeKeySet(keys);
147 Map<byte[], byte[]> bm = m.getAll(sk);
148 Map<K, V> dsm = new HashMap<>(bm.size());
149 for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
150 dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
151 }
152 return dsm;
153 }
154
155 @Override
156 public void loadAll(boolean replaceExistingValues) {
157 m.loadAll(replaceExistingValues);
158 }
159
160 @Override
161 public void loadAll(Set<K> keys, boolean replaceExistingValues) {
162 Set<byte[]> sk = serializeKeySet(keys);
163 m.loadAll(sk, replaceExistingValues);
164 }
165
166 @Override
167 public void clear() {
168 m.clear();
169 }
170
171 @Override
172 public Future<V> getAsync(K key) {
173 Future<byte[]> f = m.getAsync(serializeKey(key));
174 return Futures.lazyTransform(f, new DeserializeVal());
175 }
176
177 @Override
178 public Future<V> putAsync(K key, V value) {
179 Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
180 return Futures.lazyTransform(f, new DeserializeVal());
181 }
182
183 @Override
184 public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
185 Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
186 return Futures.lazyTransform(f, new DeserializeVal());
187 }
188
189 @Override
190 public Future<V> removeAsync(K key) {
191 Future<byte[]> f = m.removeAsync(serializeKey(key));
192 return Futures.lazyTransform(f, new DeserializeVal());
193 }
194
195 @Override
196 public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
197 return m.tryRemove(serializeKey(key), timeout, timeunit);
198 }
199
200 @Override
201 public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
202 return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
203 }
204
205 @Override
206 public V put(K key, V value, long ttl, TimeUnit timeunit) {
207 return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
208 }
209
210 @Override
211 public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
212 m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
213 }
214
215 @Override
216 public V putIfAbsent(K key, V value) {
217 return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
218 }
219
220 @Override
221 public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
222 return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
223 }
224
225 @Override
226 public boolean replace(K key, V oldValue, V newValue) {
227 return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
228 }
229
230 @Override
231 public V replace(K key, V value) {
232 return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
233 }
234
235 @Override
236 public void set(K key, V value) {
237 m.set(serializeKey(key), serializeVal(value));
238 }
239
240 @Override
241 public void set(K key, V value, long ttl, TimeUnit timeunit) {
242 m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
243 }
244
245 @Override
246 public void lock(K key) {
247 m.lock(serializeKey(key));
248 }
249
250 @Override
251 public void lock(K key, long leaseTime, TimeUnit timeUnit) {
252 m.lock(serializeKey(key), leaseTime, timeUnit);
253 }
254
255 @Override
256 public boolean isLocked(K key) {
257 return m.isLocked(serializeKey(key));
258 }
259
260 @Override
261 public boolean tryLock(K key) {
262 return m.tryLock(serializeKey(key));
263 }
264
265 @Override
266 public boolean tryLock(K key, long time, TimeUnit timeunit)
267 throws InterruptedException {
268 return m.tryLock(serializeKey(key), time, timeunit);
269 }
270
271 @Override
272 public void unlock(K key) {
273 m.unlock(serializeKey(key));
274 }
275
276 @Override
277 public void forceUnlock(K key) {
278 m.forceUnlock(serializeKey(key));
279 }
280
281 @Override
282 public String addLocalEntryListener(EntryListener<K, V> listener) {
283 return m.addLocalEntryListener(new BaseEntryListener(listener));
284 }
285
286 @Deprecated // marking method not implemented
287 @Override
288 public String addLocalEntryListener(EntryListener<K, V> listener,
289 Predicate<K, V> predicate, boolean includeValue) {
290 throw new UnsupportedOperationException();
291 }
292
293 @Deprecated // marking method not implemented
294 @Override
295 public String addLocalEntryListener(EntryListener<K, V> listener,
296 Predicate<K, V> predicate, K key, boolean includeValue) {
297 throw new UnsupportedOperationException();
298 }
299
300 @Deprecated // marking method not implemented
301 @Override
302 public String addInterceptor(MapInterceptor interceptor) {
303 throw new UnsupportedOperationException();
304 }
305
306 @Override
307 public void removeInterceptor(String id) {
308 m.removeInterceptor(id);
309 }
310
311 @Override
312 public String addEntryListener(EntryListener<K, V> listener,
313 boolean includeValue) {
314 return m.addEntryListener(new BaseEntryListener(listener), includeValue);
315 }
316
317 @Override
318 public boolean removeEntryListener(String id) {
319 return m.removeEntryListener(id);
320 }
321
322 @Override
323 public String addEntryListener(EntryListener<K, V> listener, K key,
324 boolean includeValue) {
325 return m.addEntryListener(new BaseEntryListener(listener),
326 serializeKey(key), includeValue);
327 }
328
329 @Deprecated // marking method not implemented
330 @Override
331 public String addEntryListener(EntryListener<K, V> listener,
332 Predicate<K, V> predicate, boolean includeValue) {
333 throw new UnsupportedOperationException();
334 }
335
336 @Deprecated // marking method not implemented
337 @Override
338 public String addEntryListener(EntryListener<K, V> listener,
339 Predicate<K, V> predicate, K key, boolean includeValue) {
340 throw new UnsupportedOperationException();
341 }
342
343 @Deprecated // marking method not implemented
344 @Override
345 public EntryView<K, V> getEntryView(K key) {
346 throw new UnsupportedOperationException();
347 }
348
349 @Override
350 public boolean evict(K key) {
351 return m.evict(serializeKey(key));
352 }
353
354 @Override
355 public void evictAll() {
356 m.evictAll();
357 }
358
359 @Override
360 public Set<K> keySet() {
361 return deserializeKeySet(m.keySet());
362 }
363
364 @Override
365 public Collection<V> values() {
366 return deserializeVal(m.values());
367 }
368
369 @Override
370 public Set<java.util.Map.Entry<K, V>> entrySet() {
371 return deserializeEntrySet(m.entrySet());
372 }
373
374 @Deprecated // marking method not implemented
375 @SuppressWarnings("rawtypes")
376 @Override
377 public Set<K> keySet(Predicate predicate) {
378 throw new UnsupportedOperationException();
379 }
380
381 @Deprecated // marking method not implemented
382 @SuppressWarnings("rawtypes")
383 @Override
384 public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
385 throw new UnsupportedOperationException();
386 }
387
388 @Deprecated // marking method not implemented
389 @SuppressWarnings("rawtypes")
390 @Override
391 public Collection<V> values(Predicate predicate) {
392 throw new UnsupportedOperationException();
393 }
394
395 @Override
396 public Set<K> localKeySet() {
397 return deserializeKeySet(m.localKeySet());
398 }
399
400 @Deprecated // marking method not implemented
401 @SuppressWarnings("rawtypes")
402 @Override
403 public Set<K> localKeySet(Predicate predicate) {
404 throw new UnsupportedOperationException();
405 }
406
407 @Deprecated // marking method not implemented
408 @Override
409 public void addIndex(String attribute, boolean ordered) {
410 throw new UnsupportedOperationException();
411 }
412
413 @Override
414 public LocalMapStats getLocalMapStats() {
415 return m.getLocalMapStats();
416 }
417
418 @Deprecated // marking method not implemented
419 @SuppressWarnings("rawtypes")
420 @Override
421 public Object executeOnKey(K key, EntryProcessor entryProcessor) {
422 throw new UnsupportedOperationException();
423 }
424
425 @Deprecated // marking method not implemented
426 @SuppressWarnings("rawtypes")
427 @Override
428 public Map<K, Object> executeOnKeys(Set<K> keys,
429 EntryProcessor entryProcessor) {
430 throw new UnsupportedOperationException();
431 }
432
433 @Deprecated // marking method not implemented
434 @SuppressWarnings("rawtypes")
435 @Override
436 public void submitToKey(K key, EntryProcessor entryProcessor,
437 ExecutionCallback callback) {
438 throw new UnsupportedOperationException();
439 }
440
441 @Deprecated // marking method not implemented
442 @SuppressWarnings("rawtypes")
443 @Override
444 public Future submitToKey(K key, EntryProcessor entryProcessor) {
445 throw new UnsupportedOperationException();
446 }
447
448 @Deprecated // marking method not implemented
449 @SuppressWarnings("rawtypes")
450 @Override
451 public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
452 throw new UnsupportedOperationException();
453 }
454
455 @Deprecated // marking method not implemented
456 @SuppressWarnings("rawtypes")
457 @Override
458 public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
459 Predicate predicate) {
460 throw new UnsupportedOperationException();
461 }
462
463 @Deprecated // marking method not implemented
464 @Override
465 public <SuppliedValue, Result> Result aggregate(
466 Supplier<K, V, SuppliedValue> supplier,
467 Aggregation<K, SuppliedValue, Result> aggregation) {
468
469 throw new UnsupportedOperationException();
470 }
471
472 @Deprecated // marking method not implemented
473 @Override
474 public <SuppliedValue, Result> Result aggregate(
475 Supplier<K, V, SuppliedValue> supplier,
476 Aggregation<K, SuppliedValue, Result> aggregation,
477 JobTracker jobTracker) {
478
479 throw new UnsupportedOperationException();
480 }
481
482 private byte[] serializeKey(Object key) {
483 return serializer.encode(key);
484 }
485
486 private K deserializeKey(byte[] key) {
487 return serializer.decode(key);
488 }
489
490 private byte[] serializeVal(Object val) {
491 return serializer.encode(val);
492 }
493
494 private V deserializeVal(byte[] val) {
Yuta HIGUCHI871025412014-10-23 11:58:56 -0700495 if (val == null) {
496 return null;
497 }
498 return serializer.decode(val.clone());
Yuta HIGUCHIa97b5e42014-10-13 14:04:58 -0700499 }
500
501 private Set<byte[]> serializeKeySet(Set<K> keys) {
502 Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
503 for (K key : keys) {
504 sk.add(serializeKey(key));
505 }
506 return sk;
507 }
508
509 private Set<K> deserializeKeySet(Set<byte[]> keys) {
510 Set<K> dsk = new HashSet<>(keys.size());
511 for (byte[] key : keys) {
512 dsk.add(deserializeKey(key));
513 }
514 return dsk;
515 }
516
517 private Collection<V> deserializeVal(Collection<byte[]> vals) {
518 Collection<V> dsl = new ArrayList<>(vals.size());
519 for (byte[] val : vals) {
520 dsl.add(deserializeVal(val));
521 }
522 return dsl;
523 }
524
525 private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
526 Set<java.util.Map.Entry<byte[], byte[]>> entries) {
527
528 Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
529 for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
530 dse.add(Pair.of(deserializeKey(entry.getKey()),
531 deserializeVal(entry.getValue())));
532 }
533 return dse;
534 }
535
536 private final class BaseEntryListener
537 implements EntryListener<byte[], byte[]> {
538
539 private final EntryListener<K, V> listener;
540
541 public BaseEntryListener(EntryListener<K, V> listener) {
542 this.listener = listener;
543 }
544
545 @Override
546 public void mapEvicted(MapEvent event) {
547 listener.mapEvicted(event);
548 }
549
550 @Override
551 public void mapCleared(MapEvent event) {
552 listener.mapCleared(event);
553 }
554
555 @Override
556 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
557 EntryEvent<K, V> evt = new EntryEvent<K, V>(
558 event.getSource(),
559 event.getMember(),
560 event.getEventType().getType(),
561 deserializeKey(event.getKey()),
562 deserializeVal(event.getOldValue()),
563 deserializeVal(event.getValue()));
564
565 listener.entryUpdated(evt);
566 }
567
568 @Override
569 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
570 EntryEvent<K, V> evt = new EntryEvent<K, V>(
571 event.getSource(),
572 event.getMember(),
573 event.getEventType().getType(),
574 deserializeKey(event.getKey()),
575 deserializeVal(event.getOldValue()),
576 null);
577
578 listener.entryRemoved(evt);
579 }
580
581 @Override
582 public void entryEvicted(EntryEvent<byte[], byte[]> event) {
583 EntryEvent<K, V> evt = new EntryEvent<K, V>(
584 event.getSource(),
585 event.getMember(),
586 event.getEventType().getType(),
587 deserializeKey(event.getKey()),
588 deserializeVal(event.getOldValue()),
589 deserializeVal(event.getValue()));
590
591 listener.entryEvicted(evt);
592 }
593
594 @Override
595 public void entryAdded(EntryEvent<byte[], byte[]> event) {
596 EntryEvent<K, V> evt = new EntryEvent<K, V>(
597 event.getSource(),
598 event.getMember(),
599 event.getEventType().getType(),
600 deserializeKey(event.getKey()),
601 null,
602 deserializeVal(event.getValue()));
603
604 listener.entryAdded(evt);
605 }
606 }
607
608 private final class DeserializeVal implements Function<byte[], V> {
609 @Override
610 public V apply(byte[] input) {
611 return deserializeVal(input);
612 }
613 }
614
615}