blob: e42ec453fce41216068c6bdf55cbb21fba235539 [file] [log] [blame]
Yuta HIGUCHI41f2ec02014-10-27 09:54:43 -07001package org.onlab.onos.store.hz;
tomb41d1ac2014-09-24 01:51:24 -07002
3import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
5import com.hazelcast.core.EntryAdapter;
6import com.hazelcast.core.EntryEvent;
7import com.hazelcast.core.HazelcastInstance;
8import com.hazelcast.core.MapEvent;
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -07009import com.hazelcast.core.Member;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070010
tomb41d1ac2014-09-24 01:51:24 -070011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
tom0755a362014-09-24 11:54:43 -070015import org.onlab.onos.event.Event;
16import org.onlab.onos.store.AbstractStore;
17import org.onlab.onos.store.StoreDelegate;
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070018import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI971addc2014-10-07 23:23:17 -070019import org.onlab.onos.store.serializers.StoreSerializer;
tomb41d1ac2014-09-24 01:51:24 -070020import org.slf4j.Logger;
21
22import static com.google.common.base.Preconditions.checkNotNull;
23import static org.slf4j.LoggerFactory.getLogger;
24
25/**
26 * Abstraction of a distributed store based on Hazelcast.
27 */
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070028@Component
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070029public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
tomca55e642014-09-24 18:28:38 -070030 extends AbstractStore<E, D> {
tomb41d1ac2014-09-24 01:51:24 -070031
32 protected final Logger log = getLogger(getClass());
33
34 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
35 protected StoreService storeService;
36
Yuta HIGUCHI971addc2014-10-07 23:23:17 -070037 protected StoreSerializer serializer;
Yuta HIGUCHIad4c2182014-09-29 11:16:23 -070038
tomb41d1ac2014-09-24 01:51:24 -070039 protected HazelcastInstance theInstance;
40
41 @Activate
42 public void activate() {
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070043 serializer = new KryoSerializer();
tomb41d1ac2014-09-24 01:51:24 -070044 theInstance = storeService.getHazelcastInstance();
45 }
46
47 /**
48 * Serializes the specified object using the backing store service.
49 *
50 * @param obj object to be serialized
51 * @return serialized object
52 */
53 protected byte[] serialize(Object obj) {
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070054 return serializer.encode(obj);
tomb41d1ac2014-09-24 01:51:24 -070055 }
56
57 /**
58 * Deserializes the specified object using the backing store service.
59 *
60 * @param bytes bytes to be deserialized
61 * @param <T> type of object
62 * @return deserialized object
63 */
64 protected <T> T deserialize(byte[] bytes) {
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070065 return serializer.decode(bytes);
tomb41d1ac2014-09-24 01:51:24 -070066 }
67
68
69 /**
70 * An IMap entry listener, which reflects each remote event to the cache.
71 *
72 * @param <K> IMap key type after deserialization
73 * @param <V> IMap value type after deserialization
74 */
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070075 public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
tomb41d1ac2014-09-24 01:51:24 -070076
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070077 private final Member localMember;
tomb41d1ac2014-09-24 01:51:24 -070078 private LoadingCache<K, Optional<V>> cache;
79
80 /**
81 * Constructor.
82 *
83 * @param cache cache to update
84 */
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070085 public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
86 this.localMember = theInstance.getCluster().getLocalMember();
tomb41d1ac2014-09-24 01:51:24 -070087 this.cache = checkNotNull(cache);
88 }
89
90 @Override
91 public void mapCleared(MapEvent event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070092 if (localMember.equals(event.getMember())) {
93 // ignore locally triggered event
94 return;
95 }
tomb41d1ac2014-09-24 01:51:24 -070096 cache.invalidateAll();
97 }
98
99 @Override
tomca55e642014-09-24 18:28:38 -0700100 public void entryAdded(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700101 if (localMember.equals(event.getMember())) {
102 // ignore locally triggered event
103 return;
104 }
tomca55e642014-09-24 18:28:38 -0700105 K key = deserialize(event.getKey());
106 V newVal = deserialize(event.getValue());
107 Optional<V> newValue = Optional.of(newVal);
108 cache.asMap().putIfAbsent(key, newValue);
109 onAdd(key, newVal);
110 }
111
112 @Override
tomb41d1ac2014-09-24 01:51:24 -0700113 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700114 if (localMember.equals(event.getMember())) {
115 // ignore locally triggered event
116 return;
117 }
tomca55e642014-09-24 18:28:38 -0700118 K key = deserialize(event.getKey());
119 V oldVal = deserialize(event.getOldValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700120 Optional<V> oldValue = Optional.fromNullable(oldVal);
tomca55e642014-09-24 18:28:38 -0700121 V newVal = deserialize(event.getValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700122 Optional<V> newValue = Optional.of(newVal);
123 cache.asMap().replace(key, oldValue, newValue);
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700124 onUpdate(key, oldVal, newVal);
tomb41d1ac2014-09-24 01:51:24 -0700125 }
126
127 @Override
128 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700129 if (localMember.equals(event.getMember())) {
130 // ignore locally triggered event
131 return;
132 }
tomca55e642014-09-24 18:28:38 -0700133 K key = deserialize(event.getKey());
Yuta HIGUCHIf5479702014-09-25 00:09:24 -0700134 V val = deserialize(event.getOldValue());
tomca55e642014-09-24 18:28:38 -0700135 cache.invalidate(key);
136 onRemove(key, val);
tomb41d1ac2014-09-24 01:51:24 -0700137 }
138
tomca55e642014-09-24 18:28:38 -0700139 /**
140 * Cache entry addition hook.
141 *
142 * @param key new key
143 * @param newVal new value
144 */
145 protected void onAdd(K key, V newVal) {
146 }
147
148 /**
149 * Cache entry update hook.
150 *
151 * @param key new key
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700152 * @param oldValue old value
tomca55e642014-09-24 18:28:38 -0700153 * @param newVal new value
154 */
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700155 protected void onUpdate(K key, V oldValue, V newVal) {
tomca55e642014-09-24 18:28:38 -0700156 }
157
158 /**
159 * Cache entry remove hook.
160 *
161 * @param key new key
162 * @param val old value
163 */
164 protected void onRemove(K key, V val) {
tomb41d1ac2014-09-24 01:51:24 -0700165 }
166 }
167
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700168 /**
169 * Distributed object remote event entry listener.
170 *
171 * @param <K> Entry key type after deserialization
172 * @param <V> Entry value type after deserialization
173 */
174 public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
175
176 private final Member localMember;
177
178 public RemoteEventHandler() {
179 this.localMember = theInstance.getCluster().getLocalMember();
180 }
181 @Override
182 public void entryAdded(EntryEvent<byte[], byte[]> event) {
183 if (localMember.equals(event.getMember())) {
184 // ignore locally triggered event
185 return;
186 }
187 K key = deserialize(event.getKey());
188 V newVal = deserialize(event.getValue());
189 onAdd(key, newVal);
190 }
191
192 @Override
193 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
194 if (localMember.equals(event.getMember())) {
195 // ignore locally triggered event
196 return;
197 }
198 K key = deserialize(event.getKey());
199 V val = deserialize(event.getValue());
200 onRemove(key, val);
201 }
202
203 @Override
204 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
205 if (localMember.equals(event.getMember())) {
206 // ignore locally triggered event
207 return;
208 }
209 K key = deserialize(event.getKey());
210 V oldVal = deserialize(event.getOldValue());
211 V newVal = deserialize(event.getValue());
212 onUpdate(key, oldVal, newVal);
213 }
214
215 /**
216 * Remote entry addition hook.
217 *
218 * @param key new key
219 * @param newVal new value
220 */
221 protected void onAdd(K key, V newVal) {
222 }
223
224 /**
225 * Remote entry update hook.
226 *
227 * @param key new key
228 * @param oldValue old value
229 * @param newVal new value
230 */
231 protected void onUpdate(K key, V oldValue, V newVal) {
232 }
233
234 /**
235 * Remote entry remove hook.
236 *
237 * @param key new key
238 * @param val old value
239 */
240 protected void onRemove(K key, V val) {
241 }
242 }
243
tomb41d1ac2014-09-24 01:51:24 -0700244}