blob: eadaa7e620c6b8ac967f7b00b4aac5be709ca8bd [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani79924fa2016-09-13 13:57:03 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
Madan Jampani79924fa2016-09-13 13:57:03 -070019import java.util.HashMap;
20import java.util.List;
21import java.util.Map;
Madan Jampani4c8e3fe2016-09-16 16:20:28 -070022import java.util.Optional;
Madan Jampani79924fa2016-09-13 13:57:03 -070023import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.Executor;
25
Jordan Halterman2bf177c2017-06-29 01:49:08 -070026import com.google.common.util.concurrent.MoreExecutors;
27import io.atomix.protocols.raft.proxy.RaftProxy;
28import org.onlab.util.KryoNamespace;
Madan Jampani79924fa2016-09-13 13:57:03 -070029import org.onlab.util.Match;
30import org.onlab.util.Tools;
Sithara Punnassery61a80252017-08-07 11:16:08 -070031import org.onosproject.store.primitives.NodeUpdate;
32import org.onosproject.store.primitives.TransactionId;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070033import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
34import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
35import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
36import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
37import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
Sithara Punnassery61a80252017-08-07 11:16:08 -070038import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionBegin;
39import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepare;
40import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionPrepareAndCommit;
41import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionCommit;
42import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.TransactionRollback;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070043import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani79924fa2016-09-13 13:57:03 -070044import org.onosproject.store.service.AsyncDocumentTree;
45import org.onosproject.store.service.DocumentPath;
46import org.onosproject.store.service.DocumentTreeEvent;
47import org.onosproject.store.service.DocumentTreeListener;
48import org.onosproject.store.service.IllegalDocumentModificationException;
49import org.onosproject.store.service.NoSuchDocumentPathException;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070050import org.onosproject.store.service.Serializer;
Sithara Punnassery61a80252017-08-07 11:16:08 -070051import org.onosproject.store.service.TransactionLog;
52import org.onosproject.store.service.Version;
Madan Jampani79924fa2016-09-13 13:57:03 -070053import org.onosproject.store.service.Versioned;
54
Jordan Halterman2bf177c2017-06-29 01:49:08 -070055import static com.google.common.base.Preconditions.checkNotNull;
56import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
Sithara Punnassery61a80252017-08-07 11:16:08 -070057import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.BEGIN;
58import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE;
59import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.PREPARE_AND_COMMIT;
60import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.COMMIT;
61import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ROLLBACK;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070062import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
63import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
64import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070065import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
Sithara Punnassery61a80252017-08-07 11:16:08 -070066import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
67import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
Jordan Haltermane853d032017-08-01 15:10:28 -070068import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.ILLEGAL_MODIFICATION;
69import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.INVALID_PATH;
70import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.OK;
Madan Jampani79924fa2016-09-13 13:57:03 -070071
72/**
73 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
74 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070075public class AtomixDocumentTree extends AbstractRaftPrimitive implements AsyncDocumentTree<byte[]> {
76 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
77 .register(KryoNamespaces.BASIC)
78 .register(AtomixDocumentTreeOperations.NAMESPACE)
79 .register(AtomixDocumentTreeEvents.NAMESPACE)
80 .build());
Madan Jampani79924fa2016-09-13 13:57:03 -070081
Madan Jampani98094222016-09-15 21:12:46 -070082 private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
Madan Jampani79924fa2016-09-13 13:57:03 -070083
Jordan Halterman2bf177c2017-06-29 01:49:08 -070084 public AtomixDocumentTree(RaftProxy proxy) {
85 super(proxy);
86 proxy.addStateChangeListener(state -> {
87 if (state == RaftProxy.State.CONNECTED && isListening()) {
88 proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen());
89 }
Madan Jampani79924fa2016-09-13 13:57:03 -070090 });
Jordan Halterman2bf177c2017-06-29 01:49:08 -070091 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::processTreeUpdates);
Madan Jampani79924fa2016-09-13 13:57:03 -070092 }
93
94 @Override
95 public Type primitiveType() {
96 return Type.DOCUMENT_TREE;
97 }
98
99 @Override
100 public CompletableFuture<Void> destroy() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700101 return proxy.invoke(CLEAR);
Madan Jampani79924fa2016-09-13 13:57:03 -0700102 }
103
104 @Override
105 public DocumentPath root() {
Jordan Haltermancb1e02c2017-08-25 16:20:43 -0700106 return DocumentPath.ROOT;
Madan Jampani79924fa2016-09-13 13:57:03 -0700107 }
108
109 @Override
110 public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700111 return proxy.<GetChildren, DocumentTreeResult<Map<String, Versioned<byte[]>>>>invoke(
112 GET_CHILDREN,
113 SERIALIZER::encode,
114 new GetChildren(checkNotNull(path)),
115 SERIALIZER::decode)
116 .thenCompose(result -> {
117 if (result.status() == INVALID_PATH) {
118 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
119 } else if (result.status() == ILLEGAL_MODIFICATION) {
120 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
121 } else {
122 return CompletableFuture.completedFuture(result);
123 }
124 }).thenApply(result -> result.result());
Madan Jampani79924fa2016-09-13 13:57:03 -0700125 }
126
127 @Override
128 public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700129 return proxy.invoke(GET, SERIALIZER::encode, new Get(checkNotNull(path)), SERIALIZER::decode);
Madan Jampani79924fa2016-09-13 13:57:03 -0700130 }
131
132 @Override
133 public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700134 return proxy.<Update, DocumentTreeResult<Versioned<byte[]>>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700135 SERIALIZER::encode,
136 new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()),
137 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700138 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700139 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700140 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700141 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700142 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
143 } else {
144 return CompletableFuture.completedFuture(result);
145 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700146 }).thenApply(result -> result.result());
Madan Jampani79924fa2016-09-13 13:57:03 -0700147 }
148
149 @Override
150 public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
Madan Jampani86983282016-09-15 14:55:48 -0700151 return createInternal(path, value)
152 .thenCompose(status -> {
153 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700154 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
Madan Jampani79924fa2016-09-13 13:57:03 -0700155 }
Madan Jampani86983282016-09-15 14:55:48 -0700156 return CompletableFuture.completedFuture(true);
157 });
158 }
159
160 @Override
161 public CompletableFuture<Boolean> createRecursive(DocumentPath path, byte[] value) {
162 return createInternal(path, value)
163 .thenCompose(status -> {
164 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700165 return createRecursive(path.parent(), null)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700166 .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
Madan Jampani86983282016-09-15 14:55:48 -0700167 }
168 return CompletableFuture.completedFuture(status == OK);
169 });
Madan Jampani79924fa2016-09-13 13:57:03 -0700170 }
171
172 @Override
173 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700174 return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700175 SERIALIZER::encode,
176 new Update(checkNotNull(path),
177 Optional.ofNullable(newValue),
178 Match.any(),
179 Match.ifValue(version)), SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700180 .thenApply(result -> result.updated());
181 }
182
183 @Override
184 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700185 return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700186 SERIALIZER::encode,
187 new Update(checkNotNull(path),
188 Optional.ofNullable(newValue),
189 Match.ifValue(currentValue),
190 Match.any()),
191 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700192 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700193 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700194 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700195 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700196 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
197 } else {
198 return CompletableFuture.completedFuture(result);
199 }
200 }).thenApply(result -> result.updated());
201 }
202
203 @Override
204 public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
205 if (path.equals(DocumentPath.from("root"))) {
206 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
207 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700208 return proxy.<Update, DocumentTreeResult<Versioned<byte[]>>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700209 SERIALIZER::encode,
210 new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()),
211 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700212 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700213 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700214 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700215 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700216 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
217 } else {
218 return CompletableFuture.completedFuture(result);
219 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700220 }).thenApply(result -> result.result());
Madan Jampani79924fa2016-09-13 13:57:03 -0700221 }
222
223 @Override
224 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
225 checkNotNull(path);
226 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700227 InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
Madan Jampani79924fa2016-09-13 13:57:03 -0700228 // TODO: Support API that takes an executor
Madan Jampani98094222016-09-15 21:12:46 -0700229 if (!eventListeners.containsKey(listener)) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 return proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen(path))
231 .thenRun(() -> eventListeners.put(listener, internalListener));
Madan Jampani79924fa2016-09-13 13:57:03 -0700232 }
Madan Jampani98094222016-09-15 21:12:46 -0700233 return CompletableFuture.completedFuture(null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700234 }
235
236 @Override
237 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
238 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700239 InternalListener internalListener = eventListeners.remove(listener);
240 if (internalListener != null && eventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700241 return proxy.invoke(REMOVE_LISTENER, SERIALIZER::encode, new Unlisten(internalListener.path))
242 .thenApply(v -> null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700243 }
244 return CompletableFuture.completedFuture(null);
245 }
246
Jordan Haltermane853d032017-08-01 15:10:28 -0700247 private CompletableFuture<DocumentTreeResult.Status> createInternal(DocumentPath path, byte[] value) {
248 return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700249 SERIALIZER::encode,
250 new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()),
251 SERIALIZER::decode)
252 .thenApply(result -> result.status());
Madan Jampani86983282016-09-15 14:55:48 -0700253 }
254
Madan Jampani79924fa2016-09-13 13:57:03 -0700255 private boolean isListening() {
256 return !eventListeners.isEmpty();
257 }
258
259 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
Madan Jampani98094222016-09-15 21:12:46 -0700260 events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
261 }
262
Sithara Punnassery61a80252017-08-07 11:16:08 -0700263 @Override
264 public CompletableFuture<Version> begin(TransactionId transactionId) {
265 return proxy.<TransactionBegin, Long>invoke(
266 BEGIN,
267 SERIALIZER::encode,
268 new TransactionBegin(transactionId),
269 SERIALIZER::decode)
270 .thenApply(Version::new);
271 }
272
273 @Override
274 public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
275 return proxy.<TransactionPrepare, PrepareResult>invoke(
276 PREPARE,
277 SERIALIZER::encode,
278 new TransactionPrepare(transactionLog),
279 SERIALIZER::decode)
280 .thenApply(v -> v == PrepareResult.OK);
281 }
282
283 @Override
284 public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<byte[]>> transactionLog) {
285 return proxy.<TransactionPrepareAndCommit, PrepareResult>invoke(
286 PREPARE_AND_COMMIT,
287 SERIALIZER::encode,
288 new TransactionPrepareAndCommit(transactionLog),
289 SERIALIZER::decode)
290 .thenApply(v -> v == PrepareResult.OK);
291 }
292
293 @Override
294 public CompletableFuture<Void> commit(TransactionId transactionId) {
295 return proxy.<TransactionCommit, CommitResult>invoke(
296 COMMIT,
297 SERIALIZER::encode,
298 new TransactionCommit(transactionId),
299 SERIALIZER::decode)
300 .thenApply(v -> null);
301 }
302
303 @Override
304 public CompletableFuture<Void> rollback(TransactionId transactionId) {
305 return proxy.invoke(
306 ROLLBACK,
307 SERIALIZER::encode,
308 new TransactionRollback(transactionId),
309 SERIALIZER::decode)
310 .thenApply(v -> null);
311 }
312
Madan Jampani98094222016-09-15 21:12:46 -0700313 private class InternalListener implements DocumentTreeListener<byte[]> {
314
315 private final DocumentPath path;
316 private final DocumentTreeListener<byte[]> listener;
317 private final Executor executor;
318
319 public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
320 this.path = path;
321 this.listener = listener;
322 this.executor = executor;
323 }
324
325 @Override
326 public void event(DocumentTreeEvent<byte[]> event) {
327 if (event.path().isDescendentOf(path)) {
328 executor.execute(() -> listener.event(event));
329 }
330 }
Madan Jampani79924fa2016-09-13 13:57:03 -0700331 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700332}