blob: 1730fe843e6e5e76d78e4cf939117ee5a541a738 [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
Madan Jampani79924fa2016-09-13 13:57:03 -070019import java.util.HashMap;
20import java.util.List;
21import java.util.Map;
Madan Jampani4c8e3fe2016-09-16 16:20:28 -070022import java.util.Optional;
Madan Jampani79924fa2016-09-13 13:57:03 -070023import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.Executor;
25
Jordan Halterman2bf177c2017-06-29 01:49:08 -070026import com.google.common.util.concurrent.MoreExecutors;
27import io.atomix.protocols.raft.proxy.RaftProxy;
28import org.onlab.util.KryoNamespace;
Madan Jampani79924fa2016-09-13 13:57:03 -070029import org.onlab.util.Match;
30import org.onlab.util.Tools;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070031import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
32import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
33import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
34import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
35import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
36import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani79924fa2016-09-13 13:57:03 -070037import org.onosproject.store.service.AsyncDocumentTree;
38import org.onosproject.store.service.DocumentPath;
39import org.onosproject.store.service.DocumentTreeEvent;
40import org.onosproject.store.service.DocumentTreeListener;
41import org.onosproject.store.service.IllegalDocumentModificationException;
42import org.onosproject.store.service.NoSuchDocumentPathException;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070043import org.onosproject.store.service.Serializer;
Madan Jampani79924fa2016-09-13 13:57:03 -070044import org.onosproject.store.service.Versioned;
45
Jordan Halterman2bf177c2017-06-29 01:49:08 -070046import static com.google.common.base.Preconditions.checkNotNull;
47import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
48import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
49import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
50import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
51import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
52import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
53import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
54import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
55import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
56import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
Madan Jampani79924fa2016-09-13 13:57:03 -070057
58/**
59 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
60 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061public class AtomixDocumentTree extends AbstractRaftPrimitive implements AsyncDocumentTree<byte[]> {
62 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
63 .register(KryoNamespaces.BASIC)
64 .register(AtomixDocumentTreeOperations.NAMESPACE)
65 .register(AtomixDocumentTreeEvents.NAMESPACE)
66 .build());
Madan Jampani79924fa2016-09-13 13:57:03 -070067
Madan Jampani98094222016-09-15 21:12:46 -070068 private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
Madan Jampani79924fa2016-09-13 13:57:03 -070069
Jordan Halterman2bf177c2017-06-29 01:49:08 -070070 public AtomixDocumentTree(RaftProxy proxy) {
71 super(proxy);
72 proxy.addStateChangeListener(state -> {
73 if (state == RaftProxy.State.CONNECTED && isListening()) {
74 proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen());
75 }
Madan Jampani79924fa2016-09-13 13:57:03 -070076 });
Jordan Halterman2bf177c2017-06-29 01:49:08 -070077 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::processTreeUpdates);
Madan Jampani79924fa2016-09-13 13:57:03 -070078 }
79
80 @Override
81 public Type primitiveType() {
82 return Type.DOCUMENT_TREE;
83 }
84
85 @Override
86 public CompletableFuture<Void> destroy() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -070087 return proxy.invoke(CLEAR);
Madan Jampani79924fa2016-09-13 13:57:03 -070088 }
89
90 @Override
91 public DocumentPath root() {
92 return DocumentPath.from("root");
93 }
94
95 @Override
96 public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -070097 return proxy.invoke(GET_CHILDREN, SERIALIZER::encode, new GetChildren(checkNotNull(path)), SERIALIZER::decode);
Madan Jampani79924fa2016-09-13 13:57:03 -070098 }
99
100 @Override
101 public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700102 return proxy.invoke(GET, SERIALIZER::encode, new Get(checkNotNull(path)), SERIALIZER::decode);
Madan Jampani79924fa2016-09-13 13:57:03 -0700103 }
104
105 @Override
106 public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700107 return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
108 SERIALIZER::encode,
109 new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()),
110 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700111 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700112 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700113 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700114 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700115 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
116 } else {
117 return CompletableFuture.completedFuture(result);
118 }
119 }).thenApply(result -> result.oldValue());
120 }
121
122 @Override
123 public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
Madan Jampani86983282016-09-15 14:55:48 -0700124 return createInternal(path, value)
125 .thenCompose(status -> {
126 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700127 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
Madan Jampani79924fa2016-09-13 13:57:03 -0700128 }
Madan Jampani86983282016-09-15 14:55:48 -0700129 return CompletableFuture.completedFuture(true);
130 });
131 }
132
133 @Override
134 public CompletableFuture<Boolean> createRecursive(DocumentPath path, byte[] value) {
135 return createInternal(path, value)
136 .thenCompose(status -> {
137 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700138 return createRecursive(path.parent(), null)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700139 .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
Madan Jampani86983282016-09-15 14:55:48 -0700140 }
141 return CompletableFuture.completedFuture(status == OK);
142 });
Madan Jampani79924fa2016-09-13 13:57:03 -0700143 }
144
145 @Override
146 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700147 return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
148 SERIALIZER::encode,
149 new Update(checkNotNull(path),
150 Optional.ofNullable(newValue),
151 Match.any(),
152 Match.ifValue(version)), SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700153 .thenApply(result -> result.updated());
154 }
155
156 @Override
157 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700158 return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
159 SERIALIZER::encode,
160 new Update(checkNotNull(path),
161 Optional.ofNullable(newValue),
162 Match.ifValue(currentValue),
163 Match.any()),
164 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700165 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700166 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700167 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700168 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700169 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
170 } else {
171 return CompletableFuture.completedFuture(result);
172 }
173 }).thenApply(result -> result.updated());
174 }
175
176 @Override
177 public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
178 if (path.equals(DocumentPath.from("root"))) {
179 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
180 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700181 return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
182 SERIALIZER::encode,
183 new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()),
184 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700185 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700186 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700187 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700188 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700189 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
190 } else {
191 return CompletableFuture.completedFuture(result);
192 }
193 }).thenApply(result -> result.oldValue());
194 }
195
196 @Override
197 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
198 checkNotNull(path);
199 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700200 InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
Madan Jampani79924fa2016-09-13 13:57:03 -0700201 // TODO: Support API that takes an executor
Madan Jampani98094222016-09-15 21:12:46 -0700202 if (!eventListeners.containsKey(listener)) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700203 return proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen(path))
204 .thenRun(() -> eventListeners.put(listener, internalListener));
Madan Jampani79924fa2016-09-13 13:57:03 -0700205 }
Madan Jampani98094222016-09-15 21:12:46 -0700206 return CompletableFuture.completedFuture(null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700207 }
208
209 @Override
210 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
211 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700212 InternalListener internalListener = eventListeners.remove(listener);
213 if (internalListener != null && eventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700214 return proxy.invoke(REMOVE_LISTENER, SERIALIZER::encode, new Unlisten(internalListener.path))
215 .thenApply(v -> null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700216 }
217 return CompletableFuture.completedFuture(null);
218 }
219
Madan Jampani86983282016-09-15 14:55:48 -0700220 private CompletableFuture<DocumentTreeUpdateResult.Status> createInternal(DocumentPath path, byte[] value) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700221 return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
222 SERIALIZER::encode,
223 new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()),
224 SERIALIZER::decode)
225 .thenApply(result -> result.status());
Madan Jampani86983282016-09-15 14:55:48 -0700226 }
227
Madan Jampani79924fa2016-09-13 13:57:03 -0700228 private boolean isListening() {
229 return !eventListeners.isEmpty();
230 }
231
232 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
Madan Jampani98094222016-09-15 21:12:46 -0700233 events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
234 }
235
236 private class InternalListener implements DocumentTreeListener<byte[]> {
237
238 private final DocumentPath path;
239 private final DocumentTreeListener<byte[]> listener;
240 private final Executor executor;
241
242 public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
243 this.path = path;
244 this.listener = listener;
245 this.executor = executor;
246 }
247
248 @Override
249 public void event(DocumentTreeEvent<byte[]> event) {
250 if (event.path().isDescendentOf(path)) {
251 executor.execute(() -> listener.event(event));
252 }
253 }
Madan Jampani79924fa2016-09-13 13:57:03 -0700254 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700255}