ONOS-2495: Workaround for the fact that kryo deserialization is not thread safe
Change-Id: Id1bd3334f9a5c122984d08f97dbbbf622b27cf33
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 9082ba6..a661cc6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -32,7 +32,6 @@
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
@@ -122,22 +121,35 @@
this.purgeOnUninstall = purgeOnUninstall;
this.database.registerConsumer(update -> {
SharedExecutors.getSingleThreadExecutor().execute(() -> {
- if (update.target() == MAP_UPDATE) {
- Result<UpdateResult<String, byte[]>> result = update.output();
- if (result.success() && result.value().mapName().equals(name)) {
- MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
- notifyListeners(mapEvent);
+ if (listeners.isEmpty()) {
+ return;
+ }
+ try {
+ if (update.target() == MAP_UPDATE) {
+ Result<UpdateResult<String, byte[]>> result = update.output();
+ if (result.success() && result.value().mapName().equals(name)) {
+ MapEvent<K, V> mapEvent = result.value()
+ .<K, V>map(this::dK,
+ v -> serializer.decode(Tools.copyOf(v)))
+ .toMapEvent();
+ notifyListeners(mapEvent);
+ }
+ } else if (update.target() == TX_COMMIT) {
+ CommitResponse response = update.output();
+ if (response.success()) {
+ response.updates().forEach(u -> {
+ if (u.mapName().equals(name)) {
+ MapEvent<K, V> mapEvent =
+ u.<K, V>map(this::dK,
+ v -> serializer.decode(Tools.copyOf(v)))
+ .toMapEvent();
+ notifyListeners(mapEvent);
+ }
+ });
+ }
}
- } else if (update.target() == TX_COMMIT) {
- CommitResponse response = update.output();
- if (response.success()) {
- response.updates().forEach(u -> {
- if (u.mapName().equals(name)) {
- MapEvent<K, V> mapEvent = u.<K, V>map(this::dK, serializer::decode).toMapEvent();
- notifyListeners(mapEvent);
- }
- });
- }
+ } catch (Exception e) {
+ log.warn("Error notifying listeners", e);
}
});
});
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index ec6b6da..301f411 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -190,6 +190,16 @@
}
/**
+ * Returns a copy of the input byte array.
+ *
+ * @param original input
+ * @return copy of original
+ */
+ public static byte[] copyOf(byte[] original) {
+ return Arrays.copyOf(original, original.length);
+ }
+
+ /**
* Get property as a string value.
*
* @param properties properties to be looked up