blob: 1aed4a896d7e5bacefed66899062897ff842fc39 [file] [log] [blame]
Jordan Haltermanc955df72017-02-04 20:43:28 -08001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.copycat.server.Commit;
19import io.atomix.copycat.server.Snapshottable;
20import io.atomix.copycat.server.StateMachineExecutor;
21import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
22import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
23import io.atomix.resource.ResourceStateMachine;
24import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.AddAndGet;
25import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Clear;
26import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.DecrementAndGet;
27import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Get;
28import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndAdd;
29import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndDecrement;
30import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.GetAndIncrement;
31import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.IncrementAndGet;
32import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.IsEmpty;
33import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Put;
34import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.PutIfAbsent;
35import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Remove;
36import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.RemoveValue;
37import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Replace;
38import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapCommands.Size;
39
40import java.util.HashMap;
41import java.util.Map;
42import java.util.Properties;
43
44/**
45 * Atomic counter map state for Atomix.
46 * <p>
47 * The counter map state is implemented as a snapshottable state machine. Snapshots are necessary
48 * since incremental compaction is impractical for counters where the value of a counter is the sum
49 * of all its increments. Note that this snapshotting large state machines may risk blocking of the
50 * Raft cluster with the current implementation of snapshotting in Copycat.
51 */
52public class AtomixAtomicCounterMapState extends ResourceStateMachine implements Snapshottable {
53 private Map<String, Long> map = new HashMap<>();
54
55 public AtomixAtomicCounterMapState(Properties config) {
56 super(config);
57 }
58
59 @Override
60 protected void configure(StateMachineExecutor executor) {
61 executor.register(Put.class, this::put);
62 executor.register(PutIfAbsent.class, this::putIfAbsent);
63 executor.register(Get.class, this::get);
64 executor.register(Replace.class, this::replace);
65 executor.register(Remove.class, this::remove);
66 executor.register(RemoveValue.class, this::removeValue);
67 executor.register(GetAndIncrement.class, this::getAndIncrement);
68 executor.register(GetAndDecrement.class, this::getAndDecrement);
69 executor.register(IncrementAndGet.class, this::incrementAndGet);
70 executor.register(DecrementAndGet.class, this::decrementAndGet);
71 executor.register(AddAndGet.class, this::addAndGet);
72 executor.register(GetAndAdd.class, this::getAndAdd);
73 executor.register(Size.class, this::size);
74 executor.register(IsEmpty.class, this::isEmpty);
75 executor.register(Clear.class, this::clear);
76 }
77
78 @Override
79 public void snapshot(SnapshotWriter writer) {
80 writer.writeObject(map);
81 }
82
83 @Override
84 public void install(SnapshotReader reader) {
85 map = reader.readObject();
86 }
87
88 /**
89 * Returns the primitive value for the given primitive wrapper.
90 */
91 private long primitive(Long value) {
92 if (value != null) {
93 return value;
94 } else {
95 return 0;
96 }
97 }
98
99 /**
100 * Handles a {@link Put} command which implements {@link AtomixAtomicCounterMap#put(String, long)}.
101 *
102 * @param commit put commit
103 * @return put result
104 */
105 protected long put(Commit<Put> commit) {
106 try {
107 return primitive(map.put(commit.operation().key(), commit.operation().value()));
108 } finally {
109 commit.close();
110 }
111 }
112
113 /**
114 * Handles a {@link PutIfAbsent} command which implements {@link AtomixAtomicCounterMap#putIfAbsent(String, long)}.
115 *
116 * @param commit putIfAbsent commit
117 * @return putIfAbsent result
118 */
119 protected long putIfAbsent(Commit<PutIfAbsent> commit) {
120 try {
121 return primitive(map.putIfAbsent(commit.operation().key(), commit.operation().value()));
122 } finally {
123 commit.close();
124 }
125 }
126
127 /**
128 * Handles a {@link Get} query which implements {@link AtomixAtomicCounterMap#get(String)}}.
129 *
130 * @param commit get commit
131 * @return get result
132 */
133 protected long get(Commit<Get> commit) {
134 try {
135 return primitive(map.get(commit.operation().key()));
136 } finally {
137 commit.close();
138 }
139 }
140
141 /**
142 * Handles a {@link Replace} command which implements {@link AtomixAtomicCounterMap#replace(String, long, long)}.
143 *
144 * @param commit replace commit
145 * @return replace result
146 */
147 protected boolean replace(Commit<Replace> commit) {
148 try {
149 Long value = map.get(commit.operation().key());
150 if (value == null) {
151 if (commit.operation().replace() == 0) {
152 map.put(commit.operation().key(), commit.operation().value());
153 return true;
154 } else {
155 return false;
156 }
157 } else if (value == commit.operation().replace()) {
158 map.put(commit.operation().key(), commit.operation().value());
159 return true;
160 }
161 return false;
162 } finally {
163 commit.close();
164 }
165 }
166
167 /**
168 * Handles a {@link Remove} command which implements {@link AtomixAtomicCounterMap#remove(String)}.
169 *
170 * @param commit remove commit
171 * @return remove result
172 */
173 protected long remove(Commit<Remove> commit) {
174 try {
175 return primitive(map.remove(commit.operation().key()));
176 } finally {
177 commit.close();
178 }
179 }
180
181 /**
182 * Handles a {@link RemoveValue} command which implements {@link AtomixAtomicCounterMap#remove(String, long)}.
183 *
184 * @param commit removeValue commit
185 * @return removeValue result
186 */
187 protected boolean removeValue(Commit<RemoveValue> commit) {
188 try {
189 Long value = map.get(commit.operation().key());
190 if (value == null) {
191 if (commit.operation().value() == 0) {
192 map.remove(commit.operation().key());
193 return true;
194 }
195 return false;
196 } else if (value == commit.operation().value()) {
197 map.remove(commit.operation().key());
198 return true;
199 }
200 return false;
201 } finally {
202 commit.close();
203 }
204 }
205
206 /**
207 * Handles a {@link GetAndIncrement} command which implements
208 * {@link AtomixAtomicCounterMap#getAndIncrement(String)}.
209 *
210 * @param commit getAndIncrement commit
211 * @return getAndIncrement result
212 */
213 protected long getAndIncrement(Commit<GetAndIncrement> commit) {
214 try {
215 long value = primitive(map.get(commit.operation().key()));
216 map.put(commit.operation().key(), value + 1);
217 return value;
218 } finally {
219 commit.close();
220 }
221 }
222
223 /**
224 * Handles a {@link GetAndDecrement} command which implements
225 * {@link AtomixAtomicCounterMap#getAndDecrement(String)}.
226 *
227 * @param commit getAndDecrement commit
228 * @return getAndDecrement result
229 */
230 protected long getAndDecrement(Commit<GetAndDecrement> commit) {
231 try {
232 long value = primitive(map.get(commit.operation().key()));
233 map.put(commit.operation().key(), value - 1);
234 return value;
235 } finally {
236 commit.close();
237 }
238 }
239
240 /**
241 * Handles a {@link IncrementAndGet} command which implements
242 * {@link AtomixAtomicCounterMap#incrementAndGet(String)}.
243 *
244 * @param commit incrementAndGet commit
245 * @return incrementAndGet result
246 */
247 protected long incrementAndGet(Commit<IncrementAndGet> commit) {
248 try {
249 long value = primitive(map.get(commit.operation().key()));
250 map.put(commit.operation().key(), ++value);
251 return value;
252 } finally {
253 commit.close();
254 }
255 }
256
257 /**
258 * Handles a {@link DecrementAndGet} command which implements
259 * {@link AtomixAtomicCounterMap#decrementAndGet(String)}.
260 *
261 * @param commit decrementAndGet commit
262 * @return decrementAndGet result
263 */
264 protected long decrementAndGet(Commit<DecrementAndGet> commit) {
265 try {
266 long value = primitive(map.get(commit.operation().key()));
267 map.put(commit.operation().key(), --value);
268 return value;
269 } finally {
270 commit.close();
271 }
272 }
273
274 /**
275 * Handles a {@link AddAndGet} command which implements {@link AtomixAtomicCounterMap#addAndGet(String, long)}.
276 *
277 * @param commit addAndGet commit
278 * @return addAndGet result
279 */
280 protected long addAndGet(Commit<AddAndGet> commit) {
281 try {
282 long value = primitive(map.get(commit.operation().key()));
283 value += commit.operation().delta();
284 map.put(commit.operation().key(), value);
285 return value;
286 } finally {
287 commit.close();
288 }
289 }
290
291 /**
292 * Handles a {@link GetAndAdd} command which implements {@link AtomixAtomicCounterMap#getAndAdd(String, long)}.
293 *
294 * @param commit getAndAdd commit
295 * @return getAndAdd result
296 */
297 protected long getAndAdd(Commit<GetAndAdd> commit) {
298 try {
299 long value = primitive(map.get(commit.operation().key()));
300 map.put(commit.operation().key(), value + commit.operation().delta());
301 return value;
302 } finally {
303 commit.close();
304 }
305 }
306
307 /**
308 * Handles a {@link Size} query which implements {@link AtomixAtomicCounterMap#size()}.
309 *
310 * @param commit size commit
311 * @return size result
312 */
313 protected int size(Commit<Size> commit) {
314 try {
315 return map.size();
316 } finally {
317 commit.close();
318 }
319 }
320
321 /**
322 * Handles an {@link IsEmpty} query which implements {@link AtomixAtomicCounterMap#isEmpty()}.
323 *
324 * @param commit isEmpty commit
325 * @return isEmpty result
326 */
327 protected boolean isEmpty(Commit<IsEmpty> commit) {
328 try {
329 return map.isEmpty();
330 } finally {
331 commit.close();
332 }
333 }
334
335 /**
336 * Handles a {@link Clear} command which implements {@link AtomixAtomicCounterMap#clear()}.
337 *
338 * @param commit clear commit
339 */
340 protected void clear(Commit<Clear> commit) {
341 try {
342 map.clear();
343 } finally {
344 commit.close();
345 }
346 }
347}