| package net.onrc.onos.core.datagrid; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.Set; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.TimeUnit; |
| |
| import net.onrc.onos.core.util.serializers.KryoFactory; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.esotericsoftware.kryo.Kryo; |
| import com.esotericsoftware.kryo.io.Input; |
| import com.esotericsoftware.kryo.io.Output; |
| import com.hazelcast.core.EntryEvent; |
| import com.hazelcast.core.EntryListener; |
| import com.hazelcast.core.HazelcastInstance; |
| import com.hazelcast.core.IMap; |
| |
| /** |
| * A datagrid event channel that uses Hazelcast as a datagrid. |
| * |
| * @param <K> The class type of the key. |
| * @param <V> The class type of the value. |
| */ |
| public class HazelcastEventChannel<K, V> implements IEventChannel<K, V> { |
| private static final Logger log = |
| LoggerFactory.getLogger(HazelcastEventChannel.class); |
| |
| private final HazelcastInstance hazelcastInstance; // The Hazelcast instance |
| private final String channelName; // The event channel name |
| private final Class<K> typeK; // The class type of the key |
| private final Class<V> typeV; // The class type of the value |
| private IMap<K, byte[]> channelMap; // The Hazelcast channel map |
| // The channel listeners |
| private final CopyOnWriteArrayList<IEventChannelListener<K, V>> listeners = |
| new CopyOnWriteArrayList<>(); |
| |
| // The map entry listener |
| private final EntryListener<K, byte[]> mapEntryListener = new MapEntryListener(); |
| private String mapListenerId; // The map listener ID |
| |
| // TODO: We should use a single global KryoFactory instance |
| private final KryoFactory kryoFactory = new KryoFactory(); |
| |
| // Maximum serialized event size |
| private static final int MAX_BUFFER_SIZE = 64 * 1024; |
| |
| /** |
| * Constructor for a given event channel name. |
| * |
| * @param newHazelcastInstance the Hazelcast instance to use. |
| * @param newChannelName the event channel name. |
| * @param newTypeK the type of the Key in the Key-Value store. |
| * @param newTypeV the type of the Value in the Key-Value store. |
| */ |
| public HazelcastEventChannel(HazelcastInstance newHazelcastInstance, |
| String newChannelName, Class<K> newTypeK, |
| Class<V> newTypeV) { |
| hazelcastInstance = newHazelcastInstance; |
| channelName = newChannelName; |
| typeK = newTypeK; |
| typeV = newTypeV; |
| } |
| |
| /** |
| * Verify the key and value types of a channel. |
| * |
| * @param typeKToVerify the type of the key to verify. |
| * @param typeVToVerify the type of the value to verify. |
| * @return true if the key and value types of the channel match, |
| * otherwise false. |
| */ |
| @Override |
| public boolean verifyKeyValueTypes(Class<?> typeKToVerify, |
| Class<?> typeVToVerify) { |
| return (typeK.equals(typeKToVerify)) && (typeV.equals(typeVToVerify)); |
| } |
| |
| /** |
| * Cleanup and destroy the channel. |
| */ |
| @Override |
| protected void finalize() { |
| shutdown(); |
| } |
| |
| /** |
| * Startup the channel operation. |
| */ |
| @Override |
| public void startup() { |
| if (channelMap == null) { |
| channelMap = hazelcastInstance.getMap(channelName); |
| mapListenerId = channelMap.addEntryListener(mapEntryListener, |
| true); |
| } |
| } |
| |
| /** |
| * Shutdown the channel operation. |
| */ |
| @Override |
| public void shutdown() { |
| if (channelMap != null) { |
| channelMap.removeEntryListener(mapListenerId); |
| channelMap = null; |
| mapListenerId = null; |
| } |
| } |
| |
| /** |
| * Add event channel listener. |
| * |
| * @param listener the listener to add. |
| */ |
| @Override |
| public void addListener(IEventChannelListener<K, V> listener) { |
| if (listeners.contains(listener)) { |
| return; // Nothing to do: already a listener |
| } |
| listeners.add(listener); |
| } |
| |
| /** |
| * Remove event channel listener. |
| * |
| * @param listener the listener to remove. |
| */ |
| @Override |
| public void removeListener(IEventChannelListener<K, V> listener) { |
| listeners.remove(listener); |
| } |
| |
| /** |
| * Add an entry to the channel. |
| * |
| * @param key the key of the entry to add. |
| * @param value the value of the entry to add. |
| */ |
| @Override |
| public void addEntry(K key, V value) { |
| byte[] valueBytes = serializeValue(value); |
| // |
| // Put the entry in the map: |
| // - Key : Type <K> |
| // - Value : Serialized Value (byte[]) |
| // |
| channelMap.putAsync(key, valueBytes); |
| } |
| |
| /** |
| * Add a transient entry to the channel. |
| * <p/> |
| * The added entry is transient and will automatically timeout after 1ms. |
| * |
| * @param key the key of the entry to add. |
| * @param value the value of the entry to add. |
| */ |
| @Override |
| public void addTransientEntry(K key, V value) { |
| byte[] valueBytes = serializeValue(value); |
| // |
| // Put the entry in the map: |
| // - Key : Type <K> |
| // - Value : Serialized Value (byte[]) |
| // - Timeout: 1ms |
| // |
| channelMap.putAsync(key, valueBytes, 1L, TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * Serialize the value. |
| * |
| * @param value the value to serialize. |
| * @return the serialized value. |
| */ |
| private byte[] serializeValue(V value) { |
| // |
| // Encode the value |
| // |
| byte[] buffer = new byte[MAX_BUFFER_SIZE]; |
| Kryo kryo = kryoFactory.newKryo(); |
| try { |
| Output output = new Output(buffer, -1); |
| kryo.writeClassAndObject(output, value); |
| byte[] valueBytes = output.toBytes(); |
| return valueBytes; |
| } finally { |
| kryoFactory.deleteKryo(kryo); |
| } |
| } |
| |
| /** |
| * Deserialize the value. |
| * |
| * @param kryo the Kryo instance to use for the deserialization. |
| * @param valueBytes the buffer with the serialized value. |
| * @return the deserialized value. |
| */ |
| private V deserializeValue(Kryo kryo, byte[] valueBytes) { |
| V value; |
| |
| // |
| // Decode the value |
| // |
| Input input = new Input(valueBytes); |
| Object objValue = kryo.readClassAndObject(input); |
| try { |
| value = typeV.cast(objValue); |
| } catch (ClassCastException e) { |
| log.error("Received notification value cast failed", e); |
| return null; |
| } |
| |
| return value; |
| } |
| |
| /** |
| * Remove an entry from the channel. |
| * |
| * @param key the key of the entry to remove. |
| */ |
| @Override |
| public void removeEntry(K key) { |
| // |
| // Remove the entry: |
| // - Key : Type <K> |
| // - Value : Serialized Value (byte[]) |
| // |
| channelMap.removeAsync(key); |
| } |
| |
| /** |
| * Update an entry in the channel. |
| * |
| * @param key the key of the entry to update. |
| * @param value the value of the entry to update. |
| */ |
| @Override |
| public void updateEntry(K key, V value) { |
| // NOTE: Adding an entry with an existing key automatically updates it |
| addEntry(key, value); |
| } |
| |
| /** |
| * Get an entry from the channel. |
| * |
| * @param key the key of the entry to get. |
| * @return the entry if found, otherwise null. |
| */ |
| @Override |
| @Deprecated |
| public V getEntry(K key) { |
| byte[] valueBytes = channelMap.get(key); |
| if (valueBytes == null) { |
| return null; |
| } |
| |
| // |
| // Decode the value |
| // |
| Kryo kryo = kryoFactory.newKryo(); |
| try { |
| V value = deserializeValue(kryo, valueBytes); |
| return value; |
| } finally { |
| kryoFactory.deleteKryo(kryo); |
| } |
| } |
| |
| /** |
| * Get all entries in the channel. |
| * |
| * @return all entries that are currently in the channel. |
| */ |
| @Override |
| @Deprecated |
| public Collection<V> getAllEntries() { |
| Collection<V> allEntries = new LinkedList<V>(); |
| |
| if (channelMap == null) { |
| return allEntries; // Nothing found |
| } |
| |
| // |
| // Get all entries |
| // |
| Collection<byte[]> values = channelMap.values(); |
| Kryo kryo = kryoFactory.newKryo(); |
| try { |
| for (byte[] valueBytes : values) { |
| // |
| // Decode the value |
| // |
| V value = deserializeValue(kryo, valueBytes); |
| allEntries.add(value); |
| } |
| } finally { |
| kryoFactory.deleteKryo(kryo); |
| } |
| |
| return allEntries; |
| } |
| |
| /** |
| * Remove all entries in the channel. |
| */ |
| @Override |
| @Deprecated |
| public void removeAllEntries() { |
| // |
| // Remove all entries |
| // |
| // NOTE: We remove the entries one-by-one so the per-entry |
| // notifications will be delivered. |
| // |
| // channelMap.clear(); |
| Set<K> keySet = channelMap.keySet(); |
| for (K key : keySet) { |
| channelMap.removeAsync(key); |
| } |
| } |
| |
| /** |
| * Class for receiving event notifications for the channel. |
| * <p/> |
| * The datagrid map is: |
| * - Key: Type K |
| * - Value: Serialized V (byte[]) |
| */ |
| private class MapEntryListener implements EntryListener<K, byte[]> { |
| /** |
| * Receive a notification that an entry is added. |
| * |
| * @param event the notification event for the entry. |
| */ |
| @Override |
| public void entryAdded(EntryEvent<K, byte[]> event) { |
| // |
| // Decode the value |
| // |
| byte[] original = event.getValue(); |
| // Copying byte[], to see if it resolves ONOS-1343. |
| byte[] valueBytes = Arrays.copyOf(original, original.length); |
| Kryo kryo = kryoFactory.newKryo(); |
| try { |
| V value = deserializeValue(kryo, valueBytes); |
| |
| // |
| // Deliver the notification |
| // |
| final Iterator<IEventChannelListener<K, V>> it = listeners.iterator(); |
| while (it.hasNext()) { |
| final IEventChannelListener<K, V> listener = it.next(); |
| if (it.hasNext()) { |
| // Each listener should get a deep copy of the value |
| // TODO: compare which is faster |
| // - kryo.copy(value) |
| // - deserializeValue(kryo, valueBytes) |
| listener.entryAdded(kryo.copy(value)); |
| } else { |
| // Last listener can use the value |
| listener.entryAdded(value); |
| } |
| } |
| } finally { |
| kryoFactory.deleteKryo(kryo); |
| } |
| } |
| |
| /** |
| * Receive a notification that an entry is removed. |
| * |
| * @param event the notification event for the entry. |
| */ |
| @Override |
| public void entryRemoved(EntryEvent<K, byte[]> event) { |
| // |
| // Decode the value |
| // |
| byte[] original = event.getValue(); |
| // Copying byte[], to see if it resolves ONOS-1343. |
| byte[] valueBytes = Arrays.copyOf(original, original.length); |
| Kryo kryo = kryoFactory.newKryo(); |
| try { |
| V value = deserializeValue(kryo, valueBytes); |
| |
| // |
| // Deliver the notification |
| // |
| final Iterator<IEventChannelListener<K, V>> it = listeners.iterator(); |
| while (it.hasNext()) { |
| final IEventChannelListener<K, V> listener = it.next(); |
| if (it.hasNext()) { |
| // Each listener should get a deep copy of the value |
| listener.entryRemoved(kryo.copy(value)); |
| } else { |
| // Last listener can use the value |
| listener.entryRemoved(value); |
| } |
| } |
| } finally { |
| kryoFactory.deleteKryo(kryo); |
| } |
| } |
| |
| /** |
| * Receive a notification that an entry is updated. |
| * |
| * @param event the notification event for the entry. |
| */ |
| @Override |
| public void entryUpdated(EntryEvent<K, byte[]> event) { |
| // |
| // Decode the value |
| // |
| byte[] original = event.getValue(); |
| // Copying byte[], to see if it resolves ONOS-1343. |
| byte[] valueBytes = Arrays.copyOf(original, original.length); |
| Kryo kryo = kryoFactory.newKryo(); |
| try { |
| V value = deserializeValue(kryo, valueBytes); |
| |
| // |
| // Deliver the notification |
| // |
| final Iterator<IEventChannelListener<K, V>> it = listeners.iterator(); |
| while (it.hasNext()) { |
| final IEventChannelListener<K, V> listener = it.next(); |
| if (it.hasNext()) { |
| // Each listener should get a deep copy of the value |
| listener.entryUpdated(kryo.copy(value)); |
| } else { |
| // Last listener can use the value |
| listener.entryUpdated(value); |
| } |
| } |
| } finally { |
| kryoFactory.deleteKryo(kryo); |
| } |
| } |
| |
| /** |
| * Receive a notification that an entry is evicted. |
| * |
| * @param event the notification event for the entry. |
| */ |
| @Override |
| public void entryEvicted(EntryEvent<K, byte[]> event) { |
| // NOTE: We don't use eviction for this map |
| } |
| } |
| } |