Simplified ECMap implmentation by merging items and tombstones maps
Change-Id: If4253722d91c35a7e57dec3c2fceb216d14a7314
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 118ef78..1fd27d3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -18,9 +18,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.KryoNamespace;
@@ -30,12 +29,10 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.LogicalTimestamp;
-import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.service.WallClockTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
@@ -43,30 +40,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
@@ -80,12 +74,12 @@
private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
- private final ConcurrentMap<K, Timestamped<V>> items;
- private final ConcurrentMap<K, Timestamp> removedItems;
+ private final Map<K, MapValue<V>> items;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
+ private final NodeId localNodeId;
private final BiFunction<K, V, Timestamp> timestampProvider;
@@ -93,7 +87,7 @@
private final MessageSubject antiEntropyAdvertisementSubject;
private final Set<EventuallyConsistentMapListener<K, V>> listeners
- = new CopyOnWriteArraySet<>();
+ = Sets.newCopyOnWriteArraySet();
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
@@ -162,13 +156,13 @@
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
boolean persistent) {
- items = new ConcurrentHashMap<>();
- removedItems = new ConcurrentHashMap<>();
+ items = Maps.newConcurrentMap();
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
+ this.localNodeId = clusterService.getLocalNode().id();
this.serializer = createSerializer(serializerBuilder);
@@ -179,7 +173,7 @@
} else {
this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
.map(ControllerNode::id)
- .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
+ .filter(nodeId -> !nodeId.equals(localNodeId))
.collect(Collectors.toList());
}
@@ -210,7 +204,7 @@
newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
- persistentStore.readInto(items, removedItems);
+ persistentStore.readInto(items);
} else {
this.persistentStore = null;
}
@@ -223,17 +217,21 @@
}
// start anti-entropy thread
- this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
initialDelaySec, antiEntropyPeriod,
antiEntropyTimeUnit);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
- new InternalEventListener(), this.executor);
+ serializer::decode,
+ this::processUpdates,
+ this.executor);
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
- new InternalAntiEntropyListener(), this.backgroundExecutor);
+ serializer::decode,
+ this::handleAntiEntropyAdvertisement,
+ this.backgroundExecutor);
this.tombstonesDisabled = tombstonesDisabled;
this.lightweightAntiEntropy = !convergeFaster;
@@ -245,14 +243,13 @@
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
+ .register(KryoNamespaces.BASIC)
.register(LogicalTimestamp.class)
.register(WallClockTimestamp.class)
- .register(PutEntry.class)
- .register(RemoveEntry.class)
- .register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
- .register(HashMap.class)
- .register(Timestamped.class)
+ .register(UpdateEntry.class)
+ .register(MapValue.class)
+ .register(MapValue.Digest.class)
.build();
}
};
@@ -261,29 +258,31 @@
@Override
public int size() {
checkState(!destroyed, destroyedMessage);
- return items.size();
+ // TODO: Maintain a separate counter for tracking live elements in map.
+ return Maps.filterValues(items, MapValue::isAlive).size();
}
@Override
public boolean isEmpty() {
checkState(!destroyed, destroyedMessage);
- return items.isEmpty();
+ return size() == 0;
}
@Override
public boolean containsKey(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
- return items.containsKey(key);
+ return get(key) != null;
}
@Override
public boolean containsValue(V value) {
checkState(!destroyed, destroyedMessage);
checkNotNull(value, ERROR_NULL_VALUE);
-
- return items.values().stream()
- .anyMatch(timestamped -> timestamped.value().equals(value));
+ return items.values()
+ .stream()
+ .filter(MapValue::isAlive)
+ .anyMatch(v -> v.get().equals(value));
}
@Override
@@ -291,11 +290,8 @@
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
- Timestamped<V> value = items.get(key);
- if (value != null) {
- return value.value();
- }
- return null;
+ MapValue<V> value = items.get(key);
+ return (value == null || value.isTombstone()) ? null : value.get();
}
@Override
@@ -304,123 +300,18 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- Timestamp timestamp = timestampProvider.apply(key, value);
-
- if (putInternal(key, value, timestamp)) {
- notifyPeers(new PutEntry<>(key, value, timestamp),
- peerUpdateFunction.apply(key, value));
- notifyListeners(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.PUT, key, value));
+ MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
+ if (updateInternal(key, newValue)) {
+ notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
+ notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
}
}
- private boolean putInternal(K key, V value, Timestamp timestamp) {
- counter.incrementCount();
- Timestamp removed = removedItems.get(key);
- if (removed != null && removed.isNewerThan(timestamp)) {
- log.debug("ecmap - removed was newer {}", value);
- return false;
- }
-
- final MutableBoolean updated = new MutableBoolean(false);
-
- items.compute(key, (k, existing) -> {
- if (existing != null && existing.isNewerThan(timestamp)) {
- updated.setFalse();
- return existing;
- } else {
- updated.setTrue();
- return new Timestamped<>(value, timestamp);
- }
- });
-
- boolean success = updated.booleanValue();
- if (!success) {
- log.debug("ecmap - existing was newer {}", value);
- }
-
- if (success && removed != null) {
- removedItems.remove(key, removed);
- }
-
- if (success && persistent) {
- persistentStore.put(key, value, timestamp);
- }
-
- return success;
- }
-
@Override
public V remove(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
-
- // TODO prevent calls here if value is important for timestamp
- Timestamp timestamp = timestampProvider.apply(key, null);
-
- Optional<V> removedValue = removeInternal(key, timestamp);
- if (removedValue == null) {
- return null;
- }
- notifyPeers(new RemoveEntry<>(key, timestamp),
- peerUpdateFunction.apply(key, null));
- notifyListeners(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, removedValue.orElse(null)));
-
- return removedValue.orElse(null);
- }
-
- /**
- * Returns null if the timestamp is for a outdated request i.e.
- * the value is the map is more recent or a tombstone exists with a
- * more recent timestamp.
- * Returns non-empty optional if a value was indeed removed from the map.
- * Returns empty optional if map did not contain a value for the key but the existing
- * tombstone is older than this timestamp.
- * @param key key
- * @param timestamp timestamp for remove request
- * @return Optional value.
- */
- private Optional<V> removeInternal(K key, Timestamp timestamp) {
- if (timestamp == null) {
- return null;
- }
-
- counter.incrementCount();
- final AtomicReference<Optional<V>> removedValue = new AtomicReference<>(null);
- items.compute(key, (k, existing) -> {
- if (existing != null && existing.isNewerThan(timestamp)) {
- return existing;
- } else {
- removedValue.set(existing == null ? Optional.empty() : Optional.of(existing.value()));
- return null;
- }
- });
-
- if (isNull(removedValue.get())) {
- return null;
- }
-
- boolean updatedTombstone = false;
-
- if (!tombstonesDisabled) {
- Timestamp removedTimestamp = removedItems.get(key);
- if (removedTimestamp == null) {
- //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
- updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
- } else if (timestamp.isNewerThan(removedTimestamp)) {
- updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
- }
- }
-
- if (persistent) {
- persistentStore.remove(key, timestamp);
- }
-
- if (tombstonesDisabled || updatedTombstone) {
- return removedValue.get();
- }
- return null;
+ return removeInternal(key, Optional.empty());
}
@Override
@@ -428,15 +319,34 @@
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
+ removeInternal(key, Optional.of(value));
+ }
- Timestamp timestamp = timestampProvider.apply(key, value);
+ private V removeInternal(K key, Optional<V> value) {
+ checkState(!destroyed, destroyedMessage);
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
- if (nonNull(removeInternal(key, timestamp))) {
- notifyPeers(new RemoveEntry<>(key, timestamp),
- peerUpdateFunction.apply(key, value));
- notifyListeners(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, value));
+ MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
+ AtomicBoolean updated = new AtomicBoolean(false);
+ AtomicReference<V> previousValue = new AtomicReference<>();
+ items.compute(key, (k, existing) -> {
+ if (existing != null && existing.isAlive()) {
+ updated.set(!value.isPresent() || value.get().equals(existing.get()));
+ previousValue.set(existing.get());
+ }
+ updated.set(existing == null || newValue.isNewerThan(existing));
+ return updated.get() ? newValue : existing;
+ });
+ if (updated.get()) {
+ notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
+ notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
+ if (persistent) {
+ persistentStore.update(key, newValue);
+ }
+ return previousValue.get();
}
+ return null;
}
@Override
@@ -448,30 +358,59 @@
@Override
public void clear() {
checkState(!destroyed, destroyedMessage);
- items.forEach((key, value) -> remove(key));
+ Maps.filterValues(items, MapValue::isAlive)
+ .forEach((k, v) -> remove(k));
}
@Override
public Set<K> keySet() {
checkState(!destroyed, destroyedMessage);
- return items.keySet();
+ return Maps.filterValues(items, MapValue::isAlive)
+ .keySet();
}
@Override
public Collection<V> values() {
checkState(!destroyed, destroyedMessage);
- return items.values().stream()
- .map(Timestamped::value)
- .collect(Collectors.toList());
+ return Maps.filterValues(items, MapValue::isAlive)
+ .values()
+ .stream()
+ .map(MapValue::get)
+ .collect(Collectors.toList());
}
@Override
public Set<Map.Entry<K, V>> entrySet() {
checkState(!destroyed, destroyedMessage);
+ return Maps.filterValues(items, MapValue::isAlive)
+ .entrySet()
+ .stream()
+ .map(e -> Pair.of(e.getKey(), e.getValue().get()))
+ .collect(Collectors.toSet());
+ }
- return items.entrySet().stream()
- .map(e -> Pair.of(e.getKey(), e.getValue().value()))
- .collect(Collectors.toSet());
+ /**
+ * Returns true if newValue was accepted i.e. map is updated.
+ * @param key key
+ * @param newValue proposed new value
+ * @return true if update happened; false if map already contains a more recent value for the key
+ */
+ private boolean updateInternal(K key, MapValue<V> newValue) {
+ AtomicBoolean updated = new AtomicBoolean(false);
+ items.compute(key, (k, existing) -> {
+ if (existing == null || newValue.isNewerThan(existing)) {
+ updated.set(true);
+ if (newValue.isTombstone()) {
+ return tombstonesDisabled ? null : newValue;
+ }
+ return newValue;
+ }
+ return existing;
+ });
+ if (updated.get() && persistent) {
+ persistentStore.update(key, newValue);
+ }
+ return updated.get();
}
@Override
@@ -503,26 +442,20 @@
}
private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
- for (EventuallyConsistentMapListener<K, V> listener : listeners) {
- listener.event(event);
- }
+ listeners.forEach(listener -> listener.event(event));
}
- private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
+ private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
queueUpdate(event, peers);
}
- private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
- queueUpdate(event, peers);
- }
-
- private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
+ private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
if (peers == null) {
// we have no friends :(
return;
}
peers.forEach(node ->
- senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
+ senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
);
}
@@ -530,276 +463,107 @@
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
- private final class SendAdvertisementTask implements Runnable {
- @Override
- public void run() {
- if (Thread.currentThread().isInterrupted()) {
- log.info("Interrupted, quitting");
- return;
- }
-
+ private void sendAdvertisement() {
+ try {
if (underHighLoad() || destroyed) {
return;
}
-
- try {
- final NodeId self = clusterService.getLocalNode().id();
- Set<ControllerNode> nodes = clusterService.getNodes();
-
- List<NodeId> nodeIds = nodes.stream()
- .map(ControllerNode::id)
- .collect(Collectors.toList());
-
- if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
- log.trace("No other peers in the cluster.");
- return;
- }
-
- NodeId peer;
- do {
- int idx = RandomUtils.nextInt(0, nodeIds.size());
- peer = nodeIds.get(idx);
- } while (peer.equals(self));
-
- if (Thread.currentThread().isInterrupted()) {
- log.info("Interrupted, quitting");
- return;
- }
-
- AntiEntropyAdvertisement<K> ad = createAdvertisement();
- NodeId destination = peer;
- clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to send anti-entropy advertisement to {}", destination);
- }
- });
-
- } catch (Exception e) {
- // Catch all exceptions to avoid scheduled task being suppressed.
- log.error("Exception thrown while sending advertisement", e);
- }
+ pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
+ } catch (Exception e) {
+ // Catch all exceptions to avoid scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
}
}
+ private Optional<NodeId> pickRandomActivePeer() {
+ List<NodeId> activePeers = clusterService.getNodes()
+ .stream()
+ .filter(node -> !localNodeId.equals(node))
+ .map(ControllerNode::id)
+ .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
+ .collect(Collectors.toList());
+ Collections.shuffle(activePeers);
+ return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
+ }
+
+ private void sendAdvertisementToPeer(NodeId peer) {
+ clusterCommunicator.unicast(createAdvertisement(),
+ antiEntropyAdvertisementSubject,
+ serializer::encode,
+ peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to send anti-entropy advertisement to {}", peer);
+ }
+ });
+ }
+
+
private AntiEntropyAdvertisement<K> createAdvertisement() {
- final NodeId self = clusterService.getLocalNode().id();
-
- Map<K, Timestamp> timestamps = new HashMap<>(items.size());
-
- items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
-
- Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
-
- return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
+ return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
}
private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
- List<EventuallyConsistentMapEvent<K, V>> externalEvents;
+ if (destroyed || underHighLoad()) {
+ return;
+ }
+ try {
+ antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
- externalEvents = antiEntropyCheckLocalItems(ad);
-
- antiEntropyCheckLocalRemoved(ad);
-
- if (!lightweightAntiEntropy) {
- externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
-
- // if remote ad has something unknown, actively sync
- for (K key : ad.timestamps().keySet()) {
- if (!items.containsKey(key)) {
+ if (!lightweightAntiEntropy) {
+ Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
+ // if remote ad has something unknown, actively sync
+ if (missingKeys.size() > 0) {
// Send the advertisement back if this peer is out-of-sync
- final NodeId sender = ad.sender();
- AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-
- clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to send reactive "
- + "anti-entropy advertisement to {}", sender);
- }
- });
- break;
+ // TODO: Send ad for missing keys and for entries that are stale
+ sendAdvertisementToPeer(ad.sender());
}
}
+ } catch (Exception e) {
+ log.warn("Error handling anti-entropy advertisement", e);
}
- externalEvents.forEach(this::notifyListeners);
}
/**
- * Checks if any of the remote's live items or tombstones are out of date
- * according to our local live item list, or if our live items are out of
- * date according to the remote's tombstone list.
- * If the local copy is more recent, it will be pushed to the remote. If the
- * remote has a more recent remove, we apply that to the local state.
- *
- * @param ad remote anti-entropy advertisement
- * @return list of external events relating to local operations performed
+ * Processes anti-entropy ad from peer by taking following actions:
+ * 1. If peer has an old entry, updates peer.
+ * 2. If peer indicates an entry is removed and has a more recent
+ * timestamp than the local entry, update local state.
*/
private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
AntiEntropyAdvertisement<K> ad) {
- final List<EventuallyConsistentMapEvent<K, V>> externalEvents
- = new LinkedList<>();
+ final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
final NodeId sender = ad.sender();
-
- for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
- K key = item.getKey();
- Timestamped<V> localValue = item.getValue();
-
- Timestamp remoteTimestamp = ad.timestamps().get(key);
- if (remoteTimestamp == null) {
- remoteTimestamp = ad.tombstones().get(key);
- }
- if (remoteTimestamp == null || localValue
- .isNewerThan(remoteTimestamp)) {
+ items.forEach((key, localValue) -> {
+ MapValue.Digest remoteValueDigest = ad.digest().get(key);
+ if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
// local value is more recent, push to sender
- queueUpdate(new PutEntry<>(key, localValue.value(),
- localValue.timestamp()), ImmutableList.of(sender));
- }
-
- Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
- if (remoteDeadTimestamp != null &&
- remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
- // sender has a more recent remove
- if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
- externalEvents.add(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, null));
+ queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
+ } else {
+ if (remoteValueDigest.isTombstone()
+ && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
+ if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
+ externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
+ }
}
}
- }
-
+ });
return externalEvents;
}
- /**
- * Checks if any items in the remote live list are out of date according
- * to our tombstone list. If we find we have a more up to date tombstone,
- * we'll send it to the remote.
- *
- * @param ad remote anti-entropy advertisement
- */
- private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
- final NodeId sender = ad.sender();
-
- for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
- K key = dead.getKey();
- Timestamp localDeadTimestamp = dead.getValue();
-
- Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
- if (remoteLiveTimestamp != null
- && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
- // sender has zombie, push remove
- queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
- }
+ private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
+ if (destroyed) {
+ return;
}
- }
+ updates.forEach(update -> {
+ final K key = update.key();
+ final MapValue<V> value = update.value();
- /**
- * Checks if any of the local live items are out of date according to the
- * remote's tombstone advertisements. If we find a local item is out of date,
- * we'll apply the remove operation to the local state.
- *
- * @param ad remote anti-entropy advertisement
- * @return list of external events relating to local operations performed
- */
- private List<EventuallyConsistentMapEvent<K, V>>
- antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
- final List<EventuallyConsistentMapEvent<K, V>> externalEvents
- = new LinkedList<>();
-
- for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
- K key = remoteDead.getKey();
- Timestamp remoteDeadTimestamp = remoteDead.getValue();
-
- Timestamped<V> local = items.get(key);
- Timestamp localDead = removedItems.get(key);
- if (local != null && remoteDeadTimestamp.isNewerThan(
- local.timestamp())) {
- // If the remote has a more recent tombstone than either our local
- // value, then do a remove with their timestamp
- if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
- externalEvents.add(new EventuallyConsistentMapEvent<>(
- EventuallyConsistentMapEvent.Type.REMOVE, key, null));
- }
- } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
- localDead)) {
- // If the remote has a more recent tombstone than us, update ours
- // to their timestamp
- removeInternal(key, remoteDeadTimestamp);
+ if (updateInternal(key, value)) {
+ final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
+ notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
}
- }
-
- return externalEvents;
- }
-
- private final class InternalAntiEntropyListener
- implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- log.trace("Received anti-entropy advertisement from peer: {}",
- message.sender());
- AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
- try {
- if (!underHighLoad()) {
- handleAntiEntropyAdvertisement(advertisement);
- }
- } catch (Exception e) {
- log.warn("Exception thrown handling advertisements", e);
- }
- }
- }
-
- private final class InternalEventListener implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- if (destroyed) {
- return;
- }
-
- log.debug("Received update event from peer: {}", message.sender());
- Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
-
- try {
- // TODO clean this for loop up
- for (AbstractEntry<K, V> entry : events) {
- final K key = entry.key();
- V value;
- final Timestamp timestamp = entry.timestamp();
- final EventuallyConsistentMapEvent.Type type;
- if (entry instanceof PutEntry) {
- PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
- value = putEntry.value();
- type = EventuallyConsistentMapEvent.Type.PUT;
- } else if (entry instanceof RemoveEntry) {
- type = EventuallyConsistentMapEvent.Type.REMOVE;
- value = null;
- } else {
- throw new IllegalStateException("Unknown entry type " + entry.getClass());
- }
-
- boolean success;
- switch (type) {
- case PUT:
- success = putInternal(key, value, timestamp);
- break;
- case REMOVE:
- Optional<V> removedValue = removeInternal(key, timestamp);
- success = removedValue != null;
- if (success) {
- value = removedValue.orElse(null);
- }
- break;
- default:
- success = false;
- }
- if (success) {
- notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
- }
- }
- } catch (Exception e) {
- log.warn("Exception thrown handling put", e);
- }
- }
+ });
}
// TODO pull this into the class if this gets pulled out...
@@ -808,7 +572,7 @@
private static final int DEFAULT_MAX_BATCH_MS = 50;
private static final Timer TIMER = new Timer("onos-ecm-sender-events");
- private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
+ private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
private final NodeId peer;
@@ -818,23 +582,21 @@
}
@Override
- public void processItems(List<AbstractEntry<K, V>> items) {
- Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
- items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
- oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
- )
- );
+ public void processItems(List<UpdateEntry<K, V>> items) {
+ Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
+ items.forEach(item -> map.compute(item.key(), (key, existing) ->
+ existing == null || item.compareTo(existing) > 0 ? item : existing));
communicationExecutor.submit(() -> {
- clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+ clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
updateMessageSubject,
serializer::encode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
- log.debug("Failed to send to {}", peer);
+ log.debug("Failed to send to {}", peer, error);
}
});
});
}
}
-}
+}
\ No newline at end of file