AtomicValue updates
- events now have old and new values
- simplify implementation by using map.replace for compareAndSet
- Removed option for disabling it from builder. Will make it a system wide option in a subsequent update
Change-Id: I7c1424f8671c0e54688172c273b9262f305b0920
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
index 912d6d0..7bbd9d3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAsyncAtomicValue.java
@@ -13,31 +13,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.onosproject.store.primitives.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AtomicValueEvent;
import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArraySet;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
-/**
- * Default implementation of {@link AsyncAtomicValue}.
- *
- * @param <V> value type
- */
+
public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
- private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
- private final AsyncConsistentMap<String, V> valueMap;
private final String name;
- private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
+ private final Serializer serializer;
+ private final AsyncConsistentMap<String, byte[]> backingMap;
+ private final Map<AtomicValueEventListener<V>, MapEventListener<String, byte[]>> listeners =
+ Maps.newIdentityHashMap();
private final MeteringAgent monitor;
private static final String COMPONENT_NAME = "atomicValue";
@@ -45,13 +48,15 @@
private static final String GET_AND_SET = "getAndSet";
private static final String SET = "set";
private static final String COMPARE_AND_SET = "compareAndSet";
+ private static final String ADD_LISTENER = "addListener";
+ private static final String REMOVE_LISTENER = "removeListener";
+ private static final String NOTIFY_LISTENER = "notifyListener";
- public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
- String name,
- boolean meteringEnabled) {
- this.valueMap = valueMap;
- this.name = name;
- this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
+ public DefaultAsyncAtomicValue(String name, Serializer serializer, AsyncConsistentMap<String, byte[]> backingMap) {
+ this.name = checkNotNull(name, "name must not be null");
+ this.serializer = checkNotNull(serializer, "serializer must not be null");
+ this.backingMap = checkNotNull(backingMap, "backingMap must not be null");
+ this.monitor = new MeteringAgent(COMPONENT_NAME, name, true);
}
@Override
@@ -62,76 +67,95 @@
@Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
- CompletableFuture<Boolean> response;
- if (expect == null) {
- if (update == null) {
- response = CompletableFuture.completedFuture(true);
- }
- response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
- } else {
- response = update == null
- ? valueMap.remove(name, expect)
- : valueMap.replace(name, expect, update);
- }
- return response.whenComplete((r, e) -> newTimer.stop(null));
+ return backingMap.replace(name, serializer.encode(expect), serializer.encode(update))
+ .whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<V> get() {
final MeteringAgent.Context newTimer = monitor.startTimer(GET);
- return valueMap.get(name)
- .thenApply(Versioned::valueOrNull)
- .whenComplete((r, e) -> newTimer.stop(null));
+ return backingMap.get(name)
+ .thenApply(Versioned::valueOrNull)
+ .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+ .whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<V> getAndSet(V value) {
final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
- CompletableFuture<Versioned<V>> previousValue = value == null ?
- valueMap.remove(name) : valueMap.put(name, value);
- return previousValue.thenApply(Versioned::valueOrNull)
- .whenComplete((r, e) -> newTimer.stop(null));
+ if (value == null) {
+ return backingMap.remove(name)
+ .thenApply(Versioned::valueOrNull)
+ .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+ .whenComplete((r, e) -> newTimer.stop(e));
+ }
+ return backingMap.put(name, serializer.encode(value))
+ .thenApply(Versioned::valueOrNull)
+ .thenApply(v -> v == null ? null : serializer.<V>decode(v))
+ .whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<Void> set(V value) {
final MeteringAgent.Context newTimer = monitor.startTimer(SET);
- CompletableFuture<Void> previousValue = value == null ?
- valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
- return previousValue.whenComplete((r, e) -> newTimer.stop(null));
+ if (value == null) {
+ return backingMap.remove(name)
+ .whenComplete((r, e) -> newTimer.stop(e))
+ .thenApply(v -> null);
+
+ }
+ return backingMap.put(name, serializer.encode(value))
+ .whenComplete((r, e) -> newTimer.stop(e))
+ .thenApply(v -> null);
}
@Override
public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
- synchronized (listeners) {
- if (listeners.add(listener)) {
- if (listeners.size() == 1) {
- return valueMap.addListener(mapEventListener);
- }
- }
- }
- return CompletableFuture.completedFuture(null);
+ checkNotNull(listener, "listener must not be null");
+ final MeteringAgent.Context newTimer = monitor.startTimer(ADD_LISTENER);
+ MapEventListener<String, byte[]> mapListener =
+ listeners.computeIfAbsent(listener, key -> new InternalMapValueEventListener(listener));
+ return backingMap.addListener(mapListener).whenComplete((r, e) -> newTimer.stop(e));
}
@Override
public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
- synchronized (listeners) {
- if (listeners.remove(listener)) {
- if (listeners.size() == 0) {
- return valueMap.removeListener(mapEventListener);
+ checkNotNull(listener, "listener must not be null");
+ final MeteringAgent.Context newTimer = monitor.startTimer(REMOVE_LISTENER);
+ MapEventListener<String, byte[]> mapListener = listeners.remove(listener);
+ if (mapListener != null) {
+ return backingMap.removeListener(mapListener)
+ .whenComplete((r, e) -> newTimer.stop(e));
+ } else {
+ newTimer.stop(null);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private class InternalMapValueEventListener implements MapEventListener<String, byte[]> {
+
+ private final AtomicValueEventListener<V> listener;
+
+ InternalMapValueEventListener(AtomicValueEventListener<V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(MapEvent<String, byte[]> event) {
+ if (event.key().equals(name)) {
+ final MeteringAgent.Context newTimer = monitor.startTimer(NOTIFY_LISTENER);
+ byte[] rawNewValue = Versioned.valueOrNull(event.newValue());
+ byte[] rawOldValue = Versioned.valueOrNull(event.oldValue());
+ try {
+ listener.event(new AtomicValueEvent<>(name,
+ rawNewValue == null ? null : serializer.decode(rawNewValue),
+ rawOldValue == null ? null : serializer.decode(rawOldValue)));
+ newTimer.stop(null);
+ } catch (Exception e) {
+ newTimer.stop(e);
+ Throwables.propagate(e);
}
}
}
- return CompletableFuture.completedFuture(null);
}
-
- private class InternalMapEventListener implements MapEventListener<String, V> {
-
- @Override
- public void event(MapEvent<String, V> mapEvent) {
- V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
- AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
- listeners.forEach(l -> l.event(atomicValueEvent));
- }
- }
-}
+}
\ No newline at end of file