blob: bca585dd701aba210f3bd7d849b4cbbe687d1af3 [file] [log] [blame]
tomb41d1ac2014-09-24 01:51:24 -07001package org.onlab.onos.store.impl;
2
3import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
5import com.hazelcast.core.EntryAdapter;
6import com.hazelcast.core.EntryEvent;
7import com.hazelcast.core.HazelcastInstance;
8import com.hazelcast.core.MapEvent;
9import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
tom0755a362014-09-24 11:54:43 -070013import org.onlab.onos.event.Event;
14import org.onlab.onos.store.AbstractStore;
15import org.onlab.onos.store.StoreDelegate;
16import org.onlab.onos.store.common.StoreService;
tomb41d1ac2014-09-24 01:51:24 -070017import org.slf4j.Logger;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20import static org.slf4j.LoggerFactory.getLogger;
21
22/**
23 * Abstraction of a distributed store based on Hazelcast.
24 */
25@Component(componentAbstract = true)
tom0755a362014-09-24 11:54:43 -070026public abstract class AbstractDistributedStore<E extends Event, D extends StoreDelegate<E>>
27 extends AbstractStore<E, D> {
tomb41d1ac2014-09-24 01:51:24 -070028
29 protected final Logger log = getLogger(getClass());
30
31 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
32 protected StoreService storeService;
33
34 protected HazelcastInstance theInstance;
35
36 @Activate
37 public void activate() {
38 theInstance = storeService.getHazelcastInstance();
39 }
40
41 /**
42 * Serializes the specified object using the backing store service.
43 *
44 * @param obj object to be serialized
45 * @return serialized object
46 */
47 protected byte[] serialize(Object obj) {
48 return storeService.serialize(obj);
49 }
50
51 /**
52 * Deserializes the specified object using the backing store service.
53 *
54 * @param bytes bytes to be deserialized
55 * @param <T> type of object
56 * @return deserialized object
57 */
58 protected <T> T deserialize(byte[] bytes) {
59 return storeService.deserialize(bytes);
60 }
61
62
63 /**
64 * An IMap entry listener, which reflects each remote event to the cache.
65 *
66 * @param <K> IMap key type after deserialization
67 * @param <V> IMap value type after deserialization
68 */
69 public final class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
70
71 private LoadingCache<K, Optional<V>> cache;
72
73 /**
74 * Constructor.
75 *
76 * @param cache cache to update
77 */
78 public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) {
79 this.cache = checkNotNull(cache);
80 }
81
82 @Override
83 public void mapCleared(MapEvent event) {
84 cache.invalidateAll();
85 }
86
87 @Override
88 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
89 cache.put(storeService.<K>deserialize(event.getKey()),
90 Optional.of(storeService.<V>deserialize(event.getValue())));
91 }
92
93 @Override
94 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
95 cache.invalidate(storeService.<K>deserialize(event.getKey()));
96 }
97
98 @Override
99 public void entryAdded(EntryEvent<byte[], byte[]> event) {
100 entryUpdated(event);
101 }
102 }
103
104}