blob: bef0243258ca57b7a86b323abeb176f6502342e8 [file] [log] [blame]
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08001package net.onrc.onos.datagrid;
2
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08003import java.util.Collection;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08004import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08005import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08006import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08007
8import com.esotericsoftware.kryo.Kryo;
9import com.esotericsoftware.kryo.io.Input;
10import com.esotericsoftware.kryo.io.Output;
11import com.hazelcast.core.EntryEvent;
12import com.hazelcast.core.EntryListener;
13import com.hazelcast.core.HazelcastInstance;
14import com.hazelcast.core.IMap;
15
16import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
17
18/**
19 * A datagrid event channel that uses Hazelcast as a datagrid.
Ray Milkeye88b2b82014-03-21 11:40:04 -070020 *
21 * @param <K> The class type of the key.
22 * @param <V> The class type of the value.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080023 */
24public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
Ray Milkeye88b2b82014-03-21 11:40:04 -070025 private final HazelcastInstance hazelcastInstance; // The Hazelcast instance
26 private final String channelName; // The event channel name
27 private final Class<?> typeK; // The class type of the key
28 private final Class<?> typeV; // The class type of the value
29 private IMap<K, byte[]> channelMap; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080030 // The channel listeners
Ray Milkeye88b2b82014-03-21 11:40:04 -070031 private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
32 new CopyOnWriteArrayList<>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080033
34 // The map entry listener
Ray Milkeye88b2b82014-03-21 11:40:04 -070035 private final MapEntryListener mapEntryListener = new MapEntryListener<K>();
36 private String mapListenerId; // The map listener ID
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080037
38 // TODO: We should use a single global KryoFactory instance
Ray Milkeye88b2b82014-03-21 11:40:04 -070039 private final KryoFactory kryoFactory = new KryoFactory();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080040
41 // Maximum serialized event size
Ray Milkeye88b2b82014-03-21 11:40:04 -070042 private static final int MAX_BUFFER_SIZE = 64 * 1024;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080043
44 /**
45 * Constructor for a given event channel name.
46 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070047 * @param newHazelcastInstance the Hazelcast instance to use.
48 * @param newChannelName the event channel name.
49 * @param newTypeK the type of the Key in the Key-Value store.
50 * @param newTypeV the type of the Value in the Key-Value store.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080051 */
Ray Milkeye88b2b82014-03-21 11:40:04 -070052 public HazelcastEventChannel(HazelcastInstance newHazelcastInstance,
53 String newChannelName, Class<K> newTypeK,
54 Class<V> newTypeV) {
55 hazelcastInstance = newHazelcastInstance;
56 channelName = newChannelName;
57 typeK = newTypeK;
58 typeV = newTypeV;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080059 }
60
61 /**
62 * Verify the key and value types of a channel.
63 *
Ray Milkeye88b2b82014-03-21 11:40:04 -070064 * @param typeKToVerify the type of the key to verify.
65 * @param typeVToVerify the type of the value to verify.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080066 * @return true if the key and value types of the channel match,
67 * otherwise false.
68 */
69 @Override
Ray Milkeye88b2b82014-03-21 11:40:04 -070070 public boolean verifyKeyValueTypes(Class typeKToVerify,
71 Class typeVToVerify) {
Ray Milkey73019652014-03-24 11:12:44 -070072 return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify));
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080073 }
74
75 /**
76 * Cleanup and destroy the channel.
77 */
78 @Override
79 protected void finalize() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070080 shutdown();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080081 }
82
83 /**
84 * Startup the channel operation.
85 */
86 @Override
87 public void startup() {
Ray Milkeye88b2b82014-03-21 11:40:04 -070088 if (channelMap == null) {
89 channelMap = hazelcastInstance.getMap(channelName);
90 mapListenerId = channelMap.addEntryListener(mapEntryListener,
91 true);
92 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080093 }
94
95 /**
96 * Shutdown the channel operation.
97 */
98 @Override
99 public void shutdown() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700100 if (channelMap != null) {
101 channelMap.removeEntryListener(mapListenerId);
102 channelMap = null;
103 mapListenerId = null;
104 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800105 }
106
107 /**
108 * Add event channel listener.
109 *
110 * @param listener the listener to add.
111 */
112 @Override
113 public void addListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700114 if (listeners.contains(listener)) {
115 return; // Nothing to do: already a listener
116 }
117 listeners.add(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800118 }
119
120 /**
121 * Remove event channel listener.
122 *
123 * @param listener the listener to remove.
124 */
125 @Override
126 public void removeListener(IEventChannelListener<K, V> listener) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700127 listeners.remove(listener);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800128 }
129
130 /**
131 * Add an entry to the channel.
132 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700133 * @param key the key of the entry to add.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800134 * @param value the value of the entry to add.
135 */
136 @Override
137 public void addEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700138 //
139 // Encode the value
140 //
141 byte[] buffer = new byte[MAX_BUFFER_SIZE];
142 Kryo kryo = kryoFactory.newKryo();
143 Output output = new Output(buffer, -1);
144 kryo.writeObject(output, value);
145 byte[] valueBytes = output.toBytes();
146 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800147
Ray Milkeye88b2b82014-03-21 11:40:04 -0700148 //
149 // Put the entry in the map:
150 // - Key : Type <K>
151 // - Value : Serialized Value (byte[])
152 //
153 channelMap.putAsync(key, valueBytes);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800154 }
155
156 /**
157 * Remove an entry from the channel.
158 *
159 * @param key the key of the entry to remove.
160 */
161 @Override
162 public void removeEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700163 //
164 // Remove the entry:
165 // - Key : Type <K>
166 // - Value : Serialized Value (byte[])
167 //
168 channelMap.removeAsync(key);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800169 }
170
171 /**
172 * Update an entry in the channel.
173 *
Ray Milkeye88b2b82014-03-21 11:40:04 -0700174 * @param key the key of the entry to update.
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800175 * @param value the value of the entry to update.
176 */
177 @Override
178 public void updateEntry(K key, V value) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700179 // NOTE: Adding an entry with an existing key automatically updates it
180 addEntry(key, value);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800181 }
182
183 /**
184 * Get an entry from the channel.
185 *
186 * @param key the key of the entry to get.
187 * @return the entry if found, otherwise null.
188 */
189 @Override
190 @Deprecated
191 public V getEntry(K key) {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700192 byte[] valueBytes = channelMap.get(key);
193 if (valueBytes == null) {
194 return null;
195 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800196
Ray Milkeye88b2b82014-03-21 11:40:04 -0700197 Kryo kryo = kryoFactory.newKryo();
198 //
199 // Decode the value
200 //
201 Input input = new Input(valueBytes);
202 V value = (V) kryo.readObject(input, typeV);
203 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800204
Ray Milkeye88b2b82014-03-21 11:40:04 -0700205 return value;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800206 }
207
208 /**
209 * Get all entries in the channel.
210 *
211 * @return all entries that are currently in the channel.
212 */
213 @Override
214 @Deprecated
215 public Collection<V> getAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700216 Collection<V> allEntries = new LinkedList<V>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800217
Ray Milkeye88b2b82014-03-21 11:40:04 -0700218 if (channelMap == null) {
219 return allEntries; // Nothing found
220 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800221
Ray Milkeye88b2b82014-03-21 11:40:04 -0700222 //
223 // Get all entries
224 //
225 Collection<byte[]> values = channelMap.values();
226 Kryo kryo = kryoFactory.newKryo();
227 for (byte[] valueBytes : values) {
228 //
229 // Decode the value
230 //
231 Input input = new Input(valueBytes);
232 V value = (V) kryo.readObject(input, typeV);
233 allEntries.add(value);
234 }
235 kryoFactory.deleteKryo(kryo);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800236
Ray Milkeye88b2b82014-03-21 11:40:04 -0700237 return allEntries;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800238 }
239
240 /**
241 * Remove all entries in the channel.
242 */
243 @Override
244 @Deprecated
245 public void removeAllEntries() {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700246 //
247 // Remove all entries
248 //
249 // NOTE: We remove the entries one-by-one so the per-entry
250 // notifications will be delivered.
251 //
252 // channelMap.clear();
253 Set<K> keySet = channelMap.keySet();
254 for (K key : keySet) {
255 channelMap.removeAsync(key);
256 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800257 }
258
259 /**
260 * Class for receiving event notifications for the channel.
Ray Milkeye88b2b82014-03-21 11:40:04 -0700261 * <p/>
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800262 * The datagrid map is:
Ray Milkeye88b2b82014-03-21 11:40:04 -0700263 * - Key: Type K
264 * - Value: Serialized V (byte[])
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800265 */
266 private class MapEntryListener<K> implements EntryListener<K, byte[]> {
Ray Milkeye88b2b82014-03-21 11:40:04 -0700267 /**
268 * Receive a notification that an entry is added.
269 *
270 * @param event the notification event for the entry.
271 */
272 @Override
273 public void entryAdded(EntryEvent<K, byte[]> event) {
274 //
275 // Decode the value
276 //
277 byte[] valueBytes = event.getValue();
278 Kryo kryo = kryoFactory.newKryo();
279 Input input = new Input(valueBytes);
280 V value = (V) kryo.readObject(input, typeV);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800281
Ray Milkeye88b2b82014-03-21 11:40:04 -0700282 //
283 // Deliver the notification
284 //
285 int index = 0;
286 for (IEventChannelListener listener : listeners) {
287 V copyValue = value;
288 if (index++ > 0) {
289 // Each listener should get a deep copy of the value
290 copyValue = kryo.copy(value);
291 }
292 listener.entryAdded(copyValue);
293 }
294 kryoFactory.deleteKryo(kryo);
295 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800296
Ray Milkeye88b2b82014-03-21 11:40:04 -0700297 /**
298 * Receive a notification that an entry is removed.
299 *
300 * @param event the notification event for the entry.
301 */
302 @Override
303 public void entryRemoved(EntryEvent<K, byte[]> event) {
304 //
305 // Decode the value
306 //
307 byte[] valueBytes = event.getValue();
308 Kryo kryo = kryoFactory.newKryo();
309 Input input = new Input(valueBytes);
310 V value = (V) kryo.readObject(input, typeV);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800311
Ray Milkeye88b2b82014-03-21 11:40:04 -0700312 //
313 // Deliver the notification
314 //
315 int index = 0;
316 for (IEventChannelListener listener : listeners) {
317 V copyValue = value;
318 if (index++ > 0) {
319 // Each listener should get a deep copy of the value
320 copyValue = kryo.copy(value);
321 }
322 listener.entryRemoved(copyValue);
323 }
324 kryoFactory.deleteKryo(kryo);
325 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800326
Ray Milkeye88b2b82014-03-21 11:40:04 -0700327 /**
328 * Receive a notification that an entry is updated.
329 *
330 * @param event the notification event for the entry.
331 */
332 @Override
333 public void entryUpdated(EntryEvent<K, byte[]> event) {
334 //
335 // Decode the value
336 //
337 byte[] valueBytes = event.getValue();
338 Kryo kryo = kryoFactory.newKryo();
339 Input input = new Input(valueBytes);
340 V value = (V) kryo.readObject(input, typeV);
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800341
Ray Milkeye88b2b82014-03-21 11:40:04 -0700342 //
343 // Deliver the notification
344 //
345 int index = 0;
346 for (IEventChannelListener listener : listeners) {
347 V copyValue = value;
348 if (index++ > 0) {
349 // Each listener should get a deep copy of the value
350 copyValue = kryo.copy(value);
351 }
352 listener.entryUpdated(copyValue);
353 }
354 kryoFactory.deleteKryo(kryo);
355 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800356
Ray Milkeye88b2b82014-03-21 11:40:04 -0700357 /**
358 * Receive a notification that an entry is evicted.
359 *
360 * @param event the notification event for the entry.
361 */
362 @Override
363 public void entryEvicted(EntryEvent<K, byte[]> event) {
364 // NOTE: We don't use eviction for this map
365 }
Pavlin Radoslavov7940b652014-02-13 19:42:05 -0800366 }
367}