Fix for ONOS-4315
- Additional log on error
- Allow count=0 using CountDownCompleter
- test case to detect the issue (@Ignored by default right now)
- other bug fixes found along the way
Based on patch by Madan@China
Change-Id: I7d6cb8c214052859900ef7ee0337a7e1c8a9d295
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 4823902..48fd4a2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -18,6 +18,7 @@
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.REMOVE;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
@@ -60,7 +61,9 @@
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -72,6 +75,7 @@
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
+ private final Logger log = getLogger(getClass());
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
@@ -384,7 +388,7 @@
}
MapEntryValue existingValue = mapEntries.get(key);
if (existingValue == null) {
- if (update.currentValue() != null) {
+ if (update.type() != MapUpdate.Type.PUT_IF_ABSENT) {
return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
}
} else {
@@ -399,6 +403,9 @@
transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
+ } catch (Exception e) {
+ log.warn("Failure applying {}", commit, e);
+ throw Throwables.propagate(e);
} finally {
if (!ok) {
commit.close();
@@ -416,6 +423,9 @@
TransactionId transactionId = commit.operation().transactionId();
try {
return commitInternal(transactionId);
+ } catch (Exception e) {
+ log.warn("Failure applying {}", commit, e);
+ throw Throwables.propagate(e);
} finally {
commit.close();
}
@@ -438,12 +448,11 @@
List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
+ checkState(preparedKeys.remove(key), "key is not prepared");
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
- checkState(preparedKeys.remove(key), "key is not prepared");
if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
- newValue = new TransactionalCommit(key,
- versionCounter.incrementAndGet(), completer);
+ newValue = new TransactionalCommit(key, versionCounter.incrementAndGet(), completer);
}
eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
if (newValue != null) {
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
index f7c44c2..a5fdb49 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -16,6 +16,8 @@
package org.onosproject.store.primitives.resources.impl;
import io.atomix.resource.ResourceType;
+
+import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.Arrays;
@@ -348,6 +350,7 @@
map.addListener(listener).join();
+ // PUT_IF_ABSENT
MapUpdate<String, byte[]> update1 =
MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey("foo")
@@ -359,6 +362,7 @@
map.prepare(tx).thenAccept(result -> {
assertEquals(true, result);
}).join();
+ // verify changes in Tx is not visible yet until commit
assertFalse(listener.eventReceived());
map.size().thenAccept(result -> {
@@ -371,7 +375,7 @@
try {
map.put("foo", value2).join();
- assertTrue(false);
+ fail("update to map entry in open tx should fail with Exception");
} catch (CompletionException e) {
assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
}
@@ -384,6 +388,7 @@
assertEquals(MapEvent.Type.INSERT, event.type());
assertTrue(Arrays.equals(value1, event.newValue().value()));
+ // map should be update-able after commit
map.put("foo", value2).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
@@ -391,6 +396,43 @@
assertNotNull(event);
assertEquals(MapEvent.Type.UPDATE, event.type());
assertTrue(Arrays.equals(value2, event.newValue().value()));
+
+
+ // REMOVE_IF_VERSION_MATCH
+ byte[] currFoo = map.get("foo").get().value();
+ long currFooVersion = map.get("foo").get().version();
+ MapUpdate<String, byte[]> remove1 =
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withKey("foo")
+ .withCurrentVersion(currFooVersion)
+ .build();
+
+ tx = new MapTransaction<>(TransactionId.from("tx2"), Arrays.asList(remove1));
+
+ map.prepare(tx).thenAccept(result -> {
+ assertTrue("prepare should succeed", result);
+ }).join();
+ // verify changes in Tx is not visible yet until commit
+ assertFalse(listener.eventReceived());
+
+ map.size().thenAccept(size -> {
+ assertThat(size, is(1));
+ }).join();
+
+ map.get("foo").thenAccept(result -> {
+ assertThat(result.value(), is(currFoo));
+ }).join();
+
+ map.commit(tx.transactionId()).join();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MapEvent.Type.REMOVE, event.type());
+ assertArrayEquals(currFoo, event.oldValue().value());
+
+ map.size().thenAccept(size -> {
+ assertThat(size, is(0));
+ }).join();
+
}
protected void transactionRollbackTests(int clusterSize) throws Throwable {
diff --git a/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java b/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java
index e3ef9be..ef17bc4 100644
--- a/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java
+++ b/utils/misc/src/main/java/org/onlab/util/CountDownCompleter.java
@@ -45,10 +45,13 @@
* @param onCompleteCallback callback to invoke when completer is completed
*/
public CountDownCompleter(T object, long count, Consumer<T> onCompleteCallback) {
- checkState(count > 0, "count must be positive");
+ checkState(count >= 0, "count must be non-negative");
this.counter = new AtomicLong(count);
this.object = checkNotNull(object);
this.onCompleteCallback = checkNotNull(onCompleteCallback);
+ if (count == 0) {
+ onCompleteCallback.accept(object);
+ }
}
/**