blob: 8a8a23cb6b79bcc55954e71861bbdeaae422b218 [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import static org.slf4j.LoggerFactory.getLogger;
20import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.ServerSession;
24import io.atomix.copycat.server.session.SessionListener;
25import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
26import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
27import io.atomix.resource.ResourceStateMachine;
28
Madan Jampani98094222016-09-15 21:12:46 -070029import java.util.Arrays;
Madan Jampani79924fa2016-09-13 13:57:03 -070030import java.util.HashMap;
Madan Jampani98094222016-09-15 21:12:46 -070031import java.util.Iterator;
32import java.util.List;
Madan Jampani79924fa2016-09-13 13:57:03 -070033import java.util.Map;
34import java.util.Optional;
35import java.util.Properties;
36import java.util.Queue;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.stream.Collectors;
39
40import org.onlab.util.Match;
41import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
42import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
43import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
44import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
45import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
46import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
47import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status;
48import org.onosproject.store.service.DocumentPath;
49import org.onosproject.store.service.DocumentTree;
50import org.onosproject.store.service.DocumentTreeEvent;
51import org.onosproject.store.service.DocumentTreeEvent.Type;
52import org.onosproject.store.service.IllegalDocumentModificationException;
53import org.onosproject.store.service.NoSuchDocumentPathException;
54import org.onosproject.store.service.Versioned;
55import org.slf4j.Logger;
56
57import com.google.common.base.Throwables;
Madan Jampani98094222016-09-15 21:12:46 -070058import com.google.common.collect.Lists;
Madan Jampani79924fa2016-09-13 13:57:03 -070059import com.google.common.collect.Maps;
60import com.google.common.collect.Queues;
61
62/**
63 * State Machine for {@link AtomixDocumentTree} resource.
64 */
65public class AtomixDocumentTreeState
66 extends ResourceStateMachine
67 implements SessionListener, Snapshottable {
68
69 private final Logger log = getLogger(getClass());
Madan Jampani98094222016-09-15 21:12:46 -070070 private final Map<Long, SessionListenCommits> listeners = new HashMap<>();
Madan Jampani2914e4e2016-09-13 17:48:56 -070071 private AtomicLong versionCounter = new AtomicLong(0);
Madan Jampani79924fa2016-09-13 13:57:03 -070072 private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
73
74 public AtomixDocumentTreeState(Properties properties) {
75 super(properties);
76 }
77
78 @Override
79 public void snapshot(SnapshotWriter writer) {
80 writer.writeLong(versionCounter.get());
81 }
82
83 @Override
84 public void install(SnapshotReader reader) {
Madan Jampani2914e4e2016-09-13 17:48:56 -070085 versionCounter = new AtomicLong(reader.readLong());
Madan Jampani79924fa2016-09-13 13:57:03 -070086 }
87
88 @Override
89 protected void configure(StateMachineExecutor executor) {
90 // Listeners
91 executor.register(Listen.class, this::listen);
92 executor.register(Unlisten.class, this::unlisten);
93 // queries
94 executor.register(Get.class, this::get);
95 executor.register(GetChildren.class, this::getChildren);
96 // commands
97 executor.register(Update.class, this::update);
98 executor.register(Clear.class, this::clear);
99 }
100
101 protected void listen(Commit<? extends Listen> commit) {
102 Long sessionId = commit.session().id();
Madan Jampani98094222016-09-15 21:12:46 -0700103 listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits()).add(commit);
Madan Jampani2914e4e2016-09-13 17:48:56 -0700104 commit.session().onStateChange(
Madan Jampani79924fa2016-09-13 13:57:03 -0700105 state -> {
106 if (state == ServerSession.State.CLOSED
107 || state == ServerSession.State.EXPIRED) {
Madan Jampani98094222016-09-15 21:12:46 -0700108 closeListener(commit.session().id());
Madan Jampani79924fa2016-09-13 13:57:03 -0700109 }
110 });
111 }
112
113 protected void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani98094222016-09-15 21:12:46 -0700114 Long sessionId = commit.session().id();
Madan Jampani79924fa2016-09-13 13:57:03 -0700115 try {
Madan Jampani98094222016-09-15 21:12:46 -0700116 SessionListenCommits listenCommits = listeners.get(sessionId);
117 if (listenCommits != null) {
118 listenCommits.remove(commit);
119 }
Madan Jampani79924fa2016-09-13 13:57:03 -0700120 } finally {
121 commit.close();
122 }
123 }
124
125 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
126 try {
127 Versioned<TreeNodeValue> value = docTree.get(commit.operation().path());
128 return value == null ? null : value.map(node -> node == null ? null : node.value());
Aaron Kruglikovc7ec0fa2016-10-24 15:45:33 -0700129 } catch (IllegalStateException e) {
130 return null;
Madan Jampani79924fa2016-09-13 13:57:03 -0700131 } finally {
132 commit.close();
133 }
134 }
135
136 protected Map<String, Versioned<byte[]>> getChildren(Commit<? extends GetChildren> commit) {
137 try {
138 Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(commit.operation().path());
139 return children == null
140 ? null : Maps.newHashMap(Maps.transformValues(children,
141 value -> value.map(TreeNodeValue::value)));
142 } finally {
143 commit.close();
144 }
145 }
146
147 protected DocumentTreeUpdateResult<byte[]> update(Commit<? extends Update> commit) {
148 DocumentTreeUpdateResult<byte[]> result = null;
149 DocumentPath path = commit.operation().path();
150 boolean updated = false;
151 Versioned<TreeNodeValue> currentValue = docTree.get(path);
152 try {
153 Match<Long> versionMatch = commit.operation().versionMatch();
154 Match<byte[]> valueMatch = commit.operation().valueMatch();
155
156 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
157 && valueMatch.matches(currentValue == null ? null : currentValue.value().value())) {
158 if (commit.operation().value() == null) {
159 docTree.removeNode(path);
160 } else {
161 docTree.set(path, new NonTransactionalCommit(commit));
162 }
163 updated = true;
164 }
165 Versioned<TreeNodeValue> newValue = updated ? docTree.get(path) : currentValue;
166 Status updateStatus = updated
167 ? Status.OK : commit.operation().value() == null ? Status.INVALID_PATH : Status.NOOP;
168 result = new DocumentTreeUpdateResult<>(path,
169 updateStatus,
170 newValue == null
171 ? null : newValue.map(TreeNodeValue::value),
172 currentValue == null
173 ? null : currentValue.map(TreeNodeValue::value));
174 } catch (IllegalDocumentModificationException e) {
175 result = DocumentTreeUpdateResult.illegalModification(path);
176 } catch (NoSuchDocumentPathException e) {
177 result = DocumentTreeUpdateResult.invalidPath(path);
178 } catch (Exception e) {
179 log.error("Failed to apply {} to state machine", commit.operation(), e);
180 throw Throwables.propagate(e);
181 } finally {
182 if (updated) {
183 if (currentValue != null) {
184 currentValue.value().discard();
185 }
186 } else {
187 commit.close();
188 }
189 }
190 notifyListeners(path, result);
191 return result;
192 }
193
194 protected void clear(Commit<? extends Clear> commit) {
195 try {
196 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
197 Map<String, Versioned<TreeNodeValue>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
198 toClearQueue.addAll(topLevelChildren.keySet()
199 .stream()
200 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
201 .collect(Collectors.toList()));
202 while (!toClearQueue.isEmpty()) {
203 DocumentPath path = toClearQueue.remove();
204 Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(path);
205 if (children.size() == 0) {
206 docTree.removeNode(path).value().discard();
207 } else {
208 children.keySet()
209 .stream()
210 .forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
211 toClearQueue.add(path);
212 }
213 }
214 } finally {
215 commit.close();
216 }
217 }
218
219 /**
220 * Interface implemented by tree node values.
221 */
222 private interface TreeNodeValue {
223 /**
224 * Returns the raw {@code byte[]}.
225 *
226 * @return raw value
227 */
228 byte[] value();
229
230 /**
231 * Discards the value by invoke appropriate clean up actions.
232 */
233 void discard();
234 }
235
236 /**
237 * A {@code TreeNodeValue} that is derived from a non-transactional update
238 * i.e. via any standard tree update operation.
239 */
240 private class NonTransactionalCommit implements TreeNodeValue {
241 private final Commit<? extends Update> commit;
242
243 public NonTransactionalCommit(Commit<? extends Update> commit) {
244 this.commit = commit;
245 }
246
247 @Override
248 public byte[] value() {
Madan Jampani4c8e3fe2016-09-16 16:20:28 -0700249 return commit.operation().value().orElse(null);
Madan Jampani79924fa2016-09-13 13:57:03 -0700250 }
251
252 @Override
253 public void discard() {
254 commit.close();
255 }
256 }
257
258 private void notifyListeners(DocumentPath path, DocumentTreeUpdateResult<byte[]> result) {
259 if (result.status() != Status.OK) {
260 return;
261 }
262 DocumentTreeEvent<byte[]> event =
263 new DocumentTreeEvent<>(path,
264 result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
265 Optional.ofNullable(result.newValue()),
266 Optional.ofNullable(result.oldValue()));
Madan Jampani98094222016-09-15 21:12:46 -0700267
Madan Jampani2914e4e2016-09-13 17:48:56 -0700268 listeners.values()
Madan Jampani98094222016-09-15 21:12:46 -0700269 .stream()
270 .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
271 .forEach(listener -> listener.publish(AtomixDocumentTree.CHANGE_SUBJECT, Arrays.asList(event)));
Madan Jampani79924fa2016-09-13 13:57:03 -0700272 }
273
274 @Override
275 public void register(ServerSession session) {
276 }
277
278 @Override
279 public void unregister(ServerSession session) {
280 closeListener(session.id());
281 }
282
283 @Override
284 public void expire(ServerSession session) {
285 closeListener(session.id());
286 }
287
288 @Override
289 public void close(ServerSession session) {
290 closeListener(session.id());
291 }
292
293 private void closeListener(Long sessionId) {
Madan Jampani98094222016-09-15 21:12:46 -0700294 SessionListenCommits listenCommits = listeners.remove(sessionId);
295 if (listenCommits != null) {
296 listenCommits.close();
297 }
298 }
299
300 private class SessionListenCommits {
301 private final List<Commit<? extends Listen>> commits = Lists.newArrayList();
302 private DocumentPath leastCommonAncestorPath;
303
304 public void add(Commit<? extends Listen> commit) {
305 commits.add(commit);
306 recomputeLeastCommonAncestor();
307 }
308
309 public void remove(Commit<? extends Unlisten> commit) {
310 // Remove the first listen commit with path matching path in unlisten commit
311 Iterator<Commit<? extends Listen>> iterator = commits.iterator();
312 while (iterator.hasNext()) {
313 Commit<? extends Listen> listenCommit = iterator.next();
314 if (listenCommit.operation().path().equals(commit.operation().path())) {
315 iterator.remove();
316 listenCommit.close();
317 }
318 }
319 recomputeLeastCommonAncestor();
320 }
321
322 public DocumentPath leastCommonAncestorPath() {
323 return leastCommonAncestorPath;
324 }
325
326 public <M> void publish(String topic, M message) {
327 commits.stream().findAny().ifPresent(commit -> commit.session().publish(topic, message));
328 }
329
330 public void close() {
331 commits.forEach(Commit::close);
332 commits.clear();
333 leastCommonAncestorPath = null;
334 }
335
336 private void recomputeLeastCommonAncestor() {
337 this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(commits.stream()
338 .map(c -> c.operation().path())
339 .collect(Collectors.toList()));
Madan Jampani79924fa2016-09-13 13:57:03 -0700340 }
341 }
342}