blob: cd725d9d60c9d83684d80e592e1d3b99d4321543 [file] [log] [blame]
Madan Jampani79924fa2016-09-13 13:57:03 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.primitives.resources.impl;
18
19import static org.slf4j.LoggerFactory.getLogger;
20import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.ServerSession;
24import io.atomix.copycat.server.session.SessionListener;
25import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
26import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
27import io.atomix.resource.ResourceStateMachine;
28
29import java.util.HashMap;
30import java.util.Map;
31import java.util.Optional;
32import java.util.Properties;
33import java.util.Queue;
34import java.util.concurrent.atomic.AtomicLong;
35import java.util.stream.Collectors;
36
37import org.onlab.util.Match;
38import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
39import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
40import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
41import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
42import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
43import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
44import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status;
45import org.onosproject.store.service.DocumentPath;
46import org.onosproject.store.service.DocumentTree;
47import org.onosproject.store.service.DocumentTreeEvent;
48import org.onosproject.store.service.DocumentTreeEvent.Type;
49import org.onosproject.store.service.IllegalDocumentModificationException;
50import org.onosproject.store.service.NoSuchDocumentPathException;
51import org.onosproject.store.service.Versioned;
52import org.slf4j.Logger;
53
54import com.google.common.base.Throwables;
55import com.google.common.collect.ImmutableList;
56import com.google.common.collect.Maps;
57import com.google.common.collect.Queues;
58
59/**
60 * State Machine for {@link AtomixDocumentTree} resource.
61 */
62public class AtomixDocumentTreeState
63 extends ResourceStateMachine
64 implements SessionListener, Snapshottable {
65
66 private final Logger log = getLogger(getClass());
67 private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
Madan Jampani2914e4e2016-09-13 17:48:56 -070068 private AtomicLong versionCounter = new AtomicLong(0);
Madan Jampani79924fa2016-09-13 13:57:03 -070069 private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
70
71 public AtomixDocumentTreeState(Properties properties) {
72 super(properties);
73 }
74
75 @Override
76 public void snapshot(SnapshotWriter writer) {
77 writer.writeLong(versionCounter.get());
78 }
79
80 @Override
81 public void install(SnapshotReader reader) {
Madan Jampani2914e4e2016-09-13 17:48:56 -070082 versionCounter = new AtomicLong(reader.readLong());
Madan Jampani79924fa2016-09-13 13:57:03 -070083 }
84
85 @Override
86 protected void configure(StateMachineExecutor executor) {
87 // Listeners
88 executor.register(Listen.class, this::listen);
89 executor.register(Unlisten.class, this::unlisten);
90 // queries
91 executor.register(Get.class, this::get);
92 executor.register(GetChildren.class, this::getChildren);
93 // commands
94 executor.register(Update.class, this::update);
95 executor.register(Clear.class, this::clear);
96 }
97
98 protected void listen(Commit<? extends Listen> commit) {
99 Long sessionId = commit.session().id();
100 if (listeners.putIfAbsent(sessionId, commit) != null) {
101 commit.close();
102 return;
103 }
Madan Jampani2914e4e2016-09-13 17:48:56 -0700104 commit.session().onStateChange(
Madan Jampani79924fa2016-09-13 13:57:03 -0700105 state -> {
106 if (state == ServerSession.State.CLOSED
107 || state == ServerSession.State.EXPIRED) {
108 Commit<? extends Listen> listener = listeners.remove(sessionId);
109 if (listener != null) {
110 listener.close();
111 }
112 }
113 });
114 }
115
116 protected void unlisten(Commit<? extends Unlisten> commit) {
117 try {
118 closeListener(commit.session().id());
119 } finally {
120 commit.close();
121 }
122 }
123
124 protected Versioned<byte[]> get(Commit<? extends Get> commit) {
125 try {
126 Versioned<TreeNodeValue> value = docTree.get(commit.operation().path());
127 return value == null ? null : value.map(node -> node == null ? null : node.value());
128 } finally {
129 commit.close();
130 }
131 }
132
133 protected Map<String, Versioned<byte[]>> getChildren(Commit<? extends GetChildren> commit) {
134 try {
135 Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(commit.operation().path());
136 return children == null
137 ? null : Maps.newHashMap(Maps.transformValues(children,
138 value -> value.map(TreeNodeValue::value)));
139 } finally {
140 commit.close();
141 }
142 }
143
144 protected DocumentTreeUpdateResult<byte[]> update(Commit<? extends Update> commit) {
145 DocumentTreeUpdateResult<byte[]> result = null;
146 DocumentPath path = commit.operation().path();
147 boolean updated = false;
148 Versioned<TreeNodeValue> currentValue = docTree.get(path);
149 try {
150 Match<Long> versionMatch = commit.operation().versionMatch();
151 Match<byte[]> valueMatch = commit.operation().valueMatch();
152
153 if (versionMatch.matches(currentValue == null ? null : currentValue.version())
154 && valueMatch.matches(currentValue == null ? null : currentValue.value().value())) {
155 if (commit.operation().value() == null) {
156 docTree.removeNode(path);
157 } else {
158 docTree.set(path, new NonTransactionalCommit(commit));
159 }
160 updated = true;
161 }
162 Versioned<TreeNodeValue> newValue = updated ? docTree.get(path) : currentValue;
163 Status updateStatus = updated
164 ? Status.OK : commit.operation().value() == null ? Status.INVALID_PATH : Status.NOOP;
165 result = new DocumentTreeUpdateResult<>(path,
166 updateStatus,
167 newValue == null
168 ? null : newValue.map(TreeNodeValue::value),
169 currentValue == null
170 ? null : currentValue.map(TreeNodeValue::value));
171 } catch (IllegalDocumentModificationException e) {
172 result = DocumentTreeUpdateResult.illegalModification(path);
173 } catch (NoSuchDocumentPathException e) {
174 result = DocumentTreeUpdateResult.invalidPath(path);
175 } catch (Exception e) {
176 log.error("Failed to apply {} to state machine", commit.operation(), e);
177 throw Throwables.propagate(e);
178 } finally {
179 if (updated) {
180 if (currentValue != null) {
181 currentValue.value().discard();
182 }
183 } else {
184 commit.close();
185 }
186 }
187 notifyListeners(path, result);
188 return result;
189 }
190
191 protected void clear(Commit<? extends Clear> commit) {
192 try {
193 Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
194 Map<String, Versioned<TreeNodeValue>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
195 toClearQueue.addAll(topLevelChildren.keySet()
196 .stream()
197 .map(name -> new DocumentPath(name, DocumentPath.from("root")))
198 .collect(Collectors.toList()));
199 while (!toClearQueue.isEmpty()) {
200 DocumentPath path = toClearQueue.remove();
201 Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(path);
202 if (children.size() == 0) {
203 docTree.removeNode(path).value().discard();
204 } else {
205 children.keySet()
206 .stream()
207 .forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
208 toClearQueue.add(path);
209 }
210 }
211 } finally {
212 commit.close();
213 }
214 }
215
216 /**
217 * Interface implemented by tree node values.
218 */
219 private interface TreeNodeValue {
220 /**
221 * Returns the raw {@code byte[]}.
222 *
223 * @return raw value
224 */
225 byte[] value();
226
227 /**
228 * Discards the value by invoke appropriate clean up actions.
229 */
230 void discard();
231 }
232
233 /**
234 * A {@code TreeNodeValue} that is derived from a non-transactional update
235 * i.e. via any standard tree update operation.
236 */
237 private class NonTransactionalCommit implements TreeNodeValue {
238 private final Commit<? extends Update> commit;
239
240 public NonTransactionalCommit(Commit<? extends Update> commit) {
241 this.commit = commit;
242 }
243
244 @Override
245 public byte[] value() {
246 return commit.operation().value();
247 }
248
249 @Override
250 public void discard() {
251 commit.close();
252 }
253 }
254
255 private void notifyListeners(DocumentPath path, DocumentTreeUpdateResult<byte[]> result) {
256 if (result.status() != Status.OK) {
257 return;
258 }
259 DocumentTreeEvent<byte[]> event =
260 new DocumentTreeEvent<>(path,
261 result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
262 Optional.ofNullable(result.newValue()),
263 Optional.ofNullable(result.oldValue()));
Madan Jampani2914e4e2016-09-13 17:48:56 -0700264 listeners.values()
265 .forEach(commit -> commit.session()
266 .publish(AtomixDocumentTree.CHANGE_SUBJECT,
267 ImmutableList.of(event)));
Madan Jampani79924fa2016-09-13 13:57:03 -0700268 }
269
270 @Override
271 public void register(ServerSession session) {
272 }
273
274 @Override
275 public void unregister(ServerSession session) {
276 closeListener(session.id());
277 }
278
279 @Override
280 public void expire(ServerSession session) {
281 closeListener(session.id());
282 }
283
284 @Override
285 public void close(ServerSession session) {
286 closeListener(session.id());
287 }
288
289 private void closeListener(Long sessionId) {
290 Commit<? extends Listen> commit = listeners.remove(sessionId);
291 if (commit != null) {
292 commit.close();
293 }
294 }
295}