blob: 2d42d4f1395929c58fd5e814521ca6ae98cdc6af [file] [log] [blame]
Jordan Haltermancb1e02c2017-08-25 16:20:43 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import java.util.Collection;
Yuta HIGUCHI47d96092017-11-17 14:05:26 -080019import java.util.List;
Jordan Haltermancb1e02c2017-08-25 16:20:43 -070020import java.util.Map;
21import java.util.Objects;
22import java.util.TreeMap;
23import java.util.concurrent.CompletableFuture;
24import java.util.stream.Collectors;
25
Yuta HIGUCHI47d96092017-11-17 14:05:26 -080026import com.google.common.collect.ImmutableList;
Jordan Haltermancb1e02c2017-08-25 16:20:43 -070027import com.google.common.collect.Maps;
28import org.onlab.util.Tools;
29import org.onosproject.cluster.PartitionId;
30import org.onosproject.store.primitives.NodeUpdate;
31import org.onosproject.store.primitives.TransactionId;
32import org.onosproject.store.service.AsyncDocumentTree;
33import org.onosproject.store.service.DocumentPath;
34import org.onosproject.store.service.DocumentTreeListener;
35import org.onosproject.store.service.NoSuchDocumentPathException;
36import org.onosproject.store.service.TransactionLog;
37import org.onosproject.store.service.Version;
38import org.onosproject.store.service.Versioned;
39
40import static com.google.common.base.Preconditions.checkNotNull;
41
42/**
43 * Partitioned asynchronous document tree.
44 */
45public class PartitionedAsyncDocumentTree<V> implements AsyncDocumentTree<V> {
46
47 private final String name;
48 private final TreeMap<PartitionId, AsyncDocumentTree<V>> partitions = Maps.newTreeMap();
49 private final Hasher<DocumentPath> pathHasher;
50
51 public PartitionedAsyncDocumentTree(
52 String name,
53 Map<PartitionId, AsyncDocumentTree<V>> partitions,
54 Hasher<DocumentPath> pathHasher) {
55 this.name = name;
56 this.partitions.putAll(checkNotNull(partitions));
57 this.pathHasher = checkNotNull(pathHasher);
58 }
59
60 @Override
61 public String name() {
62 return name;
63 }
64
65 @Override
66 public DocumentPath root() {
67 return DocumentPath.ROOT;
68 }
69
70 /**
71 * Returns the document tree (partition) to which the specified path maps.
72 *
73 * @param path path
74 * @return AsyncConsistentMap to which path maps
75 */
76 private AsyncDocumentTree<V> partition(DocumentPath path) {
77 return partitions.get(pathHasher.hash(path));
78 }
79
80 /**
81 * Returns all the constituent trees.
82 *
83 * @return collection of partitions.
84 */
85 private Collection<AsyncDocumentTree<V>> partitions() {
86 return partitions.values();
87 }
88
89 @Override
90 public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
91 return Tools.allOf(partitions().stream()
92 .map(partition -> partition.getChildren(path).exceptionally(r -> null))
93 .collect(Collectors.toList())).thenApply(allChildren -> {
94 Map<String, Versioned<V>> children = Maps.newLinkedHashMap();
95 allChildren.stream().filter(Objects::nonNull).forEach(children::putAll);
96 return children;
97 });
98 }
99
100 @Override
101 public CompletableFuture<Versioned<V>> get(DocumentPath path) {
102 return partition(path).get(path);
103 }
104
105 @Override
106 public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
107 return partition(path).set(path, value);
108 }
109
110 @Override
111 public CompletableFuture<Boolean> create(DocumentPath path, V value) {
Yuta HIGUCHIff9af3e2017-09-12 13:21:13 -0700112 if (path.parent() == null) {
113 // create value on root
114 return partition(path).createRecursive(path, value);
115 }
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700116 // TODO: This operation is not atomic
Yuta HIGUCHIff9af3e2017-09-12 13:21:13 -0700117 return partition(path.parent()).get(path.parent()).thenCompose(parentValue -> {
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700118 if (parentValue == null) {
Yuta HIGUCHI30161e72017-09-11 16:38:07 -0700119 return Tools.exceptionalFuture(new NoSuchDocumentPathException(String.valueOf(path.parent())));
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700120 } else {
Yuta HIGUCHIff9af3e2017-09-12 13:21:13 -0700121 // not atomic: parent did exist at some point, so moving forward
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700122 return partition(path).createRecursive(path, value);
123 }
124 });
125 }
126
127 @Override
128 public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
129 return partition(path).createRecursive(path, value);
130 }
131
132 @Override
133 public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
134 return partition(path).replace(path, newValue, version);
135 }
136
137 @Override
138 public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
139 return partition(path).replace(path, newValue, currentValue);
140 }
141
142 @Override
143 public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
144 return partition(path).removeNode(path);
145 }
146
147 @Override
148 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
149 return CompletableFuture.allOf(partitions().stream()
150 .map(map -> map.addListener(path, listener))
151 .toArray(CompletableFuture[]::new));
152 }
153
154 @Override
155 public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
156 return CompletableFuture.allOf(partitions().stream()
157 .map(map -> map.removeListener(listener))
158 .toArray(CompletableFuture[]::new));
159 }
160
161 @Override
162 public CompletableFuture<Version> begin(TransactionId transactionId) {
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800163 return partitions().stream()
164 .map(p -> p.begin(transactionId))
165 // returning lowest Version
166 .reduce((f1, f2) -> f1.thenCombine(f2, Tools::min))
167 .orElse(Tools.exceptionalFuture(new IllegalStateException("Empty partitions?")));
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700168 }
169
170 @Override
171 public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800172 Map<AsyncDocumentTree<V>, List<NodeUpdate<V>>> perPart =
173 transactionLog.records().stream()
174 .collect(Collectors.groupingBy(nu -> partition(nu.path())));
175
176 // must walk all partitions to ensure empty TransactionLog will
177 // be issued against no-op partitions in order for commit to succeed
178 return partitions().stream()
179 .map(p -> p.prepare(new TransactionLog<>(transactionLog.transactionId(),
180 transactionLog.version(),
181 perPart.getOrDefault(p, ImmutableList.of()))))
182 .reduce((f1, f2) -> f1.thenCombine(f2, Boolean::logicalAnd))
183 .orElse(CompletableFuture.completedFuture(true));
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700184 }
185
186 @Override
187 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800188 // Note: cannot call prepareAndCommit on each partition,
189 // must check all partitions are prepare()-ed first.
190 return prepare(transactionLog)
191 .thenApply(prepOk -> {
192 if (prepOk) {
193 commit(transactionLog.transactionId());
194 return true;
195 } else {
196 return false;
197 }
198 });
199
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700200 }
201
202 @Override
203 public CompletableFuture<Void> commit(TransactionId transactionId) {
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800204 return CompletableFuture.allOf(partitions().stream()
205 .map(p -> p.commit(transactionId))
206 .toArray(CompletableFuture[]::new)
207 );
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700208 }
209
210 @Override
211 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Yuta HIGUCHI47d96092017-11-17 14:05:26 -0800212 return CompletableFuture.allOf(partitions().stream()
213 .map(p -> p.rollback(transactionId))
214 .toArray(CompletableFuture[]::new)
215 );
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700216 }
217}