blob: 8f0c73a2ce13deaa52e45a5d7a4e44d0178d637f [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani86983282016-09-15 14:55:48 -070020import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
21import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
22import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
Madan Jampani79924fa2016-09-13 13:57:03 -070023import io.atomix.copycat.client.CopycatClient;
24import io.atomix.resource.AbstractResource;
25import io.atomix.resource.ResourceTypeInfo;
26
27import java.util.HashMap;
28import java.util.List;
29import java.util.Map;
30import java.util.Properties;
31import java.util.concurrent.CompletableFuture;
32import java.util.concurrent.Executor;
33
34import org.onlab.util.Match;
35import org.onlab.util.Tools;
36import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
37import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
38import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
39import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
40import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
41import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
42import org.onosproject.store.service.AsyncDocumentTree;
43import org.onosproject.store.service.DocumentPath;
44import org.onosproject.store.service.DocumentTreeEvent;
45import org.onosproject.store.service.DocumentTreeListener;
46import org.onosproject.store.service.IllegalDocumentModificationException;
47import org.onosproject.store.service.NoSuchDocumentPathException;
48import org.onosproject.store.service.Versioned;
49
50import com.google.common.util.concurrent.MoreExecutors;
51
52/**
53 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
54 */
55@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
56public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
57 implements AsyncDocumentTree<byte[]> {
58
59 private final Map<DocumentTreeListener<byte[]>, Executor> eventListeners = new HashMap<>();
60 public static final String CHANGE_SUBJECT = "changeEvents";
61
62 protected AtomixDocumentTree(CopycatClient client, Properties options) {
63 super(client, options);
64 }
65
66 @Override
67 public CompletableFuture<AtomixDocumentTree> open() {
68 return super.open().thenApply(result -> {
69 client.onStateChange(state -> {
70 if (state == CopycatClient.State.CONNECTED && isListening()) {
71 client.submit(new Listen());
72 }
73 });
74 client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
75 return result;
76 });
77 }
78
79 @Override
80 public String name() {
81 return null;
82 }
83
84 @Override
85 public Type primitiveType() {
86 return Type.DOCUMENT_TREE;
87 }
88
89 @Override
90 public CompletableFuture<Void> destroy() {
91 return client.submit(new Clear());
92 }
93
94 @Override
95 public DocumentPath root() {
96 return DocumentPath.from("root");
97 }
98
99 @Override
100 public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
101 return client.submit(new GetChildren(checkNotNull(path)));
102 }
103
104 @Override
105 public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
106 return client.submit(new Get(checkNotNull(path)));
107 }
108
109 @Override
110 public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
111 return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.any(), Match.any()))
112 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700113 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700114 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700115 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700116 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
117 } else {
118 return CompletableFuture.completedFuture(result);
119 }
120 }).thenApply(result -> result.oldValue());
121 }
122
123 @Override
124 public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
Madan Jampani86983282016-09-15 14:55:48 -0700125 return createInternal(path, value)
126 .thenCompose(status -> {
127 if (status == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700128 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
Madan Jampani79924fa2016-09-13 13:57:03 -0700129 }
Madan Jampani86983282016-09-15 14:55:48 -0700130 return CompletableFuture.completedFuture(true);
131 });
132 }
133
134 @Override
135 public CompletableFuture<Boolean> createRecursive(DocumentPath path, byte[] value) {
136 return createInternal(path, value)
137 .thenCompose(status -> {
138 if (status == ILLEGAL_MODIFICATION) {
139 return createRecursive(path.parent(), new byte[0])
140 .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
141 }
142 return CompletableFuture.completedFuture(status == OK);
143 });
Madan Jampani79924fa2016-09-13 13:57:03 -0700144 }
145
146 @Override
147 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
148 return client.submit(new Update(checkNotNull(path), newValue, Match.any(), Match.ifValue(version)))
149 .thenApply(result -> result.updated());
150 }
151
152 @Override
153 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
154 return client.submit(new Update(checkNotNull(path), newValue, Match.ifValue(currentValue), Match.any()))
155 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700156 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700157 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700158 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700159 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
160 } else {
161 return CompletableFuture.completedFuture(result);
162 }
163 }).thenApply(result -> result.updated());
164 }
165
166 @Override
167 public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
168 if (path.equals(DocumentPath.from("root"))) {
169 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
170 }
171 return client.submit(new Update(checkNotNull(path), null, Match.ifNotNull(), Match.any()))
172 .thenCompose(result -> {
Madan Jampani86983282016-09-15 14:55:48 -0700173 if (result.status() == INVALID_PATH) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700174 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
Madan Jampani86983282016-09-15 14:55:48 -0700175 } else if (result.status() == ILLEGAL_MODIFICATION) {
Madan Jampani79924fa2016-09-13 13:57:03 -0700176 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
177 } else {
178 return CompletableFuture.completedFuture(result);
179 }
180 }).thenApply(result -> result.oldValue());
181 }
182
183 @Override
184 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
185 checkNotNull(path);
186 checkNotNull(listener);
187 // TODO: Support API that takes an executor
188 if (isListening()) {
189 eventListeners.putIfAbsent(listener, MoreExecutors.directExecutor());
190 return CompletableFuture.completedFuture(null);
191 } else {
192 return client.submit(new Listen(path))
193 .thenRun(() -> eventListeners.put(listener, MoreExecutors.directExecutor()));
194 }
195 }
196
197 @Override
198 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
199 checkNotNull(listener);
200 if (eventListeners.remove(listener) != null && eventListeners.isEmpty()) {
201 return client.submit(new Unlisten()).thenApply(v -> null);
202 }
203 return CompletableFuture.completedFuture(null);
204 }
205
Madan Jampani86983282016-09-15 14:55:48 -0700206 private CompletableFuture<DocumentTreeUpdateResult.Status> createInternal(DocumentPath path, byte[] value) {
207 return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.ifNull(), Match.any()))
208 .thenApply(result -> result.status());
209 }
210
Madan Jampani79924fa2016-09-13 13:57:03 -0700211 private boolean isListening() {
212 return !eventListeners.isEmpty();
213 }
214
215 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
216 events.forEach(event ->
217 eventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
218 }
219}