Fixes some issues that were preventing intent tests from running
- LeadershipStore to support serving getAllLeaderships from cache.
- Removed a changed to KryoNamespaces that was causing serialization issues. Instead moved that type registration to McastStore.
Change-Id: I06acf1a397b6a982c0dfd0ebc0830b2161cf23a4
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index c6647b7..83d6f96 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -71,6 +71,8 @@
protected NodeId localNodeId;
protected ConsistentMap<String, InternalLeadership> leadershipMap;
+ protected Map<String, Versioned<InternalLeadership>> leadershipCache = Maps.newConcurrentMap();
+
private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
event -> {
Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
@@ -91,6 +93,12 @@
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
+ leadershipCache.compute(event.key(), (k, v) -> {
+ if (v == null || v.version() < event.newValue().version()) {
+ return event.newValue();
+ }
+ return v;
+ });
notifyDelegate(new LeadershipEvent(eventType, newValue));
};
@@ -103,6 +111,7 @@
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
.build();
+ leadershipMap.entrySet().forEach(e -> leadershipCache.put(e.getKey(), e.getValue()));
leadershipMap.addListener(leadershipChangeListener);
log.info("Started");
}
@@ -210,16 +219,13 @@
@Override
public Leadership getLeadership(String topic) {
- return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
+ InternalLeadership internalLeadership = Versioned.valueOrNull(leadershipCache.get(topic));
+ return internalLeadership == null ? null : internalLeadership.asLeadership();
}
@Override
public Map<String, Leadership> getLeaderships() {
- Map<String, Leadership> leaderships = Maps.newHashMap();
- leadershipMap.entrySet().forEach(e -> {
- leaderships.put(e.getKey(), e.getValue().value().asLeadership());
- });
- return ImmutableMap.copyOf(leaderships);
+ return ImmutableMap.copyOf(Maps.transformValues(leadershipCache, v -> v.value().asLeadership()));
}
private static class InternalLeadership {
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index ae27fc2..baf7f66 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -225,7 +225,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
public final class KryoNamespaces {
@@ -235,7 +234,6 @@
.register(AtomicBoolean.class)
.register(AtomicInteger.class)
.register(AtomicLong.class)
- .register(AtomicReference.class)
.register(new ImmutableListSerializer(),
ImmutableList.class,
ImmutableList.of(1).getClass(),
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
index 06e908b..e48852c 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/DistributedMcastStore.java
@@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static org.slf4j.LoggerFactory.getLogger;
@@ -56,6 +57,7 @@
.withSerializer(Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(
+ AtomicReference.class,
MulticastData.class,
McastRoute.class,
McastRoute.Type.class
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
index 7bf4a10..0d2fa24 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/mcast/impl/MulticastData.java
@@ -57,6 +57,7 @@
}
public void setSource(ConnectPoint source) {
+ // FIXME: violates immutability
isEmpty.set(false);
this.source.set(source);
}