blob: 99f8df6ff5c4178a40544d821ebb23e0502a2bdc [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani79924fa2016-09-13 13:57:03 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
Madan Jampani79924fa2016-09-13 13:57:03 -070019import java.util.HashMap;
20import java.util.List;
21import java.util.Map;
Madan Jampani4c8e3fe2016-09-16 16:20:28 -070022import java.util.Optional;
Madan Jampani79924fa2016-09-13 13:57:03 -070023import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.Executor;
25
Jordan Halterman2bf177c2017-06-29 01:49:08 -070026import com.google.common.util.concurrent.MoreExecutors;
27import io.atomix.protocols.raft.proxy.RaftProxy;
28import org.onlab.util.KryoNamespace;
Madan Jampani79924fa2016-09-13 13:57:03 -070029import org.onlab.util.Match;
30import org.onlab.util.Tools;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070031import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
32import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
33import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
34import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
35import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
36import org.onosproject.store.serializers.KryoNamespaces;
Madan Jampani79924fa2016-09-13 13:57:03 -070037import org.onosproject.store.service.AsyncDocumentTree;
38import org.onosproject.store.service.DocumentPath;
39import org.onosproject.store.service.DocumentTreeEvent;
40import org.onosproject.store.service.DocumentTreeListener;
41import org.onosproject.store.service.IllegalDocumentModificationException;
42import org.onosproject.store.service.NoSuchDocumentPathException;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070043import org.onosproject.store.service.Serializer;
Madan Jampani79924fa2016-09-13 13:57:03 -070044import org.onosproject.store.service.Versioned;
45
Jordan Halterman2bf177c2017-06-29 01:49:08 -070046import static com.google.common.base.Preconditions.checkNotNull;
47import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
48import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
49import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
50import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
51import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
52import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
53import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
Jordan Haltermane853d032017-08-01 15:10:28 -070054import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.ILLEGAL_MODIFICATION;
55import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.INVALID_PATH;
56import static org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status.OK;
Madan Jampani79924fa2016-09-13 13:57:03 -070057
58/**
59 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
60 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070061public class AtomixDocumentTree extends AbstractRaftPrimitive implements AsyncDocumentTree<byte[]> {
62 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
63 .register(KryoNamespaces.BASIC)
64 .register(AtomixDocumentTreeOperations.NAMESPACE)
65 .register(AtomixDocumentTreeEvents.NAMESPACE)
66 .build());
Madan Jampani79924fa2016-09-13 13:57:03 -070067
Madan Jampani98094222016-09-15 21:12:46 -070068 private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
Madan Jampani79924fa2016-09-13 13:57:03 -070069
Jordan Halterman2bf177c2017-06-29 01:49:08 -070070 public AtomixDocumentTree(RaftProxy proxy) {
71 super(proxy);
72 proxy.addStateChangeListener(state -> {
73 if (state == RaftProxy.State.CONNECTED && isListening()) {
74 proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen());
75 }
Madan Jampani79924fa2016-09-13 13:57:03 -070076 });
Jordan Halterman2bf177c2017-06-29 01:49:08 -070077 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::processTreeUpdates);
Madan Jampani79924fa2016-09-13 13:57:03 -070078 }
79
80 @Override
81 public Type primitiveType() {
82 return Type.DOCUMENT_TREE;
83 }
84
85 @Override
86 public CompletableFuture<Void> destroy() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -070087 return proxy.invoke(CLEAR);
Madan Jampani79924fa2016-09-13 13:57:03 -070088 }
89
90 @Override
91 public DocumentPath root() {
92 return DocumentPath.from("root");
93 }
94
95 @Override
96 public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
Jordan Haltermane853d032017-08-01 15:10:28 -070097 return proxy.<GetChildren, DocumentTreeResult<Map<String, Versioned<byte[]>>>>invoke(
98 GET_CHILDREN,
99 SERIALIZER::encode,
100 new GetChildren(checkNotNull(path)),
101 SERIALIZER::decode)
102 .thenCompose(result -> {
103 if (result.status() == INVALID_PATH) {
104 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
105 } else if (result.status() == ILLEGAL_MODIFICATION) {
106 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
107 } else {
108 return CompletableFuture.completedFuture(result);
109 }
110 }).thenApply(result -> result.result());
Madan Jampani79924fa2016-09-13 13:57:03 -0700111 }
112
113 @Override
114 public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700115 return proxy.invoke(GET, SERIALIZER::encode, new Get(checkNotNull(path)), SERIALIZER::decode);
Madan Jampani79924fa2016-09-13 13:57:03 -0700116 }
117
118 @Override
119 public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700120 return proxy.<Update, DocumentTreeResult<Versioned<byte[]>>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700121 SERIALIZER::encode,
122 new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()),
123 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700124 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700125 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700126 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700127 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700128 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
129 } else {
130 return CompletableFuture.completedFuture(result);
131 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700132 }).thenApply(result -> result.result());
Madan Jampani79924fa2016-09-13 13:57:03 -0700133 }
134
135 @Override
136 public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
Madan Jampani86983282016-09-15 14:55:48 -0700137 return createInternal(path, value)
138 .thenCompose(status -> {
139 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700140 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
Madan Jampani79924fa2016-09-13 13:57:03 -0700141 }
Madan Jampani86983282016-09-15 14:55:48 -0700142 return CompletableFuture.completedFuture(true);
143 });
144 }
145
146 @Override
147 public CompletableFuture<Boolean> createRecursive(DocumentPath path, byte[] value) {
148 return createInternal(path, value)
149 .thenCompose(status -> {
150 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700151 return createRecursive(path.parent(), null)
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700152 .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
Madan Jampani86983282016-09-15 14:55:48 -0700153 }
154 return CompletableFuture.completedFuture(status == OK);
155 });
Madan Jampani79924fa2016-09-13 13:57:03 -0700156 }
157
158 @Override
159 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700160 return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700161 SERIALIZER::encode,
162 new Update(checkNotNull(path),
163 Optional.ofNullable(newValue),
164 Match.any(),
165 Match.ifValue(version)), SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700166 .thenApply(result -> result.updated());
167 }
168
169 @Override
170 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700171 return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700172 SERIALIZER::encode,
173 new Update(checkNotNull(path),
174 Optional.ofNullable(newValue),
175 Match.ifValue(currentValue),
176 Match.any()),
177 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700178 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700179 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700180 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700181 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700182 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
183 } else {
184 return CompletableFuture.completedFuture(result);
185 }
186 }).thenApply(result -> result.updated());
187 }
188
189 @Override
190 public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
191 if (path.equals(DocumentPath.from("root"))) {
192 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
193 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700194 return proxy.<Update, DocumentTreeResult<Versioned<byte[]>>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700195 SERIALIZER::encode,
196 new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()),
197 SERIALIZER::decode)
Madan Jampani79924fa2016-09-13 13:57:03 -0700198 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700199 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700200 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700201 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700202 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
203 } else {
204 return CompletableFuture.completedFuture(result);
205 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700206 }).thenApply(result -> result.result());
Madan Jampani79924fa2016-09-13 13:57:03 -0700207 }
208
209 @Override
210 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
211 checkNotNull(path);
212 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700213 InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
Madan Jampani79924fa2016-09-13 13:57:03 -0700214 // TODO: Support API that takes an executor
Madan Jampani98094222016-09-15 21:12:46 -0700215 if (!eventListeners.containsKey(listener)) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700216 return proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen(path))
217 .thenRun(() -> eventListeners.put(listener, internalListener));
Madan Jampani79924fa2016-09-13 13:57:03 -0700218 }
Madan Jampani98094222016-09-15 21:12:46 -0700219 return CompletableFuture.completedFuture(null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700220 }
221
222 @Override
223 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
224 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700225 InternalListener internalListener = eventListeners.remove(listener);
226 if (internalListener != null && eventListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700227 return proxy.invoke(REMOVE_LISTENER, SERIALIZER::encode, new Unlisten(internalListener.path))
228 .thenApply(v -> null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700229 }
230 return CompletableFuture.completedFuture(null);
231 }
232
Jordan Haltermane853d032017-08-01 15:10:28 -0700233 private CompletableFuture<DocumentTreeResult.Status> createInternal(DocumentPath path, byte[] value) {
234 return proxy.<Update, DocumentTreeResult<byte[]>>invoke(UPDATE,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700235 SERIALIZER::encode,
236 new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()),
237 SERIALIZER::decode)
238 .thenApply(result -> result.status());
Madan Jampani86983282016-09-15 14:55:48 -0700239 }
240
Madan Jampani79924fa2016-09-13 13:57:03 -0700241 private boolean isListening() {
242 return !eventListeners.isEmpty();
243 }
244
245 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
Madan Jampani98094222016-09-15 21:12:46 -0700246 events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
247 }
248
249 private class InternalListener implements DocumentTreeListener<byte[]> {
250
251 private final DocumentPath path;
252 private final DocumentTreeListener<byte[]> listener;
253 private final Executor executor;
254
255 public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
256 this.path = path;
257 this.listener = listener;
258 this.executor = executor;
259 }
260
261 @Override
262 public void event(DocumentTreeEvent<byte[]> event) {
263 if (event.path().isDescendentOf(path)) {
264 executor.execute(() -> listener.event(event));
265 }
266 }
Madan Jampani79924fa2016-09-13 13:57:03 -0700267 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700268}