blob: b9e71356f94215f4b40dac9fc8fc835235e63571 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.AbstractMap.SimpleImmutableEntry;
20import java.util.Collection;
21import java.util.HashMap;
22import java.util.Iterator;
23import java.util.List;
24import java.util.Map;
25import java.util.NavigableMap;
26import java.util.Set;
27import java.util.TreeMap;
28import java.util.stream.Collectors;
29
30import com.google.common.collect.Lists;
31import com.google.common.collect.Maps;
32import com.google.common.collect.Sets;
33import io.atomix.protocols.raft.service.AbstractRaftService;
34import io.atomix.protocols.raft.service.Commit;
35import io.atomix.protocols.raft.service.RaftServiceExecutor;
36import io.atomix.protocols.raft.session.RaftSession;
37import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
38import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
39import org.onlab.util.KryoNamespace;
40import org.onlab.util.Match;
41import org.onosproject.store.serializers.KryoNamespaces;
42import org.onosproject.store.service.MapEvent;
43import org.onosproject.store.service.Serializer;
44import org.onosproject.store.service.Versioned;
45
46import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents.CHANGE;
47import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ADD_LISTENER;
48import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_ENTRY;
49import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_KEY;
50import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CLEAR;
51import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_KEY;
52import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CONTAINS_VALUE;
53import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingEntry;
54import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingKey;
55import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsKey;
56import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ContainsValue;
57import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.ENTRY_SET;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_ENTRY;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_KEY;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_ENTRY;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_KEY;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorEntry;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorKey;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GET_OR_DEFAULT;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.Get;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.GetOrDefault;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_ENTRY;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_KEY;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherEntry;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherKey;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.IS_EMPTY;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.KEY_SET;
74import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_ENTRY;
75import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_KEY;
76import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_ENTRY;
77import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_KEY;
78import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerEntry;
79import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerKey;
80import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_FIRST_ENTRY;
81import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_LAST_ENTRY;
82import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.REMOVE_LISTENER;
83import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SIZE;
84import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SUB_MAP;
85import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SubMap;
86import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UPDATE_AND_GET;
87import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.UpdateAndGet;
88import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.VALUES;
89import static org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult.Status;
90
91/**
92 * State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
93 * {@link TreeMap}.
94 */
95public class AtomixConsistentTreeMapService extends AbstractRaftService {
96
97 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
98 .register(KryoNamespaces.BASIC)
99 .register(AtomixConsistentTreeMapOperations.NAMESPACE)
100 .register(AtomixConsistentTreeMapEvents.NAMESPACE)
101 .register(TreeMapEntryValue.class)
102 .register(new HashMap<>().keySet().getClass())
103 .register(TreeMap.class)
104 .build());
105
106 private final Map<Long, RaftSession> listeners = Maps.newHashMap();
107 private TreeMap<String, TreeMapEntryValue> tree = Maps.newTreeMap();
108 private final Set<String> preparedKeys = Sets.newHashSet();
109
110 @Override
111 public void snapshot(SnapshotWriter writer) {
112 writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
113 writer.writeObject(preparedKeys, SERIALIZER::encode);
114 writer.writeObject(tree, SERIALIZER::encode);
115 }
116
117 @Override
118 public void install(SnapshotReader reader) {
119 listeners.clear();
120 for (long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
121 listeners.put(sessionId, getSessions().getSession(sessionId));
122 }
123
124 preparedKeys.clear();
125 preparedKeys.addAll(reader.readObject(SERIALIZER::decode));
126
127 tree.clear();
128 tree.putAll(reader.readObject(SERIALIZER::decode));
129 }
130
131 @Override
132 public void configure(RaftServiceExecutor executor) {
133 // Listeners
134 executor.register(ADD_LISTENER, this::listen);
135 executor.register(REMOVE_LISTENER, this::unlisten);
136 // Queries
137 executor.register(CONTAINS_KEY, SERIALIZER::decode, this::containsKey, SERIALIZER::encode);
138 executor.register(CONTAINS_VALUE, SERIALIZER::decode, this::containsValue, SERIALIZER::encode);
139 executor.register(ENTRY_SET, this::entrySet, SERIALIZER::encode);
140 executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
141 executor.register(GET_OR_DEFAULT, SERIALIZER::decode, this::getOrDefault, SERIALIZER::encode);
142 executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
143 executor.register(KEY_SET, this::keySet, SERIALIZER::encode);
144 executor.register(SIZE, this::size, SERIALIZER::encode);
145 executor.register(VALUES, this::values, SERIALIZER::encode);
146 executor.register(SUB_MAP, SERIALIZER::decode, this::subMap, SERIALIZER::encode);
147 executor.register(FIRST_KEY, this::firstKey, SERIALIZER::encode);
148 executor.register(LAST_KEY, this::lastKey, SERIALIZER::encode);
149 executor.register(FIRST_ENTRY, this::firstEntry, SERIALIZER::encode);
150 executor.register(LAST_ENTRY, this::lastEntry, SERIALIZER::encode);
151 executor.register(POLL_FIRST_ENTRY, this::pollFirstEntry, SERIALIZER::encode);
152 executor.register(POLL_LAST_ENTRY, this::pollLastEntry, SERIALIZER::encode);
153 executor.register(LOWER_ENTRY, SERIALIZER::decode, this::lowerEntry, SERIALIZER::encode);
154 executor.register(LOWER_KEY, SERIALIZER::decode, this::lowerKey, SERIALIZER::encode);
155 executor.register(FLOOR_ENTRY, SERIALIZER::decode, this::floorEntry, SERIALIZER::encode);
156 executor.register(FLOOR_KEY, SERIALIZER::decode, this::floorKey, SERIALIZER::encode);
157 executor.register(CEILING_ENTRY, SERIALIZER::decode, this::ceilingEntry, SERIALIZER::encode);
158 executor.register(CEILING_KEY, SERIALIZER::decode, this::ceilingKey, SERIALIZER::encode);
159 executor.register(HIGHER_ENTRY, SERIALIZER::decode, this::higherEntry, SERIALIZER::encode);
160 executor.register(HIGHER_KEY, SERIALIZER::decode, this::higherKey, SERIALIZER::encode);
161
162 // Commands
163 executor.register(UPDATE_AND_GET, SERIALIZER::decode, this::updateAndGet, SERIALIZER::encode);
164 executor.register(CLEAR, this::clear, SERIALIZER::encode);
165 }
166
167 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
168 return toVersioned(tree.get((commit.value().key()))) != null;
169 }
170
171 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
172 Match<byte[]> valueMatch = Match
173 .ifValue(commit.value().value());
174 return tree.values().stream().anyMatch(
175 value -> valueMatch.matches(value.value()));
176 }
177
178 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
179 return toVersioned(tree.get(commit.value().key()));
180 }
181
182 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
183 Versioned<byte[]> value = toVersioned(tree.get(commit.value().key()));
184 return value != null ? value : new Versioned<>(commit.value().defaultValue(), 0);
185 }
186
187 protected int size(Commit<Void> commit) {
188 return tree.size();
189 }
190
191 protected boolean isEmpty(Commit<Void> commit) {
192 return tree.isEmpty();
193 }
194
195 protected Set<String> keySet(Commit<Void> commit) {
196 return tree.keySet().stream().collect(Collectors.toSet());
197 }
198
199 protected Collection<Versioned<byte[]>> values(Commit<Void> commit) {
200 return tree.values().stream().map(this::toVersioned)
201 .collect(Collectors.toList());
202 }
203
204 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(Commit<Void> commit) {
205 return tree
206 .entrySet()
207 .stream()
208 .map(e -> Maps.immutableEntry(e.getKey(),
209 toVersioned(e.getValue())))
210 .collect(Collectors.toSet());
211 }
212
213 protected MapEntryUpdateResult<String, byte[]> updateAndGet(Commit<? extends UpdateAndGet> commit) {
214 Status updateStatus = validate(commit.value());
215 String key = commit.value().key();
216 TreeMapEntryValue oldCommitValue = tree.get(commit.value().key());
217 Versioned<byte[]> oldTreeValue = toVersioned(oldCommitValue);
218
219 if (updateStatus != Status.OK) {
220 return new MapEntryUpdateResult<>(updateStatus, "", key,
221 oldTreeValue, oldTreeValue);
222 }
223
224 byte[] newValue = commit.value().value();
225 long newVersion = commit.index();
226 Versioned<byte[]> newTreeValue = newValue == null ? null
227 : new Versioned<byte[]>(newValue, newVersion);
228
229 MapEvent.Type updateType = newValue == null ? MapEvent.Type.REMOVE
230 : oldCommitValue == null ? MapEvent.Type.INSERT :
231 MapEvent.Type.UPDATE;
232 if (updateType == MapEvent.Type.REMOVE ||
233 updateType == MapEvent.Type.UPDATE) {
234 tree.remove(key);
235 }
236 if (updateType == MapEvent.Type.INSERT ||
237 updateType == MapEvent.Type.UPDATE) {
238 tree.put(key, new TreeMapEntryValue(newVersion, commit.value().value()));
239 }
240 publish(Lists.newArrayList(new MapEvent<>("", key, newTreeValue,
241 oldTreeValue)));
242 return new MapEntryUpdateResult<>(updateStatus, "", key, oldTreeValue,
243 newTreeValue);
244 }
245
246 protected Status clear(Commit<Void> commit) {
247 Iterator<Map.Entry<String, TreeMapEntryValue>> iterator = tree
248 .entrySet()
249 .iterator();
250 while (iterator.hasNext()) {
251 Map.Entry<String, TreeMapEntryValue> entry = iterator.next();
252 String key = entry.getKey();
253 TreeMapEntryValue value = entry.getValue();
254 Versioned<byte[]> removedValue =
255 new Versioned<byte[]>(value.value(),
256 value.version());
257 publish(Lists.newArrayList(new MapEvent<>("", key, null,
258 removedValue)));
259 iterator.remove();
260 }
261 return Status.OK;
262 }
263
264 protected void listen(Commit<Void> commit) {
265 listeners.put(commit.session().sessionId().id(), commit.session());
266 }
267
268 protected void unlisten(Commit<Void> commit) {
269 closeListener(commit.session().sessionId().id());
270 }
271
272 private Status validate(UpdateAndGet update) {
273 TreeMapEntryValue existingValue = tree.get(update.key());
274 if (existingValue == null && update.value() == null) {
275 return Status.NOOP;
276 }
277 if (preparedKeys.contains(update.key())) {
278 return Status.WRITE_LOCK;
279 }
280 byte[] existingRawValue = existingValue == null ? null :
281 existingValue.value();
282 Long existingVersion = existingValue == null ? null :
283 existingValue.version();
284 return update.valueMatch().matches(existingRawValue)
285 && update.versionMatch().matches(existingVersion) ?
286 Status.OK
287 : Status.PRECONDITION_FAILED;
288 }
289
290 protected NavigableMap<String, TreeMapEntryValue> subMap(
291 Commit<? extends SubMap> commit) {
292 // Do not support this until lazy communication is possible. At present
293 // it transmits up to the entire map.
294 SubMap<String, TreeMapEntryValue> subMap = commit.value();
295 return tree.subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
296 subMap.toKey(), subMap.isInclusiveTo());
297 }
298
299 protected String firstKey(Commit<Void> commit) {
300 if (tree.isEmpty()) {
301 return null;
302 }
303 return tree.firstKey();
304 }
305
306 protected String lastKey(Commit<Void> commit) {
307 return tree.isEmpty() ? null : tree.lastKey();
308 }
309
310 protected Map.Entry<String, Versioned<byte[]>> higherEntry(Commit<? extends HigherEntry> commit) {
311 if (tree.isEmpty()) {
312 return null;
313 }
314 return toVersionedEntry(
315 tree.higherEntry(commit.value().key()));
316 }
317
318 protected Map.Entry<String, Versioned<byte[]>> firstEntry(Commit<Void> commit) {
319 if (tree.isEmpty()) {
320 return null;
321 }
322 return toVersionedEntry(tree.firstEntry());
323 }
324
325 protected Map.Entry<String, Versioned<byte[]>> lastEntry(Commit<Void> commit) {
326 if (tree.isEmpty()) {
327 return null;
328 }
329 return toVersionedEntry(tree.lastEntry());
330 }
331
332 protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry(Commit<Void> commit) {
333 return toVersionedEntry(tree.pollFirstEntry());
334 }
335
336 protected Map.Entry<String, Versioned<byte[]>> pollLastEntry(Commit<Void> commit) {
337 return toVersionedEntry(tree.pollLastEntry());
338 }
339
340 protected Map.Entry<String, Versioned<byte[]>> lowerEntry(Commit<? extends LowerEntry> commit) {
341 return toVersionedEntry(tree.lowerEntry(commit.value().key()));
342 }
343
344 protected String lowerKey(Commit<? extends LowerKey> commit) {
345 return tree.lowerKey(commit.value().key());
346 }
347
348 protected Map.Entry<String, Versioned<byte[]>> floorEntry(Commit<? extends FloorEntry> commit) {
349 return toVersionedEntry(tree.floorEntry(commit.value().key()));
350 }
351
352 protected String floorKey(Commit<? extends FloorKey> commit) {
353 return tree.floorKey(commit.value().key());
354 }
355
356 protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(Commit<CeilingEntry> commit) {
357 return toVersionedEntry(
358 tree.ceilingEntry(commit.value().key()));
359 }
360
361 protected String ceilingKey(Commit<CeilingKey> commit) {
362 return tree.ceilingKey(commit.value().key());
363 }
364
365 protected String higherKey(Commit<HigherKey> commit) {
366 return tree.higherKey(commit.value().key());
367 }
368
369 private Versioned<byte[]> toVersioned(TreeMapEntryValue value) {
370 return value == null ? null :
371 new Versioned<byte[]>(value.value(), value.version());
372 }
373
374 private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
375 Map.Entry<String, TreeMapEntryValue> entry) {
376 //FIXME is this the best type of entry to return?
377 return entry == null ? null : new SimpleImmutableEntry<>(
378 entry.getKey(), toVersioned(entry.getValue()));
379 }
380
381 private void publish(List<MapEvent<String, byte[]>> events) {
382 listeners.values().forEach(session -> session.publish(CHANGE, SERIALIZER::encode, events));
383 }
384
385 @Override
386 public void onExpire(RaftSession session) {
387 closeListener(session.sessionId().id());
388 }
389
390 @Override
391 public void onClose(RaftSession session) {
392 closeListener(session.sessionId().id());
393 }
394
395 private void closeListener(Long sessionId) {
396 listeners.remove(sessionId);
397 }
398
399 private static class TreeMapEntryValue {
400 private final long version;
401 private final byte[] value;
402
403 public TreeMapEntryValue(long version, byte[] value) {
404 this.version = version;
405 this.value = value;
406 }
407
408 public byte[] value() {
409 return value;
410 }
411
412 public long version() {
413 return version;
414 }
415 }
416}