blob: 354db264d983a3253871faf4049cc35241ccdee3 [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.HashMap;
import java.util.Map;
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.AddAndGet;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.DecrementAndGet;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Get;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GetAndAdd;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GetAndDecrement;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GetAndIncrement;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.IncrementAndGet;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Put;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.PutIfAbsent;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Remove;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.RemoveValue;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.Replace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.ADD_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.CLEAR;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.DECREMENT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET_AND_ADD;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET_AND_DECREMENT;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.GET_AND_INCREMENT;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.INCREMENT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.IS_EMPTY;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.PUT;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.PUT_IF_ABSENT;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.REMOVE_VALUE;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations.SIZE;
/**
* Atomic counter map state for Atomix.
* <p>
* The counter map state is implemented as a snapshottable state machine. Snapshots are necessary
* since incremental compaction is impractical for counters where the value of a counter is the sum
* of all its increments. Note that this snapshotting large state machines may risk blocking of the
* Raft cluster with the current implementation of snapshotting in Copycat.
*/
public class AtomixAtomicCounterMapService extends AbstractRaftService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(AtomixAtomicCounterMapOperations.NAMESPACE)
.build());
private Map<String, Long> map = new HashMap<>();
@Override
protected void configure(RaftServiceExecutor executor) {
executor.register(PUT, SERIALIZER::decode, this::put, SERIALIZER::encode);
executor.register(PUT_IF_ABSENT, SERIALIZER::decode, this::putIfAbsent, SERIALIZER::encode);
executor.register(GET, SERIALIZER::decode, this::get, SERIALIZER::encode);
executor.register(REPLACE, SERIALIZER::decode, this::replace, SERIALIZER::encode);
executor.register(REMOVE, SERIALIZER::decode, this::remove, SERIALIZER::encode);
executor.register(REMOVE_VALUE, SERIALIZER::decode, this::removeValue, SERIALIZER::encode);
executor.register(GET_AND_INCREMENT, SERIALIZER::decode, this::getAndIncrement, SERIALIZER::encode);
executor.register(GET_AND_DECREMENT, SERIALIZER::decode, this::getAndDecrement, SERIALIZER::encode);
executor.register(INCREMENT_AND_GET, SERIALIZER::decode, this::incrementAndGet, SERIALIZER::encode);
executor.register(DECREMENT_AND_GET, SERIALIZER::decode, this::decrementAndGet, SERIALIZER::encode);
executor.register(ADD_AND_GET, SERIALIZER::decode, this::addAndGet, SERIALIZER::encode);
executor.register(GET_AND_ADD, SERIALIZER::decode, this::getAndAdd, SERIALIZER::encode);
executor.register(SIZE, this::size, SERIALIZER::encode);
executor.register(IS_EMPTY, this::isEmpty, SERIALIZER::encode);
executor.register(CLEAR, this::clear);
}
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeObject(map, SERIALIZER::encode);
}
@Override
public void install(SnapshotReader reader) {
map = reader.readObject(SERIALIZER::decode);
}
/**
* Returns the primitive value for the given primitive wrapper.
*/
private long primitive(Long value) {
if (value != null) {
return value;
} else {
return 0;
}
}
/**
* Handles a {@link Put} command which implements {@link AtomixAtomicCounterMap#put(String, long)}.
*
* @param commit put commit
* @return put result
*/
protected long put(Commit<Put> commit) {
return primitive(map.put(commit.value().key(), commit.value().value()));
}
/**
* Handles a {@link PutIfAbsent} command which implements {@link AtomixAtomicCounterMap#putIfAbsent(String, long)}.
*
* @param commit putIfAbsent commit
* @return putIfAbsent result
*/
protected long putIfAbsent(Commit<PutIfAbsent> commit) {
return primitive(map.putIfAbsent(commit.value().key(), commit.value().value()));
}
/**
* Handles a {@link Get} query which implements {@link AtomixAtomicCounterMap#get(String)}}.
*
* @param commit get commit
* @return get result
*/
protected long get(Commit<Get> commit) {
return primitive(map.get(commit.value().key()));
}
/**
* Handles a {@link Replace} command which implements {@link AtomixAtomicCounterMap#replace(String, long, long)}.
*
* @param commit replace commit
* @return replace result
*/
protected boolean replace(Commit<Replace> commit) {
Long value = map.get(commit.value().key());
if (value == null) {
if (commit.value().replace() == 0) {
map.put(commit.value().key(), commit.value().value());
return true;
} else {
return false;
}
} else if (value == commit.value().replace()) {
map.put(commit.value().key(), commit.value().value());
return true;
}
return false;
}
/**
* Handles a {@link Remove} command which implements {@link AtomixAtomicCounterMap#remove(String)}.
*
* @param commit remove commit
* @return remove result
*/
protected long remove(Commit<Remove> commit) {
return primitive(map.remove(commit.value().key()));
}
/**
* Handles a {@link RemoveValue} command which implements {@link AtomixAtomicCounterMap#remove(String, long)}.
*
* @param commit removeValue commit
* @return removeValue result
*/
protected boolean removeValue(Commit<RemoveValue> commit) {
Long value = map.get(commit.value().key());
if (value == null) {
if (commit.value().value() == 0) {
map.remove(commit.value().key());
return true;
}
return false;
} else if (value == commit.value().value()) {
map.remove(commit.value().key());
return true;
}
return false;
}
/**
* Handles a {@link GetAndIncrement} command which implements
* {@link AtomixAtomicCounterMap#getAndIncrement(String)}.
*
* @param commit getAndIncrement commit
* @return getAndIncrement result
*/
protected long getAndIncrement(Commit<GetAndIncrement> commit) {
long value = primitive(map.get(commit.value().key()));
map.put(commit.value().key(), value + 1);
return value;
}
/**
* Handles a {@link GetAndDecrement} command which implements
* {@link AtomixAtomicCounterMap#getAndDecrement(String)}.
*
* @param commit getAndDecrement commit
* @return getAndDecrement result
*/
protected long getAndDecrement(Commit<GetAndDecrement> commit) {
long value = primitive(map.get(commit.value().key()));
map.put(commit.value().key(), value - 1);
return value;
}
/**
* Handles a {@link IncrementAndGet} command which implements
* {@link AtomixAtomicCounterMap#incrementAndGet(String)}.
*
* @param commit incrementAndGet commit
* @return incrementAndGet result
*/
protected long incrementAndGet(Commit<IncrementAndGet> commit) {
long value = primitive(map.get(commit.value().key()));
map.put(commit.value().key(), ++value);
return value;
}
/**
* Handles a {@link DecrementAndGet} command which implements
* {@link AtomixAtomicCounterMap#decrementAndGet(String)}.
*
* @param commit decrementAndGet commit
* @return decrementAndGet result
*/
protected long decrementAndGet(Commit<DecrementAndGet> commit) {
long value = primitive(map.get(commit.value().key()));
map.put(commit.value().key(), --value);
return value;
}
/**
* Handles a {@link AddAndGet} command which implements {@link AtomixAtomicCounterMap#addAndGet(String, long)}.
*
* @param commit addAndGet commit
* @return addAndGet result
*/
protected long addAndGet(Commit<AddAndGet> commit) {
long value = primitive(map.get(commit.value().key()));
value += commit.value().delta();
map.put(commit.value().key(), value);
return value;
}
/**
* Handles a {@link GetAndAdd} command which implements {@link AtomixAtomicCounterMap#getAndAdd(String, long)}.
*
* @param commit getAndAdd commit
* @return getAndAdd result
*/
protected long getAndAdd(Commit<GetAndAdd> commit) {
long value = primitive(map.get(commit.value().key()));
map.put(commit.value().key(), value + commit.value().delta());
return value;
}
/**
* Handles a {@code Size} query which implements {@link AtomixAtomicCounterMap#size()}.
*
* @param commit size commit
* @return size result
*/
protected int size(Commit<Void> commit) {
return map.size();
}
/**
* Handles an {@code IsEmpty} query which implements {@link AtomixAtomicCounterMap#isEmpty()}.
*
* @param commit isEmpty commit
* @return isEmpty result
*/
protected boolean isEmpty(Commit<Void> commit) {
return map.isEmpty();
}
/**
* Handles a {@code Clear} command which implements {@link AtomixAtomicCounterMap#clear()}.
*
* @param commit clear commit
*/
protected void clear(Commit<Void> commit) {
map.clear();
}
}