blob: 9ee352c6a97dd91d0d71b47b99966cfe3a8ff9d3 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Optional;
25import java.util.Queue;
26import java.util.TreeMap;
27import java.util.concurrent.atomic.AtomicLong;
28import java.util.stream.Collectors;
29
30import com.esotericsoftware.kryo.Kryo;
31import com.esotericsoftware.kryo.io.Input;
32import com.esotericsoftware.kryo.io.Output;
33import com.google.common.base.Throwables;
34import com.google.common.collect.Lists;
35import com.google.common.collect.Queues;
36import io.atomix.protocols.raft.event.EventType;
37import io.atomix.protocols.raft.service.AbstractRaftService;
38import io.atomix.protocols.raft.service.Commit;
39import io.atomix.protocols.raft.service.RaftServiceExecutor;
40import io.atomix.protocols.raft.session.RaftSession;
41import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
42import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
43import org.onlab.util.KryoNamespace;
44import org.onlab.util.Match;
45import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
46import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
47import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
48import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
49import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
Jordan Haltermane853d032017-08-01 15:10:28 -070050import org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070051import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.service.DocumentPath;
53import org.onosproject.store.service.DocumentTree;
54import org.onosproject.store.service.DocumentTreeEvent;
55import org.onosproject.store.service.DocumentTreeEvent.Type;
56import org.onosproject.store.service.IllegalDocumentModificationException;
57import org.onosproject.store.service.NoSuchDocumentPathException;
58import org.onosproject.store.service.Serializer;
59import org.onosproject.store.service.Versioned;
60
61import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
62import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
63import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
64import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
65import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
66import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
67import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
68
69/**
70 * State Machine for {@link AtomixDocumentTree} resource.
71 */
72public class AtomixDocumentTreeService extends AbstractRaftService {
73 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
74 .register(KryoNamespaces.BASIC)
75 .register(AtomixDocumentTreeOperations.NAMESPACE)
76 .register(AtomixDocumentTreeEvents.NAMESPACE)
77 .register(new com.esotericsoftware.kryo.Serializer<Listener>() {
78 @Override
79 public void write(Kryo kryo, Output output, Listener listener) {
80 output.writeLong(listener.session.sessionId().id());
81 kryo.writeObject(output, listener.path);
82 }
83
84 @Override
85 public Listener read(Kryo kryo, Input input, Class<Listener> type) {
86 return new Listener(getSessions().getSession(input.readLong()),
87 kryo.readObjectOrNull(input, DocumentPath.class));
88 }
89 }, Listener.class)
90 .register(Versioned.class)
91 .register(DocumentPath.class)
92 .register(new HashMap().keySet().getClass())
93 .register(TreeMap.class)
94 .register(SessionListenCommits.class)
95 .register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
96 @Override
97 public void write(Kryo kryo, Output output, DefaultDocumentTree object) {
98 kryo.writeObject(output, object.root);
99 }
100
101 @Override
102 @SuppressWarnings("unchecked")
103 public DefaultDocumentTree read(Kryo kryo, Input input, Class<DefaultDocumentTree> type) {
104 return new DefaultDocumentTree(versionCounter::incrementAndGet,
105 kryo.readObject(input, DefaultDocumentTreeNode.class));
106 }
107 }, DefaultDocumentTree.class)
108 .register(DefaultDocumentTreeNode.class)
109 .build());
110
111 private Map<Long, SessionListenCommits> listeners = new HashMap<>();
112 private AtomicLong versionCounter = new AtomicLong(0);
113 private DocumentTree<byte[]> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
114
115 @Override
116 public void snapshot(SnapshotWriter writer) {
117 writer.writeLong(versionCounter.get());
118 writer.writeObject(listeners, serializer::encode);
119 writer.writeObject(docTree, serializer::encode);
120 }
121
122 @Override
123 public void install(SnapshotReader reader) {
124 versionCounter = new AtomicLong(reader.readLong());
125 listeners = reader.readObject(serializer::decode);
126 docTree = reader.readObject(serializer::decode);
127 }
128
129 @Override
130 protected void configure(RaftServiceExecutor executor) {
131 // Listeners
132 executor.register(ADD_LISTENER, serializer::decode, this::listen);
133 executor.register(REMOVE_LISTENER, serializer::decode, this::unlisten);
134 // queries
135 executor.register(GET, serializer::decode, this::get, serializer::encode);
136 executor.register(GET_CHILDREN, serializer::decode, this::getChildren, serializer::encode);
137 // commands
138 executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
139 executor.register(CLEAR, this::clear);
140 }
141
142 protected void listen(Commit<? extends Listen> commit) {
143 Long sessionId = commit.session().sessionId().id();
144 listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits())
145 .add(new Listener(commit.session(), commit.value().path()));
146 }
147
148 protected void unlisten(Commit<? extends Unlisten> commit) {
149 Long sessionId = commit.session().sessionId().id();
150 SessionListenCommits listenCommits = listeners.get(sessionId);
151 if (listenCommits != null) {
152 listenCommits.remove(commit);
153 }
154 }
155
156 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
157 try {
158 Versioned<byte[]> value = docTree.get(commit.value().path());
Jordan Haltermane853d032017-08-01 15:10:28 -0700159 return value == null ? null : value.map(node -> node);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700160 } catch (IllegalStateException e) {
161 return null;
162 }
163 }
164
Jordan Haltermane853d032017-08-01 15:10:28 -0700165 protected DocumentTreeResult<Map<String, Versioned<byte[]>>> getChildren(Commit<? extends GetChildren> commit) {
166 try {
167 return DocumentTreeResult.ok(docTree.getChildren(commit.value().path()));
168 } catch (NoSuchDocumentPathException e) {
169 return DocumentTreeResult.invalidPath();
170 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700171 }
172
Jordan Haltermane853d032017-08-01 15:10:28 -0700173 protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update> commit) {
174 DocumentTreeResult<Versioned<byte[]>> result = null;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700175 DocumentPath path = commit.value().path();
176 boolean updated = false;
177 Versioned<byte[]> currentValue = docTree.get(path);
178 try {
179 Match<Long> versionMatch = commit.value().versionMatch();
180 Match<byte[]> valueMatch = commit.value().valueMatch();
181
182 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
183 && valueMatch.matches(currentValue == null ? null : currentValue.value())) {
184 if (commit.value().value() == null) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700185 Versioned<byte[]> oldValue = docTree.removeNode(path);
186 result = new DocumentTreeResult<>(Status.OK, oldValue);
187 if (oldValue != null) {
188 notifyListeners(new DocumentTreeEvent<>(
189 path,
190 Type.DELETED,
191 Optional.empty(),
192 Optional.of(oldValue)));
193 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700194 } else {
Jordan Haltermane853d032017-08-01 15:10:28 -0700195 Versioned<byte[]> oldValue = docTree.set(path, commit.value().value().orElse(null));
196 Versioned<byte[]> newValue = docTree.get(path);
197 result = new DocumentTreeResult<>(Status.OK, newValue);
198 if (oldValue == null) {
199 notifyListeners(new DocumentTreeEvent<>(
200 path,
201 Type.CREATED,
202 Optional.of(newValue),
203 Optional.empty()));
204 } else {
205 notifyListeners(new DocumentTreeEvent<>(
206 path,
207 Type.UPDATED,
208 Optional.of(newValue),
209 Optional.of(oldValue)));
210 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700211 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700212 } else {
213 result = new DocumentTreeResult<>(
214 commit.value().value() == null ? Status.INVALID_PATH : Status.NOOP,
215 currentValue);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700216 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700217 } catch (IllegalDocumentModificationException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700218 result = DocumentTreeResult.illegalModification();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700219 } catch (NoSuchDocumentPathException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700220 result = DocumentTreeResult.invalidPath();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700221 } catch (Exception e) {
222 getLogger().error("Failed to apply {} to state machine", commit.value(), e);
223 throw Throwables.propagate(e);
224 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700225 return result;
226 }
227
228 protected void clear(Commit<Void> commit) {
229 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
230 Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
231 toClearQueue.addAll(topLevelChildren.keySet()
232 .stream()
233 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
234 .collect(Collectors.toList()));
235 while (!toClearQueue.isEmpty()) {
236 DocumentPath path = toClearQueue.remove();
237 Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
238 if (children.size() == 0) {
239 docTree.removeNode(path);
240 } else {
241 children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
242 toClearQueue.add(path);
243 }
244 }
245 }
246
Jordan Haltermane853d032017-08-01 15:10:28 -0700247 private void notifyListeners(DocumentTreeEvent<byte[]> event) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700248 listeners.values()
249 .stream()
250 .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
251 .forEach(listener -> listener.publish(CHANGE, Arrays.asList(event)));
252 }
253
254 @Override
255 public void onExpire(RaftSession session) {
256 closeListener(session.sessionId().id());
257 }
258
259 @Override
260 public void onClose(RaftSession session) {
261 closeListener(session.sessionId().id());
262 }
263
264 private void closeListener(Long sessionId) {
265 listeners.remove(sessionId);
266 }
267
268 private class SessionListenCommits {
269 private final List<Listener> listeners = Lists.newArrayList();
270 private DocumentPath leastCommonAncestorPath;
271
272 public void add(Listener listener) {
273 listeners.add(listener);
274 recomputeLeastCommonAncestor();
275 }
276
277 public void remove(Commit<? extends Unlisten> commit) {
278 // Remove the first listen commit with path matching path in unlisten commit
279 Iterator<Listener> iterator = listeners.iterator();
280 while (iterator.hasNext()) {
281 Listener listener = iterator.next();
282 if (listener.path().equals(commit.value().path())) {
283 iterator.remove();
284 }
285 }
286 recomputeLeastCommonAncestor();
287 }
288
289 public DocumentPath leastCommonAncestorPath() {
290 return leastCommonAncestorPath;
291 }
292
293 public <M> void publish(EventType topic, M message) {
294 listeners.stream().findAny().ifPresent(listener ->
295 listener.session().publish(topic, serializer::encode, message));
296 }
297
298 private void recomputeLeastCommonAncestor() {
299 this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(listeners.stream()
300 .map(Listener::path)
301 .collect(Collectors.toList()));
302 }
303 }
304
305 private static class Listener {
306 private final RaftSession session;
307 private final DocumentPath path;
308
309 public Listener(RaftSession session, DocumentPath path) {
310 this.session = session;
311 this.path = path;
312 }
313
314 public DocumentPath path() {
315 return path;
316 }
317
318 public RaftSession session() {
319 return session;
320 }
321 }
322}