blob: 135312d6cb97d49e0ec8800d53cf5c43a4c304e7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Yuta HIGUCHI41f2ec02014-10-27 09:54:43 -070016package org.onlab.onos.store.hz;
Yuta HIGUCHIa97b5e42014-10-13 14:04:58 -070017
18import static com.google.common.base.Preconditions.checkNotNull;
19
20import java.util.ArrayList;
21import java.util.Collection;
22import java.util.Collections;
23import java.util.HashMap;
24import java.util.HashSet;
25import java.util.IdentityHashMap;
26import java.util.Map;
27import java.util.Set;
28import java.util.concurrent.Future;
29import java.util.concurrent.TimeUnit;
30
31import org.apache.commons.lang3.tuple.Pair;
32import org.onlab.onos.store.serializers.StoreSerializer;
33
34import com.google.common.base.Function;
35import com.google.common.util.concurrent.Futures;
36import com.hazelcast.core.EntryEvent;
37import com.hazelcast.core.EntryListener;
38import com.hazelcast.core.EntryView;
39import com.hazelcast.core.ExecutionCallback;
40import com.hazelcast.core.IMap;
41import com.hazelcast.core.MapEvent;
42import com.hazelcast.map.EntryProcessor;
43import com.hazelcast.map.MapInterceptor;
44import com.hazelcast.mapreduce.JobTracker;
45import com.hazelcast.mapreduce.aggregation.Aggregation;
46import com.hazelcast.mapreduce.aggregation.Supplier;
47import com.hazelcast.monitor.LocalMapStats;
48import com.hazelcast.query.Predicate;
49
50// TODO: implement Predicate, etc. if we need them.
51/**
Thomas Vachuska4b420772014-10-30 16:46:17 -070052 * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
53 * key and value using StoreSerializer.
Yuta HIGUCHIa97b5e42014-10-13 14:04:58 -070054 *
55 * @param <K> key type
56 * @param <V> value type
57 */
58public class SMap<K, V> implements IMap<K, V> {
59
60 private final IMap<byte[], byte[]> m;
61 private final StoreSerializer serializer;
62
63 /**
64 * Creates a SMap instance.
65 *
66 * @param baseMap base IMap to use
67 * @param serializer serializer to use for both key and value
68 */
69 public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
70 this.m = checkNotNull(baseMap);
71 this.serializer = checkNotNull(serializer);
72 }
73
74 @Override
75 public int size() {
76 return m.size();
77 }
78
79 @Override
80 public boolean isEmpty() {
81 return m.isEmpty();
82 }
83
84 @Override
85 public void putAll(Map<? extends K, ? extends V> map) {
86 Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
87 for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
88 sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
89 }
90 m.putAll(sm);
91 }
92
93 @Deprecated
94 @Override
95 public Object getId() {
96 return m.getId();
97 }
98
99 @Override
100 public String getPartitionKey() {
101 return m.getPartitionKey();
102 }
103
104 @Override
105 public String getName() {
106 return m.getName();
107 }
108
109 @Override
110 public String getServiceName() {
111 return m.getServiceName();
112 }
113
114 @Override
115 public void destroy() {
116 m.destroy();
117 }
118
119 @Override
120 public boolean containsKey(Object key) {
121 return m.containsKey(serializeKey(key));
122 }
123
124 @Override
125 public boolean containsValue(Object value) {
126 return m.containsValue(serializeVal(value));
127 }
128
129 @Override
130 public V get(Object key) {
131 return deserializeVal(m.get(serializeKey(key)));
132 }
133
134 @Override
135 public V put(K key, V value) {
136 return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
137 }
138
139 @Override
140 public V remove(Object key) {
141 return deserializeVal(m.remove(serializeKey(key)));
142 }
143
144 @Override
145 public boolean remove(Object key, Object value) {
146 return m.remove(serializeKey(key), serializeVal(value));
147 }
148
149 @Override
150 public void delete(Object key) {
151 m.delete(serializeKey(key));
152 }
153
154 @Override
155 public void flush() {
156 m.flush();
157 }
158
159 @Override
160 public Map<K, V> getAll(Set<K> keys) {
161 Set<byte[]> sk = serializeKeySet(keys);
162 Map<byte[], byte[]> bm = m.getAll(sk);
163 Map<K, V> dsm = new HashMap<>(bm.size());
164 for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
165 dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
166 }
167 return dsm;
168 }
169
170 @Override
171 public void loadAll(boolean replaceExistingValues) {
172 m.loadAll(replaceExistingValues);
173 }
174
175 @Override
176 public void loadAll(Set<K> keys, boolean replaceExistingValues) {
177 Set<byte[]> sk = serializeKeySet(keys);
178 m.loadAll(sk, replaceExistingValues);
179 }
180
181 @Override
182 public void clear() {
183 m.clear();
184 }
185
186 @Override
187 public Future<V> getAsync(K key) {
188 Future<byte[]> f = m.getAsync(serializeKey(key));
189 return Futures.lazyTransform(f, new DeserializeVal());
190 }
191
192 @Override
193 public Future<V> putAsync(K key, V value) {
194 Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
195 return Futures.lazyTransform(f, new DeserializeVal());
196 }
197
198 @Override
199 public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
200 Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
201 return Futures.lazyTransform(f, new DeserializeVal());
202 }
203
204 @Override
205 public Future<V> removeAsync(K key) {
206 Future<byte[]> f = m.removeAsync(serializeKey(key));
207 return Futures.lazyTransform(f, new DeserializeVal());
208 }
209
210 @Override
211 public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
212 return m.tryRemove(serializeKey(key), timeout, timeunit);
213 }
214
215 @Override
216 public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
217 return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
218 }
219
220 @Override
221 public V put(K key, V value, long ttl, TimeUnit timeunit) {
222 return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
223 }
224
225 @Override
226 public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
227 m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
228 }
229
230 @Override
231 public V putIfAbsent(K key, V value) {
232 return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
233 }
234
235 @Override
236 public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
237 return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
238 }
239
240 @Override
241 public boolean replace(K key, V oldValue, V newValue) {
242 return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
243 }
244
245 @Override
246 public V replace(K key, V value) {
247 return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
248 }
249
250 @Override
251 public void set(K key, V value) {
252 m.set(serializeKey(key), serializeVal(value));
253 }
254
255 @Override
256 public void set(K key, V value, long ttl, TimeUnit timeunit) {
257 m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
258 }
259
260 @Override
261 public void lock(K key) {
262 m.lock(serializeKey(key));
263 }
264
265 @Override
266 public void lock(K key, long leaseTime, TimeUnit timeUnit) {
267 m.lock(serializeKey(key), leaseTime, timeUnit);
268 }
269
270 @Override
271 public boolean isLocked(K key) {
272 return m.isLocked(serializeKey(key));
273 }
274
275 @Override
276 public boolean tryLock(K key) {
277 return m.tryLock(serializeKey(key));
278 }
279
280 @Override
281 public boolean tryLock(K key, long time, TimeUnit timeunit)
282 throws InterruptedException {
283 return m.tryLock(serializeKey(key), time, timeunit);
284 }
285
286 @Override
287 public void unlock(K key) {
288 m.unlock(serializeKey(key));
289 }
290
291 @Override
292 public void forceUnlock(K key) {
293 m.forceUnlock(serializeKey(key));
294 }
295
296 @Override
297 public String addLocalEntryListener(EntryListener<K, V> listener) {
298 return m.addLocalEntryListener(new BaseEntryListener(listener));
299 }
300
301 @Deprecated // marking method not implemented
302 @Override
303 public String addLocalEntryListener(EntryListener<K, V> listener,
304 Predicate<K, V> predicate, boolean includeValue) {
305 throw new UnsupportedOperationException();
306 }
307
308 @Deprecated // marking method not implemented
309 @Override
310 public String addLocalEntryListener(EntryListener<K, V> listener,
311 Predicate<K, V> predicate, K key, boolean includeValue) {
312 throw new UnsupportedOperationException();
313 }
314
315 @Deprecated // marking method not implemented
316 @Override
317 public String addInterceptor(MapInterceptor interceptor) {
318 throw new UnsupportedOperationException();
319 }
320
321 @Override
322 public void removeInterceptor(String id) {
323 m.removeInterceptor(id);
324 }
325
326 @Override
327 public String addEntryListener(EntryListener<K, V> listener,
328 boolean includeValue) {
329 return m.addEntryListener(new BaseEntryListener(listener), includeValue);
330 }
331
332 @Override
333 public boolean removeEntryListener(String id) {
334 return m.removeEntryListener(id);
335 }
336
337 @Override
338 public String addEntryListener(EntryListener<K, V> listener, K key,
339 boolean includeValue) {
340 return m.addEntryListener(new BaseEntryListener(listener),
341 serializeKey(key), includeValue);
342 }
343
344 @Deprecated // marking method not implemented
345 @Override
346 public String addEntryListener(EntryListener<K, V> listener,
347 Predicate<K, V> predicate, boolean includeValue) {
348 throw new UnsupportedOperationException();
349 }
350
351 @Deprecated // marking method not implemented
352 @Override
353 public String addEntryListener(EntryListener<K, V> listener,
354 Predicate<K, V> predicate, K key, boolean includeValue) {
355 throw new UnsupportedOperationException();
356 }
357
358 @Deprecated // marking method not implemented
359 @Override
360 public EntryView<K, V> getEntryView(K key) {
361 throw new UnsupportedOperationException();
362 }
363
364 @Override
365 public boolean evict(K key) {
366 return m.evict(serializeKey(key));
367 }
368
369 @Override
370 public void evictAll() {
371 m.evictAll();
372 }
373
374 @Override
375 public Set<K> keySet() {
376 return deserializeKeySet(m.keySet());
377 }
378
379 @Override
380 public Collection<V> values() {
381 return deserializeVal(m.values());
382 }
383
384 @Override
385 public Set<java.util.Map.Entry<K, V>> entrySet() {
386 return deserializeEntrySet(m.entrySet());
387 }
388
389 @Deprecated // marking method not implemented
390 @SuppressWarnings("rawtypes")
391 @Override
392 public Set<K> keySet(Predicate predicate) {
393 throw new UnsupportedOperationException();
394 }
395
396 @Deprecated // marking method not implemented
397 @SuppressWarnings("rawtypes")
398 @Override
399 public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
400 throw new UnsupportedOperationException();
401 }
402
403 @Deprecated // marking method not implemented
404 @SuppressWarnings("rawtypes")
405 @Override
406 public Collection<V> values(Predicate predicate) {
407 throw new UnsupportedOperationException();
408 }
409
410 @Override
411 public Set<K> localKeySet() {
412 return deserializeKeySet(m.localKeySet());
413 }
414
415 @Deprecated // marking method not implemented
416 @SuppressWarnings("rawtypes")
417 @Override
418 public Set<K> localKeySet(Predicate predicate) {
419 throw new UnsupportedOperationException();
420 }
421
422 @Deprecated // marking method not implemented
423 @Override
424 public void addIndex(String attribute, boolean ordered) {
425 throw new UnsupportedOperationException();
426 }
427
428 @Override
429 public LocalMapStats getLocalMapStats() {
430 return m.getLocalMapStats();
431 }
432
433 @Deprecated // marking method not implemented
434 @SuppressWarnings("rawtypes")
435 @Override
436 public Object executeOnKey(K key, EntryProcessor entryProcessor) {
437 throw new UnsupportedOperationException();
438 }
439
440 @Deprecated // marking method not implemented
441 @SuppressWarnings("rawtypes")
442 @Override
443 public Map<K, Object> executeOnKeys(Set<K> keys,
444 EntryProcessor entryProcessor) {
445 throw new UnsupportedOperationException();
446 }
447
448 @Deprecated // marking method not implemented
449 @SuppressWarnings("rawtypes")
450 @Override
451 public void submitToKey(K key, EntryProcessor entryProcessor,
452 ExecutionCallback callback) {
453 throw new UnsupportedOperationException();
454 }
455
456 @Deprecated // marking method not implemented
457 @SuppressWarnings("rawtypes")
458 @Override
459 public Future submitToKey(K key, EntryProcessor entryProcessor) {
460 throw new UnsupportedOperationException();
461 }
462
463 @Deprecated // marking method not implemented
464 @SuppressWarnings("rawtypes")
465 @Override
466 public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
467 throw new UnsupportedOperationException();
468 }
469
470 @Deprecated // marking method not implemented
471 @SuppressWarnings("rawtypes")
472 @Override
473 public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
474 Predicate predicate) {
475 throw new UnsupportedOperationException();
476 }
477
478 @Deprecated // marking method not implemented
479 @Override
480 public <SuppliedValue, Result> Result aggregate(
481 Supplier<K, V, SuppliedValue> supplier,
482 Aggregation<K, SuppliedValue, Result> aggregation) {
483
484 throw new UnsupportedOperationException();
485 }
486
487 @Deprecated // marking method not implemented
488 @Override
489 public <SuppliedValue, Result> Result aggregate(
490 Supplier<K, V, SuppliedValue> supplier,
491 Aggregation<K, SuppliedValue, Result> aggregation,
492 JobTracker jobTracker) {
493
494 throw new UnsupportedOperationException();
495 }
496
497 private byte[] serializeKey(Object key) {
498 return serializer.encode(key);
499 }
500
501 private K deserializeKey(byte[] key) {
502 return serializer.decode(key);
503 }
504
505 private byte[] serializeVal(Object val) {
506 return serializer.encode(val);
507 }
508
509 private V deserializeVal(byte[] val) {
Yuta HIGUCHI871025412014-10-23 11:58:56 -0700510 if (val == null) {
511 return null;
512 }
513 return serializer.decode(val.clone());
Yuta HIGUCHIa97b5e42014-10-13 14:04:58 -0700514 }
515
516 private Set<byte[]> serializeKeySet(Set<K> keys) {
517 Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
518 for (K key : keys) {
519 sk.add(serializeKey(key));
520 }
521 return sk;
522 }
523
524 private Set<K> deserializeKeySet(Set<byte[]> keys) {
525 Set<K> dsk = new HashSet<>(keys.size());
526 for (byte[] key : keys) {
527 dsk.add(deserializeKey(key));
528 }
529 return dsk;
530 }
531
532 private Collection<V> deserializeVal(Collection<byte[]> vals) {
533 Collection<V> dsl = new ArrayList<>(vals.size());
534 for (byte[] val : vals) {
535 dsl.add(deserializeVal(val));
536 }
537 return dsl;
538 }
539
540 private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
541 Set<java.util.Map.Entry<byte[], byte[]>> entries) {
542
543 Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
544 for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
545 dse.add(Pair.of(deserializeKey(entry.getKey()),
546 deserializeVal(entry.getValue())));
547 }
548 return dse;
549 }
550
551 private final class BaseEntryListener
552 implements EntryListener<byte[], byte[]> {
553
554 private final EntryListener<K, V> listener;
555
556 public BaseEntryListener(EntryListener<K, V> listener) {
557 this.listener = listener;
558 }
559
560 @Override
561 public void mapEvicted(MapEvent event) {
562 listener.mapEvicted(event);
563 }
564
565 @Override
566 public void mapCleared(MapEvent event) {
567 listener.mapCleared(event);
568 }
569
570 @Override
571 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
572 EntryEvent<K, V> evt = new EntryEvent<K, V>(
573 event.getSource(),
574 event.getMember(),
575 event.getEventType().getType(),
576 deserializeKey(event.getKey()),
577 deserializeVal(event.getOldValue()),
578 deserializeVal(event.getValue()));
579
580 listener.entryUpdated(evt);
581 }
582
583 @Override
584 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
585 EntryEvent<K, V> evt = new EntryEvent<K, V>(
586 event.getSource(),
587 event.getMember(),
588 event.getEventType().getType(),
589 deserializeKey(event.getKey()),
590 deserializeVal(event.getOldValue()),
591 null);
592
593 listener.entryRemoved(evt);
594 }
595
596 @Override
597 public void entryEvicted(EntryEvent<byte[], byte[]> event) {
598 EntryEvent<K, V> evt = new EntryEvent<K, V>(
599 event.getSource(),
600 event.getMember(),
601 event.getEventType().getType(),
602 deserializeKey(event.getKey()),
603 deserializeVal(event.getOldValue()),
604 deserializeVal(event.getValue()));
605
606 listener.entryEvicted(evt);
607 }
608
609 @Override
610 public void entryAdded(EntryEvent<byte[], byte[]> event) {
611 EntryEvent<K, V> evt = new EntryEvent<K, V>(
612 event.getSource(),
613 event.getMember(),
614 event.getEventType().getType(),
615 deserializeKey(event.getKey()),
616 null,
617 deserializeVal(event.getValue()));
618
619 listener.entryAdded(evt);
620 }
621 }
622
623 private final class DeserializeVal implements Function<byte[], V> {
624 @Override
625 public V apply(byte[] input) {
626 return deserializeVal(input);
627 }
628 }
629
630}