implemented a distributed default flow registration mechanism to avoid duplicate requests from other onos instances

Change-Id: Ib2abb483456538e3e08e9790c4b4b0d50db8b384

implemented a distributed default flow registration mechanism to avoid
duplicate requests from other onos instances

Change-Id: I620cc51ac29cddaffa73cdbb20e9a9acbdd9ea69
diff --git a/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java b/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java
new file mode 100644
index 0000000..0efcc7f
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.packet;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.TrafficSelector;
+
+/**
+ * Default implementation of a packet request.
+ */
+public final class DefaultPacketRequest implements PacketRequest {
+    private final TrafficSelector selector;
+    private final PacketPriority priority;
+    private final ApplicationId appId;
+    private final FlowRule.Type tableType;
+
+    public DefaultPacketRequest(TrafficSelector selector, PacketPriority priority,
+                                ApplicationId appId, FlowRule.Type tableType) {
+        this.selector = selector;
+        this.priority = priority;
+        this.appId = appId;
+        this.tableType = tableType;
+    }
+
+    public TrafficSelector selector() {
+        return selector;
+    }
+
+    public PacketPriority priority() {
+        return priority;
+    }
+
+    public ApplicationId appId() {
+        return appId;
+    }
+
+    public FlowRule.Type tableType() {
+        return tableType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DefaultPacketRequest that = (DefaultPacketRequest) o;
+
+        if (priority != that.priority) {
+            return false;
+        }
+        if (!selector.equals(that.selector)) {
+            return false;
+        }
+        if (!tableType.equals(that.tableType)) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = selector.hashCode();
+        result = 31 * result + priority.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this.getClass())
+                .add("selector", selector)
+                .add("priority", priority)
+                .add("appId", appId)
+                .add("table-type", tableType).toString();
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java b/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
index de9fe31..68c0a83 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
@@ -46,4 +46,8 @@
     public int priorityValue() {
         return priorityValue;
     }
+
+    public String toString() {
+        return String.valueOf(priorityValue);
+    }
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java b/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java
new file mode 100644
index 0000000..7a95ef4
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.packet;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.TrafficSelector;
+
+/**
+ * Represents a packet request made to devices.
+ */
+public interface PacketRequest {
+
+    /**
+     * Obtain the traffic selector.
+     * @return a traffic selector
+     */
+    public TrafficSelector selector();
+
+    /**
+     * Obtain the priority.
+     * @return a PacketPriority
+     */
+    public PacketPriority priority();
+
+    /**
+     * Obtain the application id.
+     * @return an application id
+     */
+    public ApplicationId appId();
+
+    /**
+     * Obtain the table type.
+     * @return a table type
+     */
+    public FlowRule.Type tableType();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
index c38605d..a54b8fa 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
@@ -17,6 +17,8 @@
 
 import org.onosproject.store.Store;
 
+import java.util.Set;
+
 /**
  * Manages routing of outbound packets.
  */
@@ -31,4 +33,21 @@
      */
     void emit(OutboundPacket packet);
 
+    /**
+     * Register a request for packets. If the registration
+     * is successful the manager can proceed, otherwise it should
+     * consider these packet already available in the system.
+     *
+     * @param request a packet request
+     * @return a boolean indicating registration state.
+     */
+    boolean requestPackets(PacketRequest request);
+
+    /**
+     * Obtains all existing requests in the system.
+     *
+     * @return a set of packet requests
+     */
+    Set<PacketRequest> existingRequests();
+
 }
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index 8908601..9177d3a 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -23,7 +23,6 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.Device;
-import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
@@ -33,6 +32,7 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultPacketRequest;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketEvent;
@@ -41,6 +41,7 @@
 import org.onosproject.net.packet.PacketProvider;
 import org.onosproject.net.packet.PacketProviderRegistry;
 import org.onosproject.net.packet.PacketProviderService;
+import org.onosproject.net.packet.PacketRequest;
 import org.onosproject.net.packet.PacketService;
 import org.onosproject.net.packet.PacketStore;
 import org.onosproject.net.packet.PacketStoreDelegate;
@@ -48,9 +49,7 @@
 import org.onosproject.net.provider.AbstractProviderService;
 import org.slf4j.Logger;
 
-import java.util.Collections;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -82,68 +81,6 @@
 
     private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
 
-    private Set<PacketRequest> packetRequests =
-            Collections.newSetFromMap(new ConcurrentHashMap<>());
-
-    private final class PacketRequest {
-        private final TrafficSelector selector;
-        private final PacketPriority priority;
-        private final ApplicationId appId;
-        private final FlowRule.Type tableType;
-
-        public PacketRequest(TrafficSelector selector, PacketPriority priority,
-                             ApplicationId appId, FlowRule.Type tableType) {
-            this.selector = selector;
-            this.priority = priority;
-            this.appId = appId;
-            this.tableType = tableType;
-        }
-
-        public TrafficSelector selector() {
-            return selector;
-        }
-
-        public PacketPriority priority() {
-            return priority;
-        }
-
-        public ApplicationId appId() {
-            return appId;
-        }
-
-        public FlowRule.Type tableType() {
-            return tableType;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            PacketRequest that = (PacketRequest) o;
-
-            if (priority != that.priority) {
-                return false;
-            }
-            if (!selector.equals(that.selector)) {
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = selector.hashCode();
-            result = 31 * result + priority.hashCode();
-            return result;
-        }
-    }
-
     @Activate
     public void activate() {
         store.setDelegate(delegate);
@@ -177,10 +114,11 @@
         checkNotNull(appId, "Application ID cannot be null");
 
         PacketRequest request =
-                new PacketRequest(selector, priority, appId, FlowRule.Type.DEFAULT);
+                new DefaultPacketRequest(selector, priority, appId, FlowRule.Type.DEFAULT);
 
-        packetRequests.add(request);
-        pushToAllDevices(request);
+        if (store.requestPackets(request)) {
+            pushToAllDevices(request);
+        }
     }
 
     @Override
@@ -192,11 +130,12 @@
                 + "without table hints, use other methods in the packetService API");
 
         PacketRequest request =
-                new PacketRequest(selector, priority, appId, tableType);
+                new DefaultPacketRequest(selector, priority, appId, tableType);
 
-        if (packetRequests.add(request)) {
+        if (store.requestPackets(request)) {
             pushToAllDevices(request);
         }
+
     }
 
     /**
@@ -206,9 +145,7 @@
      */
     private void pushToAllDevices(PacketRequest request) {
         for (Device device : deviceService.getDevices()) {
-            if (deviceService.getRole(device.id()) == MastershipRole.MASTER) {
-                pushRule(device, request);
-            }
+            pushRule(device, request);
         }
     }
 
@@ -303,7 +240,7 @@
         public void event(DeviceEvent event) {
             Device device = event.subject();
             if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
-                for (PacketRequest request : packetRequests) {
+                for (PacketRequest request : store.existingRequests()) {
                     pushRule(device, request);
                 }
             }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 36f9f54..7ee8712 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -28,6 +28,7 @@
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketEvent.Type;
+import org.onosproject.net.packet.PacketRequest;
 import org.onosproject.net.packet.PacketStore;
 import org.onosproject.net.packet.PacketStoreDelegate;
 import org.onosproject.store.AbstractStore;
@@ -37,8 +38,12 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -69,6 +74,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService communicationService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private PacketRequestTracker tracker;
+
     private static final MessageSubject PACKET_OUT_SUBJECT =
             new MessageSubject("packet-out");
 
@@ -94,6 +104,8 @@
                                            new InternalClusterMessageHandler(),
                                            messageHandlingExecutor);
 
+        tracker =  new PacketRequestTracker();
+
         log.info("Started");
     }
 
@@ -125,6 +137,16 @@
         // error log: log.warn("Failed to send packet-out to {}", master);
     }
 
+    @Override
+    public boolean requestPackets(PacketRequest request) {
+        return tracker.add(request);
+    }
+
+    @Override
+    public Set<PacketRequest> existingRequests() {
+        return tracker.requests();
+    }
+
     /**
      * Handles incoming cluster messages.
      */
@@ -140,4 +162,46 @@
         }
     }
 
+    private class PacketRequestTracker {
+
+        private ConsistentMap<PacketRequest, Boolean> requests;
+
+        public PacketRequestTracker() {
+            requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
+                    .withName("packet-requests")
+                    .withSerializer(new Serializer() {
+                        KryoNamespace kryo = new KryoNamespace.Builder()
+                                .register(KryoNamespaces.API)
+                                .build();
+                        @Override
+                        public <T> byte[] encode(T object) {
+                            return kryo.serialize(object);
+                        }
+
+                        @Override
+                        public <T> T decode(byte[] bytes) {
+                            return kryo.deserialize(bytes);
+                        }
+                    }).build();
+        }
+
+        public boolean add(PacketRequest request) {
+            if (requests.putIfAbsent(request, true) == null) {
+                return true;
+            }
+            return false;
+        }
+
+        public boolean remove(PacketRequest request) {
+            if (requests.remove(request) == null) {
+                return false;
+            }
+            return true;
+        }
+
+        public Set<PacketRequest> requests() {
+            return requests.keySet();
+        }
+
+    }
 }
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 33631da..73a0e00 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -106,6 +106,8 @@
 import org.onosproject.net.intent.constraint.WaypointConstraint;
 import org.onosproject.net.link.DefaultLinkDescription;
 import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.DefaultPacketRequest;
+import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.net.resource.Bandwidth;
 import org.onosproject.net.resource.BandwidthResourceAllocation;
@@ -225,6 +227,8 @@
                     FlowRule.Type.class,
                     DefaultFlowRule.class,
                     DefaultFlowEntry.class,
+                    DefaultPacketRequest.class,
+                    PacketPriority.class,
                     FlowEntry.FlowEntryState.class,
                     FlowId.class,
                     DefaultTrafficSelector.class,
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java
index 30f3910..68da758 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java
@@ -15,15 +15,21 @@
  */
 package org.onosproject.store.trivial.impl;
 
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketEvent;
 import org.onosproject.net.packet.PacketEvent.Type;
+import org.onosproject.net.packet.PacketRequest;
 import org.onosproject.net.packet.PacketStore;
 import org.onosproject.net.packet.PacketStoreDelegate;
 import org.onosproject.store.AbstractStore;
 
+
+import java.util.Collections;
+import java.util.Set;
+
 /**
  * Simple single instance implementation of the packet store.
  */
@@ -33,9 +39,21 @@
         extends AbstractStore<PacketEvent, PacketStoreDelegate>
         implements PacketStore {
 
+    private Set<PacketRequest> requests = Sets.newConcurrentHashSet();
+
     @Override
     public void emit(OutboundPacket packet) {
         notifyDelegate(new PacketEvent(Type.EMIT, packet));
     }
 
+    @Override
+    public boolean requestPackets(PacketRequest request) {
+        return requests.add(request);
+    }
+
+    @Override
+    public Set<PacketRequest> existingRequests() {
+        return Collections.unmodifiableSet(requests);
+    }
+
 }