blob: 576bfea5d6dbff65dade3627c13a6a2c87ac4f29 [file] [log] [blame]
Aaron Kruglikov3e29f662016-07-13 10:18:10 -07001/*
Brian O'Connor0a4e6742016-09-15 23:03:10 -07002 * Copyright 2016-present Open Networking Laboratory
Aaron Kruglikov3e29f662016-07-13 10:18:10 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import io.atomix.copycat.server.Commit;
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070023import io.atomix.copycat.server.StateMachineExecutor;
24import io.atomix.copycat.server.session.ServerSession;
25import io.atomix.copycat.server.session.SessionListener;
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070026import io.atomix.resource.ResourceStateMachine;
27import org.onlab.util.Match;
28import org.onosproject.store.service.MapEvent;
29import org.onosproject.store.service.Versioned;
30
31import java.util.AbstractMap.SimpleImmutableEntry;
32import java.util.Collection;
33import java.util.Iterator;
34import java.util.List;
35import java.util.Map;
36import java.util.NavigableMap;
37import java.util.Properties;
38import java.util.Set;
39import java.util.TreeMap;
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070040import java.util.function.Function;
41import java.util.stream.Collectors;
42
43import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingEntry;
44import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingKey;
45import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Clear;
46import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsKey;
47import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsValue;
48import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.EntrySet;
49import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstEntry;
50import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
51import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
52import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorKey;
53import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Get;
Jordan Haltermanf6272442017-04-20 02:18:08 -070054import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.GetOrDefault;
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070055import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherEntry;
56import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherKey;
57import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.IsEmpty;
58import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.KeySet;
59import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastEntry;
60import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastKey;
61import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Listen;
62import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerEntry;
63import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerKey;
64import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollFirstEntry;
65import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollLastEntry;
66import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Size;
67import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.SubMap;
68import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Unlisten;
69import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.UpdateAndGet;
70import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Values;
71import static org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult.*;
72
73/**
74 * State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
75 * {@link TreeMap}.
76 */
Jordan Halterman27947022017-04-20 01:30:13 -070077public class AtomixConsistentTreeMapState extends ResourceStateMachine implements SessionListener {
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070078
79 private final Map<Long, Commit<? extends Listen>> listeners =
80 Maps.newHashMap();
81 private TreeMap<String, TreeMapEntryValue> tree = Maps.newTreeMap();
82 private final Set<String> preparedKeys = Sets.newHashSet();
Aaron Kruglikov3e29f662016-07-13 10:18:10 -070083
84 private Function<Commit<SubMap>, NavigableMap<String, TreeMapEntryValue>> subMapFunction = this::subMap;
85 private Function<Commit<FirstKey>, String> firstKeyFunction = this::firstKey;
86 private Function<Commit<LastKey>, String> lastKeyFunction = this::lastKey;
87 private Function<Commit<HigherEntry>, Map.Entry<String, Versioned<byte[]>>> higherEntryFunction =
88 this::higherEntry;
89 private Function<Commit<FirstEntry>, Map.Entry<String, Versioned<byte[]>>> firstEntryFunction =
90 this::firstEntry;
91 private Function<Commit<LastEntry>, Map.Entry<String, Versioned<byte[]>>> lastEntryFunction =
92 this::lastEntry;
93 private Function<Commit<PollFirstEntry>, Map.Entry<String, Versioned<byte[]>>> pollFirstEntryFunction =
94 this::pollFirstEntry;
95 private Function<Commit<PollLastEntry>, Map.Entry<String, Versioned<byte[]>>> pollLastEntryFunction =
96 this::pollLastEntry;
97 private Function<Commit<LowerEntry>, Map.Entry<String, Versioned<byte[]>>> lowerEntryFunction =
98 this::lowerEntry;
99 private Function<Commit<LowerKey>, String> lowerKeyFunction = this::lowerKey;
100 private Function<Commit<FloorEntry>, Map.Entry<String, Versioned<byte[]>>> floorEntryFunction =
101 this::floorEntry;
102 private Function<Commit<CeilingEntry>, Map.Entry<String, Versioned<byte[]>>> ceilingEntryFunction =
103 this::ceilingEntry;
104 private Function<Commit<FloorKey>, String> floorKeyFunction = this::floorKey;
105 private Function<Commit<CeilingKey>, String> ceilingKeyFunction = this::ceilingKey;
106 private Function<Commit<HigherKey>, String> higherKeyFunction = this::higherKey;
107
108 public AtomixConsistentTreeMapState(Properties properties) {
109 super(properties);
110 }
111
112 @Override
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700113 public void configure(StateMachineExecutor executor) {
114 // Listeners
115 executor.register(Listen.class, this::listen);
116 executor.register(Unlisten.class, this::unlisten);
117 // Queries
118 executor.register(ContainsKey.class, this::containsKey);
119 executor.register(ContainsValue.class, this::containsValue);
120 executor.register(EntrySet.class, this::entrySet);
121 executor.register(Get.class, this::get);
Jordan Haltermanf6272442017-04-20 02:18:08 -0700122 executor.register(GetOrDefault.class, this::getOrDefault);
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700123 executor.register(IsEmpty.class, this::isEmpty);
124 executor.register(KeySet.class, this::keySet);
125 executor.register(Size.class, this::size);
126 executor.register(Values.class, this::values);
127 executor.register(SubMap.class, subMapFunction);
128 executor.register(FirstKey.class, firstKeyFunction);
129 executor.register(LastKey.class, lastKeyFunction);
130 executor.register(FirstEntry.class, firstEntryFunction);
131 executor.register(LastEntry.class, lastEntryFunction);
132 executor.register(PollFirstEntry.class, pollFirstEntryFunction);
133 executor.register(PollLastEntry.class, pollLastEntryFunction);
134 executor.register(LowerEntry.class, lowerEntryFunction);
135 executor.register(LowerKey.class, lowerKeyFunction);
136 executor.register(FloorEntry.class, floorEntryFunction);
137 executor.register(FloorKey.class, floorKeyFunction);
138 executor.register(CeilingEntry.class, ceilingEntryFunction);
139 executor.register(CeilingKey.class, ceilingKeyFunction);
140 executor.register(HigherEntry.class, higherEntryFunction);
141 executor.register(HigherKey.class, higherKeyFunction);
142
143 // Commands
144 executor.register(UpdateAndGet.class, this::updateAndGet);
145 executor.register(Clear.class, this::clear);
146 }
147
148 @Override
149 public void delete() {
150 listeners.values().forEach(Commit::close);
151 listeners.clear();
152 tree.values().forEach(TreeMapEntryValue::discard);
153 tree.clear();
154 }
155
156 protected boolean containsKey(Commit<? extends ContainsKey> commit) {
157 try {
158 return toVersioned(tree.get((commit.operation().key()))) != null;
159 } finally {
160 commit.close();
161 }
162 }
163
164 protected boolean containsValue(Commit<? extends ContainsValue> commit) {
165 try {
166 Match<byte[]> valueMatch = Match
167 .ifValue(commit.operation().value());
168 return tree.values().stream().anyMatch(
169 value -> valueMatch.matches(value.value()));
170 } finally {
171 commit.close();
172 }
173 }
174
175 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
176 try {
177 return toVersioned(tree.get(commit.operation().key()));
178 } finally {
179 commit.close();
180 }
181 }
182
Jordan Haltermanf6272442017-04-20 02:18:08 -0700183 protected Versioned<byte[]> getOrDefault(Commit<? extends GetOrDefault> commit) {
184 try {
185 Versioned<byte[]> value = toVersioned(tree.get(commit.operation().key()));
186 return value != null ? value : new Versioned<>(commit.operation().defaultValue(), 0);
187 } finally {
188 commit.close();
189 }
190 }
191
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700192 protected int size(Commit<? extends Size> commit) {
193 try {
194 return tree.size();
195 } finally {
196 commit.close();
197 }
198 }
199
200 protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
201 try {
202 return tree.isEmpty();
203 } finally {
204 commit.close();
205 }
206 }
207
208 protected Set<String> keySet(Commit<? extends KeySet> commit) {
209 try {
210 return tree.keySet().stream().collect(Collectors.toSet());
211 } finally {
212 commit.close();
213 }
214 }
215
216 protected Collection<Versioned<byte[]>> values(
217 Commit<? extends Values> commit) {
218 try {
219 return tree.values().stream().map(this::toVersioned)
220 .collect(Collectors.toList());
221 } finally {
222 commit.close();
223 }
224 }
225
226 protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
227 Commit<? extends EntrySet> commit) {
228 try {
229 return tree
230 .entrySet()
231 .stream()
232 .map(e -> Maps.immutableEntry(e.getKey(),
233 toVersioned(e.getValue())))
234 .collect(Collectors.toSet());
235 } finally {
236 commit.close();
237 }
238 }
239
240 protected MapEntryUpdateResult<String, byte[]> updateAndGet(
241 Commit<? extends UpdateAndGet> commit) {
242 Status updateStatus = validate(commit.operation());
243 String key = commit.operation().key();
244 TreeMapEntryValue oldCommitValue = tree.get(commit.operation().key());
245 Versioned<byte[]> oldTreeValue = toVersioned(oldCommitValue);
246
247 if (updateStatus != Status.OK) {
248 commit.close();
249 return new MapEntryUpdateResult<>(updateStatus, "", key,
250 oldTreeValue, oldTreeValue);
251 }
252
253 byte[] newValue = commit.operation().value();
Jordan Halterman27947022017-04-20 01:30:13 -0700254 long newVersion = commit.index();
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700255 Versioned<byte[]> newTreeValue = newValue == null ? null
256 : new Versioned<byte[]>(newValue, newVersion);
257
258 MapEvent.Type updateType = newValue == null ? MapEvent.Type.REMOVE
259 : oldCommitValue == null ? MapEvent.Type.INSERT :
260 MapEvent.Type.UPDATE;
261 if (updateType == MapEvent.Type.REMOVE ||
262 updateType == MapEvent.Type.UPDATE) {
263 tree.remove(key);
264 oldCommitValue.discard();
265 }
266 if (updateType == MapEvent.Type.INSERT ||
267 updateType == MapEvent.Type.UPDATE) {
268 tree.put(key, new NonTransactionalCommit(newVersion, commit));
269 } else {
270 commit.close();
271 }
272 publish(Lists.newArrayList(new MapEvent<>("", key, newTreeValue,
273 oldTreeValue)));
274 return new MapEntryUpdateResult<>(updateStatus, "", key, oldTreeValue,
275 newTreeValue);
276 }
277
278 protected Status clear(
279 Commit<? extends Clear> commit) {
280 try {
281 Iterator<Map.Entry<String, TreeMapEntryValue>> iterator = tree
282 .entrySet()
283 .iterator();
284 while (iterator.hasNext()) {
285 Map.Entry<String, TreeMapEntryValue> entry = iterator.next();
286 String key = entry.getKey();
287 TreeMapEntryValue value = entry.getValue();
288 Versioned<byte[]> removedValue =
289 new Versioned<byte[]>(value.value(),
290 value.version());
291 publish(Lists.newArrayList(new MapEvent<>("", key, null,
292 removedValue)));
293 value.discard();
294 iterator.remove();
295 }
296 return Status.OK;
297 } finally {
298 commit.close();
299 }
300 }
301
302 protected void listen(
303 Commit<? extends Listen> commit) {
304 Long sessionId = commit.session().id();
305 listeners.put(sessionId, commit);
306 commit.session()
307 .onStateChange(
308 state -> {
309 if (state == ServerSession.State.CLOSED
310 || state == ServerSession.State.EXPIRED) {
311 Commit<? extends Listen> listener =
312 listeners.remove(sessionId);
313 if (listener != null) {
314 listener.close();
315 }
316 }
317 });
318 }
319
320 protected void unlisten(
321 Commit<? extends Unlisten> commit) {
322 try {
323 Commit<? extends AtomixConsistentTreeMapCommands.Listen> listener =
Yuta HIGUCHI3f084df2016-12-23 13:11:39 -0800324 listeners.remove(commit.session().id());
Aaron Kruglikov3e29f662016-07-13 10:18:10 -0700325 if (listener != null) {
326 listener.close();
327 }
328 } finally {
329 commit.close();
330 }
331 }
332
333 private Status validate(UpdateAndGet update) {
334 TreeMapEntryValue existingValue = tree.get(update.key());
335 if (existingValue == null && update.value() == null) {
336 return Status.NOOP;
337 }
338 if (preparedKeys.contains(update.key())) {
339 return Status.WRITE_LOCK;
340 }
341 byte[] existingRawValue = existingValue == null ? null :
342 existingValue.value();
343 Long existingVersion = existingValue == null ? null :
344 existingValue.version();
345 return update.valueMatch().matches(existingRawValue)
346 && update.versionMatch().matches(existingVersion) ?
347 Status.OK
348 : Status.PRECONDITION_FAILED;
349 }
350
351 protected NavigableMap<String, TreeMapEntryValue> subMap(
352 Commit<? extends SubMap> commit) {
353 //Do not support this until lazy communication is possible. At present
354 // it transmits up to the entire map.
355 try {
356 SubMap<String, TreeMapEntryValue> subMap = commit.operation();
357 return tree.subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
358 subMap.toKey(), subMap.isInclusiveTo());
359 } finally {
360 commit.close();
361 }
362 }
363
364 protected String firstKey(Commit<? extends FirstKey> commit) {
365 try {
366 if (tree.isEmpty()) {
367 return null;
368 }
369 return tree.firstKey();
370 } finally {
371 commit.close();
372 }
373 }
374
375 protected String lastKey(Commit<? extends LastKey> commit) {
376 try {
377 return tree.isEmpty() ? null : tree.lastKey();
378 } finally {
379 commit.close();
380 }
381 }
382
383 protected Map.Entry<String, Versioned<byte[]>> higherEntry(
384 Commit<? extends HigherEntry> commit) {
385 try {
386 if (tree.isEmpty()) {
387 return null;
388 }
389 return toVersionedEntry(
390 tree.higherEntry(commit.operation().key()));
391 } finally {
392 commit.close();
393 }
394 }
395
396 protected Map.Entry<String, Versioned<byte[]>> firstEntry(
397 Commit<? extends FirstEntry> commit) {
398 try {
399 if (tree.isEmpty()) {
400 return null;
401 }
402 return toVersionedEntry(tree.firstEntry());
403 } finally {
404 commit.close();
405 }
406 }
407
408 protected Map.Entry<String, Versioned<byte[]>> lastEntry(
409 Commit<? extends LastEntry> commit) {
410 try {
411 if (tree.isEmpty()) {
412 return null;
413 }
414 return toVersionedEntry(tree.lastEntry());
415 } finally {
416 commit.close();
417 }
418 }
419
420 protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry(
421 Commit<? extends PollFirstEntry> commit) {
422 try {
423 return toVersionedEntry(tree.pollFirstEntry());
424 } finally {
425 commit.close();
426 }
427 }
428
429 protected Map.Entry<String, Versioned<byte[]>> pollLastEntry(
430 Commit<? extends PollLastEntry> commit) {
431 try {
432 return toVersionedEntry(tree.pollLastEntry());
433 } finally {
434 commit.close();
435 }
436 }
437
438 protected Map.Entry<String, Versioned<byte[]>> lowerEntry(
439 Commit<? extends LowerEntry> commit) {
440 try {
441 return toVersionedEntry(tree.lowerEntry(commit.operation().key()));
442 } finally {
443 commit.close();
444 }
445 }
446
447 protected String lowerKey(Commit<? extends LowerKey> commit) {
448 try {
449 return tree.lowerKey(commit.operation().key());
450 } finally {
451 commit.close();
452 }
453 }
454
455 protected Map.Entry<String, Versioned<byte[]>> floorEntry(
456 Commit<? extends FloorEntry> commit) {
457 try {
458 return toVersionedEntry(tree.floorEntry(commit.operation().key()));
459 } finally {
460 commit.close();
461 }
462 }
463
464 protected String floorKey(Commit<? extends FloorKey> commit) {
465 try {
466 return tree.floorKey(commit.operation().key());
467 } finally {
468 commit.close();
469 }
470 }
471
472 protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(
473 Commit<CeilingEntry> commit) {
474 try {
475 return toVersionedEntry(
476 tree.ceilingEntry(commit.operation().key()));
477 } finally {
478 commit.close();
479 }
480 }
481
482 protected String ceilingKey(Commit<CeilingKey> commit) {
483 try {
484 return tree.ceilingKey(commit.operation().key());
485 } finally {
486 commit.close();
487 }
488 }
489
490 protected String higherKey(Commit<HigherKey> commit) {
491 try {
492 return tree.higherKey(commit.operation().key());
493 } finally {
494 commit.close();
495 }
496 }
497
498 private Versioned<byte[]> toVersioned(TreeMapEntryValue value) {
499 return value == null ? null :
500 new Versioned<byte[]>(value.value(), value.version());
501 }
502
503 private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
504 Map.Entry<String, TreeMapEntryValue> entry) {
505 //FIXME is this the best type of entry to return?
506 return entry == null ? null : new SimpleImmutableEntry<>(
507 entry.getKey(), toVersioned(entry.getValue()));
508 }
509
510 private void publish(List<MapEvent<String, byte[]>> events) {
511 listeners.values().forEach(commit -> commit.session()
512 .publish(AtomixConsistentTreeMap.CHANGE_SUBJECT, events));
513 }
514
515 @Override
516 public void register(ServerSession session) {
517 }
518
519 @Override
520 public void unregister(ServerSession session) {
521 closeListener(session.id());
522 }
523
524 @Override
525 public void expire(ServerSession session) {
526 closeListener(session.id());
527 }
528
529 @Override
530 public void close(ServerSession session) {
531 closeListener(session.id());
532 }
533
534 private void closeListener(Long sessionId) {
535 Commit<? extends Listen> commit = listeners.remove(sessionId);
536 if (commit != null) {
537 commit.close();
538 }
539 }
540
541 private interface TreeMapEntryValue {
542
543 byte[] value();
544
545 long version();
546
547 void discard();
548 }
549
550 private class NonTransactionalCommit implements TreeMapEntryValue {
551 private final long version;
552 private final Commit<? extends UpdateAndGet> commit;
553
554 public NonTransactionalCommit(long version,
555 Commit<? extends UpdateAndGet> commit) {
556 this.version = version;
557 this.commit = commit;
558 }
559
560 @Override
561 public byte[] value() {
562 return commit.operation().value();
563 }
564
565 @Override
566 public long version() {
567 return version;
568 }
569
570 @Override
571 public void discard() {
572 commit.close();
573 }
574 }
575}