blob: 6b7c550bd0ac39c6c18884fe704c688e4ecf3cdd [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import java.util.Arrays;
20import java.util.HashMap;
21import java.util.Iterator;
22import java.util.List;
23import java.util.Map;
24import java.util.Optional;
25import java.util.Queue;
26import java.util.TreeMap;
27import java.util.concurrent.atomic.AtomicLong;
28import java.util.stream.Collectors;
29
30import com.esotericsoftware.kryo.Kryo;
31import com.esotericsoftware.kryo.io.Input;
32import com.esotericsoftware.kryo.io.Output;
33import com.google.common.base.Throwables;
34import com.google.common.collect.Lists;
35import com.google.common.collect.Queues;
36import io.atomix.protocols.raft.event.EventType;
37import io.atomix.protocols.raft.service.AbstractRaftService;
38import io.atomix.protocols.raft.service.Commit;
39import io.atomix.protocols.raft.service.RaftServiceExecutor;
40import io.atomix.protocols.raft.session.RaftSession;
41import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
42import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
43import org.onlab.util.KryoNamespace;
44import org.onlab.util.Match;
45import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
46import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
47import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
48import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
49import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
50import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.service.DocumentPath;
53import org.onosproject.store.service.DocumentTree;
54import org.onosproject.store.service.DocumentTreeEvent;
55import org.onosproject.store.service.DocumentTreeEvent.Type;
56import org.onosproject.store.service.IllegalDocumentModificationException;
57import org.onosproject.store.service.NoSuchDocumentPathException;
58import org.onosproject.store.service.Serializer;
59import org.onosproject.store.service.Versioned;
60
61import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
62import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
63import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
64import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
65import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
66import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
67import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
68
69/**
70 * State Machine for {@link AtomixDocumentTree} resource.
71 */
72public class AtomixDocumentTreeService extends AbstractRaftService {
73 private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
74 .register(KryoNamespaces.BASIC)
75 .register(AtomixDocumentTreeOperations.NAMESPACE)
76 .register(AtomixDocumentTreeEvents.NAMESPACE)
77 .register(new com.esotericsoftware.kryo.Serializer<Listener>() {
78 @Override
79 public void write(Kryo kryo, Output output, Listener listener) {
80 output.writeLong(listener.session.sessionId().id());
81 kryo.writeObject(output, listener.path);
82 }
83
84 @Override
85 public Listener read(Kryo kryo, Input input, Class<Listener> type) {
86 return new Listener(getSessions().getSession(input.readLong()),
87 kryo.readObjectOrNull(input, DocumentPath.class));
88 }
89 }, Listener.class)
90 .register(Versioned.class)
91 .register(DocumentPath.class)
92 .register(new HashMap().keySet().getClass())
93 .register(TreeMap.class)
94 .register(SessionListenCommits.class)
95 .register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
96 @Override
97 public void write(Kryo kryo, Output output, DefaultDocumentTree object) {
98 kryo.writeObject(output, object.root);
99 }
100
101 @Override
102 @SuppressWarnings("unchecked")
103 public DefaultDocumentTree read(Kryo kryo, Input input, Class<DefaultDocumentTree> type) {
104 return new DefaultDocumentTree(versionCounter::incrementAndGet,
105 kryo.readObject(input, DefaultDocumentTreeNode.class));
106 }
107 }, DefaultDocumentTree.class)
108 .register(DefaultDocumentTreeNode.class)
109 .build());
110
111 private Map<Long, SessionListenCommits> listeners = new HashMap<>();
112 private AtomicLong versionCounter = new AtomicLong(0);
113 private DocumentTree<byte[]> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
114
115 @Override
116 public void snapshot(SnapshotWriter writer) {
117 writer.writeLong(versionCounter.get());
118 writer.writeObject(listeners, serializer::encode);
119 writer.writeObject(docTree, serializer::encode);
120 }
121
122 @Override
123 public void install(SnapshotReader reader) {
124 versionCounter = new AtomicLong(reader.readLong());
125 listeners = reader.readObject(serializer::decode);
126 docTree = reader.readObject(serializer::decode);
127 }
128
129 @Override
130 protected void configure(RaftServiceExecutor executor) {
131 // Listeners
132 executor.register(ADD_LISTENER, serializer::decode, this::listen);
133 executor.register(REMOVE_LISTENER, serializer::decode, this::unlisten);
134 // queries
135 executor.register(GET, serializer::decode, this::get, serializer::encode);
136 executor.register(GET_CHILDREN, serializer::decode, this::getChildren, serializer::encode);
137 // commands
138 executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
139 executor.register(CLEAR, this::clear);
140 }
141
142 protected void listen(Commit<? extends Listen> commit) {
143 Long sessionId = commit.session().sessionId().id();
144 listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits())
145 .add(new Listener(commit.session(), commit.value().path()));
146 }
147
148 protected void unlisten(Commit<? extends Unlisten> commit) {
149 Long sessionId = commit.session().sessionId().id();
150 SessionListenCommits listenCommits = listeners.get(sessionId);
151 if (listenCommits != null) {
152 listenCommits.remove(commit);
153 }
154 }
155
156 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
157 try {
158 Versioned<byte[]> value = docTree.get(commit.value().path());
159 return value == null ? null : value.map(node -> node == null ? null : node);
160 } catch (IllegalStateException e) {
161 return null;
162 }
163 }
164
165 protected Map<String, Versioned<byte[]>> getChildren(Commit<? extends GetChildren> commit) {
166 return docTree.getChildren(commit.value().path());
167 }
168
169 protected DocumentTreeUpdateResult<byte[]> update(Commit<? extends Update> commit) {
170 DocumentTreeUpdateResult<byte[]> result = null;
171 DocumentPath path = commit.value().path();
172 boolean updated = false;
173 Versioned<byte[]> currentValue = docTree.get(path);
174 try {
175 Match<Long> versionMatch = commit.value().versionMatch();
176 Match<byte[]> valueMatch = commit.value().valueMatch();
177
178 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
179 && valueMatch.matches(currentValue == null ? null : currentValue.value())) {
180 if (commit.value().value() == null) {
181 docTree.removeNode(path);
182 } else {
183 docTree.set(path, commit.value().value().orElse(null));
184 }
185 updated = true;
186 }
187 Versioned<byte[]> newValue = updated ? docTree.get(path) : currentValue;
188 Status updateStatus = updated
189 ? Status.OK : commit.value().value() == null ? Status.INVALID_PATH : Status.NOOP;
190 result = new DocumentTreeUpdateResult<>(path, updateStatus, newValue, currentValue);
191 } catch (IllegalDocumentModificationException e) {
192 result = DocumentTreeUpdateResult.illegalModification(path);
193 } catch (NoSuchDocumentPathException e) {
194 result = DocumentTreeUpdateResult.invalidPath(path);
195 } catch (Exception e) {
196 getLogger().error("Failed to apply {} to state machine", commit.value(), e);
197 throw Throwables.propagate(e);
198 }
199 notifyListeners(path, result);
200 return result;
201 }
202
203 protected void clear(Commit<Void> commit) {
204 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
205 Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
206 toClearQueue.addAll(topLevelChildren.keySet()
207 .stream()
208 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
209 .collect(Collectors.toList()));
210 while (!toClearQueue.isEmpty()) {
211 DocumentPath path = toClearQueue.remove();
212 Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
213 if (children.size() == 0) {
214 docTree.removeNode(path);
215 } else {
216 children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
217 toClearQueue.add(path);
218 }
219 }
220 }
221
222 private void notifyListeners(DocumentPath path, DocumentTreeUpdateResult<byte[]> result) {
223 if (result.status() != Status.OK) {
224 return;
225 }
226 DocumentTreeEvent<byte[]> event =
227 new DocumentTreeEvent<>(path,
228 result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
229 Optional.ofNullable(result.newValue()),
230 Optional.ofNullable(result.oldValue()));
231
232 listeners.values()
233 .stream()
234 .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
235 .forEach(listener -> listener.publish(CHANGE, Arrays.asList(event)));
236 }
237
238 @Override
239 public void onExpire(RaftSession session) {
240 closeListener(session.sessionId().id());
241 }
242
243 @Override
244 public void onClose(RaftSession session) {
245 closeListener(session.sessionId().id());
246 }
247
248 private void closeListener(Long sessionId) {
249 listeners.remove(sessionId);
250 }
251
252 private class SessionListenCommits {
253 private final List<Listener> listeners = Lists.newArrayList();
254 private DocumentPath leastCommonAncestorPath;
255
256 public void add(Listener listener) {
257 listeners.add(listener);
258 recomputeLeastCommonAncestor();
259 }
260
261 public void remove(Commit<? extends Unlisten> commit) {
262 // Remove the first listen commit with path matching path in unlisten commit
263 Iterator<Listener> iterator = listeners.iterator();
264 while (iterator.hasNext()) {
265 Listener listener = iterator.next();
266 if (listener.path().equals(commit.value().path())) {
267 iterator.remove();
268 }
269 }
270 recomputeLeastCommonAncestor();
271 }
272
273 public DocumentPath leastCommonAncestorPath() {
274 return leastCommonAncestorPath;
275 }
276
277 public <M> void publish(EventType topic, M message) {
278 listeners.stream().findAny().ifPresent(listener ->
279 listener.session().publish(topic, serializer::encode, message));
280 }
281
282 private void recomputeLeastCommonAncestor() {
283 this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(listeners.stream()
284 .map(Listener::path)
285 .collect(Collectors.toList()));
286 }
287 }
288
289 private static class Listener {
290 private final RaftSession session;
291 private final DocumentPath path;
292
293 public Listener(RaftSession session, DocumentPath path) {
294 this.session = session;
295 this.path = path;
296 }
297
298 public DocumentPath path() {
299 return path;
300 }
301
302 public RaftSession session() {
303 return session;
304 }
305 }
306}