blob: 354db264d983a3253871faf4049cc35241ccdee3 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.HashMap;
19import java.util.Map;
20
21import io.atomix.protocols.raft.service.AbstractRaftService;
22import io.atomix.protocols.raft.service.Commit;
23import io.atomix.protocols.raft.service.RaftServiceExecutor;
24import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
25import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.AddAndGet;
28import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.DecrementAndGet;
29import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Get;
30import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GetAndAdd;
31import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GetAndDecrement;
32import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GetAndIncrement;
33import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.IncrementAndGet;
34import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Put;
35import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.PutIfAbsent;
36import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Remove;
37import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.RemoveValue;
38import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Replace;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.Serializer;
41
42import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.ADD_AND_GET;
43import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.CLEAR;
44import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.DECREMENT_AND_GET;
45import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET;
46import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET_AND_ADD;
47import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET_AND_DECREMENT;
48import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET_AND_INCREMENT;
49import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.INCREMENT_AND_GET;
50import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.IS_EMPTY;
51import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.PUT;
52import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.PUT_IF_ABSENT;
53import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.REMOVE;
54import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.REMOVE_VALUE;
55import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.REPLACE;
56import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.SIZE;
57
58/**
59 * Atomic counter map state for Atomix.
60 * <p>
61 * The counter map state is implemented as a snapshottable state machine. Snapshots are necessary
62 * since incremental compaction is impractical for counters where the value of a counter is the sum
63 * of all its increments. Note that this snapshotting large state machines may risk blocking of the
64 * Raft cluster with the current implementation of snapshotting in Copycat.
65 */
66public class AtomixAtomicCounterMapService extends AbstractRaftService {
67
68 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
69 .register(KryoNamespaces.BASIC)
70 .register(AtomixAtomicCounterMapOperations.NAMESPACE)
71 .build());
72
73 private Map<String, Long> map = new HashMap<>();
74
75 @Override
76 protected void configure(RaftServiceExecutor executor) {
77 executor.register(PUT, SERIALIZER::decode, this::put, SERIALIZER::encode);
78 executor.register(PUT_IF_ABSENT, SERIALIZER::decode, this::putIfAbsent, SERIALIZER::encode);
79 executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
80 executor.register(REPLACE, SERIALIZER::decode, this::replace, SERIALIZER::encode);
81 executor.register(REMOVE, SERIALIZER::decode, this::remove, SERIALIZER::encode);
82 executor.register(REMOVE_VALUE, SERIALIZER::decode, this::removeValue, SERIALIZER::encode);
83 executor.register(GET_AND_INCREMENT, SERIALIZER::decode, this::getAndIncrement, SERIALIZER::encode);
84 executor.register(GET_AND_DECREMENT, SERIALIZER::decode, this::getAndDecrement, SERIALIZER::encode);
85 executor.register(INCREMENT_AND_GET, SERIALIZER::decode, this::incrementAndGet, SERIALIZER::encode);
86 executor.register(DECREMENT_AND_GET, SERIALIZER::decode, this::decrementAndGet, SERIALIZER::encode);
87 executor.register(ADD_AND_GET, SERIALIZER::decode, this::addAndGet, SERIALIZER::encode);
88 executor.register(GET_AND_ADD, SERIALIZER::decode, this::getAndAdd, SERIALIZER::encode);
89 executor.register(SIZE, this::size, SERIALIZER::encode);
90 executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
91 executor.register(CLEAR, this::clear);
92 }
93
94 @Override
95 public void snapshot(SnapshotWriter writer) {
96 writer.writeObject(map, SERIALIZER::encode);
97 }
98
99 @Override
100 public void install(SnapshotReader reader) {
101 map = reader.readObject(SERIALIZER::decode);
102 }
103
104 /**
105 * Returns the primitive value for the given primitive wrapper.
106 */
107 private long primitive(Long value) {
108 if (value != null) {
109 return value;
110 } else {
111 return 0;
112 }
113 }
114
115 /**
116 * Handles a {@link Put} command which implements {@link AtomixAtomicCounterMap#put(String, long)}.
117 *
118 * @param commit put commit
119 * @return put result
120 */
121 protected long put(Commit<Put> commit) {
122 return primitive(map.put(commit.value().key(), commit.value().value()));
123 }
124
125 /**
126 * Handles a {@link PutIfAbsent} command which implements {@link AtomixAtomicCounterMap#putIfAbsent(String, long)}.
127 *
128 * @param commit putIfAbsent commit
129 * @return putIfAbsent result
130 */
131 protected long putIfAbsent(Commit<PutIfAbsent> commit) {
132 return primitive(map.putIfAbsent(commit.value().key(), commit.value().value()));
133 }
134
135 /**
136 * Handles a {@link Get} query which implements {@link AtomixAtomicCounterMap#get(String)}}.
137 *
138 * @param commit get commit
139 * @return get result
140 */
141 protected long get(Commit<Get> commit) {
142 return primitive(map.get(commit.value().key()));
143 }
144
145 /**
146 * Handles a {@link Replace} command which implements {@link AtomixAtomicCounterMap#replace(String, long, long)}.
147 *
148 * @param commit replace commit
149 * @return replace result
150 */
151 protected boolean replace(Commit<Replace> commit) {
152 Long value = map.get(commit.value().key());
153 if (value == null) {
154 if (commit.value().replace() == 0) {
155 map.put(commit.value().key(), commit.value().value());
156 return true;
157 } else {
158 return false;
159 }
160 } else if (value == commit.value().replace()) {
161 map.put(commit.value().key(), commit.value().value());
162 return true;
163 }
164 return false;
165 }
166
167 /**
168 * Handles a {@link Remove} command which implements {@link AtomixAtomicCounterMap#remove(String)}.
169 *
170 * @param commit remove commit
171 * @return remove result
172 */
173 protected long remove(Commit<Remove> commit) {
174 return primitive(map.remove(commit.value().key()));
175 }
176
177 /**
178 * Handles a {@link RemoveValue} command which implements {@link AtomixAtomicCounterMap#remove(String, long)}.
179 *
180 * @param commit removeValue commit
181 * @return removeValue result
182 */
183 protected boolean removeValue(Commit<RemoveValue> commit) {
184 Long value = map.get(commit.value().key());
185 if (value == null) {
186 if (commit.value().value() == 0) {
187 map.remove(commit.value().key());
188 return true;
189 }
190 return false;
191 } else if (value == commit.value().value()) {
192 map.remove(commit.value().key());
193 return true;
194 }
195 return false;
196 }
197
198 /**
199 * Handles a {@link GetAndIncrement} command which implements
200 * {@link AtomixAtomicCounterMap#getAndIncrement(String)}.
201 *
202 * @param commit getAndIncrement commit
203 * @return getAndIncrement result
204 */
205 protected long getAndIncrement(Commit<GetAndIncrement> commit) {
206 long value = primitive(map.get(commit.value().key()));
207 map.put(commit.value().key(), value + 1);
208 return value;
209 }
210
211 /**
212 * Handles a {@link GetAndDecrement} command which implements
213 * {@link AtomixAtomicCounterMap#getAndDecrement(String)}.
214 *
215 * @param commit getAndDecrement commit
216 * @return getAndDecrement result
217 */
218 protected long getAndDecrement(Commit<GetAndDecrement> commit) {
219 long value = primitive(map.get(commit.value().key()));
220 map.put(commit.value().key(), value - 1);
221 return value;
222 }
223
224 /**
225 * Handles a {@link IncrementAndGet} command which implements
226 * {@link AtomixAtomicCounterMap#incrementAndGet(String)}.
227 *
228 * @param commit incrementAndGet commit
229 * @return incrementAndGet result
230 */
231 protected long incrementAndGet(Commit<IncrementAndGet> commit) {
232 long value = primitive(map.get(commit.value().key()));
233 map.put(commit.value().key(), ++value);
234 return value;
235 }
236
237 /**
238 * Handles a {@link DecrementAndGet} command which implements
239 * {@link AtomixAtomicCounterMap#decrementAndGet(String)}.
240 *
241 * @param commit decrementAndGet commit
242 * @return decrementAndGet result
243 */
244 protected long decrementAndGet(Commit<DecrementAndGet> commit) {
245 long value = primitive(map.get(commit.value().key()));
246 map.put(commit.value().key(), --value);
247 return value;
248 }
249
250 /**
251 * Handles a {@link AddAndGet} command which implements {@link AtomixAtomicCounterMap#addAndGet(String, long)}.
252 *
253 * @param commit addAndGet commit
254 * @return addAndGet result
255 */
256 protected long addAndGet(Commit<AddAndGet> commit) {
257 long value = primitive(map.get(commit.value().key()));
258 value += commit.value().delta();
259 map.put(commit.value().key(), value);
260 return value;
261 }
262
263 /**
264 * Handles a {@link GetAndAdd} command which implements {@link AtomixAtomicCounterMap#getAndAdd(String, long)}.
265 *
266 * @param commit getAndAdd commit
267 * @return getAndAdd result
268 */
269 protected long getAndAdd(Commit<GetAndAdd> commit) {
270 long value = primitive(map.get(commit.value().key()));
271 map.put(commit.value().key(), value + commit.value().delta());
272 return value;
273 }
274
275 /**
276 * Handles a {@code Size} query which implements {@link AtomixAtomicCounterMap#size()}.
277 *
278 * @param commit size commit
279 * @return size result
280 */
281 protected int size(Commit<Void> commit) {
282 return map.size();
283 }
284
285 /**
286 * Handles an {@code IsEmpty} query which implements {@link AtomixAtomicCounterMap#isEmpty()}.
287 *
288 * @param commit isEmpty commit
289 * @return isEmpty result
290 */
291 protected boolean isEmpty(Commit<Void> commit) {
292 return map.isEmpty();
293 }
294
295 /**
296 * Handles a {@code Clear} command which implements {@link AtomixAtomicCounterMap#clear()}.
297 *
298 * @param commit clear commit
299 */
300 protected void clear(Commit<Void> commit) {
301 map.clear();
302 }
303}