Misc fixes/improvments to ECMapImpl. Most notably:
- Fixed logic in determining random peer to do AE
- Fixed for logic for when to do active sync if lightWeightAE is disabled
- Fixed tracking of ECMap activity
Change-Id: I35da91d6ef684e16630be7bd5e518c8400debe14
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 18cb3bc..b91df44 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -15,7 +15,9 @@
*/
package org.onosproject.store.ecmap;
+import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -97,6 +99,8 @@
private final ExecutorService communicationExecutor;
private final Map<NodeId, EventAccumulator> senderPending;
+ private final String mapName;
+
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
private final String destroyedMessage;
@@ -157,6 +161,7 @@
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
boolean persistent) {
+ this.mapName = mapName;
items = Maps.newConcurrentMap();
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
@@ -284,7 +289,7 @@
return items.values()
.stream()
.filter(MapValue::isAlive)
- .anyMatch(v -> v.get().equals(value));
+ .anyMatch(v -> value.equals(v.get()));
}
@Override
@@ -303,7 +308,7 @@
checkNotNull(value, ERROR_NULL_VALUE);
MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
- if (updateInternal(key, newValue)) {
+ if (putInternal(key, newValue)) {
notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
}
@@ -313,9 +318,20 @@
public V remove(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
- // TODO prevent calls here if value is important for timestamp
- MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, null));
- MapValue<V> previousValue = removeInternal(key, Optional.empty(), tombstone);
+ return removeAndNotify(key, null);
+ }
+
+ @Override
+ public void remove(K key, V value) {
+ checkState(!destroyed, destroyedMessage);
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(value, ERROR_NULL_VALUE);
+ removeAndNotify(key, value);
+ }
+
+ private V removeAndNotify(K key, V value) {
+ MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
+ MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
if (previousValue != null) {
notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
if (previousValue.isAlive()) {
@@ -325,27 +341,13 @@
return previousValue != null ? previousValue.get() : null;
}
- @Override
- public void remove(K key, V value) {
- checkState(!destroyed, destroyedMessage);
- checkNotNull(key, ERROR_NULL_KEY);
- checkNotNull(value, ERROR_NULL_VALUE);
- MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
- MapValue<V> previousValue = removeInternal(key, Optional.of(value), tombstone);
- if (previousValue != null) {
- notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
- if (previousValue.isAlive()) {
- notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
- }
- }
- }
-
private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
-
checkState(tombstone.isTombstone());
+
+ counter.incrementCount();
AtomicBoolean updated = new AtomicBoolean(false);
AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
items.compute(key, (k, existing) -> {
@@ -360,11 +362,19 @@
if (updated.get()) {
previousValue.set(existing);
}
- return updated.get() ? tombstone : existing;
+ if (updated.get()) {
+ return tombstonesDisabled ? null : tombstone;
+ } else {
+ return existing;
+ }
});
if (updated.get()) {
if (persistent) {
- persistentStore.update(key, tombstone);
+ if (tombstonesDisabled) {
+ persistentStore.remove(key);
+ } else {
+ persistentStore.update(key, tombstone);
+ }
}
}
return previousValue.get();
@@ -393,11 +403,7 @@
@Override
public Collection<V> values() {
checkState(!destroyed, destroyedMessage);
- return Maps.filterValues(items, MapValue::isAlive)
- .values()
- .stream()
- .map(MapValue::get)
- .collect(Collectors.toList());
+ return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
}
@Override
@@ -416,14 +422,16 @@
* @param newValue proposed new value
* @return true if update happened; false if map already contains a more recent value for the key
*/
- private boolean updateInternal(K key, MapValue<V> newValue) {
+ private boolean putInternal(K key, MapValue<V> newValue) {
+ checkState(!destroyed, destroyedMessage);
+ checkNotNull(key, ERROR_NULL_KEY);
+ checkNotNull(newValue, ERROR_NULL_VALUE);
+ checkState(newValue.isAlive());
+ counter.incrementCount();
AtomicBoolean updated = new AtomicBoolean(false);
items.compute(key, (k, existing) -> {
if (existing == null || newValue.isNewerThan(existing)) {
updated.set(true);
- if (newValue.isTombstone()) {
- return tombstonesDisabled ? null : newValue;
- }
return newValue;
}
return existing;
@@ -499,8 +507,8 @@
private Optional<NodeId> pickRandomActivePeer() {
List<NodeId> activePeers = clusterService.getNodes()
.stream()
- .filter(node -> !localNodeId.equals(node))
- .map(ControllerNode::id)
+ .map(ControllerNode::id)
+ .filter(id -> !localNodeId.equals(id))
.filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
.collect(Collectors.toList());
Collections.shuffle(activePeers);
@@ -519,9 +527,9 @@
});
}
-
private AntiEntropyAdvertisement<K> createAdvertisement() {
- return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
+ return new AntiEntropyAdvertisement<K>(localNodeId,
+ ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
}
private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
@@ -529,13 +537,14 @@
return;
}
try {
+ log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
+ mapName, ad.sender(), ad.digest().size());
antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
if (!lightweightAntiEntropy) {
- Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
- // if remote ad has something unknown, actively sync
- if (missingKeys.size() > 0) {
- // Send the advertisement back if this peer is out-of-sync
+ // if remote ad has any entries that the local copy is missing, actively sync
+ // TODO: Missing keys is not the way local copy can be behind.
+ if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
// TODO: Send ad for missing keys and for entries that are stale
sendAdvertisementToPeer(ad.sender());
}
@@ -561,7 +570,9 @@
// local value is more recent, push to sender
queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
}
- if (remoteValueDigest != null && remoteValueDigest.isTombstone()) {
+ if (remoteValueDigest != null
+ && remoteValueDigest.isNewerThan(localValue.digest())
+ && remoteValueDigest.isTombstone()) {
MapValue<V> previousValue = removeInternal(key,
Optional.empty(),
MapValue.tombstone(remoteValueDigest.timestamp()));
@@ -582,10 +593,10 @@
final MapValue<V> value = update.value();
if (value.isTombstone()) {
MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
- if (previousValue != null && previousValue.get() != null) {
+ if (previousValue != null && previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
}
- } else if (updateInternal(key, value)) {
+ } else if (putInternal(key, value)) {
notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value.get()));
}
});