blob: 29af33a3fa1f00d41f94c0b7ceb3e169a072f432 [file] [log] [blame]
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -07001package org.onlab.onos.store.common;
tomb41d1ac2014-09-24 01:51:24 -07002
3import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
5import com.hazelcast.core.EntryAdapter;
6import com.hazelcast.core.EntryEvent;
7import com.hazelcast.core.HazelcastInstance;
8import com.hazelcast.core.MapEvent;
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -07009import com.hazelcast.core.Member;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070010
tomb41d1ac2014-09-24 01:51:24 -070011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
tom0755a362014-09-24 11:54:43 -070015import org.onlab.onos.event.Event;
16import org.onlab.onos.store.AbstractStore;
17import org.onlab.onos.store.StoreDelegate;
Yuta HIGUCHIad4c2182014-09-29 11:16:23 -070018import org.onlab.onos.store.serializers.KryoSerializationService;
tomb41d1ac2014-09-24 01:51:24 -070019import org.slf4j.Logger;
20
21import static com.google.common.base.Preconditions.checkNotNull;
22import static org.slf4j.LoggerFactory.getLogger;
23
24/**
25 * Abstraction of a distributed store based on Hazelcast.
26 */
27@Component(componentAbstract = true)
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070028public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
tomca55e642014-09-24 18:28:38 -070029 extends AbstractStore<E, D> {
tomb41d1ac2014-09-24 01:51:24 -070030
31 protected final Logger log = getLogger(getClass());
32
33 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
34 protected StoreService storeService;
35
Yuta HIGUCHIad4c2182014-09-29 11:16:23 -070036 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
37 protected KryoSerializationService kryoSerializationService;
38
tomb41d1ac2014-09-24 01:51:24 -070039 protected HazelcastInstance theInstance;
40
41 @Activate
42 public void activate() {
43 theInstance = storeService.getHazelcastInstance();
44 }
45
46 /**
47 * Serializes the specified object using the backing store service.
48 *
49 * @param obj object to be serialized
50 * @return serialized object
51 */
52 protected byte[] serialize(Object obj) {
Yuta HIGUCHI53a285d2014-10-06 23:58:01 -070053 return kryoSerializationService.encode(obj);
tomb41d1ac2014-09-24 01:51:24 -070054 }
55
56 /**
57 * Deserializes the specified object using the backing store service.
58 *
59 * @param bytes bytes to be deserialized
60 * @param <T> type of object
61 * @return deserialized object
62 */
63 protected <T> T deserialize(byte[] bytes) {
Yuta HIGUCHI53a285d2014-10-06 23:58:01 -070064 return kryoSerializationService.decode(bytes);
tomb41d1ac2014-09-24 01:51:24 -070065 }
66
67
68 /**
69 * An IMap entry listener, which reflects each remote event to the cache.
70 *
71 * @param <K> IMap key type after deserialization
72 * @param <V> IMap value type after deserialization
73 */
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070074 public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
tomb41d1ac2014-09-24 01:51:24 -070075
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070076 private final Member localMember;
tomb41d1ac2014-09-24 01:51:24 -070077 private LoadingCache<K, Optional<V>> cache;
78
79 /**
80 * Constructor.
81 *
82 * @param cache cache to update
83 */
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070084 public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
85 this.localMember = theInstance.getCluster().getLocalMember();
tomb41d1ac2014-09-24 01:51:24 -070086 this.cache = checkNotNull(cache);
87 }
88
89 @Override
90 public void mapCleared(MapEvent event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070091 if (localMember.equals(event.getMember())) {
92 // ignore locally triggered event
93 return;
94 }
tomb41d1ac2014-09-24 01:51:24 -070095 cache.invalidateAll();
96 }
97
98 @Override
tomca55e642014-09-24 18:28:38 -070099 public void entryAdded(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700100 if (localMember.equals(event.getMember())) {
101 // ignore locally triggered event
102 return;
103 }
tomca55e642014-09-24 18:28:38 -0700104 K key = deserialize(event.getKey());
105 V newVal = deserialize(event.getValue());
106 Optional<V> newValue = Optional.of(newVal);
107 cache.asMap().putIfAbsent(key, newValue);
108 onAdd(key, newVal);
109 }
110
111 @Override
tomb41d1ac2014-09-24 01:51:24 -0700112 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700113 if (localMember.equals(event.getMember())) {
114 // ignore locally triggered event
115 return;
116 }
tomca55e642014-09-24 18:28:38 -0700117 K key = deserialize(event.getKey());
118 V oldVal = deserialize(event.getOldValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700119 Optional<V> oldValue = Optional.fromNullable(oldVal);
tomca55e642014-09-24 18:28:38 -0700120 V newVal = deserialize(event.getValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700121 Optional<V> newValue = Optional.of(newVal);
122 cache.asMap().replace(key, oldValue, newValue);
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700123 onUpdate(key, oldVal, newVal);
tomb41d1ac2014-09-24 01:51:24 -0700124 }
125
126 @Override
127 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700128 if (localMember.equals(event.getMember())) {
129 // ignore locally triggered event
130 return;
131 }
tomca55e642014-09-24 18:28:38 -0700132 K key = deserialize(event.getKey());
Yuta HIGUCHIf5479702014-09-25 00:09:24 -0700133 V val = deserialize(event.getOldValue());
tomca55e642014-09-24 18:28:38 -0700134 cache.invalidate(key);
135 onRemove(key, val);
tomb41d1ac2014-09-24 01:51:24 -0700136 }
137
tomca55e642014-09-24 18:28:38 -0700138 /**
139 * Cache entry addition hook.
140 *
141 * @param key new key
142 * @param newVal new value
143 */
144 protected void onAdd(K key, V newVal) {
145 }
146
147 /**
148 * Cache entry update hook.
149 *
150 * @param key new key
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700151 * @param oldValue old value
tomca55e642014-09-24 18:28:38 -0700152 * @param newVal new value
153 */
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700154 protected void onUpdate(K key, V oldValue, V newVal) {
tomca55e642014-09-24 18:28:38 -0700155 }
156
157 /**
158 * Cache entry remove hook.
159 *
160 * @param key new key
161 * @param val old value
162 */
163 protected void onRemove(K key, V val) {
tomb41d1ac2014-09-24 01:51:24 -0700164 }
165 }
166
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700167 /**
168 * Distributed object remote event entry listener.
169 *
170 * @param <K> Entry key type after deserialization
171 * @param <V> Entry value type after deserialization
172 */
173 public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
174
175 private final Member localMember;
176
177 public RemoteEventHandler() {
178 this.localMember = theInstance.getCluster().getLocalMember();
179 }
180 @Override
181 public void entryAdded(EntryEvent<byte[], byte[]> event) {
182 if (localMember.equals(event.getMember())) {
183 // ignore locally triggered event
184 return;
185 }
186 K key = deserialize(event.getKey());
187 V newVal = deserialize(event.getValue());
188 onAdd(key, newVal);
189 }
190
191 @Override
192 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
193 if (localMember.equals(event.getMember())) {
194 // ignore locally triggered event
195 return;
196 }
197 K key = deserialize(event.getKey());
198 V val = deserialize(event.getValue());
199 onRemove(key, val);
200 }
201
202 @Override
203 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
204 if (localMember.equals(event.getMember())) {
205 // ignore locally triggered event
206 return;
207 }
208 K key = deserialize(event.getKey());
209 V oldVal = deserialize(event.getOldValue());
210 V newVal = deserialize(event.getValue());
211 onUpdate(key, oldVal, newVal);
212 }
213
214 /**
215 * Remote entry addition hook.
216 *
217 * @param key new key
218 * @param newVal new value
219 */
220 protected void onAdd(K key, V newVal) {
221 }
222
223 /**
224 * Remote entry update hook.
225 *
226 * @param key new key
227 * @param oldValue old value
228 * @param newVal new value
229 */
230 protected void onUpdate(K key, V oldValue, V newVal) {
231 }
232
233 /**
234 * Remote entry remove hook.
235 *
236 * @param key new key
237 * @param val old value
238 */
239 protected void onRemove(K key, V val) {
240 }
241 }
242
tomb41d1ac2014-09-24 01:51:24 -0700243}