DefaultConsistentMap to automatically retry compute* calls failing due to ConcurrentModification
Change-Id: If59e432e423d323282eb8fe7b1b438899154aae9
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMap.java
index e09b624..f804611 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMap.java
@@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -28,14 +29,18 @@
import java.util.function.Function;
import java.util.function.Predicate;
+import org.onlab.util.Tools;
import org.onosproject.store.primitives.ConsistentMapBackedJavaMap;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Synchronous;
import org.onosproject.store.service.Versioned;
+import com.google.common.base.Throwables;
+
/**
* Default implementation of {@code ConsistentMap}.
*
@@ -45,6 +50,7 @@
public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K, V>> implements ConsistentMap<K, V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
+ private static final int MAX_DELAY_BETWEEN_RETY_MILLS = 50;
private final AsyncConsistentMap<K, V> asyncMap;
private Map<K, V> javaMap;
@@ -82,26 +88,29 @@
@Override
public Versioned<V> computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
- return complete(asyncMap.computeIfAbsent(key, mappingFunction));
+ return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
}
@Override
public Versioned<V> computeIfPresent(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return complete(asyncMap.computeIfPresent(key, remappingFunction));
+ return computeIf(key, Objects::nonNull, remappingFunction);
}
@Override
public Versioned<V> compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return complete(asyncMap.compute(key, remappingFunction));
+ return computeIf(key, v -> true, remappingFunction);
}
@Override
public Versioned<V> computeIf(K key,
Predicate<? super V> condition,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return complete(asyncMap.computeIf(key, condition, remappingFunction));
+ return Tools.retryable(() -> complete(asyncMap.computeIf(key, condition, remappingFunction)),
+ ConcurrentModification.class,
+ Integer.MAX_VALUE,
+ MAX_DELAY_BETWEEN_RETY_MILLS).get();
}
@Override
@@ -203,11 +212,8 @@
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
- if (e.getCause() instanceof ConsistentMapException) {
- throw (ConsistentMapException) e.getCause();
- } else {
- throw new ConsistentMapException(e.getCause());
- }
+ Throwables.propagateIfPossible(e.getCause());
+ throw new ConsistentMapException(e.getCause());
}
}
}
\ No newline at end of file