blob: 4afd1443d5a9a2054adecfcc20040d490d914e2e [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datagrid;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08002
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08003import java.util.Collection;
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -07004import java.util.Iterator;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08005import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08006import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08007import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -07008import java.util.concurrent.TimeUnit;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08009
Jonathan Harta99ec672014-04-03 11:30:34 -070010import net.onrc.onos.core.util.serializers.KryoFactory;
11
Jonathan Hart14c4a762014-04-15 10:35:57 -070012import org.slf4j.Logger;
13import org.slf4j.LoggerFactory;
14
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080015import com.esotericsoftware.kryo.Kryo;
16import com.esotericsoftware.kryo.io.Input;
17import com.esotericsoftware.kryo.io.Output;
18import com.hazelcast.core.EntryEvent;
19import com.hazelcast.core.EntryListener;
20import com.hazelcast.core.HazelcastInstance;
21import com.hazelcast.core.IMap;
22
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080023/**
24 * A datagrid event channel that uses Hazelcast as a datagrid.
Ray Milkeye88b2b82014-03-21 11:40:04 -070025 *
26 * @param <K> The class type of the key.
27 * @param <V> The class type of the value.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080028 */
29public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
Jonathan Hart14c4a762014-04-15 10:35:57 -070030 private static final Logger log =
31 LoggerFactory.getLogger(HazelcastEventChannel.class);
32
Ray Milkeye88b2b82014-03-21 11:40:04 -070033 private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
34 private final String channelName; // The event channel name
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070035 private final Class<K> typeK; // The class type of the key
36 private final Class<V> typeV; // The class type of the value
Ray Milkeye88b2b82014-03-21 11:40:04 -070037 private IMap<K, byte[]> channelMap; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080038 // The channel listeners
Ray Milkeye88b2b82014-03-21 11:40:04 -070039 private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
40 new CopyOnWriteArrayList<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080041
42 // The map entry listener
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070043 private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
Ray Milkeye88b2b82014-03-21 11:40:04 -070044 private String mapListenerId; // The map listener ID
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080045
46 // TODO: We should use a single global KryoFactory instance
Ray Milkeye88b2b82014-03-21 11:40:04 -070047 private final KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080048
49 // Maximum serialized event size
Ray Milkeye88b2b82014-03-21 11:40:04 -070050 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080051
52 /**
53 * Constructor for a given event channel name.
54 *
Ray Milkey9c8a2132014-04-02 15:16:42 -070055 * @param newHazelcastInstance the Hazelcast instance to use.
56 * @param newChannelName the event channel name.
57 * @param newTypeK the type of the Key in the Key-Value store.
58 * @param newTypeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080059 */
Ray Milkeye88b2b82014-03-21 11:40:04 -070060 public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
61 String newChannelName, Class<K> newTypeK,
62 Class<V> newTypeV) {
63 hazelcastInstance = newHazelcastInstance;
64 channelName = newChannelName;
65 typeK = newTypeK;
66 typeV = newTypeV;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080067 }
68
69 /**
70 * Verify the key and value types of a channel.
71 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070072 * @param typeKToVerify the type of the key to verify.
73 * @param typeVToVerify the type of the value to verify.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080074 * @return true if the key and value types of the channel match,
75 * otherwise false.
76 */
77 @Override
Jonathan Hart14c4a762014-04-15 10:35:57 -070078 public boolean verifyKeyValueTypes(Class<?> typeKToVerify,
79 Class<?> typeVToVerify) {
Ray Milkey73019652014-03-24 11:12:44 -070080 return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080081 }
82
83 /**
84 * Cleanup and destroy the channel.
85 */
86 @Override
87 protected void finalize() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070088 shutdown();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080089 }
90
91 /**
92 * Startup the channel operation.
93 */
94 @Override
95 public void startup() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070096 if (channelMap == null) {
97 channelMap = hazelcastInstance.getMap(channelName);
98 mapListenerId = channelMap.addEntryListener(mapEntryListener,
99 true);
100 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800101 }
102
103 /**
104 * Shutdown the channel operation.
105 */
106 @Override
107 public void shutdown() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700108 if (channelMap != null) {
109 channelMap.removeEntryListener(mapListenerId);
110 channelMap = null;
111 mapListenerId = null;
112 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800113 }
114
115 /**
116 * Add event channel listener.
117 *
118 * @param listener the listener to add.
119 */
120 @Override
121 public void addListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700122 if (listeners.contains(listener)) {
123 return; // Nothing to do: already a listener
124 }
125 listeners.add(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800126 }
127
128 /**
129 * Remove event channel listener.
130 *
131 * @param listener the listener to remove.
132 */
133 @Override
134 public void removeListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700135 listeners.remove(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800136 }
137
138 /**
139 * Add an entry to the channel.
140 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700141 * @param key the key of the entry to add.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800142 * @param value the value of the entry to add.
143 */
144 @Override
145 public void addEntry(K key, V value) {
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700146 byte[] valueBytes = serializeValue(value);
147 //
148 // Put the entry in the map:
149 // - Key : Type <K>
150 // - Value : Serialized Value (byte[])
151 //
152 channelMap.putAsync(key, valueBytes);
153 }
154
155 /**
156 * Add a transient entry to the channel.
Ray Milkey9c8a2132014-04-02 15:16:42 -0700157 * <p/>
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700158 * The added entry is transient and will automatically timeout after 1ms.
159 *
160 * @param key the key of the entry to add.
161 * @param value the value of the entry to add.
162 */
163 @Override
164 public void addTransientEntry(K key, V value) {
165 byte[] valueBytes = serializeValue(value);
166 //
167 // Put the entry in the map:
168 // - Key : Type <K>
169 // - Value : Serialized Value (byte[])
170 // - Timeout: 1ms
171 //
172 channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
173 }
174
175 /**
176 * Serialize the value.
177 *
178 * @param value the value to serialize.
179 * @return the serialized value.
180 */
181 private byte[] serializeValue(V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700182 //
183 // Encode the value
184 //
185 byte[] buffer = new byte[MAX_BUFFER_SIZE];
186 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHIe8bfe612014-06-08 14:20:51 -0700187 try {
188 Output output = new Output(buffer, -1);
189 kryo.writeClassAndObject(output, value);
190 byte[] valueBytes = output.toBytes();
191 return valueBytes;
192 } finally {
193 kryoFactory.deleteKryo(kryo);
194 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800195 }
196
197 /**
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700198 * Deserialize the value.
199 *
200 * @param kryo the Kryo instance to use for the deserialization.
201 * @param valueBytes the buffer with the serialized value.
202 * @return the deserialized value.
203 */
204 private V deserializeValue(Kryo kryo, byte[] valueBytes) {
205 V value;
206
207 //
208 // Decode the value
209 //
210 Input input = new Input(valueBytes);
211 Object objValue = kryo.readClassAndObject(input);
212 try {
213 value = typeV.cast(objValue);
214 } catch (ClassCastException e) {
215 log.error("Received notification value cast failed", e);
216 return null;
217 }
218
219 return value;
220 }
221
222 /**
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800223 * Remove an entry from the channel.
224 *
225 * @param key the key of the entry to remove.
226 */
227 @Override
228 public void removeEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700229 //
230 // Remove the entry:
231 // - Key : Type <K>
232 // - Value : Serialized Value (byte[])
233 //
234 channelMap.removeAsync(key);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800235 }
236
237 /**
238 * Update an entry in the channel.
239 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700240 * @param key the key of the entry to update.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800241 * @param value the value of the entry to update.
242 */
243 @Override
244 public void updateEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700245 // NOTE: Adding an entry with an existing key automatically updates it
246 addEntry(key, value);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800247 }
248
249 /**
250 * Get an entry from the channel.
251 *
252 * @param key the key of the entry to get.
253 * @return the entry if found, otherwise null.
254 */
255 @Override
256 @Deprecated
257 public V getEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700258 byte[] valueBytes = channelMap.get(key);
259 if (valueBytes == null) {
260 return null;
261 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800262
Ray Milkeye88b2b82014-03-21 11:40:04 -0700263 //
264 // Decode the value
265 //
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700266 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHIe8bfe612014-06-08 14:20:51 -0700267 try {
268 V value = deserializeValue(kryo, valueBytes);
269 return value;
270 } finally {
271 kryoFactory.deleteKryo(kryo);
272 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800273 }
274
275 /**
276 * Get all entries in the channel.
277 *
278 * @return all entries that are currently in the channel.
279 */
280 @Override
281 @Deprecated
282 public Collection<V> getAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700283 Collection<V> allEntries = new LinkedList<V>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800284
Ray Milkeye88b2b82014-03-21 11:40:04 -0700285 if (channelMap == null) {
286 return allEntries; // Nothing found
287 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800288
Ray Milkeye88b2b82014-03-21 11:40:04 -0700289 //
290 // Get all entries
291 //
292 Collection<byte[]> values = channelMap.values();
293 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHIe8bfe612014-06-08 14:20:51 -0700294 try {
295 for (byte[] valueBytes : values) {
296 //
297 // Decode the value
298 //
299 V value = deserializeValue(kryo, valueBytes);
300 allEntries.add(value);
301 }
302 } finally {
303 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700304 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800305
Ray Milkeye88b2b82014-03-21 11:40:04 -0700306 return allEntries;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800307 }
308
309 /**
310 * Remove all entries in the channel.
311 */
312 @Override
313 @Deprecated
314 public void removeAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700315 //
316 // Remove all entries
317 //
318 // NOTE: We remove the entries one-by-one so the per-entry
319 // notifications will be delivered.
320 //
321 // channelMap.clear();
322 Set<K> keySet = channelMap.keySet();
323 for (K key : keySet) {
324 channelMap.removeAsync(key);
325 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800326 }
327
328 /**
329 * Class for receiving event notifications for the channel.
Ray Milkeye88b2b82014-03-21 11:40:04 -0700330 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800331 * The datagrid map is:
Ray Milkeye88b2b82014-03-21 11:40:04 -0700332 * - Key: Type K
333 * - Value: Serialized V (byte[])
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800334 */
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700335 private class MapEntryListener implements EntryListener<K, byte[]> {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700336 /**
337 * Receive a notification that an entry is added.
338 *
339 * @param event the notification event for the entry.
340 */
341 @Override
342 public void entryAdded(EntryEvent<K, byte[]> event) {
343 //
344 // Decode the value
345 //
346 byte[] valueBytes = event.getValue();
347 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700348 try {
349 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800350
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700351 //
352 // Deliver the notification
353 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700354 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
355 while (it.hasNext()) {
356 final IEventChannelListener<K, V> listener = it.next();
357 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700358 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700359 // TODO: compare which is faster
360 // - kryo.copy(value)
361 // - deserializeValue(kryo, valueBytes)
362 listener.entryAdded(kryo.copy(value));
363 } else {
364 // Last listener can use the value
365 listener.entryAdded(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700366 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700367 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700368 } finally {
369 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700370 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700371 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800372
Ray Milkeye88b2b82014-03-21 11:40:04 -0700373 /**
374 * Receive a notification that an entry is removed.
375 *
376 * @param event the notification event for the entry.
377 */
378 @Override
379 public void entryRemoved(EntryEvent<K, byte[]> event) {
380 //
381 // Decode the value
382 //
383 byte[] valueBytes = event.getValue();
384 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700385 try {
386 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800387
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700388 //
389 // Deliver the notification
390 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700391 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
392 while (it.hasNext()) {
393 final IEventChannelListener<K, V> listener = it.next();
394 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700395 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700396 listener.entryRemoved(kryo.copy(value));
397 } else {
398 // Last listener can use the value
399 listener.entryRemoved(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700400 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700401 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700402 } finally {
403 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700404 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700405 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800406
Ray Milkeye88b2b82014-03-21 11:40:04 -0700407 /**
408 * Receive a notification that an entry is updated.
409 *
410 * @param event the notification event for the entry.
411 */
412 @Override
413 public void entryUpdated(EntryEvent<K, byte[]> event) {
414 //
415 // Decode the value
416 //
417 byte[] valueBytes = event.getValue();
418 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700419 try {
420 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800421
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700422 //
423 // Deliver the notification
424 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700425 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
426 while (it.hasNext()) {
427 final IEventChannelListener<K, V> listener = it.next();
428 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700429 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700430 listener.entryUpdated(kryo.copy(value));
431 } else {
432 // Last listener can use the value
433 listener.entryUpdated(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700434 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700435 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700436 } finally {
437 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700438 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700439 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800440
Ray Milkeye88b2b82014-03-21 11:40:04 -0700441 /**
442 * Receive a notification that an entry is evicted.
443 *
444 * @param event the notification event for the entry.
445 */
446 @Override
447 public void entryEvicted(EntryEvent<K, byte[]> event) {
448 // NOTE: We don't use eviction for this map
449 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800450 }
451}