blob: 00ff7a8a835b50ae6c128d6ea787fa04ae0cb7f7 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datagrid;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08002
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08003import java.util.Collection;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08004import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08005import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08006import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -07007import java.util.concurrent.TimeUnit;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08008
Jonathan Harta99ec672014-04-03 11:30:34 -07009import net.onrc.onos.core.util.serializers.KryoFactory;
10
Jonathan Hart14c4a762014-04-15 10:35:57 -070011import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080014import com.esotericsoftware.kryo.Kryo;
15import com.esotericsoftware.kryo.io.Input;
16import com.esotericsoftware.kryo.io.Output;
17import com.hazelcast.core.EntryEvent;
18import com.hazelcast.core.EntryListener;
19import com.hazelcast.core.HazelcastInstance;
20import com.hazelcast.core.IMap;
21
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080022/**
23 * A datagrid event channel that uses Hazelcast as a datagrid.
Ray Milkeye88b2b82014-03-21 11:40:04 -070024 *
25 * @param <K> The class type of the key.
26 * @param <V> The class type of the value.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080027 */
28public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
Jonathan Hart14c4a762014-04-15 10:35:57 -070029 private static final Logger log =
30 LoggerFactory.getLogger(HazelcastEventChannel.class);
31
Ray Milkeye88b2b82014-03-21 11:40:04 -070032 private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
33 private final String channelName; // The event channel name
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070034 private final Class<K> typeK; // The class type of the key
35 private final Class<V> typeV; // The class type of the value
Ray Milkeye88b2b82014-03-21 11:40:04 -070036 private IMap<K, byte[]> channelMap; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080037 // The channel listeners
Ray Milkeye88b2b82014-03-21 11:40:04 -070038 private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
39 new CopyOnWriteArrayList<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080040
41 // The map entry listener
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070042 private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
Ray Milkeye88b2b82014-03-21 11:40:04 -070043 private String mapListenerId; // The map listener ID
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080044
45 // TODO: We should use a single global KryoFactory instance
Ray Milkeye88b2b82014-03-21 11:40:04 -070046 private final KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080047
48 // Maximum serialized event size
Ray Milkeye88b2b82014-03-21 11:40:04 -070049 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080050
51 /**
52 * Constructor for a given event channel name.
53 *
Ray Milkey9c8a2132014-04-02 15:16:42 -070054 * @param newHazelcastInstance the Hazelcast instance to use.
55 * @param newChannelName the event channel name.
56 * @param newTypeK the type of the Key in the Key-Value store.
57 * @param newTypeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080058 */
Ray Milkeye88b2b82014-03-21 11:40:04 -070059 public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
60 String newChannelName, Class<K> newTypeK,
61 Class<V> newTypeV) {
62 hazelcastInstance = newHazelcastInstance;
63 channelName = newChannelName;
64 typeK = newTypeK;
65 typeV = newTypeV;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080066 }
67
68 /**
69 * Verify the key and value types of a channel.
70 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070071 * @param typeKToVerify the type of the key to verify.
72 * @param typeVToVerify the type of the value to verify.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080073 * @return true if the key and value types of the channel match,
74 * otherwise false.
75 */
76 @Override
Jonathan Hart14c4a762014-04-15 10:35:57 -070077 public boolean verifyKeyValueTypes(Class<?> typeKToVerify,
78 Class<?> typeVToVerify) {
Ray Milkey73019652014-03-24 11:12:44 -070079 return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080080 }
81
82 /**
83 * Cleanup and destroy the channel.
84 */
85 @Override
86 protected void finalize() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070087 shutdown();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080088 }
89
90 /**
91 * Startup the channel operation.
92 */
93 @Override
94 public void startup() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070095 if (channelMap == null) {
96 channelMap = hazelcastInstance.getMap(channelName);
97 mapListenerId = channelMap.addEntryListener(mapEntryListener,
98 true);
99 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800100 }
101
102 /**
103 * Shutdown the channel operation.
104 */
105 @Override
106 public void shutdown() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700107 if (channelMap != null) {
108 channelMap.removeEntryListener(mapListenerId);
109 channelMap = null;
110 mapListenerId = null;
111 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800112 }
113
114 /**
115 * Add event channel listener.
116 *
117 * @param listener the listener to add.
118 */
119 @Override
120 public void addListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700121 if (listeners.contains(listener)) {
122 return; // Nothing to do: already a listener
123 }
124 listeners.add(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800125 }
126
127 /**
128 * Remove event channel listener.
129 *
130 * @param listener the listener to remove.
131 */
132 @Override
133 public void removeListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700134 listeners.remove(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800135 }
136
137 /**
138 * Add an entry to the channel.
139 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700140 * @param key the key of the entry to add.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800141 * @param value the value of the entry to add.
142 */
143 @Override
144 public void addEntry(K key, V value) {
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700145 byte[] valueBytes = serializeValue(value);
146 //
147 // Put the entry in the map:
148 // - Key : Type <K>
149 // - Value : Serialized Value (byte[])
150 //
151 channelMap.putAsync(key, valueBytes);
152 }
153
154 /**
155 * Add a transient entry to the channel.
Ray Milkey9c8a2132014-04-02 15:16:42 -0700156 * <p/>
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700157 * The added entry is transient and will automatically timeout after 1ms.
158 *
159 * @param key the key of the entry to add.
160 * @param value the value of the entry to add.
161 */
162 @Override
163 public void addTransientEntry(K key, V value) {
164 byte[] valueBytes = serializeValue(value);
165 //
166 // Put the entry in the map:
167 // - Key : Type <K>
168 // - Value : Serialized Value (byte[])
169 // - Timeout: 1ms
170 //
171 channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
172 }
173
174 /**
175 * Serialize the value.
176 *
177 * @param value the value to serialize.
178 * @return the serialized value.
179 */
180 private byte[] serializeValue(V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700181 //
182 // Encode the value
183 //
184 byte[] buffer = new byte[MAX_BUFFER_SIZE];
185 Kryo kryo = kryoFactory.newKryo();
186 Output output = new Output(buffer, -1);
Jonathan Hart14c4a762014-04-15 10:35:57 -0700187 kryo.writeClassAndObject(output, value);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700188 byte[] valueBytes = output.toBytes();
189 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800190
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700191 return valueBytes;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800192 }
193
194 /**
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700195 * Deserialize the value.
196 *
197 * @param kryo the Kryo instance to use for the deserialization.
198 * @param valueBytes the buffer with the serialized value.
199 * @return the deserialized value.
200 */
201 private V deserializeValue(Kryo kryo, byte[] valueBytes) {
202 V value;
203
204 //
205 // Decode the value
206 //
207 Input input = new Input(valueBytes);
208 Object objValue = kryo.readClassAndObject(input);
209 try {
210 value = typeV.cast(objValue);
211 } catch (ClassCastException e) {
212 log.error("Received notification value cast failed", e);
213 return null;
214 }
215
216 return value;
217 }
218
219 /**
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800220 * Remove an entry from the channel.
221 *
222 * @param key the key of the entry to remove.
223 */
224 @Override
225 public void removeEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700226 //
227 // Remove the entry:
228 // - Key : Type <K>
229 // - Value : Serialized Value (byte[])
230 //
231 channelMap.removeAsync(key);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800232 }
233
234 /**
235 * Update an entry in the channel.
236 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700237 * @param key the key of the entry to update.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800238 * @param value the value of the entry to update.
239 */
240 @Override
241 public void updateEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700242 // NOTE: Adding an entry with an existing key automatically updates it
243 addEntry(key, value);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800244 }
245
246 /**
247 * Get an entry from the channel.
248 *
249 * @param key the key of the entry to get.
250 * @return the entry if found, otherwise null.
251 */
252 @Override
253 @Deprecated
254 public V getEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700255 byte[] valueBytes = channelMap.get(key);
256 if (valueBytes == null) {
257 return null;
258 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800259
Ray Milkeye88b2b82014-03-21 11:40:04 -0700260 //
261 // Decode the value
262 //
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700263 Kryo kryo = kryoFactory.newKryo();
264 V value = deserializeValue(kryo, valueBytes);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700265 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800266
Ray Milkeye88b2b82014-03-21 11:40:04 -0700267 return value;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800268 }
269
270 /**
271 * Get all entries in the channel.
272 *
273 * @return all entries that are currently in the channel.
274 */
275 @Override
276 @Deprecated
277 public Collection<V> getAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700278 Collection<V> allEntries = new LinkedList<V>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800279
Ray Milkeye88b2b82014-03-21 11:40:04 -0700280 if (channelMap == null) {
281 return allEntries; // Nothing found
282 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800283
Ray Milkeye88b2b82014-03-21 11:40:04 -0700284 //
285 // Get all entries
286 //
287 Collection<byte[]> values = channelMap.values();
288 Kryo kryo = kryoFactory.newKryo();
289 for (byte[] valueBytes : values) {
290 //
291 // Decode the value
292 //
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700293 V value = deserializeValue(kryo, valueBytes);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700294 allEntries.add(value);
295 }
296 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800297
Ray Milkeye88b2b82014-03-21 11:40:04 -0700298 return allEntries;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800299 }
300
301 /**
302 * Remove all entries in the channel.
303 */
304 @Override
305 @Deprecated
306 public void removeAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700307 //
308 // Remove all entries
309 //
310 // NOTE: We remove the entries one-by-one so the per-entry
311 // notifications will be delivered.
312 //
313 // channelMap.clear();
314 Set<K> keySet = channelMap.keySet();
315 for (K key : keySet) {
316 channelMap.removeAsync(key);
317 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800318 }
319
320 /**
321 * Class for receiving event notifications for the channel.
Ray Milkeye88b2b82014-03-21 11:40:04 -0700322 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800323 * The datagrid map is:
Ray Milkeye88b2b82014-03-21 11:40:04 -0700324 * - Key: Type K
325 * - Value: Serialized V (byte[])
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800326 */
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700327 private class MapEntryListener implements EntryListener<K, byte[]> {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700328 /**
329 * Receive a notification that an entry is added.
330 *
331 * @param event the notification event for the entry.
332 */
333 @Override
334 public void entryAdded(EntryEvent<K, byte[]> event) {
335 //
336 // Decode the value
337 //
338 byte[] valueBytes = event.getValue();
339 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700340 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800341
Ray Milkeye88b2b82014-03-21 11:40:04 -0700342 //
343 // Deliver the notification
344 //
345 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700346 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700347 V copyValue = value;
348 if (index++ > 0) {
349 // Each listener should get a deep copy of the value
350 copyValue = kryo.copy(value);
351 }
352 listener.entryAdded(copyValue);
353 }
354 kryoFactory.deleteKryo(kryo);
355 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800356
Ray Milkeye88b2b82014-03-21 11:40:04 -0700357 /**
358 * Receive a notification that an entry is removed.
359 *
360 * @param event the notification event for the entry.
361 */
362 @Override
363 public void entryRemoved(EntryEvent<K, byte[]> event) {
364 //
365 // Decode the value
366 //
367 byte[] valueBytes = event.getValue();
368 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700369 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800370
Ray Milkeye88b2b82014-03-21 11:40:04 -0700371 //
372 // Deliver the notification
373 //
374 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700375 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700376 V copyValue = value;
377 if (index++ > 0) {
378 // Each listener should get a deep copy of the value
379 copyValue = kryo.copy(value);
380 }
381 listener.entryRemoved(copyValue);
382 }
383 kryoFactory.deleteKryo(kryo);
384 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800385
Ray Milkeye88b2b82014-03-21 11:40:04 -0700386 /**
387 * Receive a notification that an entry is updated.
388 *
389 * @param event the notification event for the entry.
390 */
391 @Override
392 public void entryUpdated(EntryEvent<K, byte[]> event) {
393 //
394 // Decode the value
395 //
396 byte[] valueBytes = event.getValue();
397 Kryo kryo = kryoFactory.newKryo();
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700398 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800399
Ray Milkeye88b2b82014-03-21 11:40:04 -0700400 //
401 // Deliver the notification
402 //
403 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700404 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700405 V copyValue = value;
406 if (index++ > 0) {
407 // Each listener should get a deep copy of the value
408 copyValue = kryo.copy(value);
409 }
410 listener.entryUpdated(copyValue);
411 }
412 kryoFactory.deleteKryo(kryo);
413 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800414
Ray Milkeye88b2b82014-03-21 11:40:04 -0700415 /**
416 * Receive a notification that an entry is evicted.
417 *
418 * @param event the notification event for the entry.
419 */
420 @Override
421 public void entryEvicted(EntryEvent<K, byte[]> event) {
422 // NOTE: We don't use eviction for this map
423 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800424 }
425}