blob: cfaa6685f80902cfd45ff77124d5c198fb3c2bfa [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import static org.slf4j.LoggerFactory.getLogger;
20import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.ServerSession;
24import io.atomix.copycat.server.session.SessionListener;
25import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
26import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
27import io.atomix.resource.ResourceStateMachine;
28
Madan Jampani98094222016-09-15 21:12:46 -070029import java.util.Arrays;
Madan Jampani79924fa2016-09-13 13:57:03 -070030import java.util.HashMap;
Madan Jampani98094222016-09-15 21:12:46 -070031import java.util.Iterator;
32import java.util.List;
Madan Jampani79924fa2016-09-13 13:57:03 -070033import java.util.Map;
34import java.util.Optional;
35import java.util.Properties;
36import java.util.Queue;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.stream.Collectors;
39
40import org.onlab.util.Match;
41import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
42import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
43import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
44import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
45import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
46import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
47import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status;
48import org.onosproject.store.service.DocumentPath;
49import org.onosproject.store.service.DocumentTree;
50import org.onosproject.store.service.DocumentTreeEvent;
51import org.onosproject.store.service.DocumentTreeEvent.Type;
52import org.onosproject.store.service.IllegalDocumentModificationException;
53import org.onosproject.store.service.NoSuchDocumentPathException;
54import org.onosproject.store.service.Versioned;
55import org.slf4j.Logger;
56
57import com.google.common.base.Throwables;
Madan Jampani98094222016-09-15 21:12:46 -070058import com.google.common.collect.Lists;
Madan Jampani79924fa2016-09-13 13:57:03 -070059import com.google.common.collect.Maps;
60import com.google.common.collect.Queues;
61
62/**
63 * State Machine for {@link AtomixDocumentTree} resource.
64 */
65public class AtomixDocumentTreeState
66 extends ResourceStateMachine
67 implements SessionListener, Snapshottable {
68
69 private final Logger log = getLogger(getClass());
Madan Jampani98094222016-09-15 21:12:46 -070070 private final Map<Long, SessionListenCommits> listeners = new HashMap<>();
Madan Jampani2914e4e2016-09-13 17:48:56 -070071 private AtomicLong versionCounter = new AtomicLong(0);
Madan Jampani79924fa2016-09-13 13:57:03 -070072 private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
73
74 public AtomixDocumentTreeState(Properties properties) {
75 super(properties);
76 }
77
78 @Override
79 public void snapshot(SnapshotWriter writer) {
80 writer.writeLong(versionCounter.get());
81 }
82
83 @Override
84 public void install(SnapshotReader reader) {
Madan Jampani2914e4e2016-09-13 17:48:56 -070085 versionCounter = new AtomicLong(reader.readLong());
Madan Jampani79924fa2016-09-13 13:57:03 -070086 }
87
88 @Override
89 protected void configure(StateMachineExecutor executor) {
90 // Listeners
91 executor.register(Listen.class, this::listen);
92 executor.register(Unlisten.class, this::unlisten);
93 // queries
94 executor.register(Get.class, this::get);
95 executor.register(GetChildren.class, this::getChildren);
96 // commands
97 executor.register(Update.class, this::update);
98 executor.register(Clear.class, this::clear);
99 }
100
101 protected void listen(Commit<? extends Listen> commit) {
102 Long sessionId = commit.session().id();
Madan Jampani98094222016-09-15 21:12:46 -0700103 listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits()).add(commit);
Madan Jampani2914e4e2016-09-13 17:48:56 -0700104 commit.session().onStateChange(
Madan Jampani79924fa2016-09-13 13:57:03 -0700105 state -> {
106 if (state == ServerSession.State.CLOSED
107 || state == ServerSession.State.EXPIRED) {
Madan Jampani98094222016-09-15 21:12:46 -0700108 closeListener(commit.session().id());
Madan Jampani79924fa2016-09-13 13:57:03 -0700109 }
110 });
111 }
112
113 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani98094222016-09-15 21:12:46 -0700114 Long sessionId = commit.session().id();
Madan Jampani79924fa2016-09-13 13:57:03 -0700115 try {
Madan Jampani98094222016-09-15 21:12:46 -0700116 SessionListenCommits listenCommits = listeners.get(sessionId);
117 if (listenCommits != null) {
118 listenCommits.remove(commit);
119 }
Madan Jampani79924fa2016-09-13 13:57:03 -0700120 } finally {
121 commit.close();
122 }
123 }
124
125 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
126 try {
127 Versioned<TreeNodeValue> value = docTree.get(commit.operation().path());
128 return value == null ? null : value.map(node -> node == null ? null : node.value());
129 } finally {
130 commit.close();
131 }
132 }
133
134 protected Map<String, Versioned<byte[]>> getChildren(Commit<? extends GetChildren> commit) {
135 try {
136 Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(commit.operation().path());
137 return children == null
138 ? null : Maps.newHashMap(Maps.transformValues(children,
139 value -> value.map(TreeNodeValue::value)));
140 } finally {
141 commit.close();
142 }
143 }
144
145 protected DocumentTreeUpdateResult<byte[]> update(Commit<? extends Update> commit) {
146 DocumentTreeUpdateResult<byte[]> result = null;
147 DocumentPath path = commit.operation().path();
148 boolean updated = false;
149 Versioned<TreeNodeValue> currentValue = docTree.get(path);
150 try {
151 Match<Long> versionMatch = commit.operation().versionMatch();
152 Match<byte[]> valueMatch = commit.operation().valueMatch();
153
154 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
155 && valueMatch.matches(currentValue == null ? null : currentValue.value().value())) {
156 if (commit.operation().value() == null) {
157 docTree.removeNode(path);
158 } else {
159 docTree.set(path, new NonTransactionalCommit(commit));
160 }
161 updated = true;
162 }
163 Versioned<TreeNodeValue> newValue = updated ? docTree.get(path) : currentValue;
164 Status updateStatus = updated
165 ? Status.OK : commit.operation().value() == null ? Status.INVALID_PATH : Status.NOOP;
166 result = new DocumentTreeUpdateResult<>(path,
167 updateStatus,
168 newValue == null
169 ? null : newValue.map(TreeNodeValue::value),
170 currentValue == null
171 ? null : currentValue.map(TreeNodeValue::value));
172 } catch (IllegalDocumentModificationException e) {
173 result = DocumentTreeUpdateResult.illegalModification(path);
174 } catch (NoSuchDocumentPathException e) {
175 result = DocumentTreeUpdateResult.invalidPath(path);
176 } catch (Exception e) {
177 log.error("Failed to apply {} to state machine", commit.operation(), e);
178 throw Throwables.propagate(e);
179 } finally {
180 if (updated) {
181 if (currentValue != null) {
182 currentValue.value().discard();
183 }
184 } else {
185 commit.close();
186 }
187 }
188 notifyListeners(path, result);
189 return result;
190 }
191
192 protected void clear(Commit<? extends Clear> commit) {
193 try {
194 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
195 Map<String, Versioned<TreeNodeValue>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
196 toClearQueue.addAll(topLevelChildren.keySet()
197 .stream()
198 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
199 .collect(Collectors.toList()));
200 while (!toClearQueue.isEmpty()) {
201 DocumentPath path = toClearQueue.remove();
202 Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(path);
203 if (children.size() == 0) {
204 docTree.removeNode(path).value().discard();
205 } else {
206 children.keySet()
207 .stream()
208 .forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
209 toClearQueue.add(path);
210 }
211 }
212 } finally {
213 commit.close();
214 }
215 }
216
217 /**
218 * Interface implemented by tree node values.
219 */
220 private interface TreeNodeValue {
221 /**
222 * Returns the raw {@code byte[]}.
223 *
224 * @return raw value
225 */
226 byte[] value();
227
228 /**
229 * Discards the value by invoke appropriate clean up actions.
230 */
231 void discard();
232 }
233
234 /**
235 * A {@code TreeNodeValue} that is derived from a non-transactional update
236 * i.e. via any standard tree update operation.
237 */
238 private class NonTransactionalCommit implements TreeNodeValue {
239 private final Commit<? extends Update> commit;
240
241 public NonTransactionalCommit(Commit<? extends Update> commit) {
242 this.commit = commit;
243 }
244
245 @Override
246 public byte[] value() {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700247 return commit.operation().value().orElse(null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700248 }
249
250 @Override
251 public void discard() {
252 commit.close();
253 }
254 }
255
256 private void notifyListeners(DocumentPath path, DocumentTreeUpdateResult<byte[]> result) {
257 if (result.status() != Status.OK) {
258 return;
259 }
260 DocumentTreeEvent<byte[]> event =
261 new DocumentTreeEvent<>(path,
262 result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
263 Optional.ofNullable(result.newValue()),
264 Optional.ofNullable(result.oldValue()));
Madan Jampani98094222016-09-15 21:12:46 -0700265
Madan Jampani2914e4e2016-09-13 17:48:56 -0700266 listeners.values()
Madan Jampani98094222016-09-15 21:12:46 -0700267 .stream()
268 .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
269 .forEach(listener -> listener.publish(AtomixDocumentTree.CHANGE_SUBJECT, Arrays.asList(event)));
Madan Jampani79924fa2016-09-13 13:57:03 -0700270 }
271
272 @Override
273 public void register(ServerSession session) {
274 }
275
276 @Override
277 public void unregister(ServerSession session) {
278 closeListener(session.id());
279 }
280
281 @Override
282 public void expire(ServerSession session) {
283 closeListener(session.id());
284 }
285
286 @Override
287 public void close(ServerSession session) {
288 closeListener(session.id());
289 }
290
291 private void closeListener(Long sessionId) {
Madan Jampani98094222016-09-15 21:12:46 -0700292 SessionListenCommits listenCommits = listeners.remove(sessionId);
293 if (listenCommits != null) {
294 listenCommits.close();
295 }
296 }
297
298 private class SessionListenCommits {
299 private final List<Commit<? extends Listen>> commits = Lists.newArrayList();
300 private DocumentPath leastCommonAncestorPath;
301
302 public void add(Commit<? extends Listen> commit) {
303 commits.add(commit);
304 recomputeLeastCommonAncestor();
305 }
306
307 public void remove(Commit<? extends Unlisten> commit) {
308 // Remove the first listen commit with path matching path in unlisten commit
309 Iterator<Commit<? extends Listen>> iterator = commits.iterator();
310 while (iterator.hasNext()) {
311 Commit<? extends Listen> listenCommit = iterator.next();
312 if (listenCommit.operation().path().equals(commit.operation().path())) {
313 iterator.remove();
314 listenCommit.close();
315 }
316 }
317 recomputeLeastCommonAncestor();
318 }
319
320 public DocumentPath leastCommonAncestorPath() {
321 return leastCommonAncestorPath;
322 }
323
324 public <M> void publish(String topic, M message) {
325 commits.stream().findAny().ifPresent(commit -> commit.session().publish(topic, message));
326 }
327
328 public void close() {
329 commits.forEach(Commit::close);
330 commits.clear();
331 leastCommonAncestorPath = null;
332 }
333
334 private void recomputeLeastCommonAncestor() {
335 this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(commits.stream()
336 .map(c -> c.operation().path())
337 .collect(Collectors.toList()));
Madan Jampani79924fa2016-09-13 13:57:03 -0700338 }
339 }
340}