blob: 2f8f5a8e6398fe793ebdfe75957f4e96a0b54ef5 [file] [log] [blame]
Aaron Kruglikov3e29f662016-07-13 10:18:10 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.ServerSession;
26import io.atomix.copycat.server.session.SessionListener;
27import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
28import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
29import io.atomix.resource.ResourceStateMachine;
30import org.onlab.util.Match;
31import org.onosproject.store.service.MapEvent;
32import org.onosproject.store.service.Versioned;
33
34import java.util.AbstractMap.SimpleImmutableEntry;
35import java.util.Collection;
36import java.util.Iterator;
37import java.util.List;
38import java.util.Map;
39import java.util.NavigableMap;
40import java.util.Properties;
41import java.util.Set;
42import java.util.TreeMap;
43import java.util.concurrent.atomic.AtomicLong;
44import java.util.function.Function;
45import java.util.stream.Collectors;
46
47import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingEntry;
48import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingKey;
49import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Clear;
50import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsKey;
51import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsValue;
52import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.EntrySet;
53import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstEntry;
54import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
55import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
56import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorKey;
57import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Get;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherEntry;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherKey;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.IsEmpty;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.KeySet;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastEntry;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastKey;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Listen;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerEntry;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerKey;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollFirstEntry;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollLastEntry;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Size;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.SubMap;
71import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Unlisten;
72import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.UpdateAndGet;
73import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Values;
74import static org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult.*;
75
76/**
77 * State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
78 * {@link TreeMap}.
79 */
80public class AtomixConsistentTreeMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
81
82 private final Map<Long, Commit<? extends Listen>> listeners =
83 Maps.newHashMap();
84 private TreeMap<String, TreeMapEntryValue> tree = Maps.newTreeMap();
85 private final Set<String> preparedKeys = Sets.newHashSet();
86 private AtomicLong versionCounter = new AtomicLong(0);
87
88 private Function<Commit<SubMap>, NavigableMap<String, TreeMapEntryValue>> subMapFunction = this::subMap;
89 private Function<Commit<FirstKey>, String> firstKeyFunction = this::firstKey;
90 private Function<Commit<LastKey>, String> lastKeyFunction = this::lastKey;
91 private Function<Commit<HigherEntry>, Map.Entry<String, Versioned<byte[]>>> higherEntryFunction =
92 this::higherEntry;
93 private Function<Commit<FirstEntry>, Map.Entry<String, Versioned<byte[]>>> firstEntryFunction =
94 this::firstEntry;
95 private Function<Commit<LastEntry>, Map.Entry<String, Versioned<byte[]>>> lastEntryFunction =
96 this::lastEntry;
97 private Function<Commit<PollFirstEntry>, Map.Entry<String, Versioned<byte[]>>> pollFirstEntryFunction =
98 this::pollFirstEntry;
99 private Function<Commit<PollLastEntry>, Map.Entry<String, Versioned<byte[]>>> pollLastEntryFunction =
100 this::pollLastEntry;
101 private Function<Commit<LowerEntry>, Map.Entry<String, Versioned<byte[]>>> lowerEntryFunction =
102 this::lowerEntry;
103 private Function<Commit<LowerKey>, String> lowerKeyFunction = this::lowerKey;
104 private Function<Commit<FloorEntry>, Map.Entry<String, Versioned<byte[]>>> floorEntryFunction =
105 this::floorEntry;
106 private Function<Commit<CeilingEntry>, Map.Entry<String, Versioned<byte[]>>> ceilingEntryFunction =
107 this::ceilingEntry;
108 private Function<Commit<FloorKey>, String> floorKeyFunction = this::floorKey;
109 private Function<Commit<CeilingKey>, String> ceilingKeyFunction = this::ceilingKey;
110 private Function<Commit<HigherKey>, String> higherKeyFunction = this::higherKey;
111
112 public AtomixConsistentTreeMapState(Properties properties) {
113 super(properties);
114 }
115
116 @Override
117 public void snapshot(SnapshotWriter writer) {
118 writer.writeLong(versionCounter.get());
119 }
120
121 @Override
122 public void install(SnapshotReader reader) {
123 versionCounter = new AtomicLong(reader.readLong());
124 }
125
126 @Override
127 public void configure(StateMachineExecutor executor) {
128 // Listeners
129 executor.register(Listen.class, this::listen);
130 executor.register(Unlisten.class, this::unlisten);
131 // Queries
132 executor.register(ContainsKey.class, this::containsKey);
133 executor.register(ContainsValue.class, this::containsValue);
134 executor.register(EntrySet.class, this::entrySet);
135 executor.register(Get.class, this::get);
136 executor.register(IsEmpty.class, this::isEmpty);
137 executor.register(KeySet.class, this::keySet);
138 executor.register(Size.class, this::size);
139 executor.register(Values.class, this::values);
140 executor.register(SubMap.class, subMapFunction);
141 executor.register(FirstKey.class, firstKeyFunction);
142 executor.register(LastKey.class, lastKeyFunction);
143 executor.register(FirstEntry.class, firstEntryFunction);
144 executor.register(LastEntry.class, lastEntryFunction);
145 executor.register(PollFirstEntry.class, pollFirstEntryFunction);
146 executor.register(PollLastEntry.class, pollLastEntryFunction);
147 executor.register(LowerEntry.class, lowerEntryFunction);
148 executor.register(LowerKey.class, lowerKeyFunction);
149 executor.register(FloorEntry.class, floorEntryFunction);
150 executor.register(FloorKey.class, floorKeyFunction);
151 executor.register(CeilingEntry.class, ceilingEntryFunction);
152 executor.register(CeilingKey.class, ceilingKeyFunction);
153 executor.register(HigherEntry.class, higherEntryFunction);
154 executor.register(HigherKey.class, higherKeyFunction);
155
156 // Commands
157 executor.register(UpdateAndGet.class, this::updateAndGet);
158 executor.register(Clear.class, this::clear);
159 }
160
161 @Override
162 public void delete() {
163 listeners.values().forEach(Commit::close);
164 listeners.clear();
165 tree.values().forEach(TreeMapEntryValue::discard);
166 tree.clear();
167 }
168
169 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
170 try {
171 return toVersioned(tree.get((commit.operation().key()))) != null;
172 } finally {
173 commit.close();
174 }
175 }
176
177 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
178 try {
179 Match<byte[]> valueMatch = Match
180 .ifValue(commit.operation().value());
181 return tree.values().stream().anyMatch(
182 value -> valueMatch.matches(value.value()));
183 } finally {
184 commit.close();
185 }
186 }
187
188 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
189 try {
190 return toVersioned(tree.get(commit.operation().key()));
191 } finally {
192 commit.close();
193 }
194 }
195
196 protected int size(Commit<? extends Size> commit) {
197 try {
198 return tree.size();
199 } finally {
200 commit.close();
201 }
202 }
203
204 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
205 try {
206 return tree.isEmpty();
207 } finally {
208 commit.close();
209 }
210 }
211
212 protected Set<String> keySet(Commit<? extends KeySet> commit) {
213 try {
214 return tree.keySet().stream().collect(Collectors.toSet());
215 } finally {
216 commit.close();
217 }
218 }
219
220 protected Collection<Versioned<byte[]>> values(
221 Commit<? extends Values> commit) {
222 try {
223 return tree.values().stream().map(this::toVersioned)
224 .collect(Collectors.toList());
225 } finally {
226 commit.close();
227 }
228 }
229
230 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
231 Commit<? extends EntrySet> commit) {
232 try {
233 return tree
234 .entrySet()
235 .stream()
236 .map(e -> Maps.immutableEntry(e.getKey(),
237 toVersioned(e.getValue())))
238 .collect(Collectors.toSet());
239 } finally {
240 commit.close();
241 }
242 }
243
244 protected MapEntryUpdateResult<String, byte[]> updateAndGet(
245 Commit<? extends UpdateAndGet> commit) {
246 Status updateStatus = validate(commit.operation());
247 String key = commit.operation().key();
248 TreeMapEntryValue oldCommitValue = tree.get(commit.operation().key());
249 Versioned<byte[]> oldTreeValue = toVersioned(oldCommitValue);
250
251 if (updateStatus != Status.OK) {
252 commit.close();
253 return new MapEntryUpdateResult<>(updateStatus, "", key,
254 oldTreeValue, oldTreeValue);
255 }
256
257 byte[] newValue = commit.operation().value();
258 long newVersion = versionCounter.incrementAndGet();
259 Versioned<byte[]> newTreeValue = newValue == null ? null
260 : new Versioned<byte[]>(newValue, newVersion);
261
262 MapEvent.Type updateType = newValue == null ? MapEvent.Type.REMOVE
263 : oldCommitValue == null ? MapEvent.Type.INSERT :
264 MapEvent.Type.UPDATE;
265 if (updateType == MapEvent.Type.REMOVE ||
266 updateType == MapEvent.Type.UPDATE) {
267 tree.remove(key);
268 oldCommitValue.discard();
269 }
270 if (updateType == MapEvent.Type.INSERT ||
271 updateType == MapEvent.Type.UPDATE) {
272 tree.put(key, new NonTransactionalCommit(newVersion, commit));
273 } else {
274 commit.close();
275 }
276 publish(Lists.newArrayList(new MapEvent<>("", key, newTreeValue,
277 oldTreeValue)));
278 return new MapEntryUpdateResult<>(updateStatus, "", key, oldTreeValue,
279 newTreeValue);
280 }
281
282 protected Status clear(
283 Commit<? extends Clear> commit) {
284 try {
285 Iterator<Map.Entry<String, TreeMapEntryValue>> iterator = tree
286 .entrySet()
287 .iterator();
288 while (iterator.hasNext()) {
289 Map.Entry<String, TreeMapEntryValue> entry = iterator.next();
290 String key = entry.getKey();
291 TreeMapEntryValue value = entry.getValue();
292 Versioned<byte[]> removedValue =
293 new Versioned<byte[]>(value.value(),
294 value.version());
295 publish(Lists.newArrayList(new MapEvent<>("", key, null,
296 removedValue)));
297 value.discard();
298 iterator.remove();
299 }
300 return Status.OK;
301 } finally {
302 commit.close();
303 }
304 }
305
306 protected void listen(
307 Commit<? extends Listen> commit) {
308 Long sessionId = commit.session().id();
309 listeners.put(sessionId, commit);
310 commit.session()
311 .onStateChange(
312 state -> {
313 if (state == ServerSession.State.CLOSED
314 || state == ServerSession.State.EXPIRED) {
315 Commit<? extends Listen> listener =
316 listeners.remove(sessionId);
317 if (listener != null) {
318 listener.close();
319 }
320 }
321 });
322 }
323
324 protected void unlisten(
325 Commit<? extends Unlisten> commit) {
326 try {
327 Commit<? extends AtomixConsistentTreeMapCommands.Listen> listener =
328 listeners.remove(commit.session());
329 if (listener != null) {
330 listener.close();
331 }
332 } finally {
333 commit.close();
334 }
335 }
336
337 private Status validate(UpdateAndGet update) {
338 TreeMapEntryValue existingValue = tree.get(update.key());
339 if (existingValue == null && update.value() == null) {
340 return Status.NOOP;
341 }
342 if (preparedKeys.contains(update.key())) {
343 return Status.WRITE_LOCK;
344 }
345 byte[] existingRawValue = existingValue == null ? null :
346 existingValue.value();
347 Long existingVersion = existingValue == null ? null :
348 existingValue.version();
349 return update.valueMatch().matches(existingRawValue)
350 && update.versionMatch().matches(existingVersion) ?
351 Status.OK
352 : Status.PRECONDITION_FAILED;
353 }
354
355 protected NavigableMap<String, TreeMapEntryValue> subMap(
356 Commit<? extends SubMap> commit) {
357 //Do not support this until lazy communication is possible. At present
358 // it transmits up to the entire map.
359 try {
360 SubMap<String, TreeMapEntryValue> subMap = commit.operation();
361 return tree.subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
362 subMap.toKey(), subMap.isInclusiveTo());
363 } finally {
364 commit.close();
365 }
366 }
367
368 protected String firstKey(Commit<? extends FirstKey> commit) {
369 try {
370 if (tree.isEmpty()) {
371 return null;
372 }
373 return tree.firstKey();
374 } finally {
375 commit.close();
376 }
377 }
378
379 protected String lastKey(Commit<? extends LastKey> commit) {
380 try {
381 return tree.isEmpty() ? null : tree.lastKey();
382 } finally {
383 commit.close();
384 }
385 }
386
387 protected Map.Entry<String, Versioned<byte[]>> higherEntry(
388 Commit<? extends HigherEntry> commit) {
389 try {
390 if (tree.isEmpty()) {
391 return null;
392 }
393 return toVersionedEntry(
394 tree.higherEntry(commit.operation().key()));
395 } finally {
396 commit.close();
397 }
398 }
399
400 protected Map.Entry<String, Versioned<byte[]>> firstEntry(
401 Commit<? extends FirstEntry> commit) {
402 try {
403 if (tree.isEmpty()) {
404 return null;
405 }
406 return toVersionedEntry(tree.firstEntry());
407 } finally {
408 commit.close();
409 }
410 }
411
412 protected Map.Entry<String, Versioned<byte[]>> lastEntry(
413 Commit<? extends LastEntry> commit) {
414 try {
415 if (tree.isEmpty()) {
416 return null;
417 }
418 return toVersionedEntry(tree.lastEntry());
419 } finally {
420 commit.close();
421 }
422 }
423
424 protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry(
425 Commit<? extends PollFirstEntry> commit) {
426 try {
427 return toVersionedEntry(tree.pollFirstEntry());
428 } finally {
429 commit.close();
430 }
431 }
432
433 protected Map.Entry<String, Versioned<byte[]>> pollLastEntry(
434 Commit<? extends PollLastEntry> commit) {
435 try {
436 return toVersionedEntry(tree.pollLastEntry());
437 } finally {
438 commit.close();
439 }
440 }
441
442 protected Map.Entry<String, Versioned<byte[]>> lowerEntry(
443 Commit<? extends LowerEntry> commit) {
444 try {
445 return toVersionedEntry(tree.lowerEntry(commit.operation().key()));
446 } finally {
447 commit.close();
448 }
449 }
450
451 protected String lowerKey(Commit<? extends LowerKey> commit) {
452 try {
453 return tree.lowerKey(commit.operation().key());
454 } finally {
455 commit.close();
456 }
457 }
458
459 protected Map.Entry<String, Versioned<byte[]>> floorEntry(
460 Commit<? extends FloorEntry> commit) {
461 try {
462 return toVersionedEntry(tree.floorEntry(commit.operation().key()));
463 } finally {
464 commit.close();
465 }
466 }
467
468 protected String floorKey(Commit<? extends FloorKey> commit) {
469 try {
470 return tree.floorKey(commit.operation().key());
471 } finally {
472 commit.close();
473 }
474 }
475
476 protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(
477 Commit<CeilingEntry> commit) {
478 try {
479 return toVersionedEntry(
480 tree.ceilingEntry(commit.operation().key()));
481 } finally {
482 commit.close();
483 }
484 }
485
486 protected String ceilingKey(Commit<CeilingKey> commit) {
487 try {
488 return tree.ceilingKey(commit.operation().key());
489 } finally {
490 commit.close();
491 }
492 }
493
494 protected String higherKey(Commit<HigherKey> commit) {
495 try {
496 return tree.higherKey(commit.operation().key());
497 } finally {
498 commit.close();
499 }
500 }
501
502 private Versioned<byte[]> toVersioned(TreeMapEntryValue value) {
503 return value == null ? null :
504 new Versioned<byte[]>(value.value(), value.version());
505 }
506
507 private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
508 Map.Entry<String, TreeMapEntryValue> entry) {
509 //FIXME is this the best type of entry to return?
510 return entry == null ? null : new SimpleImmutableEntry<>(
511 entry.getKey(), toVersioned(entry.getValue()));
512 }
513
514 private void publish(List<MapEvent<String, byte[]>> events) {
515 listeners.values().forEach(commit -> commit.session()
516 .publish(AtomixConsistentTreeMap.CHANGE_SUBJECT, events));
517 }
518
519 @Override
520 public void register(ServerSession session) {
521 }
522
523 @Override
524 public void unregister(ServerSession session) {
525 closeListener(session.id());
526 }
527
528 @Override
529 public void expire(ServerSession session) {
530 closeListener(session.id());
531 }
532
533 @Override
534 public void close(ServerSession session) {
535 closeListener(session.id());
536 }
537
538 private void closeListener(Long sessionId) {
539 Commit<? extends Listen> commit = listeners.remove(sessionId);
540 if (commit != null) {
541 commit.close();
542 }
543 }
544
545 private interface TreeMapEntryValue {
546
547 byte[] value();
548
549 long version();
550
551 void discard();
552 }
553
554 private class NonTransactionalCommit implements TreeMapEntryValue {
555 private final long version;
556 private final Commit<? extends UpdateAndGet> commit;
557
558 public NonTransactionalCommit(long version,
559 Commit<? extends UpdateAndGet> commit) {
560 this.version = version;
561 this.commit = commit;
562 }
563
564 @Override
565 public byte[] value() {
566 return commit.operation().value();
567 }
568
569 @Override
570 public long version() {
571 return version;
572 }
573
574 @Override
575 public void discard() {
576 commit.close();
577 }
578 }
579}