blob: fab69433995ea4c4eb9773f7abb8a1031de4e749 [file] [log] [blame]
Jonathan Hart6df90172014-04-03 10:13:11 -07001package net.onrc.onos.core.datagrid;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08002
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08003import java.util.Collection;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08004import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08005import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08006import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -07007import java.util.concurrent.TimeUnit;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08008
9import com.esotericsoftware.kryo.Kryo;
10import com.esotericsoftware.kryo.io.Input;
11import com.esotericsoftware.kryo.io.Output;
12import com.hazelcast.core.EntryEvent;
13import com.hazelcast.core.EntryListener;
14import com.hazelcast.core.HazelcastInstance;
15import com.hazelcast.core.IMap;
16
Jonathan Hart23701d12014-04-03 10:45:48 -070017import net.onrc.onos.core.util.serializers.KryoFactory;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080018
19/**
20 * A datagrid event channel that uses Hazelcast as a datagrid.
Ray Milkeye88b2b82014-03-21 11:40:04 -070021 *
22 * @param <K> The class type of the key.
23 * @param <V> The class type of the value.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080024 */
25public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
Ray Milkeye88b2b82014-03-21 11:40:04 -070026 private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
27 private final String channelName; // The event channel name
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070028 private final Class<K> typeK; // The class type of the key
29 private final Class<V> typeV; // The class type of the value
Ray Milkeye88b2b82014-03-21 11:40:04 -070030 private IMap<K, byte[]> channelMap; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080031 // The channel listeners
Ray Milkeye88b2b82014-03-21 11:40:04 -070032 private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
33 new CopyOnWriteArrayList<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080034
35 // The map entry listener
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -070036 private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener();
Ray Milkeye88b2b82014-03-21 11:40:04 -070037 private String mapListenerId; // The map listener ID
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080038
39 // TODO: We should use a single global KryoFactory instance
Ray Milkeye88b2b82014-03-21 11:40:04 -070040 private final KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080041
42 // Maximum serialized event size
Ray Milkeye88b2b82014-03-21 11:40:04 -070043 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080044
45 /**
46 * Constructor for a given event channel name.
47 *
Ray Milkey9c8a2132014-04-02 15:16:42 -070048 * @param newHazelcastInstance the Hazelcast instance to use.
49 * @param newChannelName the event channel name.
50 * @param newTypeK the type of the Key in the Key-Value store.
51 * @param newTypeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080052 */
Ray Milkeye88b2b82014-03-21 11:40:04 -070053 public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
54 String newChannelName, Class<K> newTypeK,
55 Class<V> newTypeV) {
56 hazelcastInstance = newHazelcastInstance;
57 channelName = newChannelName;
58 typeK = newTypeK;
59 typeV = newTypeV;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080060 }
61
62 /**
63 * Verify the key and value types of a channel.
64 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070065 * @param typeKToVerify the type of the key to verify.
66 * @param typeVToVerify the type of the value to verify.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080067 * @return true if the key and value types of the channel match,
68 * otherwise false.
69 */
70 @Override
Ray Milkeye88b2b82014-03-21 11:40:04 -070071 public boolean verifyKeyValueTypes(Class typeKToVerify,
72 Class typeVToVerify) {
Ray Milkey73019652014-03-24 11:12:44 -070073 return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080074 }
75
76 /**
77 * Cleanup and destroy the channel.
78 */
79 @Override
80 protected void finalize() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070081 shutdown();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080082 }
83
84 /**
85 * Startup the channel operation.
86 */
87 @Override
88 public void startup() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070089 if (channelMap == null) {
90 channelMap = hazelcastInstance.getMap(channelName);
91 mapListenerId = channelMap.addEntryListener(mapEntryListener,
92 true);
93 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080094 }
95
96 /**
97 * Shutdown the channel operation.
98 */
99 @Override
100 public void shutdown() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700101 if (channelMap != null) {
102 channelMap.removeEntryListener(mapListenerId);
103 channelMap = null;
104 mapListenerId = null;
105 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800106 }
107
108 /**
109 * Add event channel listener.
110 *
111 * @param listener the listener to add.
112 */
113 @Override
114 public void addListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700115 if (listeners.contains(listener)) {
116 return; // Nothing to do: already a listener
117 }
118 listeners.add(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800119 }
120
121 /**
122 * Remove event channel listener.
123 *
124 * @param listener the listener to remove.
125 */
126 @Override
127 public void removeListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700128 listeners.remove(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800129 }
130
131 /**
132 * Add an entry to the channel.
133 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700134 * @param key the key of the entry to add.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800135 * @param value the value of the entry to add.
136 */
137 @Override
138 public void addEntry(K key, V value) {
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700139 byte[] valueBytes = serializeValue(value);
140 //
141 // Put the entry in the map:
142 // - Key : Type <K>
143 // - Value : Serialized Value (byte[])
144 //
145 channelMap.putAsync(key, valueBytes);
146 }
147
148 /**
149 * Add a transient entry to the channel.
Ray Milkey9c8a2132014-04-02 15:16:42 -0700150 * <p/>
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700151 * The added entry is transient and will automatically timeout after 1ms.
152 *
153 * @param key the key of the entry to add.
154 * @param value the value of the entry to add.
155 */
156 @Override
157 public void addTransientEntry(K key, V value) {
158 byte[] valueBytes = serializeValue(value);
159 //
160 // Put the entry in the map:
161 // - Key : Type <K>
162 // - Value : Serialized Value (byte[])
163 // - Timeout: 1ms
164 //
165 channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS);
166 }
167
168 /**
169 * Serialize the value.
170 *
171 * @param value the value to serialize.
172 * @return the serialized value.
173 */
174 private byte[] serializeValue(V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700175 //
176 // Encode the value
177 //
178 byte[] buffer = new byte[MAX_BUFFER_SIZE];
179 Kryo kryo = kryoFactory.newKryo();
180 Output output = new Output(buffer, -1);
181 kryo.writeObject(output, value);
182 byte[] valueBytes = output.toBytes();
183 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800184
Pavlin Radoslavovbcf14332014-03-27 18:15:30 -0700185 return valueBytes;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800186 }
187
188 /**
189 * Remove an entry from the channel.
190 *
191 * @param key the key of the entry to remove.
192 */
193 @Override
194 public void removeEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700195 //
196 // Remove the entry:
197 // - Key : Type <K>
198 // - Value : Serialized Value (byte[])
199 //
200 channelMap.removeAsync(key);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800201 }
202
203 /**
204 * Update an entry in the channel.
205 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700206 * @param key the key of the entry to update.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800207 * @param value the value of the entry to update.
208 */
209 @Override
210 public void updateEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700211 // NOTE: Adding an entry with an existing key automatically updates it
212 addEntry(key, value);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800213 }
214
215 /**
216 * Get an entry from the channel.
217 *
218 * @param key the key of the entry to get.
219 * @return the entry if found, otherwise null.
220 */
221 @Override
222 @Deprecated
223 public V getEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700224 byte[] valueBytes = channelMap.get(key);
225 if (valueBytes == null) {
226 return null;
227 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800228
Ray Milkeye88b2b82014-03-21 11:40:04 -0700229 Kryo kryo = kryoFactory.newKryo();
230 //
231 // Decode the value
232 //
233 Input input = new Input(valueBytes);
234 V value = (V) kryo.readObject(input, typeV);
235 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800236
Ray Milkeye88b2b82014-03-21 11:40:04 -0700237 return value;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800238 }
239
240 /**
241 * Get all entries in the channel.
242 *
243 * @return all entries that are currently in the channel.
244 */
245 @Override
246 @Deprecated
247 public Collection<V> getAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700248 Collection<V> allEntries = new LinkedList<V>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800249
Ray Milkeye88b2b82014-03-21 11:40:04 -0700250 if (channelMap == null) {
251 return allEntries; // Nothing found
252 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800253
Ray Milkeye88b2b82014-03-21 11:40:04 -0700254 //
255 // Get all entries
256 //
257 Collection<byte[]> values = channelMap.values();
258 Kryo kryo = kryoFactory.newKryo();
259 for (byte[] valueBytes : values) {
260 //
261 // Decode the value
262 //
263 Input input = new Input(valueBytes);
264 V value = (V) kryo.readObject(input, typeV);
265 allEntries.add(value);
266 }
267 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800268
Ray Milkeye88b2b82014-03-21 11:40:04 -0700269 return allEntries;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800270 }
271
272 /**
273 * Remove all entries in the channel.
274 */
275 @Override
276 @Deprecated
277 public void removeAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700278 //
279 // Remove all entries
280 //
281 // NOTE: We remove the entries one-by-one so the per-entry
282 // notifications will be delivered.
283 //
284 // channelMap.clear();
285 Set<K> keySet = channelMap.keySet();
286 for (K key : keySet) {
287 channelMap.removeAsync(key);
288 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800289 }
290
291 /**
292 * Class for receiving event notifications for the channel.
Ray Milkeye88b2b82014-03-21 11:40:04 -0700293 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800294 * The datagrid map is:
Ray Milkeye88b2b82014-03-21 11:40:04 -0700295 * - Key: Type K
296 * - Value: Serialized V (byte[])
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800297 */
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700298 private class MapEntryListener implements EntryListener<K, byte[]> {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700299 /**
300 * Receive a notification that an entry is added.
301 *
302 * @param event the notification event for the entry.
303 */
304 @Override
305 public void entryAdded(EntryEvent<K, byte[]> event) {
306 //
307 // Decode the value
308 //
309 byte[] valueBytes = event.getValue();
310 Kryo kryo = kryoFactory.newKryo();
311 Input input = new Input(valueBytes);
312 V value = (V) kryo.readObject(input, typeV);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800313
Ray Milkeye88b2b82014-03-21 11:40:04 -0700314 //
315 // Deliver the notification
316 //
317 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700318 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700319 V copyValue = value;
320 if (index++ > 0) {
321 // Each listener should get a deep copy of the value
322 copyValue = kryo.copy(value);
323 }
324 listener.entryAdded(copyValue);
325 }
326 kryoFactory.deleteKryo(kryo);
327 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800328
Ray Milkeye88b2b82014-03-21 11:40:04 -0700329 /**
330 * Receive a notification that an entry is removed.
331 *
332 * @param event the notification event for the entry.
333 */
334 @Override
335 public void entryRemoved(EntryEvent<K, byte[]> event) {
336 //
337 // Decode the value
338 //
339 byte[] valueBytes = event.getValue();
340 Kryo kryo = kryoFactory.newKryo();
341 Input input = new Input(valueBytes);
342 V value = (V) kryo.readObject(input, typeV);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800343
Ray Milkeye88b2b82014-03-21 11:40:04 -0700344 //
345 // Deliver the notification
346 //
347 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700348 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700349 V copyValue = value;
350 if (index++ > 0) {
351 // Each listener should get a deep copy of the value
352 copyValue = kryo.copy(value);
353 }
354 listener.entryRemoved(copyValue);
355 }
356 kryoFactory.deleteKryo(kryo);
357 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800358
Ray Milkeye88b2b82014-03-21 11:40:04 -0700359 /**
360 * Receive a notification that an entry is updated.
361 *
362 * @param event the notification event for the entry.
363 */
364 @Override
365 public void entryUpdated(EntryEvent<K, byte[]> event) {
366 //
367 // Decode the value
368 //
369 byte[] valueBytes = event.getValue();
370 Kryo kryo = kryoFactory.newKryo();
371 Input input = new Input(valueBytes);
372 V value = (V) kryo.readObject(input, typeV);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800373
Ray Milkeye88b2b82014-03-21 11:40:04 -0700374 //
375 // Deliver the notification
376 //
377 int index = 0;
Pavlin Radoslavove561a4c2014-04-01 14:10:55 -0700378 for (IEventChannelListener<K, V> listener : listeners) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700379 V copyValue = value;
380 if (index++ > 0) {
381 // Each listener should get a deep copy of the value
382 copyValue = kryo.copy(value);
383 }
384 listener.entryUpdated(copyValue);
385 }
386 kryoFactory.deleteKryo(kryo);
387 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800388
Ray Milkeye88b2b82014-03-21 11:40:04 -0700389 /**
390 * Receive a notification that an entry is evicted.
391 *
392 * @param event the notification event for the entry.
393 */
394 @Override
395 public void entryEvicted(EntryEvent<K, byte[]> event) {
396 // NOTE: We don't use eviction for this map
397 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800398 }
399}