blob: 7edeb44963378bdcd0b69e2921d8d63878956424 [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.DatabaseUpdate.Type;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
/**
* Default database state.
*/
public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
private Long nextVersion;
private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> tables;
/**
* This locks map has a structure similar to the "tables" map above and
* holds all the provisional updates made during a transaction's prepare phase.
* The entry value is represented as the tuple: (transactionId, newValue)
* If newValue == null that signifies this update is attempting to
* delete the existing value.
* This map also serves as a lock on the entries that are being updated.
* The presence of a entry in this map indicates that element is
* participating in a transaction and is currently locked for updates.
*/
private Map<String, Map<String, Update>> locks;
@Initializer
@Override
public void init(StateContext<DatabaseState<String, byte[]>> context) {
counters = context.get("counters");
if (counters == null) {
counters = Maps.newConcurrentMap();
context.put("counters", counters);
}
tables = context.get("tables");
if (tables == null) {
tables = Maps.newConcurrentMap();
context.put("tables", tables);
}
locks = context.get("locks");
if (locks == null) {
locks = Maps.newConcurrentMap();
context.put("locks", locks);
}
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
context.put("nextVersion", nextVersion);
}
}
@Override
public Set<String> tableNames() {
return new HashSet<>(tables.keySet());
}
@Override
public Map<String, Long> counters() {
Map<String, Long> counterMap = Maps.newHashMap();
counters.forEach((k, v) -> counterMap.put(k, v.get()));
return counterMap;
}
@Override
public int size(String tableName) {
return getTableMap(tableName).size();
}
@Override
public boolean isEmpty(String tableName) {
return getTableMap(tableName).isEmpty();
}
@Override
public boolean containsKey(String tableName, String key) {
return getTableMap(tableName).containsKey(key);
}
@Override
public boolean containsValue(String tableName, byte[] value) {
return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
}
@Override
public Versioned<byte[]> get(String tableName, String key) {
return getTableMap(tableName).get(key);
}
@Override
public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
return isLockedForUpdates(tableName, key)
? Result.locked()
: Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
}
@Override
public Result<UpdateResult<Versioned<byte[]>>> putAndGet(String tableName,
String key,
byte[] value) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
} else {
Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
Versioned<byte[]> oldValue = getTableMap(tableName).put(key, newValue);
return Result.ok(new UpdateResult<>(true, oldValue, newValue));
}
}
@Override
public Result<UpdateResult<Versioned<byte[]>>> putIfAbsentAndGet(String tableName,
String key,
byte[] value) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
}
Versioned<byte[]> currentValue = getTableMap(tableName).get(key);
if (currentValue != null) {
return Result.ok(new UpdateResult<>(false, currentValue, currentValue));
} else {
Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
getTableMap(tableName).put(key, newValue);
return Result.ok(new UpdateResult<>(true, null, newValue));
}
}
@Override
public Result<Versioned<byte[]>> remove(String tableName, String key) {
return isLockedForUpdates(tableName, key)
? Result.locked()
: Result.ok(getTableMap(tableName).remove(key));
}
@Override
public Result<Void> clear(String tableName) {
if (areTransactionsInProgress(tableName)) {
return Result.locked();
}
getTableMap(tableName).clear();
return Result.ok(null);
}
@Override
public Set<String> keySet(String tableName) {
return ImmutableSet.copyOf(getTableMap(tableName).keySet());
}
@Override
public Collection<Versioned<byte[]>> values(String tableName) {
return ImmutableList.copyOf(getTableMap(tableName).values());
}
@Override
public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) {
return ImmutableSet.copyOf(getTableMap(tableName)
.entrySet()
.stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toSet()));
}
@Override
public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
}
Versioned<byte[]> existingValue = get(tableName, key);
Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
return Result.ok(currentValue);
}
@Override
public Result<Boolean> remove(String tableName, String key, byte[] value) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
}
Versioned<byte[]> existing = get(tableName, key);
if (existing != null && Arrays.equals(existing.value(), value)) {
getTableMap(tableName).remove(key);
return Result.ok(true);
}
return Result.ok(false);
}
@Override
public Result<Boolean> remove(String tableName, String key, long version) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
}
Versioned<byte[]> existing = get(tableName, key);
if (existing != null && existing.version() == version) {
remove(tableName, key);
return Result.ok(true);
}
return Result.ok(false);
}
@Override
public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
}
Versioned<byte[]> existing = get(tableName, key);
if (existing != null && Arrays.equals(existing.value(), oldValue)) {
put(tableName, key, newValue);
return Result.ok(true);
}
return Result.ok(false);
}
@Override
public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
}
Versioned<byte[]> existing = get(tableName, key);
if (existing != null && existing.version() == oldVersion) {
put(tableName, key, newValue);
return Result.ok(true);
}
return Result.ok(false);
}
@Override
public Result<UpdateResult<Versioned<byte[]>>> replaceAndGet(
String tableName, String key, long oldVersion, byte[] newValue) {
if (isLockedForUpdates(tableName, key)) {
return Result.locked();
}
boolean updated = false;
Versioned<byte[]> previous = get(tableName, key);
Versioned<byte[]> current = previous;
if (previous != null && previous.version() == oldVersion) {
current = new Versioned<>(newValue, ++nextVersion);
getTableMap(tableName).put(key, current);
updated = true;
}
return Result.ok(new UpdateResult<>(updated, previous, current));
}
@Override
public Long counterAddAndGet(String counterName, long delta) {
return getCounter(counterName).addAndGet(delta);
}
@Override
public Long counterGetAndAdd(String counterName, long delta) {
return getCounter(counterName).getAndAdd(delta);
}
@Override
public Long counterGet(String counterName) {
return getCounter(counterName).get();
}
@Override
public boolean prepareAndCommit(Transaction transaction) {
if (prepare(transaction)) {
return commit(transaction);
}
return false;
}
@Override
public boolean prepare(Transaction transaction) {
if (transaction.updates().stream().anyMatch(update ->
isLockedByAnotherTransaction(update.tableName(),
update.key(),
transaction.id()))) {
return false;
}
if (transaction.updates().stream().allMatch(this::isUpdatePossible)) {
transaction.updates().forEach(update -> doProvisionalUpdate(update, transaction.id()));
return true;
}
return false;
}
@Override
public boolean commit(Transaction transaction) {
transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
return true;
}
@Override
public boolean rollback(Transaction transaction) {
transaction.updates().forEach(update -> undoProvisionalUpdate(update, transaction.id()));
return true;
}
private Map<String, Versioned<byte[]>> getTableMap(String tableName) {
return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
}
private Map<String, Update> getLockMap(String tableName) {
return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap());
}
private AtomicLong getCounter(String counterName) {
return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
}
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {
case PUT:
case REMOVE:
return true;
case PUT_IF_ABSENT:
return existingEntry == null;
case PUT_IF_VERSION_MATCH:
return existingEntry != null && existingEntry.version() == update.currentVersion();
case PUT_IF_VALUE_MATCH:
return existingEntry != null && Arrays.equals(existingEntry.value(), update.currentValue());
case REMOVE_IF_VERSION_MATCH:
return existingEntry == null || existingEntry.version() == update.currentVersion();
case REMOVE_IF_VALUE_MATCH:
return existingEntry == null || Arrays.equals(existingEntry.value(), update.currentValue());
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
}
private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
Map<String, Update> lockMap = getLockMap(update.tableName());
switch (update.type()) {
case PUT:
case PUT_IF_ABSENT:
case PUT_IF_VERSION_MATCH:
case PUT_IF_VALUE_MATCH:
lockMap.put(update.key(), new Update(transactionId, update.value()));
break;
case REMOVE:
case REMOVE_IF_VERSION_MATCH:
case REMOVE_IF_VALUE_MATCH:
lockMap.put(update.key(), new Update(transactionId, null));
break;
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
}
private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
String tableName = update.tableName();
String key = update.key();
Type type = update.type();
Update provisionalUpdate = getLockMap(tableName).get(key);
if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
getLockMap(tableName).remove(key);
} else {
return;
}
switch (type) {
case PUT:
case PUT_IF_ABSENT:
case PUT_IF_VERSION_MATCH:
case PUT_IF_VALUE_MATCH:
put(tableName, key, provisionalUpdate.value());
break;
case REMOVE:
case REMOVE_IF_VERSION_MATCH:
case REMOVE_IF_VALUE_MATCH:
remove(tableName, key);
break;
default:
break;
}
}
private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
String tableName = update.tableName();
String key = update.key();
Update provisionalUpdate = getLockMap(tableName).get(key);
if (provisionalUpdate == null) {
return;
}
if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
getLockMap(tableName).remove(key);
}
}
private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) {
Update update = getLockMap(tableName).get(key);
return update != null && !Objects.equal(transactionId, update.transactionId());
}
private boolean isLockedForUpdates(String tableName, String key) {
return getLockMap(tableName).containsKey(key);
}
private boolean areTransactionsInProgress(String tableName) {
return !getLockMap(tableName).isEmpty();
}
private class Update {
private final long transactionId;
private final byte[] value;
public Update(long txId, byte[] value) {
this.transactionId = txId;
this.value = value;
}
public long transactionId() {
return this.transactionId;
}
public byte[] value() {
return this.value;
}
}
}