blob: c4bed4cadcffc6d2f2c54e5d76d505830a80c2cc [file] [log] [blame]
tomb41d1ac2014-09-24 01:51:24 -07001package org.onlab.onos.store.impl;
2
3import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
5import com.hazelcast.core.EntryAdapter;
6import com.hazelcast.core.EntryEvent;
7import com.hazelcast.core.HazelcastInstance;
8import com.hazelcast.core.MapEvent;
9import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
tom0755a362014-09-24 11:54:43 -070013import org.onlab.onos.event.Event;
14import org.onlab.onos.store.AbstractStore;
15import org.onlab.onos.store.StoreDelegate;
16import org.onlab.onos.store.common.StoreService;
tomb41d1ac2014-09-24 01:51:24 -070017import org.slf4j.Logger;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20import static org.slf4j.LoggerFactory.getLogger;
21
22/**
23 * Abstraction of a distributed store based on Hazelcast.
24 */
25@Component(componentAbstract = true)
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070026public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
tomca55e642014-09-24 18:28:38 -070027 extends AbstractStore<E, D> {
tomb41d1ac2014-09-24 01:51:24 -070028
29 protected final Logger log = getLogger(getClass());
30
31 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
32 protected StoreService storeService;
33
34 protected HazelcastInstance theInstance;
35
36 @Activate
37 public void activate() {
38 theInstance = storeService.getHazelcastInstance();
39 }
40
41 /**
42 * Serializes the specified object using the backing store service.
43 *
44 * @param obj object to be serialized
45 * @return serialized object
46 */
47 protected byte[] serialize(Object obj) {
48 return storeService.serialize(obj);
49 }
50
51 /**
52 * Deserializes the specified object using the backing store service.
53 *
54 * @param bytes bytes to be deserialized
55 * @param <T> type of object
56 * @return deserialized object
57 */
58 protected <T> T deserialize(byte[] bytes) {
59 return storeService.deserialize(bytes);
60 }
61
62
63 /**
64 * An IMap entry listener, which reflects each remote event to the cache.
65 *
66 * @param <K> IMap key type after deserialization
67 * @param <V> IMap value type after deserialization
68 */
tomca55e642014-09-24 18:28:38 -070069 public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
tomb41d1ac2014-09-24 01:51:24 -070070
71 private LoadingCache<K, Optional<V>> cache;
72
73 /**
74 * Constructor.
75 *
76 * @param cache cache to update
77 */
78 public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) {
79 this.cache = checkNotNull(cache);
80 }
81
82 @Override
83 public void mapCleared(MapEvent event) {
84 cache.invalidateAll();
85 }
86
87 @Override
tomca55e642014-09-24 18:28:38 -070088 public void entryAdded(EntryEvent<byte[], byte[]> event) {
89 K key = deserialize(event.getKey());
90 V newVal = deserialize(event.getValue());
91 Optional<V> newValue = Optional.of(newVal);
92 cache.asMap().putIfAbsent(key, newValue);
93 onAdd(key, newVal);
94 }
95
96 @Override
tomb41d1ac2014-09-24 01:51:24 -070097 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
tomca55e642014-09-24 18:28:38 -070098 K key = deserialize(event.getKey());
99 V oldVal = deserialize(event.getOldValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700100 Optional<V> oldValue = Optional.fromNullable(oldVal);
tomca55e642014-09-24 18:28:38 -0700101 V newVal = deserialize(event.getValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700102 Optional<V> newValue = Optional.of(newVal);
103 cache.asMap().replace(key, oldValue, newValue);
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700104 onUpdate(key, oldVal, newVal);
tomb41d1ac2014-09-24 01:51:24 -0700105 }
106
107 @Override
108 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
tomca55e642014-09-24 18:28:38 -0700109 K key = deserialize(event.getKey());
Yuta HIGUCHIf5479702014-09-25 00:09:24 -0700110 V val = deserialize(event.getOldValue());
tomca55e642014-09-24 18:28:38 -0700111 cache.invalidate(key);
112 onRemove(key, val);
tomb41d1ac2014-09-24 01:51:24 -0700113 }
114
tomca55e642014-09-24 18:28:38 -0700115 /**
116 * Cache entry addition hook.
117 *
118 * @param key new key
119 * @param newVal new value
120 */
121 protected void onAdd(K key, V newVal) {
122 }
123
124 /**
125 * Cache entry update hook.
126 *
127 * @param key new key
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700128 * @param oldValue old value
tomca55e642014-09-24 18:28:38 -0700129 * @param newVal new value
130 */
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700131 protected void onUpdate(K key, V oldValue, V newVal) {
tomca55e642014-09-24 18:28:38 -0700132 }
133
134 /**
135 * Cache entry remove hook.
136 *
137 * @param key new key
138 * @param val old value
139 */
140 protected void onRemove(K key, V val) {
tomb41d1ac2014-09-24 01:51:24 -0700141 }
142 }
143
144}