blob: b011517fe579ec76b6157f0a5c88f5e18f9f88eb [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
import com.google.common.base.MoreObjects;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
/**
* Distributed Map implementation which uses optimistic replication and gossip
* based techniques to provide an eventually consistent data store.
*/
public class EventuallyConsistentMapImpl<K, V>
implements EventuallyConsistentMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
private final Map<K, Timestamped<V>> items;
private final Map<K, Timestamp> removedItems;
private final String mapName;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final ClockService<K> clockService;
private final MessageSubject updateMessageSubject;
private final MessageSubject removeMessageSubject;
private final Set<EventuallyConsistentMapListener> listeners
= new CopyOnWriteArraySet<>();
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " is already destroyed";
// TODO: Make these anti-entropy params configurable
private long initialDelaySec = 5;
private long periodSec = 5;
/**
* Creates a new eventually consistent map shared amongst multiple instances.
*
* Each map is identified by a string map name. EventuallyConsistentMapImpl
* objects in different JVMs that use the same map name will form a
* distributed map across JVMs (provided the cluster service is aware of
* both nodes).
*
* The client is expected to provide an
* {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
* will be stored in this map have been registered (including referenced
* classes). This serializer will be used to serialize both K and V for
* inter-node notifications.
*
* The client must provide an {@link org.onosproject.store.impl.ClockService}
* which can generate timestamps for a given key. The clock service is free
* to generate timestamps however it wishes, however these timestamps will
* be used to serialize updates to the map so they must be strict enough
* to ensure updates are properly ordered for the use case (i.e. in some
* cases wallclock time will suffice, whereas in other cases logical time
* will be necessary).
*
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* both K and V
* @param clockService a clock service able to generate timestamps
* for K
*/
public EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K> clockService) {
this.mapName = checkNotNull(mapName);
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
serializer = createSerializer(checkNotNull(serializerBuilder));
this.clockService = checkNotNull(clockService);
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
executor = Executors
.newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(
namedThreads("onos-ecm-" + mapName + "-bg-%d")));
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
new InternalPutEventListener());
removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
clusterCommunicator.addSubscriber(removeMessageSubject,
new InternalRemoveEventListener());
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
return new KryoSerializer() {
@Override
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
.register(WallClockTimestamp.class)
.register(PutEntry.class)
.register(ArrayList.class)
.register(InternalPutEvent.class)
.register(InternalRemoveEvent.class)
.build();
// TODO anti-entropy classes
}
};
}
@Override
public int size() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.size();
}
@Override
public boolean isEmpty() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.isEmpty();
}
@Override
public boolean containsKey(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.containsKey(key);
}
@Override
public boolean containsValue(V value) {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.anyMatch(timestamped -> timestamped.value().equals(value));
}
@Override
public V get(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
Timestamped<V> value = items.get(key);
if (value != null) {
return value.value();
}
return null;
}
@Override
public void put(K key, V value) {
checkState(destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (putInternal(key, value, timestamp)) {
notifyPeers(new InternalPutEvent<>(key, value, timestamp));
EventuallyConsistentMapEvent<K, V> externalEvent
= new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key, value);
notifyListeners(externalEvent);
}
}
private boolean putInternal(K key, V value, Timestamp timestamp) {
synchronized (this) {
Timestamp removed = removedItems.get(key);
if (removed != null && removed.compareTo(timestamp) > 0) {
return false;
}
Timestamped<V> existing = items.get(key);
if (existing != null && existing.isNewer(timestamp)) {
return false;
} else {
items.put(key, new Timestamped<>(value, timestamp));
removedItems.remove(key);
return true;
}
}
}
@Override
public void remove(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (removeInternal(key, timestamp)) {
notifyPeers(new InternalRemoveEvent<>(key, timestamp));
EventuallyConsistentMapEvent<K, V> externalEvent
= new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null);
notifyListeners(externalEvent);
}
}
private boolean removeInternal(K key, Timestamp timestamp) {
synchronized (this) {
if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
return false;
}
items.remove(key);
removedItems.put(key, timestamp);
return true;
}
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
checkState(destroyed, mapName + ERROR_DESTROYED);
List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
K key = entry.getKey();
V value = entry.getValue();
Timestamp timestamp = clockService.getTimestamp(entry.getKey());
if (putInternal(key, value, timestamp)) {
updates.add(new PutEntry<>(key, value, timestamp));
}
}
notifyPeers(new InternalPutEvent<>(updates));
for (PutEntry<K, V> entry : updates) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
notifyListeners(externalEvent);
}
}
@Override
public void clear() {
checkState(destroyed, mapName + ERROR_DESTROYED);
List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
for (K key : items.keySet()) {
Timestamp timestamp = clockService.getTimestamp(key);
if (removeInternal(key, timestamp)) {
removed.add(new RemoveEntry<>(key, timestamp));
}
}
notifyPeers(new InternalRemoveEvent<>(removed));
for (RemoveEntry<K> entry : removed) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
notifyListeners(externalEvent);
}
}
@Override
public Set<K> keySet() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.keySet();
}
@Override
public Collection<V> values() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.map(Timestamped::value)
.collect(Collectors.toList());
}
@Override
public Set<Map.Entry<K, V>> entrySet() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.entrySet().stream()
.map(e -> new Entry(e.getKey(), e.getValue().value()))
.collect(Collectors.toSet());
}
@Override
public void addListener(EventuallyConsistentMapListener listener) {
checkState(destroyed, mapName + ERROR_DESTROYED);
listeners.add(checkNotNull(listener));
}
@Override
public void removeListener(EventuallyConsistentMapListener listener) {
checkState(destroyed, mapName + ERROR_DESTROYED);
listeners.remove(checkNotNull(listener));
}
@Override
public void destroy() {
destroyed = true;
executor.shutdown();
backgroundExecutor.shutdown();
clusterCommunicator.removeSubscriber(updateMessageSubject);
clusterCommunicator.removeSubscriber(removeMessageSubject);
}
private void notifyListeners(EventuallyConsistentMapEvent event) {
for (EventuallyConsistentMapListener listener : listeners) {
listener.event(event);
}
}
private void notifyPeers(InternalPutEvent event) {
try {
log.debug("sending put {}", event);
broadcastMessage(updateMessageSubject, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
}
private void notifyPeers(InternalRemoveEvent event) {
try {
broadcastMessage(removeMessageSubject, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
}
private void broadcastMessage(MessageSubject subject, Object event) throws
IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
clusterCommunicator.broadcast(message);
}
private void unicastMessage(NodeId peer,
MessageSubject subject,
Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
clusterCommunicator.unicast(message, peer);
}
private final class Entry implements Map.Entry<K, V> {
private final K key;
private final V value;
public Entry(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
}
}
private final class InternalPutEventListener implements
ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received put event from peer: {}", message.sender());
InternalPutEvent<K, V> event = serializer.decode(message.payload());
executor.submit(() -> {
try {
for (PutEntry<K, V> entry : event.entries()) {
K key = entry.key();
V value = entry.value();
Timestamp timestamp = entry.timestamp();
if (putInternal(key, value, timestamp)) {
EventuallyConsistentMapEvent externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key,
value);
notifyListeners(externalEvent);
}
}
} catch (Exception e) {
log.warn("Exception thrown handling put", e);
}
});
}
}
private final class InternalRemoveEventListener implements
ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received remove event from peer: {}", message.sender());
InternalRemoveEvent<K> event = serializer.decode(message.payload());
executor.submit(() -> {
try {
for (RemoveEntry<K> entry : event.entries()) {
K key = entry.key();
Timestamp timestamp = entry.timestamp();
if (removeInternal(key, timestamp)) {
EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
EventuallyConsistentMapEvent.Type.REMOVE,
key, null);
notifyListeners(externalEvent);
}
}
} catch (Exception e) {
log.warn("Exception thrown handling remove", e);
}
});
}
}
private static final class InternalPutEvent<K, V> {
private final List<PutEntry<K, V>> entries;
public InternalPutEvent(K key, V value, Timestamp timestamp) {
entries = Collections
.singletonList(new PutEntry<>(key, value, timestamp));
}
public InternalPutEvent(List<PutEntry<K, V>> entries) {
this.entries = checkNotNull(entries);
}
// Needed for serialization.
@SuppressWarnings("unused")
private InternalPutEvent() {
entries = null;
}
public List<PutEntry<K, V>> entries() {
return entries;
}
}
private static final class PutEntry<K, V> {
private final K key;
private final V value;
private final Timestamp timestamp;
public PutEntry(K key, V value, Timestamp timestamp) {
this.key = checkNotNull(key);
this.value = checkNotNull(value);
this.timestamp = checkNotNull(timestamp);
}
// Needed for serialization.
@SuppressWarnings("unused")
private PutEntry() {
this.key = null;
this.value = null;
this.timestamp = null;
}
public K key() {
return key;
}
public V value() {
return value;
}
public Timestamp timestamp() {
return timestamp;
}
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.add("value", value)
.add("timestamp", timestamp)
.toString();
}
}
private static final class InternalRemoveEvent<K> {
private final List<RemoveEntry<K>> entries;
public InternalRemoveEvent(K key, Timestamp timestamp) {
entries = Collections.singletonList(
new RemoveEntry<>(key, timestamp));
}
public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
this.entries = checkNotNull(entries);
}
// Needed for serialization.
@SuppressWarnings("unused")
private InternalRemoveEvent() {
entries = null;
}
public List<RemoveEntry<K>> entries() {
return entries;
}
}
private static final class RemoveEntry<K> {
private final K key;
private final Timestamp timestamp;
public RemoveEntry(K key, Timestamp timestamp) {
this.key = checkNotNull(key);
this.timestamp = checkNotNull(timestamp);
}
// Needed for serialization.
@SuppressWarnings("unused")
private RemoveEntry() {
this.key = null;
this.timestamp = null;
}
public K key() {
return key;
}
public Timestamp timestamp() {
return timestamp;
}
}
}