Refactor DistributedPacketStore to store packet requests in a ConsistentMultimap
Change-Id: Ia4a93c47fee726009673e99609b2f8800807e675
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 5fdea05..1c02da9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,9 +15,7 @@
*/
package org.onosproject.store.packet.impl;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -43,20 +41,20 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import java.util.Collection;
import java.util.Dictionary;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
@@ -205,10 +203,10 @@
private final class PacketRequestTracker {
- private ConsistentMap<RequestKey, Set<PacketRequest>> requests;
+ private ConsistentMultimap<RequestKey, PacketRequest> requests;
private PacketRequestTracker() {
- requests = storageService.<RequestKey, Set<PacketRequest>>consistentMapBuilder()
+ requests = storageService.<RequestKey, PacketRequest>consistentMultimapBuilder()
.withName("onos-packet-requests")
.withSerializer(Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
@@ -218,68 +216,36 @@
}
private void add(PacketRequest request) {
- AtomicBoolean firstRequest = addInternal(request);
- if (firstRequest.get() && delegate != null) {
+ boolean firstRequest = addInternal(request);
+ if (firstRequest && delegate != null) {
// The instance that makes the first request will push to all devices
delegate.requestPackets(request);
}
}
- private AtomicBoolean addInternal(PacketRequest request) {
- AtomicBoolean firstRequest = new AtomicBoolean(false);
- requests.compute(key(request), (s, existingRequests) -> {
- // Reset to false just in case this is a retry due to
- // ConcurrentModificationException
- firstRequest.set(false);
- if (existingRequests == null) {
- firstRequest.set(true);
- return ImmutableSet.of(request);
- } else if (!existingRequests.contains(request)) {
- firstRequest.set(true);
- return ImmutableSet.<PacketRequest>builder()
- .addAll(existingRequests)
- .add(request)
- .build();
- } else {
- return existingRequests;
- }
- });
- return firstRequest;
+ private boolean addInternal(PacketRequest request) {
+ Collection<? extends PacketRequest> values =
+ Versioned.valueOrNull(requests.putAndGet(key(request), request));
+ return values.size() == 1;
}
private void remove(PacketRequest request) {
- AtomicBoolean removedLast = removeInternal(request);
- if (removedLast.get() && delegate != null) {
+ boolean removedLast = removeInternal(request);
+ if (removedLast && delegate != null) {
// The instance that removes the last request will remove from all devices
delegate.cancelPackets(request);
}
}
- private AtomicBoolean removeInternal(PacketRequest request) {
- AtomicBoolean removedLast = new AtomicBoolean(false);
- requests.computeIfPresent(key(request), (s, existingRequests) -> {
- // Reset to false just in case this is a retry due to
- // ConcurrentModificationException
- removedLast.set(false);
- if (existingRequests.contains(request)) {
- Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
- newRequests.remove(request);
- if (newRequests.size() > 0) {
- return ImmutableSet.copyOf(newRequests);
- } else {
- removedLast.set(true);
- return null;
- }
- } else {
- return existingRequests;
- }
- });
- return removedLast;
+ private boolean removeInternal(PacketRequest request) {
+ Collection<? extends PacketRequest> values =
+ Versioned.valueOrNull(requests.removeAndGet(key(request), request));
+ return values == null || values.isEmpty();
}
private List<PacketRequest> requests() {
List<PacketRequest> list = Lists.newArrayList();
- requests.values().forEach(v -> list.addAll(v.value()));
+ requests.values().forEach(v -> list.add(v));
list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
return list;
}