blob: 939d3f0b7b3989582b118c3fd2391ac20b54ce4e [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20import io.atomix.copycat.client.CopycatClient;
21import io.atomix.resource.AbstractResource;
22import io.atomix.resource.ResourceTypeInfo;
23
24import java.util.HashMap;
25import java.util.List;
26import java.util.Map;
27import java.util.Properties;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.Executor;
30
31import org.onlab.util.Match;
32import org.onlab.util.Tools;
33import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
34import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
35import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
36import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
37import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
38import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
39import org.onosproject.store.service.AsyncDocumentTree;
40import org.onosproject.store.service.DocumentPath;
41import org.onosproject.store.service.DocumentTreeEvent;
42import org.onosproject.store.service.DocumentTreeListener;
43import org.onosproject.store.service.IllegalDocumentModificationException;
44import org.onosproject.store.service.NoSuchDocumentPathException;
45import org.onosproject.store.service.Versioned;
46
47import com.google.common.util.concurrent.MoreExecutors;
48
49/**
50 * Distributed resource providing the {@link AsyncDocumentTree} primitive.
51 */
52@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
53public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
54 implements AsyncDocumentTree<byte[]> {
55
56 private final Map<DocumentTreeListener<byte[]>, Executor> eventListeners = new HashMap<>();
57 public static final String CHANGE_SUBJECT = "changeEvents";
58
59 protected AtomixDocumentTree(CopycatClient client, Properties options) {
60 super(client, options);
61 }
62
63 @Override
64 public CompletableFuture<AtomixDocumentTree> open() {
65 return super.open().thenApply(result -> {
66 client.onStateChange(state -> {
67 if (state == CopycatClient.State.CONNECTED && isListening()) {
68 client.submit(new Listen());
69 }
70 });
71 client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
72 return result;
73 });
74 }
75
76 @Override
77 public String name() {
78 return null;
79 }
80
81 @Override
82 public Type primitiveType() {
83 return Type.DOCUMENT_TREE;
84 }
85
86 @Override
87 public CompletableFuture<Void> destroy() {
88 return client.submit(new Clear());
89 }
90
91 @Override
92 public DocumentPath root() {
93 return DocumentPath.from("root");
94 }
95
96 @Override
97 public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
98 return client.submit(new GetChildren(checkNotNull(path)));
99 }
100
101 @Override
102 public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
103 return client.submit(new Get(checkNotNull(path)));
104 }
105
106 @Override
107 public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
108 return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.any(), Match.any()))
109 .thenCompose(result -> {
110 if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
111 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
112 } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
113 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
114 } else {
115 return CompletableFuture.completedFuture(result);
116 }
117 }).thenApply(result -> result.oldValue());
118 }
119
120 @Override
121 public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
122 return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.ifNull(), Match.any()))
123 .thenCompose(result -> {
124 if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
125 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
126 } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
127 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
128 } else {
129 return CompletableFuture.completedFuture(result);
130 }
131 }).thenApply(result -> result.created());
132 }
133
134 @Override
135 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
136 return client.submit(new Update(checkNotNull(path), newValue, Match.any(), Match.ifValue(version)))
137 .thenApply(result -> result.updated());
138 }
139
140 @Override
141 public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
142 return client.submit(new Update(checkNotNull(path), newValue, Match.ifValue(currentValue), Match.any()))
143 .thenCompose(result -> {
144 if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
145 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
146 } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
147 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
148 } else {
149 return CompletableFuture.completedFuture(result);
150 }
151 }).thenApply(result -> result.updated());
152 }
153
154 @Override
155 public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
156 if (path.equals(DocumentPath.from("root"))) {
157 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
158 }
159 return client.submit(new Update(checkNotNull(path), null, Match.ifNotNull(), Match.any()))
160 .thenCompose(result -> {
161 if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
162 return Tools.exceptionalFuture(new NoSuchDocumentPathException());
163 } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
164 return Tools.exceptionalFuture(new IllegalDocumentModificationException());
165 } else {
166 return CompletableFuture.completedFuture(result);
167 }
168 }).thenApply(result -> result.oldValue());
169 }
170
171 @Override
172 public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
173 checkNotNull(path);
174 checkNotNull(listener);
175 // TODO: Support API that takes an executor
176 if (isListening()) {
177 eventListeners.putIfAbsent(listener, MoreExecutors.directExecutor());
178 return CompletableFuture.completedFuture(null);
179 } else {
180 return client.submit(new Listen(path))
181 .thenRun(() -> eventListeners.put(listener, MoreExecutors.directExecutor()));
182 }
183 }
184
185 @Override
186 public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
187 checkNotNull(listener);
188 if (eventListeners.remove(listener) != null && eventListeners.isEmpty()) {
189 return client.submit(new Unlisten()).thenApply(v -> null);
190 }
191 return CompletableFuture.completedFuture(null);
192 }
193
194 private boolean isListening() {
195 return !eventListeners.isEmpty();
196 }
197
198 private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
199 events.forEach(event ->
200 eventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
201 }
202}