blob: 41f78d293c095f43554945927e7ea888c9c8dfeb [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datagrid;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08002
Yuta HIGUCHI62d86372014-07-03 12:04:13 -07003import java.util.Arrays;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08004import java.util.Collection;
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -07005import java.util.Iterator;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08006import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08007import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08008import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -07009import java.util.concurrent.TimeUnit;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080010
Jonathan Harta99ec672014-04-03 11:30:34 -070011import net.onrc.onos.core.util.serializers.KryoFactory;
12
Jonathan Hart14c4a762014-04-15 10:35:57 -070013import org.slf4j.Logger;
14import org.slf4j.LoggerFactory;
15
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080016import com.esotericsoftware.kryo.Kryo;
17import com.esotericsoftware.kryo.io.Input;
18import com.esotericsoftware.kryo.io.Output;
19import com.hazelcast.core.EntryEvent;
20import com.hazelcast.core.EntryListener;
21import com.hazelcast.core.HazelcastInstance;
22import com.hazelcast.core.IMap;
23
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080024/**
25 * A datagrid event channel that uses Hazelcast as a datagrid.
Ray Milkeye88b2b82014-03-21 11:40:04 -070026 *
27 * @param <K> The class type of the key.
28 * @param <V> The class type of the value.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080029 */
30public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
Jonathan Hart14c4a762014-04-15 10:35:57 -070031 private static final Logger log =
32 LoggerFactory.getLogger(HazelcastEventChannel.class);
33
Ray Milkeye88b2b82014-03-21 11:40:04 -070034 private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
35 private final String channelName; // The event channel name
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070036 private final Class<K> typeK; // The class type of the key
37 private final Class<V> typeV; // The class type of the value
Ray Milkeye88b2b82014-03-21 11:40:04 -070038 private IMap<K, byte[]> channelMap; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080039 // The channel listeners
Ray Milkeye88b2b82014-03-21 11:40:04 -070040 private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
41 new CopyOnWriteArrayList<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080042
43 // The map entry listener
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070044 private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
Ray Milkeye88b2b82014-03-21 11:40:04 -070045 private String mapListenerId; // The map listener ID
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080046
47 // TODO: We should use a single global KryoFactory instance
Ray Milkeye88b2b82014-03-21 11:40:04 -070048 private final KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080049
50 // Maximum serialized event size
Ray Milkeye88b2b82014-03-21 11:40:04 -070051 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080052
53 /**
54 * Constructor for a given event channel name.
55 *
Ray Milkey9c8a2132014-04-02 15:16:42 -070056 * @param newHazelcastInstance the Hazelcast instance to use.
57 * @param newChannelName the event channel name.
58 * @param newTypeK the type of the Key in the Key-Value store.
59 * @param newTypeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080060 */
Ray Milkeye88b2b82014-03-21 11:40:04 -070061 public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
62 String newChannelName, Class<K> newTypeK,
63 Class<V> newTypeV) {
64 hazelcastInstance = newHazelcastInstance;
65 channelName = newChannelName;
66 typeK = newTypeK;
67 typeV = newTypeV;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080068 }
69
70 /**
71 * Verify the key and value types of a channel.
72 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070073 * @param typeKToVerify the type of the key to verify.
74 * @param typeVToVerify the type of the value to verify.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080075 * @return true if the key and value types of the channel match,
76 * otherwise false.
77 */
78 @Override
Jonathan Hart14c4a762014-04-15 10:35:57 -070079 public boolean verifyKeyValueTypes(Class<?> typeKToVerify,
80 Class<?> typeVToVerify) {
Ray Milkey73019652014-03-24 11:12:44 -070081 return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080082 }
83
84 /**
85 * Cleanup and destroy the channel.
86 */
87 @Override
88 protected void finalize() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070089 shutdown();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080090 }
91
92 /**
93 * Startup the channel operation.
94 */
95 @Override
96 public void startup() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070097 if (channelMap == null) {
98 channelMap = hazelcastInstance.getMap(channelName);
99 mapListenerId = channelMap.addEntryListener(mapEntryListener,
100 true);
101 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800102 }
103
104 /**
105 * Shutdown the channel operation.
106 */
107 @Override
108 public void shutdown() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700109 if (channelMap != null) {
110 channelMap.removeEntryListener(mapListenerId);
111 channelMap = null;
112 mapListenerId = null;
113 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800114 }
115
116 /**
117 * Add event channel listener.
118 *
119 * @param listener the listener to add.
120 */
121 @Override
122 public void addListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700123 if (listeners.contains(listener)) {
124 return; // Nothing to do: already a listener
125 }
126 listeners.add(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800127 }
128
129 /**
130 * Remove event channel listener.
131 *
132 * @param listener the listener to remove.
133 */
134 @Override
135 public void removeListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700136 listeners.remove(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800137 }
138
139 /**
140 * Add an entry to the channel.
141 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700142 * @param key the key of the entry to add.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800143 * @param value the value of the entry to add.
144 */
145 @Override
146 public void addEntry(K key, V value) {
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700147 byte[] valueBytes = serializeValue(value);
148 //
149 // Put the entry in the map:
150 // - Key : Type <K>
151 // - Value : Serialized Value (byte[])
152 //
153 channelMap.putAsync(key, valueBytes);
154 }
155
156 /**
157 * Add a transient entry to the channel.
Ray Milkey9c8a2132014-04-02 15:16:42 -0700158 * <p/>
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700159 * The added entry is transient and will automatically timeout after 1ms.
160 *
161 * @param key the key of the entry to add.
162 * @param value the value of the entry to add.
163 */
164 @Override
165 public void addTransientEntry(K key, V value) {
166 byte[] valueBytes = serializeValue(value);
167 //
168 // Put the entry in the map:
169 // - Key : Type <K>
170 // - Value : Serialized Value (byte[])
171 // - Timeout: 1ms
172 //
173 channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
174 }
175
176 /**
177 * Serialize the value.
178 *
179 * @param value the value to serialize.
180 * @return the serialized value.
181 */
182 private byte[] serializeValue(V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700183 //
184 // Encode the value
185 //
186 byte[] buffer = new byte[MAX_BUFFER_SIZE];
187 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHIe8bfe612014-06-08 14:20:51 -0700188 try {
189 Output output = new Output(buffer, -1);
190 kryo.writeClassAndObject(output, value);
191 byte[] valueBytes = output.toBytes();
192 return valueBytes;
193 } finally {
194 kryoFactory.deleteKryo(kryo);
195 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800196 }
197
198 /**
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700199 * Deserialize the value.
200 *
201 * @param kryo the Kryo instance to use for the deserialization.
202 * @param valueBytes the buffer with the serialized value.
203 * @return the deserialized value.
204 */
205 private V deserializeValue(Kryo kryo, byte[] valueBytes) {
206 V value;
207
208 //
209 // Decode the value
210 //
211 Input input = new Input(valueBytes);
212 Object objValue = kryo.readClassAndObject(input);
213 try {
214 value = typeV.cast(objValue);
215 } catch (ClassCastException e) {
216 log.error("Received notification value cast failed", e);
217 return null;
218 }
219
220 return value;
221 }
222
223 /**
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800224 * Remove an entry from the channel.
225 *
226 * @param key the key of the entry to remove.
227 */
228 @Override
229 public void removeEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700230 //
231 // Remove the entry:
232 // - Key : Type <K>
233 // - Value : Serialized Value (byte[])
234 //
235 channelMap.removeAsync(key);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800236 }
237
238 /**
239 * Update an entry in the channel.
240 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700241 * @param key the key of the entry to update.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800242 * @param value the value of the entry to update.
243 */
244 @Override
245 public void updateEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700246 // NOTE: Adding an entry with an existing key automatically updates it
247 addEntry(key, value);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800248 }
249
250 /**
251 * Get an entry from the channel.
252 *
253 * @param key the key of the entry to get.
254 * @return the entry if found, otherwise null.
255 */
256 @Override
257 @Deprecated
258 public V getEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700259 byte[] valueBytes = channelMap.get(key);
260 if (valueBytes == null) {
261 return null;
262 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800263
Ray Milkeye88b2b82014-03-21 11:40:04 -0700264 //
265 // Decode the value
266 //
Pavlin Radoslavov7439db12014-04-17 13:50:29 -0700267 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHIe8bfe612014-06-08 14:20:51 -0700268 try {
269 V value = deserializeValue(kryo, valueBytes);
270 return value;
271 } finally {
272 kryoFactory.deleteKryo(kryo);
273 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800274 }
275
276 /**
277 * Get all entries in the channel.
278 *
279 * @return all entries that are currently in the channel.
280 */
281 @Override
282 @Deprecated
283 public Collection<V> getAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700284 Collection<V> allEntries = new LinkedList<V>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800285
Ray Milkeye88b2b82014-03-21 11:40:04 -0700286 if (channelMap == null) {
287 return allEntries; // Nothing found
288 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800289
Ray Milkeye88b2b82014-03-21 11:40:04 -0700290 //
291 // Get all entries
292 //
293 Collection<byte[]> values = channelMap.values();
294 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHIe8bfe612014-06-08 14:20:51 -0700295 try {
296 for (byte[] valueBytes : values) {
297 //
298 // Decode the value
299 //
300 V value = deserializeValue(kryo, valueBytes);
301 allEntries.add(value);
302 }
303 } finally {
304 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700305 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800306
Ray Milkeye88b2b82014-03-21 11:40:04 -0700307 return allEntries;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800308 }
309
310 /**
311 * Remove all entries in the channel.
312 */
313 @Override
314 @Deprecated
315 public void removeAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700316 //
317 // Remove all entries
318 //
319 // NOTE: We remove the entries one-by-one so the per-entry
320 // notifications will be delivered.
321 //
322 // channelMap.clear();
323 Set<K> keySet = channelMap.keySet();
324 for (K key : keySet) {
325 channelMap.removeAsync(key);
326 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800327 }
328
329 /**
330 * Class for receiving event notifications for the channel.
Ray Milkeye88b2b82014-03-21 11:40:04 -0700331 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800332 * The datagrid map is:
Ray Milkeye88b2b82014-03-21 11:40:04 -0700333 * - Key: Type K
334 * - Value: Serialized V (byte[])
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800335 */
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700336 private class MapEntryListener implements EntryListener<K, byte[]> {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700337 /**
338 * Receive a notification that an entry is added.
339 *
340 * @param event the notification event for the entry.
341 */
342 @Override
343 public void entryAdded(EntryEvent<K, byte[]> event) {
344 //
345 // Decode the value
346 //
Yuta HIGUCHI62d86372014-07-03 12:04:13 -0700347 byte[] original = event.getValue();
348 // Copying byte[], to see if it resolves ONOS-1343.
349 byte[] valueBytes = Arrays.copyOf(original, original.length);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700350 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700351 try {
352 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800353
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700354 //
355 // Deliver the notification
356 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700357 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
358 while (it.hasNext()) {
359 final IEventChannelListener<K, V> listener = it.next();
360 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700361 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700362 // TODO: compare which is faster
363 // - kryo.copy(value)
364 // - deserializeValue(kryo, valueBytes)
365 listener.entryAdded(kryo.copy(value));
366 } else {
367 // Last listener can use the value
368 listener.entryAdded(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700369 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700370 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700371 } finally {
372 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700373 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700374 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800375
Ray Milkeye88b2b82014-03-21 11:40:04 -0700376 /**
377 * Receive a notification that an entry is removed.
378 *
379 * @param event the notification event for the entry.
380 */
381 @Override
382 public void entryRemoved(EntryEvent<K, byte[]> event) {
383 //
384 // Decode the value
385 //
Yuta HIGUCHI62d86372014-07-03 12:04:13 -0700386 byte[] original = event.getValue();
387 // Copying byte[], to see if it resolves ONOS-1343.
388 byte[] valueBytes = Arrays.copyOf(original, original.length);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700389 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700390 try {
391 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800392
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700393 //
394 // Deliver the notification
395 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700396 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
397 while (it.hasNext()) {
398 final IEventChannelListener<K, V> listener = it.next();
399 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700400 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700401 listener.entryRemoved(kryo.copy(value));
402 } else {
403 // Last listener can use the value
404 listener.entryRemoved(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700405 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700406 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700407 } finally {
408 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700409 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700410 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800411
Ray Milkeye88b2b82014-03-21 11:40:04 -0700412 /**
413 * Receive a notification that an entry is updated.
414 *
415 * @param event the notification event for the entry.
416 */
417 @Override
418 public void entryUpdated(EntryEvent<K, byte[]> event) {
419 //
420 // Decode the value
421 //
Yuta HIGUCHI62d86372014-07-03 12:04:13 -0700422 byte[] original = event.getValue();
423 // Copying byte[], to see if it resolves ONOS-1343.
424 byte[] valueBytes = Arrays.copyOf(original, original.length);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700425 Kryo kryo = kryoFactory.newKryo();
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700426 try {
427 V value = deserializeValue(kryo, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800428
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700429 //
430 // Deliver the notification
431 //
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700432 final Iterator<IEventChannelListener<K, V>> it = listeners.iterator();
433 while (it.hasNext()) {
434 final IEventChannelListener<K, V> listener = it.next();
435 if (it.hasNext()) {
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700436 // Each listener should get a deep copy of the value
Yuta HIGUCHIa5dcd192014-06-06 18:00:35 -0700437 listener.entryUpdated(kryo.copy(value));
438 } else {
439 // Last listener can use the value
440 listener.entryUpdated(value);
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700441 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700442 }
Yuta HIGUCHI57701af2014-06-06 17:56:28 -0700443 } finally {
444 kryoFactory.deleteKryo(kryo);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700445 }
Ray Milkeye88b2b82014-03-21 11:40:04 -0700446 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800447
Ray Milkeye88b2b82014-03-21 11:40:04 -0700448 /**
449 * Receive a notification that an entry is evicted.
450 *
451 * @param event the notification event for the entry.
452 */
453 @Override
454 public void entryEvicted(EntryEvent<K, byte[]> event) {
455 // NOTE: We don't use eviction for this map
456 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800457 }
458}