AtomicValue updates
- events now have old and new values
- simplify implementation by using map.replace for compareAndSet
- Removed option for disabling it from builder. Will make it a system wide option in a subsequent update
Change-Id: I7c1424f8671c0e54688172c273b9262f305b0920
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
index d5f226f..d0ba25e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueBuilder.java
@@ -60,14 +60,6 @@
AtomicValueBuilder<V> withPartitionsDisabled();
/**
- * Instantiates Metering service to gather usage and performance metrics.
- * By default, usage data will be stored.
- *
- * @return this AtomicValueBuilder for method chaining
- */
- AtomicValueBuilder<V> withMeteringDisabled();
-
- /**
* Builds a AsyncAtomicValue based on the configuration options
* supplied to this builder.
*
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java
index 1bce136..e1916bc 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValueEvent.java
@@ -38,20 +38,20 @@
}
private final String name;
- private final Type type;
- private final V value;
+ private final V newValue;
+ private final V oldValue;
/**
* Creates a new event object.
*
* @param name AtomicValue name
- * @param type the type of the event
- * @param value the new value
+ * @param newValue the new value
+ * @param oldValue the old value
*/
- public AtomicValueEvent(String name, Type type, V value) {
+ public AtomicValueEvent(String name, V newValue, V oldValue) {
this.name = name;
- this.type = type;
- this.value = value;
+ this.newValue = newValue;
+ this.oldValue = oldValue;
}
/**
@@ -69,16 +69,25 @@
* @return the type of the event
*/
public Type type() {
- return type;
+ return AtomicValueEvent.Type.UPDATE;
}
/**
- * Returns the new updated value.
+ * Returns the newly set value.
*
- * @return the value
+ * @return the new value
*/
- public V value() {
- return value;
+ public V newValue() {
+ return newValue;
+ }
+
+ /**
+ * Returns the old replaced value.
+ *
+ * @return the old value
+ */
+ public V oldValue() {
+ return oldValue;
}
@Override
@@ -89,21 +98,21 @@
AtomicValueEvent that = (AtomicValueEvent) o;
return Objects.equals(this.name, that.name) &&
- Objects.equals(this.type, that.type) &&
- Objects.equals(this.value, that.value);
+ Objects.equals(this.newValue, that.newValue) &&
+ Objects.equals(this.oldValue, that.oldValue);
}
@Override
public int hashCode() {
- return Objects.hash(name, type, value);
+ return Objects.hash(name, newValue, oldValue);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("name", name)
- .add("type", type)
- .add("value", value)
+ .add("newValue", newValue)
+ .add("oldValue", oldValue)
.toString();
}
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java b/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java
index 39481ca..4e9ca1c 100644
--- a/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java
+++ b/core/api/src/test/java/org/onosproject/store/service/AtomicValueEventTest.java
@@ -30,13 +30,13 @@
public class AtomicValueEventTest {
AtomicValueEvent<String> event1 =
- new AtomicValueEvent<>("map1", UPDATE, "e1");
+ new AtomicValueEvent<>("map1", "e1", "e0");
AtomicValueEvent<String> event2 =
- new AtomicValueEvent<>("map1", UPDATE, "e2");
+ new AtomicValueEvent<>("map1", "e2", "e1");
AtomicValueEvent<String> sameAsEvent2 =
- new AtomicValueEvent<>("map1", UPDATE, "e2");
+ new AtomicValueEvent<>("map1", "e2", "e1");
AtomicValueEvent<String> event3 =
- new AtomicValueEvent<>("map2", UPDATE, "e2");
+ new AtomicValueEvent<>("map2", "e2", "e1");
/**
* Checks that the SetEvent class is immutable.
@@ -64,7 +64,8 @@
@Test
public void testConstruction() {
assertThat(event1.type(), is(UPDATE));
- assertThat(event1.value(), is("e1"));
+ assertThat(event1.newValue(), is("e1"));
+ assertThat(event1.oldValue(), is("e0"));
assertThat(event1.name(), is("map1"));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
index 912d6d0..7bbd9d3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
@@ -13,31 +13,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.onosproject.store.primitives.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AtomicValueEvent;
import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArraySet;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
-/**
- * Default implementation of {@link AsyncAtomicValue}.
- *
- * @param <V> value type
- */
+
public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
- private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
- private final AsyncConsistentMap<String, V> valueMap;
private final String name;
- private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
+ private final Serializer serializer;
+ private final AsyncConsistentMap<String, byte[]> backingMap;
+ private final Map<AtomicValueEventListener<V>, MapEventListener<String, byte[]>> listeners =
+ Maps.newIdentityHashMap();
private final MeteringAgent monitor;
private static final String COMPONENT_NAME = "atomicValue";
@@ -45,13 +48,15 @@
private static final String GET_AND_SET = "getAndSet";
private static final String SET = "set";
private static final String COMPARE_AND_SET = "compareAndSet";
+ private static final String ADD_LISTENER = "addListener";
+ private static final String REMOVE_LISTENER = "removeListener";
+ private static final String NOTIFY_LISTENER = "notifyListener";
- public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
- String name,
- boolean meteringEnabled) {
- this.valueMap = valueMap;
- this.name = name;
- this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
+ public DefaultAsyncAtomicValue(String name, Serializer serializer, AsyncConsistentMap<String, byte[]> backingMap) {
+ this.name = checkNotNull(name, "name must not be null");
+ this.serializer = checkNotNull(serializer, "serializer must not be null");
+ this.backingMap = checkNotNull(backingMap, "backingMap must not be null");
+ this.monitor = new MeteringAgent(COMPONENT_NAME, name, true);
}
@Override
@@ -62,76 +67,95 @@
@Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
- CompletableFuture<Boolean> response;
- if (expect == null) {
- if (update == null) {
- response = CompletableFuture.completedFuture(true);
- }
- response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
- } else {
- response = update == null
- ? valueMap.remove(name, expect)
- : valueMap.replace(name, expect, update);
- }
- return response.whenComplete((r, e) -> newTimer.stop(null));
+ return backingMap.replace(name, serializer.encode(expect), serializer.encode(update))
+ .whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<V> get() {
final MeteringAgent.Context newTimer = monitor.startTimer(GET);
- return valueMap.get(name)
- .thenApply(Versioned::valueOrNull)
- .whenComplete((r, e) -> newTimer.stop(null));
+ return backingMap.get(name)
+ .thenApply(Versioned::valueOrNull)
+ .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+ .whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<V> getAndSet(V value) {
final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
- CompletableFuture<Versioned<V>> previousValue = value == null ?
- valueMap.remove(name) : valueMap.put(name, value);
- return previousValue.thenApply(Versioned::valueOrNull)
- .whenComplete((r, e) -> newTimer.stop(null));
+ if (value == null) {
+ return backingMap.remove(name)
+ .thenApply(Versioned::valueOrNull)
+ .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+ .whenComplete((r, e) -> newTimer.stop(e));
+ }
+ return backingMap.put(name, serializer.encode(value))
+ .thenApply(Versioned::valueOrNull)
+ .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+ .whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<Void> set(V value) {
final MeteringAgent.Context newTimer = monitor.startTimer(SET);
- CompletableFuture<Void> previousValue = value == null ?
- valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
- return previousValue.whenComplete((r, e) -> newTimer.stop(null));
+ if (value == null) {
+ return backingMap.remove(name)
+ .whenComplete((r, e) -> newTimer.stop(e))
+ .thenApply(v -> null);
+
+ }
+ return backingMap.put(name, serializer.encode(value))
+ .whenComplete((r, e) -> newTimer.stop(e))
+ .thenApply(v -> null);
}
@Override
public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
- synchronized (listeners) {
- if (listeners.add(listener)) {
- if (listeners.size() == 1) {
- return valueMap.addListener(mapEventListener);
- }
- }
- }
- return CompletableFuture.completedFuture(null);
+ checkNotNull(listener, "listener must not be null");
+ final MeteringAgent.Context newTimer = monitor.startTimer(ADD_LISTENER);
+ MapEventListener<String, byte[]> mapListener =
+ listeners.computeIfAbsent(listener, key -> new InternalMapValueEventListener(listener));
+ return backingMap.addListener(mapListener).whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
- synchronized (listeners) {
- if (listeners.remove(listener)) {
- if (listeners.size() == 0) {
- return valueMap.removeListener(mapEventListener);
+ checkNotNull(listener, "listener must not be null");
+ final MeteringAgent.Context newTimer = monitor.startTimer(REMOVE_LISTENER);
+ MapEventListener<String, byte[]> mapListener = listeners.remove(listener);
+ if (mapListener != null) {
+ return backingMap.removeListener(mapListener)
+ .whenComplete((r, e) -> newTimer.stop(e));
+ } else {
+ newTimer.stop(null);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private class InternalMapValueEventListener implements MapEventListener<String, byte[]> {
+
+ private final AtomicValueEventListener<V> listener;
+
+ InternalMapValueEventListener(AtomicValueEventListener<V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(MapEvent<String, byte[]> event) {
+ if (event.key().equals(name)) {
+ final MeteringAgent.Context newTimer = monitor.startTimer(NOTIFY_LISTENER);
+ byte[] rawNewValue = Versioned.valueOrNull(event.newValue());
+ byte[] rawOldValue = Versioned.valueOrNull(event.oldValue());
+ try {
+ listener.event(new AtomicValueEvent<>(name,
+ rawNewValue == null ? null : serializer.decode(rawNewValue),
+ rawOldValue == null ? null : serializer.decode(rawOldValue)));
+ newTimer.stop(null);
+ } catch (Exception e) {
+ newTimer.stop(e);
+ Throwables.propagate(e);
}
}
}
- return CompletableFuture.completedFuture(null);
}
-
- private class InternalMapEventListener implements MapEventListener<String, V> {
-
- @Override
- public void event(MapEvent<String, V> mapEvent) {
- V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
- AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
- listeners.forEach(l -> l.event(atomicValueEvent));
- }
- }
-}
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index e2f4971..3c2243d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -30,11 +30,11 @@
public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
private String name;
- private ConsistentMapBuilder<String, V> mapBuilder;
- private boolean metering = true;
+ private Serializer serializer;
+ private ConsistentMapBuilder<String, byte[]> mapBuilder;
public DefaultAtomicValueBuilder(DatabaseManager manager) {
- mapBuilder = manager.<String, V>consistentMapBuilder()
+ mapBuilder = manager.<String, byte[]>consistentMapBuilder()
.withName("onos-atomic-values")
.withMeteringDisabled()
.withSerializer(Serializer.using(KryoNamespaces.BASIC));
@@ -59,14 +59,8 @@
}
@Override
- public AtomicValueBuilder<V> withMeteringDisabled() {
- metering = false;
- return this;
- }
-
- @Override
public AsyncAtomicValue<V> buildAsyncValue() {
- return new DefaultAsyncAtomicValue<>(mapBuilder.buildAsyncMap(), name, metering);
+ return new DefaultAsyncAtomicValue<>(name, serializer, mapBuilder.buildAsyncMap());
}
@Override