blob: 9e873256e8ea9162bac535828ae7eec2837589a6 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Optional;
25import java.util.Queue;
26import java.util.TreeMap;
27import java.util.concurrent.atomic.AtomicLong;
28import java.util.stream.Collectors;
29
30import com.esotericsoftware.kryo.Kryo;
31import com.esotericsoftware.kryo.io.Input;
32import com.esotericsoftware.kryo.io.Output;
33import com.google.common.base.Throwables;
34import com.google.common.collect.Lists;
35import com.google.common.collect.Queues;
36import io.atomix.protocols.raft.event.EventType;
37import io.atomix.protocols.raft.service.AbstractRaftService;
38import io.atomix.protocols.raft.service.Commit;
39import io.atomix.protocols.raft.service.RaftServiceExecutor;
40import io.atomix.protocols.raft.session.RaftSession;
41import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
42import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
43import org.onlab.util.KryoNamespace;
44import org.onlab.util.Match;
45import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
46import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
47import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
48import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
49import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
Jordan Haltermane853d032017-08-01 15:10:28 -070050import org.onosproject.store.primitives.resources.impl.DocumentTreeResult.Status;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070051import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.service.DocumentPath;
53import org.onosproject.store.service.DocumentTree;
54import org.onosproject.store.service.DocumentTreeEvent;
55import org.onosproject.store.service.DocumentTreeEvent.Type;
56import org.onosproject.store.service.IllegalDocumentModificationException;
57import org.onosproject.store.service.NoSuchDocumentPathException;
Jordan Haltermand0d80352017-08-10 15:08:27 -070058import org.onosproject.store.service.Ordering;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070059import org.onosproject.store.service.Serializer;
60import org.onosproject.store.service.Versioned;
61
62import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
63import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
64import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
65import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
66import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
67import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
68import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
69
70/**
71 * State Machine for {@link AtomixDocumentTree} resource.
72 */
73public class AtomixDocumentTreeService extends AbstractRaftService {
74 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
75 .register(KryoNamespaces.BASIC)
76 .register(AtomixDocumentTreeOperations.NAMESPACE)
77 .register(AtomixDocumentTreeEvents.NAMESPACE)
78 .register(new com.esotericsoftware.kryo.Serializer<Listener>() {
79 @Override
80 public void write(Kryo kryo, Output output, Listener listener) {
81 output.writeLong(listener.session.sessionId().id());
82 kryo.writeObject(output, listener.path);
83 }
84
85 @Override
86 public Listener read(Kryo kryo, Input input, Class<Listener> type) {
87 return new Listener(getSessions().getSession(input.readLong()),
88 kryo.readObjectOrNull(input, DocumentPath.class));
89 }
90 }, Listener.class)
91 .register(Versioned.class)
92 .register(DocumentPath.class)
93 .register(new HashMap().keySet().getClass())
94 .register(TreeMap.class)
Jordan Haltermand0d80352017-08-10 15:08:27 -070095 .register(Ordering.class)
Jordan Halterman2bf177c2017-06-29 01:49:08 -070096 .register(SessionListenCommits.class)
97 .register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
98 @Override
99 public void write(Kryo kryo, Output output, DefaultDocumentTree object) {
100 kryo.writeObject(output, object.root);
101 }
102
103 @Override
104 @SuppressWarnings("unchecked")
105 public DefaultDocumentTree read(Kryo kryo, Input input, Class<DefaultDocumentTree> type) {
106 return new DefaultDocumentTree(versionCounter::incrementAndGet,
107 kryo.readObject(input, DefaultDocumentTreeNode.class));
108 }
109 }, DefaultDocumentTree.class)
110 .register(DefaultDocumentTreeNode.class)
111 .build());
112
113 private Map<Long, SessionListenCommits> listeners = new HashMap<>();
114 private AtomicLong versionCounter = new AtomicLong(0);
Jordan Haltermand0d80352017-08-10 15:08:27 -0700115 private DocumentTree<byte[]> docTree;
116
117 public AtomixDocumentTreeService(Ordering ordering) {
118 this.docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet, ordering);
119 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700120
121 @Override
122 public void snapshot(SnapshotWriter writer) {
123 writer.writeLong(versionCounter.get());
124 writer.writeObject(listeners, serializer::encode);
125 writer.writeObject(docTree, serializer::encode);
126 }
127
128 @Override
129 public void install(SnapshotReader reader) {
130 versionCounter = new AtomicLong(reader.readLong());
131 listeners = reader.readObject(serializer::decode);
132 docTree = reader.readObject(serializer::decode);
133 }
134
135 @Override
136 protected void configure(RaftServiceExecutor executor) {
137 // Listeners
138 executor.register(ADD_LISTENER, serializer::decode, this::listen);
139 executor.register(REMOVE_LISTENER, serializer::decode, this::unlisten);
140 // queries
141 executor.register(GET, serializer::decode, this::get, serializer::encode);
142 executor.register(GET_CHILDREN, serializer::decode, this::getChildren, serializer::encode);
143 // commands
144 executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
145 executor.register(CLEAR, this::clear);
146 }
147
148 protected void listen(Commit<? extends Listen> commit) {
149 Long sessionId = commit.session().sessionId().id();
150 listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits())
151 .add(new Listener(commit.session(), commit.value().path()));
152 }
153
154 protected void unlisten(Commit<? extends Unlisten> commit) {
155 Long sessionId = commit.session().sessionId().id();
156 SessionListenCommits listenCommits = listeners.get(sessionId);
157 if (listenCommits != null) {
158 listenCommits.remove(commit);
159 }
160 }
161
162 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
163 try {
164 Versioned<byte[]> value = docTree.get(commit.value().path());
Jordan Haltermane853d032017-08-01 15:10:28 -0700165 return value == null ? null : value.map(node -> node);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700166 } catch (IllegalStateException e) {
167 return null;
168 }
169 }
170
Jordan Haltermane853d032017-08-01 15:10:28 -0700171 protected DocumentTreeResult<Map<String, Versioned<byte[]>>> getChildren(Commit<? extends GetChildren> commit) {
172 try {
173 return DocumentTreeResult.ok(docTree.getChildren(commit.value().path()));
174 } catch (NoSuchDocumentPathException e) {
175 return DocumentTreeResult.invalidPath();
176 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700177 }
178
Jordan Haltermane853d032017-08-01 15:10:28 -0700179 protected DocumentTreeResult<Versioned<byte[]>> update(Commit<? extends Update> commit) {
180 DocumentTreeResult<Versioned<byte[]>> result = null;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700181 DocumentPath path = commit.value().path();
182 boolean updated = false;
183 Versioned<byte[]> currentValue = docTree.get(path);
184 try {
185 Match<Long> versionMatch = commit.value().versionMatch();
186 Match<byte[]> valueMatch = commit.value().valueMatch();
187
188 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
189 && valueMatch.matches(currentValue == null ? null : currentValue.value())) {
190 if (commit.value().value() == null) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700191 Versioned<byte[]> oldValue = docTree.removeNode(path);
192 result = new DocumentTreeResult<>(Status.OK, oldValue);
193 if (oldValue != null) {
194 notifyListeners(new DocumentTreeEvent<>(
195 path,
196 Type.DELETED,
197 Optional.empty(),
198 Optional.of(oldValue)));
199 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700200 } else {
Jordan Haltermane853d032017-08-01 15:10:28 -0700201 Versioned<byte[]> oldValue = docTree.set(path, commit.value().value().orElse(null));
202 Versioned<byte[]> newValue = docTree.get(path);
203 result = new DocumentTreeResult<>(Status.OK, newValue);
204 if (oldValue == null) {
205 notifyListeners(new DocumentTreeEvent<>(
206 path,
207 Type.CREATED,
208 Optional.of(newValue),
209 Optional.empty()));
210 } else {
211 notifyListeners(new DocumentTreeEvent<>(
212 path,
213 Type.UPDATED,
214 Optional.of(newValue),
215 Optional.of(oldValue)));
216 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700217 }
Jordan Haltermane853d032017-08-01 15:10:28 -0700218 } else {
219 result = new DocumentTreeResult<>(
220 commit.value().value() == null ? Status.INVALID_PATH : Status.NOOP,
221 currentValue);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700222 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700223 } catch (IllegalDocumentModificationException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700224 result = DocumentTreeResult.illegalModification();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700225 } catch (NoSuchDocumentPathException e) {
Jordan Haltermane853d032017-08-01 15:10:28 -0700226 result = DocumentTreeResult.invalidPath();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700227 } catch (Exception e) {
228 getLogger().error("Failed to apply {} to state machine", commit.value(), e);
229 throw Throwables.propagate(e);
230 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700231 return result;
232 }
233
234 protected void clear(Commit<Void> commit) {
235 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
236 Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
237 toClearQueue.addAll(topLevelChildren.keySet()
238 .stream()
239 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
240 .collect(Collectors.toList()));
241 while (!toClearQueue.isEmpty()) {
242 DocumentPath path = toClearQueue.remove();
243 Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
244 if (children.size() == 0) {
245 docTree.removeNode(path);
246 } else {
247 children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
248 toClearQueue.add(path);
249 }
250 }
251 }
252
Jordan Haltermane853d032017-08-01 15:10:28 -0700253 private void notifyListeners(DocumentTreeEvent<byte[]> event) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700254 listeners.values()
255 .stream()
256 .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
257 .forEach(listener -> listener.publish(CHANGE, Arrays.asList(event)));
258 }
259
260 @Override
261 public void onExpire(RaftSession session) {
262 closeListener(session.sessionId().id());
263 }
264
265 @Override
266 public void onClose(RaftSession session) {
267 closeListener(session.sessionId().id());
268 }
269
270 private void closeListener(Long sessionId) {
271 listeners.remove(sessionId);
272 }
273
274 private class SessionListenCommits {
275 private final List<Listener> listeners = Lists.newArrayList();
276 private DocumentPath leastCommonAncestorPath;
277
278 public void add(Listener listener) {
279 listeners.add(listener);
280 recomputeLeastCommonAncestor();
281 }
282
283 public void remove(Commit<? extends Unlisten> commit) {
284 // Remove the first listen commit with path matching path in unlisten commit
285 Iterator<Listener> iterator = listeners.iterator();
286 while (iterator.hasNext()) {
287 Listener listener = iterator.next();
288 if (listener.path().equals(commit.value().path())) {
289 iterator.remove();
290 }
291 }
292 recomputeLeastCommonAncestor();
293 }
294
295 public DocumentPath leastCommonAncestorPath() {
296 return leastCommonAncestorPath;
297 }
298
299 public <M> void publish(EventType topic, M message) {
300 listeners.stream().findAny().ifPresent(listener ->
301 listener.session().publish(topic, serializer::encode, message));
302 }
303
304 private void recomputeLeastCommonAncestor() {
305 this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(listeners.stream()
306 .map(Listener::path)
307 .collect(Collectors.toList()));
308 }
309 }
310
311 private static class Listener {
312 private final RaftSession session;
313 private final DocumentPath path;
314
315 public Listener(RaftSession session, DocumentPath path) {
316 this.session = session;
317 this.path = path;
318 }
319
320 public DocumentPath path() {
321 return path;
322 }
323
324 public RaftSession session() {
325 return session;
326 }
327 }
328}