blob: 89c8da62db9ed23d47ae83c1246e0973448be7f0 [file] [log] [blame]
/*
* Copyright 2016-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import com.google.common.collect.Maps;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.session.RaftSession;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CEILING_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.CeilingKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FIRST_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FLOOR_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.FloorKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HIGHER_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.HigherKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LAST_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LOWER_KEY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerEntry;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.LowerKey;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_FIRST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.POLL_LAST_ENTRY;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SUB_MAP;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations.SubMap;
/**
* State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
* {@link TreeMap}.
*/
public class AtomixConsistentTreeMapService extends AtomixConsistentMapService {
private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(AtomixConsistentMapOperations.NAMESPACE)
.register(AtomixConsistentTreeMapOperations.NAMESPACE)
.register(AtomixConsistentMapEvents.NAMESPACE)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 150)
.register(TransactionScope.class)
.register(TransactionLog.class)
.register(TransactionId.class)
.register(MapEntryValue.class)
.register(MapEntryValue.Type.class)
.register(new HashMap().keySet().getClass())
.register(ConcurrentSkipListMap.class)
.build());
@Override
protected NavigableMap<String, MapEntryValue> createMap() {
return new ConcurrentSkipListMap<>();
}
@Override
protected NavigableMap<String, MapEntryValue> entries() {
return (NavigableMap<String, MapEntryValue>) super.entries();
}
@Override
protected Serializer serializer() {
return SERIALIZER;
}
@Override
public void configure(RaftServiceExecutor executor) {
super.configure(executor);
executor.register(SUB_MAP, serializer()::decode, this::subMap, serializer()::encode);
executor.register(FIRST_KEY, (Commit<Void> c) -> firstKey(), serializer()::encode);
executor.register(LAST_KEY, (Commit<Void> c) -> lastKey(), serializer()::encode);
executor.register(FIRST_ENTRY, (Commit<Void> c) -> firstEntry(), serializer()::encode);
executor.register(LAST_ENTRY, (Commit<Void> c) -> lastEntry(), serializer()::encode);
executor.register(POLL_FIRST_ENTRY, (Commit<Void> c) -> pollFirstEntry(), serializer()::encode);
executor.register(POLL_LAST_ENTRY, (Commit<Void> c) -> pollLastEntry(), serializer()::encode);
executor.register(LOWER_ENTRY, serializer()::decode, this::lowerEntry, serializer()::encode);
executor.register(LOWER_KEY, serializer()::decode, this::lowerKey, serializer()::encode);
executor.register(FLOOR_ENTRY, serializer()::decode, this::floorEntry, serializer()::encode);
executor.register(FLOOR_KEY, serializer()::decode, this::floorKey, serializer()::encode);
executor.register(CEILING_ENTRY, serializer()::decode, this::ceilingEntry, serializer()::encode);
executor.register(CEILING_KEY, serializer()::decode, this::ceilingKey, serializer()::encode);
executor.register(HIGHER_ENTRY, serializer()::decode, this::higherEntry, serializer()::encode);
executor.register(HIGHER_KEY, serializer()::decode, this::higherKey, serializer()::encode);
}
protected NavigableMap<String, MapEntryValue> subMap(
Commit<? extends SubMap> commit) {
// Do not support this until lazy communication is possible. At present
// it transmits up to the entire map.
SubMap<String, MapEntryValue> subMap = commit.value();
return entries().subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
subMap.toKey(), subMap.isInclusiveTo());
}
protected String firstKey() {
return isEmpty() ? null : entries().firstKey();
}
protected String lastKey() {
return isEmpty() ? null : entries().lastKey();
}
protected Map.Entry<String, Versioned<byte[]>> higherEntry(Commit<? extends HigherEntry> commit) {
return isEmpty() ? null : toVersionedEntry(entries().higherEntry(commit.value().key()));
}
protected Map.Entry<String, Versioned<byte[]>> firstEntry() {
return isEmpty() ? null : toVersionedEntry(entries().firstEntry());
}
protected Map.Entry<String, Versioned<byte[]>> lastEntry() {
return isEmpty() ? null : toVersionedEntry(entries().lastEntry());
}
protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry() {
return toVersionedEntry(entries().pollFirstEntry());
}
protected Map.Entry<String, Versioned<byte[]>> pollLastEntry() {
return toVersionedEntry(entries().pollLastEntry());
}
protected Map.Entry<String, Versioned<byte[]>> lowerEntry(Commit<? extends LowerEntry> commit) {
return toVersionedEntry(entries().lowerEntry(commit.value().key()));
}
protected String lowerKey(Commit<? extends LowerKey> commit) {
return entries().lowerKey(commit.value().key());
}
protected Map.Entry<String, Versioned<byte[]>> floorEntry(Commit<? extends FloorEntry> commit) {
return toVersionedEntry(entries().floorEntry(commit.value().key()));
}
protected String floorKey(Commit<? extends FloorKey> commit) {
return entries().floorKey(commit.value().key());
}
protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(Commit<CeilingEntry> commit) {
return toVersionedEntry(entries().ceilingEntry(commit.value().key()));
}
protected String ceilingKey(Commit<CeilingKey> commit) {
return entries().ceilingKey(commit.value().key());
}
protected String higherKey(Commit<HigherKey> commit) {
return entries().higherKey(commit.value().key());
}
private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
Map.Entry<String, MapEntryValue> entry) {
return entry == null || valueIsNull(entry.getValue())
? null : Maps.immutableEntry(entry.getKey(), toVersioned(entry.getValue()));
}
@Override
public void onExpire(RaftSession session) {
closeListener(session.sessionId().id());
}
@Override
public void onClose(RaftSession session) {
closeListener(session.sessionId().id());
}
private void closeListener(Long sessionId) {
listeners.remove(sessionId);
}
}