blob: 14346cef7ae70bb991bfec1d36cc25197c258094 [file] [log] [blame]
Jordan Halterman948d6592017-04-20 17:18:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman948d6592017-04-20 17:18:24 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Map;
19import java.util.stream.Stream;
20
21import com.google.common.collect.Maps;
22import org.apache.commons.lang3.tuple.Pair;
23import org.onosproject.store.primitives.MapUpdate;
24import org.onosproject.store.service.ConsistentMap;
Jordan Halterman5f97a302017-04-26 23:41:31 -070025import org.onosproject.store.service.Version;
Jordan Halterman948d6592017-04-20 17:18:24 -070026import org.onosproject.store.service.Versioned;
27
28/**
29 * Repeatable read based map participant.
30 */
31public class DefaultTransactionalMapParticipant<K, V> extends TransactionalMapParticipant<K, V> {
32 private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
33
34 public DefaultTransactionalMapParticipant(
35 ConsistentMap<K, V> backingMap, Transaction<MapUpdate<K, V>> transaction) {
36 super(backingMap, transaction);
37 }
38
39 @Override
40 protected V read(K key) {
Jordan Halterman5f97a302017-04-26 23:41:31 -070041 Versioned<V> value = backingMap.getOrDefault(key, null);
42 readCache.put(key, value);
43 return value.value();
Jordan Halterman948d6592017-04-20 17:18:24 -070044 }
45
46 @Override
Jordan Halterman5f97a302017-04-26 23:41:31 -070047 public boolean hasPendingUpdates() {
48 return !writeCache.isEmpty() || !deleteSet.isEmpty();
49 }
50
51 @Override
52 protected Stream<MapUpdate<K, V>> records(Version lockVersion) {
53 return Stream.concat(deleteStream(), writeStream(lockVersion));
Jordan Halterman948d6592017-04-20 17:18:24 -070054 }
55
56 /**
57 * Returns a transaction record stream for deleted keys.
58 */
59 private Stream<MapUpdate<K, V>> deleteStream() {
60 return deleteSet.stream()
61 .map(key -> Pair.of(key, readCache.get(key)))
Jordan Halterman948d6592017-04-20 17:18:24 -070062 .map(e -> MapUpdate.<K, V>newBuilder()
63 .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
64 .withKey(e.getKey())
Jordan Halterman5f97a302017-04-26 23:41:31 -070065 .withVersion(e.getValue().version())
Jordan Halterman948d6592017-04-20 17:18:24 -070066 .build());
67 }
68
69 /**
70 * Returns a transaction record stream for updated keys.
71 */
Jordan Halterman5f97a302017-04-26 23:41:31 -070072 private Stream<MapUpdate<K, V>> writeStream(Version lockVersion) {
73 return writeCache.entrySet().stream()
74 .map(entry -> {
75 Versioned<V> original = readCache.get(entry.getKey());
76 return MapUpdate.<K, V>newBuilder()
77 .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
78 .withKey(entry.getKey())
79 .withValue(entry.getValue())
80 .withVersion(Math.max(original.version(), lockVersion.value()))
81 .build();
82 });
Jordan Halterman948d6592017-04-20 17:18:24 -070083 }
84}