implemented a distributed default flow registration mechanism to avoid duplicate requests from other onos instances

Change-Id: Ib2abb483456538e3e08e9790c4b4b0d50db8b384

implemented a distributed default flow registration mechanism to avoid
duplicate requests from other onos instances

Change-Id: I620cc51ac29cddaffa73cdbb20e9a9acbdd9ea69
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 36f9f54..7ee8712 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -28,6 +28,7 @@
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketEvent.Type;
+import org.onosproject.net.packet.PacketRequest;
 import org.onosproject.net.packet.PacketStore;
 import org.onosproject.net.packet.PacketStoreDelegate;
 import org.onosproject.store.AbstractStore;
@@ -37,8 +38,12 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -69,6 +74,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService communicationService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private PacketRequestTracker tracker;
+
     private static final MessageSubject PACKET_OUT_SUBJECT =
             new MessageSubject("packet-out");
 
@@ -94,6 +104,8 @@
                                            new InternalClusterMessageHandler(),
                                            messageHandlingExecutor);
 
+        tracker =  new PacketRequestTracker();
+
         log.info("Started");
     }
 
@@ -125,6 +137,16 @@
         // error log: log.warn("Failed to send packet-out to {}", master);
     }
 
+    @Override
+    public boolean requestPackets(PacketRequest request) {
+        return tracker.add(request);
+    }
+
+    @Override
+    public Set<PacketRequest> existingRequests() {
+        return tracker.requests();
+    }
+
     /**
      * Handles incoming cluster messages.
      */
@@ -140,4 +162,46 @@
         }
     }
 
+    private class PacketRequestTracker {
+
+        private ConsistentMap<PacketRequest, Boolean> requests;
+
+        public PacketRequestTracker() {
+            requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
+                    .withName("packet-requests")
+                    .withSerializer(new Serializer() {
+                        KryoNamespace kryo = new KryoNamespace.Builder()
+                                .register(KryoNamespaces.API)
+                                .build();
+                        @Override
+                        public <T> byte[] encode(T object) {
+                            return kryo.serialize(object);
+                        }
+
+                        @Override
+                        public <T> T decode(byte[] bytes) {
+                            return kryo.deserialize(bytes);
+                        }
+                    }).build();
+        }
+
+        public boolean add(PacketRequest request) {
+            if (requests.putIfAbsent(request, true) == null) {
+                return true;
+            }
+            return false;
+        }
+
+        public boolean remove(PacketRequest request) {
+            if (requests.remove(request) == null) {
+                return false;
+            }
+            return true;
+        }
+
+        public Set<PacketRequest> requests() {
+            return requests.keySet();
+        }
+
+    }
 }