blob: fc2acd358d5dc8bfae93024ff9f7c27eadba1c99 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.hz;
tomb41d1ac2014-09-24 01:51:24 -070017
18import com.google.common.base.Optional;
19import com.google.common.cache.LoadingCache;
20import com.hazelcast.core.EntryAdapter;
21import com.hazelcast.core.EntryEvent;
22import com.hazelcast.core.HazelcastInstance;
23import com.hazelcast.core.MapEvent;
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070024import com.hazelcast.core.Member;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070025
tomb41d1ac2014-09-24 01:51:24 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.event.Event;
31import org.onosproject.store.AbstractStore;
32import org.onosproject.store.StoreDelegate;
33import org.onosproject.store.serializers.KryoSerializer;
34import org.onosproject.store.serializers.StoreSerializer;
tomb41d1ac2014-09-24 01:51:24 -070035import org.slf4j.Logger;
36
37import static com.google.common.base.Preconditions.checkNotNull;
38import static org.slf4j.LoggerFactory.getLogger;
39
40/**
41 * Abstraction of a distributed store based on Hazelcast.
42 */
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070043@Component
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070044public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
tomca55e642014-09-24 18:28:38 -070045 extends AbstractStore<E, D> {
tomb41d1ac2014-09-24 01:51:24 -070046
47 protected final Logger log = getLogger(getClass());
48
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 protected StoreService storeService;
51
Yuta HIGUCHI971addc2014-10-07 23:23:17 -070052 protected StoreSerializer serializer;
Yuta HIGUCHIad4c2182014-09-29 11:16:23 -070053
tomb41d1ac2014-09-24 01:51:24 -070054 protected HazelcastInstance theInstance;
55
56 @Activate
57 public void activate() {
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070058 serializer = new KryoSerializer();
tomb41d1ac2014-09-24 01:51:24 -070059 theInstance = storeService.getHazelcastInstance();
60 }
61
62 /**
63 * Serializes the specified object using the backing store service.
64 *
65 * @param obj object to be serialized
66 * @return serialized object
67 */
68 protected byte[] serialize(Object obj) {
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070069 return serializer.encode(obj);
tomb41d1ac2014-09-24 01:51:24 -070070 }
71
72 /**
73 * Deserializes the specified object using the backing store service.
74 *
75 * @param bytes bytes to be deserialized
76 * @param <T> type of object
77 * @return deserialized object
78 */
79 protected <T> T deserialize(byte[] bytes) {
Yuta HIGUCHI672488d2014-10-07 09:23:43 -070080 return serializer.decode(bytes);
tomb41d1ac2014-09-24 01:51:24 -070081 }
82
83
84 /**
85 * An IMap entry listener, which reflects each remote event to the cache.
86 *
87 * @param <K> IMap key type after deserialization
88 * @param <V> IMap value type after deserialization
89 */
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070090 public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
tomb41d1ac2014-09-24 01:51:24 -070091
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070092 private final Member localMember;
tomb41d1ac2014-09-24 01:51:24 -070093 private LoadingCache<K, Optional<V>> cache;
94
95 /**
96 * Constructor.
97 *
98 * @param cache cache to update
99 */
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700100 public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
101 this.localMember = theInstance.getCluster().getLocalMember();
tomb41d1ac2014-09-24 01:51:24 -0700102 this.cache = checkNotNull(cache);
103 }
104
105 @Override
106 public void mapCleared(MapEvent event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700107 if (localMember.equals(event.getMember())) {
108 // ignore locally triggered event
109 return;
110 }
tomb41d1ac2014-09-24 01:51:24 -0700111 cache.invalidateAll();
112 }
113
114 @Override
tomca55e642014-09-24 18:28:38 -0700115 public void entryAdded(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700116 if (localMember.equals(event.getMember())) {
117 // ignore locally triggered event
118 return;
119 }
tomca55e642014-09-24 18:28:38 -0700120 K key = deserialize(event.getKey());
121 V newVal = deserialize(event.getValue());
122 Optional<V> newValue = Optional.of(newVal);
123 cache.asMap().putIfAbsent(key, newValue);
124 onAdd(key, newVal);
125 }
126
127 @Override
tomb41d1ac2014-09-24 01:51:24 -0700128 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700129 if (localMember.equals(event.getMember())) {
130 // ignore locally triggered event
131 return;
132 }
tomca55e642014-09-24 18:28:38 -0700133 K key = deserialize(event.getKey());
134 V oldVal = deserialize(event.getOldValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700135 Optional<V> oldValue = Optional.fromNullable(oldVal);
tomca55e642014-09-24 18:28:38 -0700136 V newVal = deserialize(event.getValue());
Yuta HIGUCHIbb1fc722014-09-24 00:00:13 -0700137 Optional<V> newValue = Optional.of(newVal);
138 cache.asMap().replace(key, oldValue, newValue);
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700139 onUpdate(key, oldVal, newVal);
tomb41d1ac2014-09-24 01:51:24 -0700140 }
141
142 @Override
143 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700144 if (localMember.equals(event.getMember())) {
145 // ignore locally triggered event
146 return;
147 }
tomca55e642014-09-24 18:28:38 -0700148 K key = deserialize(event.getKey());
Yuta HIGUCHIf5479702014-09-25 00:09:24 -0700149 V val = deserialize(event.getOldValue());
tomca55e642014-09-24 18:28:38 -0700150 cache.invalidate(key);
151 onRemove(key, val);
tomb41d1ac2014-09-24 01:51:24 -0700152 }
153
tomca55e642014-09-24 18:28:38 -0700154 /**
155 * Cache entry addition hook.
156 *
157 * @param key new key
158 * @param newVal new value
159 */
160 protected void onAdd(K key, V newVal) {
161 }
162
163 /**
164 * Cache entry update hook.
165 *
166 * @param key new key
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700167 * @param oldValue old value
tomca55e642014-09-24 18:28:38 -0700168 * @param newVal new value
169 */
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700170 protected void onUpdate(K key, V oldValue, V newVal) {
tomca55e642014-09-24 18:28:38 -0700171 }
172
173 /**
174 * Cache entry remove hook.
175 *
176 * @param key new key
177 * @param val old value
178 */
179 protected void onRemove(K key, V val) {
tomb41d1ac2014-09-24 01:51:24 -0700180 }
181 }
182
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700183 /**
184 * Distributed object remote event entry listener.
185 *
186 * @param <K> Entry key type after deserialization
187 * @param <V> Entry value type after deserialization
188 */
189 public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
190
191 private final Member localMember;
192
193 public RemoteEventHandler() {
194 this.localMember = theInstance.getCluster().getLocalMember();
195 }
196 @Override
197 public void entryAdded(EntryEvent<byte[], byte[]> event) {
198 if (localMember.equals(event.getMember())) {
199 // ignore locally triggered event
200 return;
201 }
202 K key = deserialize(event.getKey());
203 V newVal = deserialize(event.getValue());
204 onAdd(key, newVal);
205 }
206
207 @Override
208 public void entryRemoved(EntryEvent<byte[], byte[]> event) {
209 if (localMember.equals(event.getMember())) {
210 // ignore locally triggered event
211 return;
212 }
213 K key = deserialize(event.getKey());
214 V val = deserialize(event.getValue());
215 onRemove(key, val);
216 }
217
218 @Override
219 public void entryUpdated(EntryEvent<byte[], byte[]> event) {
220 if (localMember.equals(event.getMember())) {
221 // ignore locally triggered event
222 return;
223 }
224 K key = deserialize(event.getKey());
225 V oldVal = deserialize(event.getOldValue());
226 V newVal = deserialize(event.getValue());
227 onUpdate(key, oldVal, newVal);
228 }
229
230 /**
231 * Remote entry addition hook.
232 *
233 * @param key new key
234 * @param newVal new value
235 */
236 protected void onAdd(K key, V newVal) {
237 }
238
239 /**
240 * Remote entry update hook.
241 *
242 * @param key new key
243 * @param oldValue old value
244 * @param newVal new value
245 */
246 protected void onUpdate(K key, V oldValue, V newVal) {
247 }
248
249 /**
250 * Remote entry remove hook.
251 *
252 * @param key new key
253 * @param val old value
254 */
255 protected void onRemove(K key, V val) {
256 }
257 }
258
tomb41d1ac2014-09-24 01:51:24 -0700259}