blob: 5774af25ff3119fe199a90ff9ad098576898892c [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datagrid;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08002
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08003import java.util.Collection;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08004import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08005import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08006import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -07007import java.util.concurrent.TimeUnit;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08008
Jonathan Harta99ec672014-04-03 11:30:34 -07009import net.onrc.onos.core.util.serializers.KryoFactory;
10
Jonathan Hart14c4a762014-04-15 10:35:57 -070011import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080014import com.esotericsoftware.kryo.Kryo;
15import com.esotericsoftware.kryo.io.Input;
16import com.esotericsoftware.kryo.io.Output;
17import com.hazelcast.core.EntryEvent;
18import com.hazelcast.core.EntryListener;
19import com.hazelcast.core.HazelcastInstance;
20import com.hazelcast.core.IMap;
21
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080022/**
23 * A datagrid event channel that uses Hazelcast as a datagrid.
Ray Milkeye88b2b82014-03-21 11:40:04 -070024 *
25 * @param <K> The class type of the key.
26 * @param <V> The class type of the value.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080027 */
28public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
Jonathan Hart14c4a762014-04-15 10:35:57 -070029 private static final Logger log =
30 LoggerFactory.getLogger(HazelcastEventChannel.class);
31
Ray Milkeye88b2b82014-03-21 11:40:04 -070032 private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
33 private final String channelName; // The event channel name
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070034 private final Class<K> typeK; // The class type of the key
35 private final Class<V> typeV; // The class type of the value
Ray Milkeye88b2b82014-03-21 11:40:04 -070036 private IMap<K, byte[]> channelMap; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080037 // The channel listeners
Ray Milkeye88b2b82014-03-21 11:40:04 -070038 private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
39 new CopyOnWriteArrayList<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080040
41 // The map entry listener
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070042 private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
Ray Milkeye88b2b82014-03-21 11:40:04 -070043 private String mapListenerId; // The map listener ID
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080044
45 // TODO: We should use a single global KryoFactory instance
Ray Milkeye88b2b82014-03-21 11:40:04 -070046 private final KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080047
48 // Maximum serialized event size
Ray Milkeye88b2b82014-03-21 11:40:04 -070049 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080050
51 /**
52 * Constructor for a given event channel name.
53 *
Ray Milkey9c8a2132014-04-02 15:16:42 -070054 * @param newHazelcastInstance the Hazelcast instance to use.
55 * @param newChannelName the event channel name.
56 * @param newTypeK the type of the Key in the Key-Value store.
57 * @param newTypeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080058 */
Ray Milkeye88b2b82014-03-21 11:40:04 -070059 public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
60 String newChannelName, Class<K> newTypeK,
61 Class<V> newTypeV) {
62 hazelcastInstance = newHazelcastInstance;
63 channelName = newChannelName;
64 typeK = newTypeK;
65 typeV = newTypeV;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080066 }
67
68 /**
69 * Verify the key and value types of a channel.
70 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070071 * @param typeKToVerify the type of the key to verify.
72 * @param typeVToVerify the type of the value to verify.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080073 * @return true if the key and value types of the channel match,
74 * otherwise false.
75 */
76 @Override
Jonathan Hart14c4a762014-04-15 10:35:57 -070077 public boolean verifyKeyValueTypes(Class<?> typeKToVerify,
78 Class<?> typeVToVerify) {
Ray Milkey73019652014-03-24 11:12:44 -070079 return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080080 }
81
82 /**
83 * Cleanup and destroy the channel.
84 */
85 @Override
86 protected void finalize() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070087 shutdown();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080088 }
89
90 /**
91 * Startup the channel operation.
92 */
93 @Override
94 public void startup() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070095 if (channelMap == null) {
96 channelMap = hazelcastInstance.getMap(channelName);
97 mapListenerId = channelMap.addEntryListener(mapEntryListener,
98 true);
99 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800100 }
101
102 /**
103 * Shutdown the channel operation.
104 */
105 @Override
106 public void shutdown() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700107 if (channelMap != null) {
108 channelMap.removeEntryListener(mapListenerId);
109 channelMap = null;
110 mapListenerId = null;
111 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800112 }
113
114 /**
115 * Add event channel listener.
116 *
117 * @param listener the listener to add.
118 */
119 @Override
120 public void addListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700121 if (listeners.contains(listener)) {
122 return; // Nothing to do: already a listener
123 }
124 listeners.add(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800125 }
126
127 /**
128 * Remove event channel listener.
129 *
130 * @param listener the listener to remove.
131 */
132 @Override
133 public void removeListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700134 listeners.remove(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800135 }
136
137 /**
138 * Add an entry to the channel.
139 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700140 * @param key the key of the entry to add.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800141 * @param value the value of the entry to add.
142 */
143 @Override
144 public void addEntry(K key, V value) {
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700145 byte[] valueBytes = serializeValue(value);
146 //
147 // Put the entry in the map:
148 // - Key : Type <K>
149 // - Value : Serialized Value (byte[])
150 //
151 channelMap.putAsync(key, valueBytes);
152 }
153
154 /**
155 * Add a transient entry to the channel.
Ray Milkey9c8a2132014-04-02 15:16:42 -0700156 * <p/>
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700157 * The added entry is transient and will automatically timeout after 1ms.
158 *
159 * @param key the key of the entry to add.
160 * @param value the value of the entry to add.
161 */
162 @Override
163 public void addTransientEntry(K key, V value) {
164 byte[] valueBytes = serializeValue(value);
165 //
166 // Put the entry in the map:
167 // - Key : Type <K>
168 // - Value : Serialized Value (byte[])
169 // - Timeout: 1ms
170 //
171 channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
172 }
173
174 /**
175 * Serialize the value.
176 *
177 * @param value the value to serialize.
178 * @return the serialized value.
179 */
180 private byte[] serializeValue(V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700181 //
182 // Encode the value
183 //
184 byte[] buffer = new byte[MAX_BUFFER_SIZE];
185 Kryo kryo = kryoFactory.newKryo();
186 Output output = new Output(buffer, -1);
Jonathan Hart14c4a762014-04-15 10:35:57 -0700187 kryo.writeClassAndObject(output, value);
Ray Milkeye88b2b82014-03-21 11:40:04 -0700188 byte[] valueBytes = output.toBytes();
189 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800190
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700191 return valueBytes;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800192 }
193
194 /**
195 * Remove an entry from the channel.
196 *
197 * @param key the key of the entry to remove.
198 */
199 @Override
200 public void removeEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700201 //
202 // Remove the entry:
203 // - Key : Type <K>
204 // - Value : Serialized Value (byte[])
205 //
206 channelMap.removeAsync(key);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800207 }
208
209 /**
210 * Update an entry in the channel.
211 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700212 * @param key the key of the entry to update.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800213 * @param value the value of the entry to update.
214 */
215 @Override
216 public void updateEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700217 // NOTE: Adding an entry with an existing key automatically updates it
218 addEntry(key, value);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800219 }
220
221 /**
222 * Get an entry from the channel.
223 *
224 * @param key the key of the entry to get.
225 * @return the entry if found, otherwise null.
226 */
227 @Override
228 @Deprecated
229 public V getEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700230 byte[] valueBytes = channelMap.get(key);
231 if (valueBytes == null) {
232 return null;
233 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800234
Ray Milkeye88b2b82014-03-21 11:40:04 -0700235 Kryo kryo = kryoFactory.newKryo();
236 //
237 // Decode the value
238 //
239 Input input = new Input(valueBytes);
240 V value = (V) kryo.readObject(input, typeV);
241 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800242
Ray Milkeye88b2b82014-03-21 11:40:04 -0700243 return value;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800244 }
245
246 /**
247 * Get all entries in the channel.
248 *
249 * @return all entries that are currently in the channel.
250 */
251 @Override
252 @Deprecated
253 public Collection<V> getAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700254 Collection<V> allEntries = new LinkedList<V>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800255
Ray Milkeye88b2b82014-03-21 11:40:04 -0700256 if (channelMap == null) {
257 return allEntries; // Nothing found
258 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800259
Ray Milkeye88b2b82014-03-21 11:40:04 -0700260 //
261 // Get all entries
262 //
263 Collection<byte[]> values = channelMap.values();
264 Kryo kryo = kryoFactory.newKryo();
265 for (byte[] valueBytes : values) {
266 //
267 // Decode the value
268 //
269 Input input = new Input(valueBytes);
270 V value = (V) kryo.readObject(input, typeV);
271 allEntries.add(value);
272 }
273 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800274
Ray Milkeye88b2b82014-03-21 11:40:04 -0700275 return allEntries;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800276 }
277
278 /**
279 * Remove all entries in the channel.
280 */
281 @Override
282 @Deprecated
283 public void removeAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700284 //
285 // Remove all entries
286 //
287 // NOTE: We remove the entries one-by-one so the per-entry
288 // notifications will be delivered.
289 //
290 // channelMap.clear();
291 Set<K> keySet = channelMap.keySet();
292 for (K key : keySet) {
293 channelMap.removeAsync(key);
294 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800295 }
296
297 /**
298 * Class for receiving event notifications for the channel.
Ray Milkeye88b2b82014-03-21 11:40:04 -0700299 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800300 * The datagrid map is:
Ray Milkeye88b2b82014-03-21 11:40:04 -0700301 * - Key: Type K
302 * - Value: Serialized V (byte[])
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800303 */
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700304 private class MapEntryListener implements EntryListener<K, byte[]> {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700305 /**
306 * Receive a notification that an entry is added.
307 *
308 * @param event the notification event for the entry.
309 */
310 @Override
311 public void entryAdded(EntryEvent<K, byte[]> event) {
312 //
313 // Decode the value
314 //
315 byte[] valueBytes = event.getValue();
316 Kryo kryo = kryoFactory.newKryo();
317 Input input = new Input(valueBytes);
Jonathan Hart14c4a762014-04-15 10:35:57 -0700318
319 Object objValue = kryo.readClassAndObject(input);
320 V value;
321 try {
322 value = typeV.cast(objValue);
323 } catch (ClassCastException e) {
324 log.error("Received notification value cast failed", e);
325 return;
326 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800327
Ray Milkeye88b2b82014-03-21 11:40:04 -0700328 //
329 // Deliver the notification
330 //
331 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700332 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700333 V copyValue = value;
334 if (index++ > 0) {
335 // Each listener should get a deep copy of the value
336 copyValue = kryo.copy(value);
337 }
338 listener.entryAdded(copyValue);
339 }
340 kryoFactory.deleteKryo(kryo);
341 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800342
Ray Milkeye88b2b82014-03-21 11:40:04 -0700343 /**
344 * Receive a notification that an entry is removed.
345 *
346 * @param event the notification event for the entry.
347 */
348 @Override
349 public void entryRemoved(EntryEvent<K, byte[]> event) {
350 //
351 // Decode the value
352 //
353 byte[] valueBytes = event.getValue();
354 Kryo kryo = kryoFactory.newKryo();
355 Input input = new Input(valueBytes);
Jonathan Hart14c4a762014-04-15 10:35:57 -0700356
357 Object objValue = kryo.readClassAndObject(input);
358 V value;
359 try {
360 value = typeV.cast(objValue);
361 } catch (ClassCastException e) {
362 log.error("Received notification value cast failed", e);
363 return;
364 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800365
Ray Milkeye88b2b82014-03-21 11:40:04 -0700366 //
367 // Deliver the notification
368 //
369 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700370 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700371 V copyValue = value;
372 if (index++ > 0) {
373 // Each listener should get a deep copy of the value
374 copyValue = kryo.copy(value);
375 }
376 listener.entryRemoved(copyValue);
377 }
378 kryoFactory.deleteKryo(kryo);
379 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800380
Ray Milkeye88b2b82014-03-21 11:40:04 -0700381 /**
382 * Receive a notification that an entry is updated.
383 *
384 * @param event the notification event for the entry.
385 */
386 @Override
387 public void entryUpdated(EntryEvent<K, byte[]> event) {
388 //
389 // Decode the value
390 //
391 byte[] valueBytes = event.getValue();
392 Kryo kryo = kryoFactory.newKryo();
393 Input input = new Input(valueBytes);
Jonathan Hart14c4a762014-04-15 10:35:57 -0700394
395 Object objValue = kryo.readClassAndObject(input);
396 V value;
397 try {
398 value = typeV.cast(objValue);
399 } catch (ClassCastException e) {
400 log.error("Received notification value cast failed", e);
401 return;
402 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800403
Ray Milkeye88b2b82014-03-21 11:40:04 -0700404 //
405 // Deliver the notification
406 //
407 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700408 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700409 V copyValue = value;
410 if (index++ > 0) {
411 // Each listener should get a deep copy of the value
412 copyValue = kryo.copy(value);
413 }
414 listener.entryUpdated(copyValue);
415 }
416 kryoFactory.deleteKryo(kryo);
417 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800418
Ray Milkeye88b2b82014-03-21 11:40:04 -0700419 /**
420 * Receive a notification that an entry is evicted.
421 *
422 * @param event the notification event for the entry.
423 */
424 @Override
425 public void entryEvicted(EntryEvent<K, byte[]> event) {
426 // NOTE: We don't use eviction for this map
427 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800428 }
429}