blob: 77f9b98a723b886c7a8ba0f8b0dbce5870ab75e1 [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani86983282016-09-15 14:55:48 -070020import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
21import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
22import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
Madan Jampani79924fa2016-09-13 13:57:03 -070023import io.atomix.copycat.client.CopycatClient;
24import io.atomix.resource.AbstractResource;
25import io.atomix.resource.ResourceTypeInfo;
26
27import java.util.HashMap;
28import java.util.List;
29import java.util.Map;
30import java.util.Properties;
31import java.util.concurrent.CompletableFuture;
32import java.util.concurrent.Executor;
33
34import org.onlab.util.Match;
35import org.onlab.util.Tools;
Madan Jampani79924fa2016-09-13 13:57:03 -070036import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
37import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
38import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
39import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
Madan Jampani98094222016-09-15 21:12:46 -070040import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
Madan Jampani79924fa2016-09-13 13:57:03 -070041import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
42import org.onosproject.store.service.AsyncDocumentTree;
43import org.onosproject.store.service.DocumentPath;
44import org.onosproject.store.service.DocumentTreeEvent;
45import org.onosproject.store.service.DocumentTreeListener;
46import org.onosproject.store.service.IllegalDocumentModificationException;
47import org.onosproject.store.service.NoSuchDocumentPathException;
48import org.onosproject.store.service.Versioned;
49
50import com.google.common.util.concurrent.MoreExecutors;
51
52/**
53 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
54 */
55@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
56public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
57 implements AsyncDocumentTree<byte[]> {
58
Madan Jampani98094222016-09-15 21:12:46 -070059 private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
Madan Jampani79924fa2016-09-13 13:57:03 -070060 public static final String CHANGE_SUBJECT = "changeEvents";
61
62 protected AtomixDocumentTree(CopycatClient client, Properties options) {
63 super(client, options);
64 }
65
66 @Override
67 public CompletableFuture<AtomixDocumentTree> open() {
68 return super.open().thenApply(result -> {
69 client.onStateChange(state -> {
70 if (state == CopycatClient.State.CONNECTED && isListening()) {
71 client.submit(new Listen());
72 }
73 });
74 client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
75 return result;
76 });
77 }
78
79 @Override
80 public String name() {
81 return null;
82 }
83
84 @Override
85 public Type primitiveType() {
86 return Type.DOCUMENT_TREE;
87 }
88
89 @Override
90 public CompletableFuture<Void> destroy() {
91 return client.submit(new Clear());
92 }
93
94 @Override
95 public DocumentPath root() {
96 return DocumentPath.from("root");
97 }
98
99 @Override
100 public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
101 return client.submit(new GetChildren(checkNotNull(path)));
102 }
103
104 @Override
105 public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
106 return client.submit(new Get(checkNotNull(path)));
107 }
108
109 @Override
110 public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
111 return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.any(), Match.any()))
112 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700113 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700114 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700115 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700116 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
117 } else {
118 return CompletableFuture.completedFuture(result);
119 }
120 }).thenApply(result -> result.oldValue());
121 }
122
123 @Override
124 public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
Madan Jampani86983282016-09-15 14:55:48 -0700125 return createInternal(path, value)
126 .thenCompose(status -> {
127 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700128 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
Madan Jampani79924fa2016-09-13 13:57:03 -0700129 }
Madan Jampani86983282016-09-15 14:55:48 -0700130 return CompletableFuture.completedFuture(true);
131 });
132 }
133
134 @Override
135 public CompletableFuture<Boolean> createRecursive(DocumentPath path, byte[] value) {
136 return createInternal(path, value)
137 .thenCompose(status -> {
138 if (status == ILLEGAL_MODIFICATION) {
139 return createRecursive(path.parent(), new byte[0])
140 .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
141 }
142 return CompletableFuture.completedFuture(status == OK);
143 });
Madan Jampani79924fa2016-09-13 13:57:03 -0700144 }
145
146 @Override
147 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
148 return client.submit(new Update(checkNotNull(path), newValue, Match.any(), Match.ifValue(version)))
149 .thenApply(result -> result.updated());
150 }
151
152 @Override
153 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
154 return client.submit(new Update(checkNotNull(path), newValue, Match.ifValue(currentValue), Match.any()))
155 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700156 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700157 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700158 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700159 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
160 } else {
161 return CompletableFuture.completedFuture(result);
162 }
163 }).thenApply(result -> result.updated());
164 }
165
166 @Override
167 public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
168 if (path.equals(DocumentPath.from("root"))) {
169 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
170 }
171 return client.submit(new Update(checkNotNull(path), null, Match.ifNotNull(), Match.any()))
172 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700173 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700174 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700175 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700176 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
177 } else {
178 return CompletableFuture.completedFuture(result);
179 }
180 }).thenApply(result -> result.oldValue());
181 }
182
183 @Override
184 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
185 checkNotNull(path);
186 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700187 InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
Madan Jampani79924fa2016-09-13 13:57:03 -0700188 // TODO: Support API that takes an executor
Madan Jampani98094222016-09-15 21:12:46 -0700189 if (!eventListeners.containsKey(listener)) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700190 return client.submit(new Listen(path))
Madan Jampani98094222016-09-15 21:12:46 -0700191 .thenRun(() -> eventListeners.put(listener, internalListener));
Madan Jampani79924fa2016-09-13 13:57:03 -0700192 }
Madan Jampani98094222016-09-15 21:12:46 -0700193 return CompletableFuture.completedFuture(null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700194 }
195
196 @Override
197 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
198 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700199 InternalListener internalListener = eventListeners.remove(listener);
200 if (internalListener != null && eventListeners.isEmpty()) {
201 return client.submit(new Unlisten(internalListener.path)).thenApply(v -> null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700202 }
203 return CompletableFuture.completedFuture(null);
204 }
205
Madan Jampani86983282016-09-15 14:55:48 -0700206 private CompletableFuture<DocumentTreeUpdateResult.Status> createInternal(DocumentPath path, byte[] value) {
207 return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.ifNull(), Match.any()))
208 .thenApply(result -> result.status());
209 }
210
Madan Jampani79924fa2016-09-13 13:57:03 -0700211 private boolean isListening() {
212 return !eventListeners.isEmpty();
213 }
214
215 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
Madan Jampani98094222016-09-15 21:12:46 -0700216 events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
217 }
218
219 private class InternalListener implements DocumentTreeListener<byte[]> {
220
221 private final DocumentPath path;
222 private final DocumentTreeListener<byte[]> listener;
223 private final Executor executor;
224
225 public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
226 this.path = path;
227 this.listener = listener;
228 this.executor = executor;
229 }
230
231 @Override
232 public void event(DocumentTreeEvent<byte[]> event) {
233 if (event.path().isDescendentOf(path)) {
234 executor.execute(() -> listener.event(event));
235 }
236 }
Madan Jampani79924fa2016-09-13 13:57:03 -0700237 }
238}