blob: 3875e5564f1a3c637f6387977eaff875bf0022f9 [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani86983282016-09-15 14:55:48 -070020import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
21import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
22import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
Madan Jampani79924fa2016-09-13 13:57:03 -070023import io.atomix.copycat.client.CopycatClient;
24import io.atomix.resource.AbstractResource;
25import io.atomix.resource.ResourceTypeInfo;
26
27import java.util.HashMap;
28import java.util.List;
29import java.util.Map;
Madan Jampani4c8e3fe2016-09-16 16:20:28 -070030import java.util.Optional;
Madan Jampani79924fa2016-09-13 13:57:03 -070031import java.util.Properties;
32import java.util.concurrent.CompletableFuture;
33import java.util.concurrent.Executor;
34
35import org.onlab.util.Match;
36import org.onlab.util.Tools;
Madan Jampani79924fa2016-09-13 13:57:03 -070037import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
38import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
39import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
40import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
Madan Jampani98094222016-09-15 21:12:46 -070041import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
Madan Jampani79924fa2016-09-13 13:57:03 -070042import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
43import org.onosproject.store.service.AsyncDocumentTree;
44import org.onosproject.store.service.DocumentPath;
45import org.onosproject.store.service.DocumentTreeEvent;
46import org.onosproject.store.service.DocumentTreeListener;
47import org.onosproject.store.service.IllegalDocumentModificationException;
48import org.onosproject.store.service.NoSuchDocumentPathException;
49import org.onosproject.store.service.Versioned;
50
51import com.google.common.util.concurrent.MoreExecutors;
52
53/**
54 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
55 */
56@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
57public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
58 implements AsyncDocumentTree<byte[]> {
59
Madan Jampani98094222016-09-15 21:12:46 -070060 private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
Madan Jampani79924fa2016-09-13 13:57:03 -070061 public static final String CHANGE_SUBJECT = "changeEvents";
62
63 protected AtomixDocumentTree(CopycatClient client, Properties options) {
64 super(client, options);
65 }
66
67 @Override
68 public CompletableFuture<AtomixDocumentTree> open() {
69 return super.open().thenApply(result -> {
70 client.onStateChange(state -> {
71 if (state == CopycatClient.State.CONNECTED && isListening()) {
72 client.submit(new Listen());
73 }
74 });
75 client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
76 return result;
77 });
78 }
79
80 @Override
81 public String name() {
82 return null;
83 }
84
85 @Override
86 public Type primitiveType() {
87 return Type.DOCUMENT_TREE;
88 }
89
90 @Override
91 public CompletableFuture<Void> destroy() {
92 return client.submit(new Clear());
93 }
94
95 @Override
96 public DocumentPath root() {
97 return DocumentPath.from("root");
98 }
99
100 @Override
101 public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
102 return client.submit(new GetChildren(checkNotNull(path)));
103 }
104
105 @Override
106 public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
107 return client.submit(new Get(checkNotNull(path)));
108 }
109
110 @Override
111 public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700112 return client.submit(new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()))
Madan Jampani79924fa2016-09-13 13:57:03 -0700113 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700114 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700115 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700116 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700117 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
118 } else {
119 return CompletableFuture.completedFuture(result);
120 }
121 }).thenApply(result -> result.oldValue());
122 }
123
124 @Override
125 public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
Madan Jampani86983282016-09-15 14:55:48 -0700126 return createInternal(path, value)
127 .thenCompose(status -> {
128 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700129 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
Madan Jampani79924fa2016-09-13 13:57:03 -0700130 }
Madan Jampani86983282016-09-15 14:55:48 -0700131 return CompletableFuture.completedFuture(true);
132 });
133 }
134
135 @Override
136 public CompletableFuture<Boolean> createRecursive(DocumentPath path, byte[] value) {
137 return createInternal(path, value)
138 .thenCompose(status -> {
139 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700140 return createRecursive(path.parent(), null)
Madan Jampani86983282016-09-15 14:55:48 -0700141 .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
142 }
143 return CompletableFuture.completedFuture(status == OK);
144 });
Madan Jampani79924fa2016-09-13 13:57:03 -0700145 }
146
147 @Override
148 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700149 return client.submit(new Update(checkNotNull(path),
150 Optional.ofNullable(newValue),
151 Match.any(),
152 Match.ifValue(version)))
Madan Jampani79924fa2016-09-13 13:57:03 -0700153 .thenApply(result -> result.updated());
154 }
155
156 @Override
157 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700158 return client.submit(new Update(checkNotNull(path),
159 Optional.ofNullable(newValue),
160 Match.ifValue(currentValue),
161 Match.any()))
Madan Jampani79924fa2016-09-13 13:57:03 -0700162 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700163 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700164 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700165 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700166 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
167 } else {
168 return CompletableFuture.completedFuture(result);
169 }
170 }).thenApply(result -> result.updated());
171 }
172
173 @Override
174 public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
175 if (path.equals(DocumentPath.from("root"))) {
176 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
177 }
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700178 return client.submit(new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()))
Madan Jampani79924fa2016-09-13 13:57:03 -0700179 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700180 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700181 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700182 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700183 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
184 } else {
185 return CompletableFuture.completedFuture(result);
186 }
187 }).thenApply(result -> result.oldValue());
188 }
189
190 @Override
191 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
192 checkNotNull(path);
193 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700194 InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
Madan Jampani79924fa2016-09-13 13:57:03 -0700195 // TODO: Support API that takes an executor
Madan Jampani98094222016-09-15 21:12:46 -0700196 if (!eventListeners.containsKey(listener)) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700197 return client.submit(new Listen(path))
Madan Jampani98094222016-09-15 21:12:46 -0700198 .thenRun(() -> eventListeners.put(listener, internalListener));
Madan Jampani79924fa2016-09-13 13:57:03 -0700199 }
Madan Jampani98094222016-09-15 21:12:46 -0700200 return CompletableFuture.completedFuture(null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700201 }
202
203 @Override
204 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
205 checkNotNull(listener);
Madan Jampani98094222016-09-15 21:12:46 -0700206 InternalListener internalListener = eventListeners.remove(listener);
207 if (internalListener != null && eventListeners.isEmpty()) {
208 return client.submit(new Unlisten(internalListener.path)).thenApply(v -> null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700209 }
210 return CompletableFuture.completedFuture(null);
211 }
212
Madan Jampani86983282016-09-15 14:55:48 -0700213 private CompletableFuture<DocumentTreeUpdateResult.Status> createInternal(DocumentPath path, byte[] value) {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700214 return client.submit(new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()))
Madan Jampani86983282016-09-15 14:55:48 -0700215 .thenApply(result -> result.status());
216 }
217
Madan Jampani79924fa2016-09-13 13:57:03 -0700218 private boolean isListening() {
219 return !eventListeners.isEmpty();
220 }
221
222 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
Madan Jampani98094222016-09-15 21:12:46 -0700223 events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
224 }
225
226 private class InternalListener implements DocumentTreeListener<byte[]> {
227
228 private final DocumentPath path;
229 private final DocumentTreeListener<byte[]> listener;
230 private final Executor executor;
231
232 public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
233 this.path = path;
234 this.listener = listener;
235 this.executor = executor;
236 }
237
238 @Override
239 public void event(DocumentTreeEvent<byte[]> event) {
240 if (event.path().isDescendentOf(path)) {
241 executor.execute(() -> listener.event(event));
242 }
243 }
Madan Jampani79924fa2016-09-13 13:57:03 -0700244 }
245}