blob: fbc67a44fbb8d7e9d045c616fc9735976823fc05 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datagrid;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08002
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08003import java.util.Collection;
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -07004import java.util.Iterator;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08005import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08006import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08007import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -07008import java.util.concurrent.TimeUnit;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08009
Jonathan Harta99ec672014-04-03 11:30:34 -070010import net.onrc.onos.core.util.serializers.KryoFactory;
11
Jonathan Hart14c4a762014-04-15 10:35:57 -070012import org.slf4j.Logger;
13import org.slf4j.LoggerFactory;
14
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080015import com.esotericsoftware.kryo.Kryo;
16import com.esotericsoftware.kryo.io.Input;
17import com.esotericsoftware.kryo.io.Output;
18import com.hazelcast.core.EntryEvent;
19import com.hazelcast.core.EntryListener;
20import com.hazelcast.core.HazelcastInstance;
21import com.hazelcast.core.IMap;
22
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080023/**
24 * A datagrid event channel that uses Hazelcast as a datagrid.
Ray Milkeye88b2b82014-03-21 11:40:04 -070025 *
26 * @param <K> The class type of the key.
27 * @param <V> The class type of the value.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080028 */
29public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
Jonathan Hart14c4a762014-04-15 10:35:57 -070030 private static final Logger log =
31 LoggerFactory.getLogger(HazelcastEventChannel.class);
32
Ray Milkeye88b2b82014-03-21 11:40:04 -070033 private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
34 private final String channelName; // The event channel name
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070035 private final Class<K> typeK; // The class type of the key
36 private final Class<V> typeV; // The class type of the value
Ray Milkeye88b2b82014-03-21 11:40:04 -070037 private IMap<K, byte[]> channelMap; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080038 // The channel listeners
Ray Milkeye88b2b82014-03-21 11:40:04 -070039 private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
40 new CopyOnWriteArrayList<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080041
42 // The map entry listener
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070043 private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
Ray Milkeye88b2b82014-03-21 11:40:04 -070044 private String mapListenerId; // The map listener ID
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080045
46 // TODO: We should use a single global KryoFactory instance
Ray Milkeye88b2b82014-03-21 11:40:04 -070047 private final KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080048
49 // Maximum serialized event size
Ray Milkeye88b2b82014-03-21 11:40:04 -070050 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080051
52 /**
53 * Constructor for a given event channel name.
54 *
Ray Milkey9c8a2132014-04-02 15:16:42 -070055 * @param newHazelcastInstance the Hazelcast instance to use.
56 * @param newChannelName the event channel name.
57 * @param newTypeK the type of the Key in the Key-Value store.
58 * @param newTypeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080059 */
Ray Milkeye88b2b82014-03-21 11:40:04 -070060 public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
61 String newChannelName, Class<K> newTypeK,
62 Class<V> newTypeV) {
63 hazelcastInstance = newHazelcastInstance;
64 channelName = newChannelName;
65 typeK = newTypeK;
66 typeV = newTypeV;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080067 }
68
69 /**
70 * Verify the key and value types of a channel.
71 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070072 * @param typeKToVerify the type of the key to verify.
73 * @param typeVToVerify the type of the value to verify.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080074 * @return true if the key and value types of the channel match,
75 * otherwise false.
76 */
77 @Override
Jonathan Hart14c4a762014-04-15 10:35:57 -070078 public boolean verifyKeyValueTypes(Class<?> typeKToVerify,
79 Class<?> typeVToVerify) {
Ray Milkey73019652014-03-24 11:12:44 -070080 return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080081 }
82
83 /**
84 * Cleanup and destroy the channel.
85 */
86 @Override
87 protected void finalize() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070088 shutdown();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080089 }
90
91 /**
92 * Startup the channel operation.
93 */
94 @Override
95 public void startup() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070096 if (channelMap == null) {
97 channelMap = hazelcastInstance.getMap(channelName);
98 mapListenerId = channelMap.addEntryListener(mapEntryListener,
99 true);
100 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800101 }
102
103 /**
104 * Shutdown the channel operation.
105 */
106 @Override
107 public void shutdown() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700108 if (channelMap != null) {
109 channelMap.removeEntryListener(mapListenerId);
110 channelMap = null;
111 mapListenerId = null;
112 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800113 }
114
115 /**
116 * Add event channel listener.
117 *
118 * @param listener the listener to add.
119 */
120 @Override
121 public void addListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700122 if (listeners.contains(listener)) {
123 return; // Nothing to do: already a listener
124 }
125 listeners.add(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800126 }
127
128 /**
129 * Remove event channel listener.
130 *
131 * @param listener the listener to remove.
132 */
133 @Override
134 public void removeListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700135 listeners.remove(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800136 }
137
138 /**
139 * Add an entry to the channel.
140 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700141 * @param key the key of the entry to add.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800142 * @param value the value of the entry to add.
143 */
144 @Override
145 public void addEntry(K key, V value) {
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700146 byte[] valueBytes = serializeValue(value);
147 //
148 // Put the entry in the map:
149 // - Key : Type <K>
150 // - Value : Serialized Value (byte[])
151 //
152 channelMap.putAsync(key, valueBytes);
153 }
154
155 /**
156 * Add a transient entry to the channel.
Ray Milkey9c8a2132014-04-02 15:16:42 -0700157 * <p/>
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700158 * The added entry is transient and will automatically timeout after 1ms.
159 *
160 * @param key the key of the entry to add.
161 * @param value the value of the entry to add.
162 */
163 @Override
164 public void addTransientEntry(K key, V value) {
165 byte[] valueBytes = serializeValue(value);
166 //
167 // Put the entry in the map:
168 // - Key : Type <K>
169 // - Value : Serialized Value (byte[])
170 // - Timeout: 1ms
171 //
172 channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
173 }
174
175 /**
176 * Serialize the value.
177 *
178 * @param value the value to serialize.
179 * @return the serialized value.
180 */
181 private byte[] serializeValue(V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700182 //
183 // Encode the value
184 //
185 byte[] buffer = new byte[MAX_BUFFER_SIZE];
186 Kryo kryo = kryoFactory.newKryo();
187 Output output = new Output(buffer, -1);
Jonathan Hart14c4a762014-04-15 10:35:57 -0700188 kryo.writeClassAndObject(output, value);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700189 byte[] valueBytes = output.toBytes();
190 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800191
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700192 return valueBytes;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800193 }
194
195 /**
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700196 * Deserialize the value.
197 *
198 * @param kryo the Kryo instance to use for the deserialization.
199 * @param valueBytes the buffer with the serialized value.
200 * @return the deserialized value.
201 */
202 private V deserializeValue(Kryo kryo, byte[] valueBytes) {
203 V value;
204
205 //
206 // Decode the value
207 //
208 Input input = new Input(valueBytes);
209 Object objValue = kryo.readClassAndObject(input);
210 try {
211 value = typeV.cast(objValue);
212 } catch (ClassCastException e) {
213 log.error("Received notification value cast failed", e);
214 return null;
215 }
216
217 return value;
218 }
219
220 /**
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800221 * Remove an entry from the channel.
222 *
223 * @param key the key of the entry to remove.
224 */
225 @Override
226 public void removeEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700227 //
228 // Remove the entry:
229 // - Key : Type <K>
230 // - Value : Serialized Value (byte[])
231 //
232 channelMap.removeAsync(key);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800233 }
234
235 /**
236 * Update an entry in the channel.
237 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700238 * @param key the key of the entry to update.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800239 * @param value the value of the entry to update.
240 */
241 @Override
242 public void updateEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700243 // NOTE: Adding an entry with an existing key automatically updates it
244 addEntry(key, value);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800245 }
246
247 /**
248 * Get an entry from the channel.
249 *
250 * @param key the key of the entry to get.
251 * @return the entry if found, otherwise null.
252 */
253 @Override
254 @Deprecated
255 public V getEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700256 byte[] valueBytes = channelMap.get(key);
257 if (valueBytes == null) {
258 return null;
259 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800260
Ray Milkeye88b2b82014-03-21 11:40:04 -0700261 //
262 // Decode the value
263 //
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700264 Kryo kryo = kryoFactory.newKryo();
265 V value = deserializeValue(kryo, valueBytes);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700266 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800267
Ray Milkeye88b2b82014-03-21 11:40:04 -0700268 return value;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800269 }
270
271 /**
272 * Get all entries in the channel.
273 *
274 * @return all entries that are currently in the channel.
275 */
276 @Override
277 @Deprecated
278 public Collection<V> getAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700279 Collection<V> allEntries = new LinkedList<V>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800280
Ray Milkeye88b2b82014-03-21 11:40:04 -0700281 if (channelMap == null) {
282 return allEntries; // Nothing found
283 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800284
Ray Milkeye88b2b82014-03-21 11:40:04 -0700285 //
286 // Get all entries
287 //
288 Collection<byte[]> values = channelMap.values();
289 Kryo kryo = kryoFactory.newKryo();
290 for (byte[] valueBytes : values) {
291 //
292 // Decode the value
293 //
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700294 V value = deserializeValue(kryo, valueBytes);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700295 allEntries.add(value);
296 }
297 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800298
Ray Milkeye88b2b82014-03-21 11:40:04 -0700299 return allEntries;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800300 }
301
302 /**
303 * Remove all entries in the channel.
304 */
305 @Override
306 @Deprecated
307 public void removeAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700308 //
309 // Remove all entries
310 //
311 // NOTE: We remove the entries one-by-one so the per-entry
312 // notifications will be delivered.
313 //
314 // channelMap.clear();
315 Set<K> keySet = channelMap.keySet();
316 for (K key : keySet) {
317 channelMap.removeAsync(key);
318 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800319 }
320
321 /**
322 * Class for receiving event notifications for the channel.
Ray Milkeye88b2b82014-03-21 11:40:04 -0700323 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800324 * The datagrid map is:
Ray Milkeye88b2b82014-03-21 11:40:04 -0700325 * - Key: Type K
326 * - Value: Serialized V (byte[])
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800327 */
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700328 private class MapEntryListener implements EntryListener<K, byte[]> {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700329 /**
330 * Receive a notification that an entry is added.
331 *
332 * @param event the notification event for the entry.
333 */
334 @Override
335 public void entryAdded(EntryEvent<K, byte[]> event) {
336 //
337 // Decode the value
338 //
339 byte[] valueBytes = event.getValue();
340 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700341 try {
342 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800343
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700344 //
345 // Deliver the notification
346 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700347 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
348 while (it.hasNext()) {
349 final IEventChannelListener<K, V> listener = it.next();
350 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700351 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700352 // TODO: compare which is faster
353 // - kryo.copy(value)
354 // - deserializeValue(kryo, valueBytes)
355 listener.entryAdded(kryo.copy(value));
356 } else {
357 // Last listener can use the value
358 listener.entryAdded(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700359 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700360 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700361 } finally {
362 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700363 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700364 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800365
Ray Milkeye88b2b82014-03-21 11:40:04 -0700366 /**
367 * Receive a notification that an entry is removed.
368 *
369 * @param event the notification event for the entry.
370 */
371 @Override
372 public void entryRemoved(EntryEvent<K, byte[]> event) {
373 //
374 // Decode the value
375 //
376 byte[] valueBytes = event.getValue();
377 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700378 try {
379 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800380
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700381 //
382 // Deliver the notification
383 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700384 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
385 while (it.hasNext()) {
386 final IEventChannelListener<K, V> listener = it.next();
387 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700388 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700389 listener.entryRemoved(kryo.copy(value));
390 } else {
391 // Last listener can use the value
392 listener.entryRemoved(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700393 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700394 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700395 } finally {
396 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700397 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700398 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800399
Ray Milkeye88b2b82014-03-21 11:40:04 -0700400 /**
401 * Receive a notification that an entry is updated.
402 *
403 * @param event the notification event for the entry.
404 */
405 @Override
406 public void entryUpdated(EntryEvent<K, byte[]> event) {
407 //
408 // Decode the value
409 //
410 byte[] valueBytes = event.getValue();
411 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700412 try {
413 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800414
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700415 //
416 // Deliver the notification
417 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700418 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
419 while (it.hasNext()) {
420 final IEventChannelListener<K, V> listener = it.next();
421 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700422 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700423 listener.entryUpdated(kryo.copy(value));
424 } else {
425 // Last listener can use the value
426 listener.entryUpdated(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700427 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700428 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700429 } finally {
430 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700431 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700432 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800433
Ray Milkeye88b2b82014-03-21 11:40:04 -0700434 /**
435 * Receive a notification that an entry is evicted.
436 *
437 * @param event the notification event for the entry.
438 */
439 @Override
440 public void entryEvicted(EntryEvent<K, byte[]> event) {
441 // NOTE: We don't use eviction for this map
442 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800443 }
444}