implemented a distributed default flow registration mechanism to avoid duplicate requests from other onos instances
Change-Id: Ib2abb483456538e3e08e9790c4b4b0d50db8b384
implemented a distributed default flow registration mechanism to avoid
duplicate requests from other onos instances
Change-Id: I620cc51ac29cddaffa73cdbb20e9a9acbdd9ea69
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 36f9f54..7ee8712 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -28,6 +28,7 @@
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketEvent.Type;
+import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
@@ -37,8 +38,12 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -69,6 +74,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService communicationService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private PacketRequestTracker tracker;
+
private static final MessageSubject PACKET_OUT_SUBJECT =
new MessageSubject("packet-out");
@@ -94,6 +104,8 @@
new InternalClusterMessageHandler(),
messageHandlingExecutor);
+ tracker = new PacketRequestTracker();
+
log.info("Started");
}
@@ -125,6 +137,16 @@
// error log: log.warn("Failed to send packet-out to {}", master);
}
+ @Override
+ public boolean requestPackets(PacketRequest request) {
+ return tracker.add(request);
+ }
+
+ @Override
+ public Set<PacketRequest> existingRequests() {
+ return tracker.requests();
+ }
+
/**
* Handles incoming cluster messages.
*/
@@ -140,4 +162,46 @@
}
}
+ private class PacketRequestTracker {
+
+ private ConsistentMap<PacketRequest, Boolean> requests;
+
+ public PacketRequestTracker() {
+ requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
+ .withName("packet-requests")
+ .withSerializer(new Serializer() {
+ KryoNamespace kryo = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .build();
+ @Override
+ public <T> byte[] encode(T object) {
+ return kryo.serialize(object);
+ }
+
+ @Override
+ public <T> T decode(byte[] bytes) {
+ return kryo.deserialize(bytes);
+ }
+ }).build();
+ }
+
+ public boolean add(PacketRequest request) {
+ if (requests.putIfAbsent(request, true) == null) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean remove(PacketRequest request) {
+ if (requests.remove(request) == null) {
+ return false;
+ }
+ return true;
+ }
+
+ public Set<PacketRequest> requests() {
+ return requests.keySet();
+ }
+
+ }
}