blob: 325b3d8bb29b2a4d230c657f359a93f2b1596c0b [file] [log] [blame]
Sho SHIMIZU5d62ba82014-08-21 10:23:47 -07001package net.onrc.onos.core.newintent;
2
3import com.esotericsoftware.kryo.Kryo;
4import com.esotericsoftware.kryo.io.Input;
5import com.esotericsoftware.kryo.io.Output;
6import com.hazelcast.core.EntryEvent;
7import com.hazelcast.core.EntryListener;
8import com.hazelcast.core.IMap;
9import net.onrc.onos.api.newintent.IntentId;
10import net.onrc.onos.core.datagrid.ISharedCollectionsService;
11import net.onrc.onos.core.util.serializers.KryoFactory;
12import org.slf4j.Logger;
13import org.slf4j.LoggerFactory;
14
15import java.util.ArrayList;
16import java.util.Collection;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19
20/**
21 * A class representing map storing an intent related value associated with
22 * intent ID as key.
23 * <p>
24 * Implementation-Specific: The backend of this data structure is Hazelcast IMap.
25 * </p>
26 * FIXME: refactor this class to aggregate logic for distributed listenable map.
27 * Intent Service, Flow Manager, and Match-Action Service would want to have similar
28 * logic to store and load the data from a distributed data structure, but these logic
29 * is scattered in each package now.
30 *
31 * @param <V> the type of value
32 */
33class IntentMap<V> {
34 private static final Logger log = LoggerFactory.getLogger(IntentMap.class);
35 private static final int MAX_BUFFER_SIZE = 64 * 1024;
36
37 private final KryoFactory factory = new KryoFactory();
38 private final Class<V> valueType;
39 private final IMap<String, byte[]> map;
40
41 /**
42 * Constructs a map which stores intent related information with the specified arguments.
43 *
44 * @param name name of the map
45 * @param valueType type of value
46 * @param collectionsService service for creating Hazelcast IMap
47 */
48 public IntentMap(String name, Class<V> valueType, ISharedCollectionsService collectionsService) {
49 this.valueType = checkNotNull(valueType);
50
51 this.map = checkNotNull(collectionsService.getConcurrentMap(name, String.class, byte[].class));
52 }
53
54 /**
55 * Stores the specified value associated with the intent ID.
56 *
57 * @param id intent ID
58 * @param value value
59 */
60 public void put(IntentId id, V value) {
61 checkNotNull(id);
62 checkNotNull(value);
63
64 map.set(id.toString(), serializeValue(value));
65 }
66
67 /**
68 * Returns the value associated with the specified intent ID.
69 *
70 * @param id intent ID
71 * @return the value associated with the key
72 */
73 public V get(IntentId id) {
74 checkNotNull(id);
75
76 byte[] bytes = map.get(id.toString());
77 if (bytes == null) {
78 return null;
79 }
80
81 return deserializeValue(bytes);
82 }
83
84 /**
85 * Removes the value associated with the specified intent ID.
86 *
87 * @param id intent ID
88 */
89 public void remove(IntentId id) {
90 checkNotNull(id);
91
92 map.remove(id.toString());
93 }
94
95 /**
96 * Returns all values stored in the instance.
97 *
98 * @return all values stored in the sintance.
99 */
100 public Collection<V> values() {
101 Collection<V> values = new ArrayList<>();
102 for (byte[] bytes: map.values()) {
103 V value = deserializeValue(bytes);
104 if (value == null) {
105 continue;
106 }
107
108 values.add(value);
109 }
110
111 return values;
112 }
113
114 /**
115 * Adds an entry listener for this map. Listener will get notified for all events.
116 *
117 * @param listener entry listener
118 */
119 public void addListener(final EntryListener<IntentId, V> listener) {
120 checkNotNull(listener);
121
122 EntryListener<String, byte[]> internalListener = new EntryListener<String, byte[]>() {
123 @Override
124 public void entryAdded(EntryEvent<String, byte[]> event) {
125 listener.entryAdded(convertEntryEvent(event));
126 }
127
128 @Override
129 public void entryRemoved(EntryEvent<String, byte[]> event) {
130 listener.entryRemoved(convertEntryEvent(event));
131 }
132
133 @Override
134 public void entryUpdated(EntryEvent<String, byte[]> event) {
135 listener.entryUpdated(convertEntryEvent(event));
136 }
137
138 @Override
139 public void entryEvicted(EntryEvent<String, byte[]> event) {
140 listener.entryEvicted(convertEntryEvent(event));
141 }
142
143 /**
144 * Converts an entry event used internally to another entry event exposed externally.
145 *
146 * @param internalEvent entry event used internally used
147 * @return entry event exposed externally
148 */
149 private EntryEvent<IntentId, V> convertEntryEvent(EntryEvent<String, byte[]> internalEvent) {
150 EntryEvent<IntentId, V> converted =
151 new EntryEvent<>(
152 internalEvent.getSource(),
153 internalEvent.getMember(),
154 internalEvent.getEventType().getType(),
155 IntentId.valueOf(internalEvent.getKey()),
156 deserializeValue(internalEvent.getValue())
157 );
158 return converted;
159 }
160 };
161
162 map.addEntryListener(internalListener, true);
163 }
164
165 /**
166 * Destroys the backend Hazelcast IMap. This method is only for testing purpose.
167 */
168 void destroy() {
169 map.destroy();
170 }
171
172 // NOTE: this method was copied from HazelcastEventChannel due to necessity of quick hack
173 // TODO: remove the code duplication
174 /**
175 * Serialize the value.
176 *
177 * @param value the value to serialize.
178 * @return the serialized value.
179 */
180 private byte[] serializeValue(V value) {
181 //
182 // Encode the value
183 //
184 byte[] buffer = new byte[MAX_BUFFER_SIZE];
185 Kryo kryo = factory.newKryo();
186 try {
187 Output output = new Output(buffer, -1);
188 kryo.writeClassAndObject(output, value);
189 return output.toBytes();
190 } finally {
191 factory.deleteKryo(kryo);
192 }
193 }
194
195 // NOTE: this method was copied from HazelcastEventChannel due to necessity of quick hack
196 // TODO: remove the code duplication
197 /**
198 * Deserialize the value.
199 *
200 * @param bytes the buffer with the serialized value.
201 * @return the deserialized value.
202 */
203 private V deserializeValue(byte[] bytes) {
204 V value;
205
206 Kryo kryo = factory.newKryo();
207 try {
208 Input input = new Input(bytes);
209 Object objValue = kryo.readClassAndObject(input);
210 value = valueType.cast(objValue);
211 } catch (ClassCastException e) {
212 log.error("Received notification value cast failed", e);
213 return null;
214 } finally {
215 factory.deleteKryo(kryo);
216 }
217
218 return value;
219 }
220}