blob: 7739e83edd52d9225bb0c0fa692a2d7ea63ac604 [file] [log] [blame]
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08001package net.onrc.onos.datagrid;
2
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08003import java.util.Collection;
4import java.util.HashMap;
5import java.util.LinkedList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08006import java.util.Map;
7import java.util.Set;
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -08008import java.util.concurrent.CopyOnWriteArrayList;
Pavlin Radoslavov7940b652014-02-13 19:42:05 -08009
10import com.esotericsoftware.kryo.Kryo;
11import com.esotericsoftware.kryo.io.Input;
12import com.esotericsoftware.kryo.io.Output;
13import com.hazelcast.core.EntryEvent;
14import com.hazelcast.core.EntryListener;
15import com.hazelcast.core.HazelcastInstance;
16import com.hazelcast.core.IMap;
17
18import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
19
20/**
21 * A datagrid event channel that uses Hazelcast as a datagrid.
22 */
23public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> {
24 private HazelcastInstance hazelcastInstance; // The Hazelcast instance
25 private String channelName; // The event channel name
26 private Class<?> typeK; // The class type of the key
27 private Class<?> typeV; // The class type of the value
28 private IMap<K, byte[]> channelMap = null; // The Hazelcast channel map
Pavlin Radoslavov1c8d8092014-02-14 15:47:24 -080029 // The channel listeners
30 private CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners =
31 new CopyOnWriteArrayList<IEventChannelListener<K, V>>();
Pavlin Radoslavov7940b652014-02-13 19:42:05 -080032
33 // The map entry listener
34 private MapEntryListener mapEntryListener = new MapEntryListener<K>();
35 private String mapListenerId = null; // The map listener ID
36
37 // TODO: We should use a single global KryoFactory instance
38 private KryoFactory kryoFactory = new KryoFactory();
39
40 // Maximum serialized event size
41 private final static int MAX_BUFFER_SIZE = 64*1024;
42
43 /**
44 * Constructor for a given event channel name.
45 *
46 * @param hazelcastInstance the Hazelcast instance to use.
47 * @param channelName the event channel name.
48 * @param typeK the type of the Key in the Key-Value store.
49 * @param typeV the type of the Value in the Key-Value store.
50 */
51 public HazelcastEventChannel(HazelcastInstance hazelcastInstance,
52 String channelName, Class<K> typeK,
53 Class<V> typeV) {
54 this.hazelcastInstance = hazelcastInstance;
55 this.channelName = channelName;
56 this.typeK = typeK;
57 this.typeV = typeV;
58 }
59
60 /**
61 * Verify the key and value types of a channel.
62 *
63 * @param typeK the type of the key to verify.
64 * @param typeV the type of the value to verify.
65 * @return true if the key and value types of the channel match,
66 * otherwise false.
67 */
68 @Override
69 public boolean verifyKeyValueTypes(Class typeK, Class typeV) {
70 return (this.typeK == typeK) && (this.typeV == typeV);
71 }
72
73 /**
74 * Cleanup and destroy the channel.
75 */
76 @Override
77 protected void finalize() {
78 shutdown();
79 }
80
81 /**
82 * Startup the channel operation.
83 */
84 @Override
85 public void startup() {
86 if (channelMap == null) {
87 channelMap = hazelcastInstance.getMap(channelName);
88 mapListenerId = channelMap.addEntryListener(mapEntryListener,
89 true);
90 }
91 }
92
93 /**
94 * Shutdown the channel operation.
95 */
96 @Override
97 public void shutdown() {
98 if (channelMap != null) {
99 channelMap.removeEntryListener(mapListenerId);
100 channelMap = null;
101 mapListenerId = null;
102 }
103 }
104
105 /**
106 * Add event channel listener.
107 *
108 * @param listener the listener to add.
109 */
110 @Override
111 public void addListener(IEventChannelListener<K, V> listener) {
112 if (listeners.contains(listener))
113 return; // Nothing to do: already a listener
114 listeners.add(listener);
115 }
116
117 /**
118 * Remove event channel listener.
119 *
120 * @param listener the listener to remove.
121 */
122 @Override
123 public void removeListener(IEventChannelListener<K, V> listener) {
124 listeners.remove(listener);
125 }
126
127 /**
128 * Add an entry to the channel.
129 *
130 * @param key the key of the entry to add.
131 * @param value the value of the entry to add.
132 */
133 @Override
134 public void addEntry(K key, V value) {
135 //
136 // Encode the value
137 //
138 byte[] buffer = new byte[MAX_BUFFER_SIZE];
139 Kryo kryo = kryoFactory.newKryo();
140 Output output = new Output(buffer, -1);
141 kryo.writeObject(output, value);
142 byte[] valueBytes = output.toBytes();
143 kryoFactory.deleteKryo(kryo);
144
145 //
146 // Put the entry in the map:
147 // - Key : Type <K>
148 // - Value : Serialized Value (byte[])
149 //
150 channelMap.putAsync(key, valueBytes);
151 }
152
153 /**
154 * Remove an entry from the channel.
155 *
156 * @param key the key of the entry to remove.
157 */
158 @Override
159 public void removeEntry(K key) {
160 //
161 // Remove the entry:
162 // - Key : Type <K>
163 // - Value : Serialized Value (byte[])
164 //
165 channelMap.removeAsync(key);
166 }
167
168 /**
169 * Update an entry in the channel.
170 *
171 * @param key the key of the entry to update.
172 * @param value the value of the entry to update.
173 */
174 @Override
175 public void updateEntry(K key, V value) {
176 // NOTE: Adding an entry with an existing key automatically updates it
177 addEntry(key, value);
178 }
179
180 /**
181 * Get an entry from the channel.
182 *
183 * @param key the key of the entry to get.
184 * @return the entry if found, otherwise null.
185 */
186 @Override
187 @Deprecated
188 public V getEntry(K key) {
189 byte[] valueBytes = channelMap.get(key);
190 if (valueBytes == null)
191 return null;
192
193 Kryo kryo = kryoFactory.newKryo();
194 //
195 // Decode the value
196 //
197 Input input = new Input(valueBytes);
198 V value = (V)kryo.readObject(input, typeV);
199 kryoFactory.deleteKryo(kryo);
200
201 return value;
202 }
203
204 /**
205 * Get all entries in the channel.
206 *
207 * @return all entries that are currently in the channel.
208 */
209 @Override
210 @Deprecated
211 public Collection<V> getAllEntries() {
212 Collection<V> allEntries = new LinkedList<V>();
213
214 if (channelMap == null)
215 return allEntries; // Nothing found
216
217 //
218 // Get all entries
219 //
220 Collection<byte[]> values = channelMap.values();
221 Kryo kryo = kryoFactory.newKryo();
222 for (byte[] valueBytes : values) {
223 //
224 // Decode the value
225 //
226 Input input = new Input(valueBytes);
227 V value = (V)kryo.readObject(input, typeV);
228 allEntries.add(value);
229 }
230 kryoFactory.deleteKryo(kryo);
231
232 return allEntries;
233 }
234
235 /**
236 * Remove all entries in the channel.
237 */
238 @Override
239 @Deprecated
240 public void removeAllEntries() {
241 //
242 // Remove all entries
243 //
244 // NOTE: We remove the entries one-by-one so the per-entry
245 // notifications will be delivered.
246 //
247 // channelMap.clear();
248 Set<K> keySet = channelMap.keySet();
249 for (K key : keySet) {
250 channelMap.removeAsync(key);
251 }
252 }
253
254 /**
255 * Class for receiving event notifications for the channel.
256 *
257 * The datagrid map is:
258 * - Key: Type K
259 * - Value: Serialized V (byte[])
260 */
261 private class MapEntryListener<K> implements EntryListener<K, byte[]> {
262 /**
263 * Receive a notification that an entry is added.
264 *
265 * @param event the notification event for the entry.
266 */
267 @Override
268 public void entryAdded(EntryEvent<K, byte[]> event) {
269 //
270 // Decode the value
271 //
272 byte[] valueBytes = event.getValue();
273 Kryo kryo = kryoFactory.newKryo();
274 Input input = new Input(valueBytes);
275 V value = (V)kryo.readObject(input, typeV);
276
277 //
278 // Deliver the notification
279 //
280 int index = 0;
281 for (IEventChannelListener listener : listeners) {
282 V copyValue = value;
283 if (index++ > 0) {
284 // Each listener should get a deep copy of the value
285 copyValue = kryo.copy(value);
286 }
287 listener.entryAdded(copyValue);
288 }
289 kryoFactory.deleteKryo(kryo);
290 }
291
292 /**
293 * Receive a notification that an entry is removed.
294 *
295 * @param event the notification event for the entry.
296 */
297 @Override
298 public void entryRemoved(EntryEvent<K, byte[]> event) {
299 //
300 // Decode the value
301 //
302 byte[] valueBytes = event.getValue();
303 Kryo kryo = kryoFactory.newKryo();
304 Input input = new Input(valueBytes);
305 V value = (V)kryo.readObject(input, typeV);
306
307 //
308 // Deliver the notification
309 //
310 int index = 0;
311 for (IEventChannelListener listener : listeners) {
312 V copyValue = value;
313 if (index++ > 0) {
314 // Each listener should get a deep copy of the value
315 copyValue = kryo.copy(value);
316 }
317 listener.entryRemoved(copyValue);
318 }
319 kryoFactory.deleteKryo(kryo);
320 }
321
322 /**
323 * Receive a notification that an entry is updated.
324 *
325 * @param event the notification event for the entry.
326 */
327 @Override
328 public void entryUpdated(EntryEvent<K, byte[]> event) {
329 //
330 // Decode the value
331 //
332 byte[] valueBytes = event.getValue();
333 Kryo kryo = kryoFactory.newKryo();
334 Input input = new Input(valueBytes);
335 V value = (V)kryo.readObject(input, typeV);
336
337 //
338 // Deliver the notification
339 //
340 int index = 0;
341 for (IEventChannelListener listener : listeners) {
342 V copyValue = value;
343 if (index++ > 0) {
344 // Each listener should get a deep copy of the value
345 copyValue = kryo.copy(value);
346 }
347 listener.entryUpdated(copyValue);
348 }
349 kryoFactory.deleteKryo(kryo);
350 }
351
352 /**
353 * Receive a notification that an entry is evicted.
354 *
355 * @param event the notification event for the entry.
356 */
357 @Override
358 public void entryEvicted(EntryEvent<K, byte[]> event) {
359 // NOTE: We don't use eviction for this map
360 }
361 }
362}