implemented a distributed default flow registration mechanism to avoid duplicate requests from other onos instances
Change-Id: Ib2abb483456538e3e08e9790c4b4b0d50db8b384
implemented a distributed default flow registration mechanism to avoid
duplicate requests from other onos instances
Change-Id: I620cc51ac29cddaffa73cdbb20e9a9acbdd9ea69
diff --git a/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java b/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java
new file mode 100644
index 0000000..0efcc7f
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/packet/DefaultPacketRequest.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.packet;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.TrafficSelector;
+
+/**
+ * Default implementation of a packet request.
+ */
+public final class DefaultPacketRequest implements PacketRequest {
+ private final TrafficSelector selector;
+ private final PacketPriority priority;
+ private final ApplicationId appId;
+ private final FlowRule.Type tableType;
+
+ public DefaultPacketRequest(TrafficSelector selector, PacketPriority priority,
+ ApplicationId appId, FlowRule.Type tableType) {
+ this.selector = selector;
+ this.priority = priority;
+ this.appId = appId;
+ this.tableType = tableType;
+ }
+
+ public TrafficSelector selector() {
+ return selector;
+ }
+
+ public PacketPriority priority() {
+ return priority;
+ }
+
+ public ApplicationId appId() {
+ return appId;
+ }
+
+ public FlowRule.Type tableType() {
+ return tableType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DefaultPacketRequest that = (DefaultPacketRequest) o;
+
+ if (priority != that.priority) {
+ return false;
+ }
+ if (!selector.equals(that.selector)) {
+ return false;
+ }
+ if (!tableType.equals(that.tableType)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = selector.hashCode();
+ result = 31 * result + priority.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this.getClass())
+ .add("selector", selector)
+ .add("priority", priority)
+ .add("appId", appId)
+ .add("table-type", tableType).toString();
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java b/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
index de9fe31..68c0a83 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketPriority.java
@@ -46,4 +46,8 @@
public int priorityValue() {
return priorityValue;
}
+
+ public String toString() {
+ return String.valueOf(priorityValue);
+ }
}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java b/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java
new file mode 100644
index 0000000..7a95ef4
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.packet;
+
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.TrafficSelector;
+
+/**
+ * Represents a packet request made to devices.
+ */
+public interface PacketRequest {
+
+ /**
+ * Obtain the traffic selector.
+ * @return a traffic selector
+ */
+ public TrafficSelector selector();
+
+ /**
+ * Obtain the priority.
+ * @return a PacketPriority
+ */
+ public PacketPriority priority();
+
+ /**
+ * Obtain the application id.
+ * @return an application id
+ */
+ public ApplicationId appId();
+
+ /**
+ * Obtain the table type.
+ * @return a table type
+ */
+ public FlowRule.Type tableType();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
index c38605d..a54b8fa 100644
--- a/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
+++ b/core/api/src/main/java/org/onosproject/net/packet/PacketStore.java
@@ -17,6 +17,8 @@
import org.onosproject.store.Store;
+import java.util.Set;
+
/**
* Manages routing of outbound packets.
*/
@@ -31,4 +33,21 @@
*/
void emit(OutboundPacket packet);
+ /**
+ * Register a request for packets. If the registration
+ * is successful the manager can proceed, otherwise it should
+ * consider these packet already available in the system.
+ *
+ * @param request a packet request
+ * @return a boolean indicating registration state.
+ */
+ boolean requestPackets(PacketRequest request);
+
+ /**
+ * Obtains all existing requests in the system.
+ *
+ * @return a set of packet requests
+ */
+ Set<PacketRequest> existingRequests();
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
index 8908601..9177d3a 100644
--- a/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
+++ b/core/net/src/main/java/org/onosproject/net/packet/impl/PacketManager.java
@@ -23,7 +23,6 @@
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.Device;
-import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
@@ -33,6 +32,7 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultPacketRequest;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketEvent;
@@ -41,6 +41,7 @@
import org.onosproject.net.packet.PacketProvider;
import org.onosproject.net.packet.PacketProviderRegistry;
import org.onosproject.net.packet.PacketProviderService;
+import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
@@ -48,9 +49,7 @@
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
-import java.util.Collections;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -82,68 +81,6 @@
private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
- private Set<PacketRequest> packetRequests =
- Collections.newSetFromMap(new ConcurrentHashMap<>());
-
- private final class PacketRequest {
- private final TrafficSelector selector;
- private final PacketPriority priority;
- private final ApplicationId appId;
- private final FlowRule.Type tableType;
-
- public PacketRequest(TrafficSelector selector, PacketPriority priority,
- ApplicationId appId, FlowRule.Type tableType) {
- this.selector = selector;
- this.priority = priority;
- this.appId = appId;
- this.tableType = tableType;
- }
-
- public TrafficSelector selector() {
- return selector;
- }
-
- public PacketPriority priority() {
- return priority;
- }
-
- public ApplicationId appId() {
- return appId;
- }
-
- public FlowRule.Type tableType() {
- return tableType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PacketRequest that = (PacketRequest) o;
-
- if (priority != that.priority) {
- return false;
- }
- if (!selector.equals(that.selector)) {
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = selector.hashCode();
- result = 31 * result + priority.hashCode();
- return result;
- }
- }
-
@Activate
public void activate() {
store.setDelegate(delegate);
@@ -177,10 +114,11 @@
checkNotNull(appId, "Application ID cannot be null");
PacketRequest request =
- new PacketRequest(selector, priority, appId, FlowRule.Type.DEFAULT);
+ new DefaultPacketRequest(selector, priority, appId, FlowRule.Type.DEFAULT);
- packetRequests.add(request);
- pushToAllDevices(request);
+ if (store.requestPackets(request)) {
+ pushToAllDevices(request);
+ }
}
@Override
@@ -192,11 +130,12 @@
+ "without table hints, use other methods in the packetService API");
PacketRequest request =
- new PacketRequest(selector, priority, appId, tableType);
+ new DefaultPacketRequest(selector, priority, appId, tableType);
- if (packetRequests.add(request)) {
+ if (store.requestPackets(request)) {
pushToAllDevices(request);
}
+
}
/**
@@ -206,9 +145,7 @@
*/
private void pushToAllDevices(PacketRequest request) {
for (Device device : deviceService.getDevices()) {
- if (deviceService.getRole(device.id()) == MastershipRole.MASTER) {
- pushRule(device, request);
- }
+ pushRule(device, request);
}
}
@@ -303,7 +240,7 @@
public void event(DeviceEvent event) {
Device device = event.subject();
if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
- for (PacketRequest request : packetRequests) {
+ for (PacketRequest request : store.existingRequests()) {
pushRule(device, request);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 36f9f54..7ee8712 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -28,6 +28,7 @@
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketEvent.Type;
+import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
@@ -37,8 +38,12 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -69,6 +74,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService communicationService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private PacketRequestTracker tracker;
+
private static final MessageSubject PACKET_OUT_SUBJECT =
new MessageSubject("packet-out");
@@ -94,6 +104,8 @@
new InternalClusterMessageHandler(),
messageHandlingExecutor);
+ tracker = new PacketRequestTracker();
+
log.info("Started");
}
@@ -125,6 +137,16 @@
// error log: log.warn("Failed to send packet-out to {}", master);
}
+ @Override
+ public boolean requestPackets(PacketRequest request) {
+ return tracker.add(request);
+ }
+
+ @Override
+ public Set<PacketRequest> existingRequests() {
+ return tracker.requests();
+ }
+
/**
* Handles incoming cluster messages.
*/
@@ -140,4 +162,46 @@
}
}
+ private class PacketRequestTracker {
+
+ private ConsistentMap<PacketRequest, Boolean> requests;
+
+ public PacketRequestTracker() {
+ requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
+ .withName("packet-requests")
+ .withSerializer(new Serializer() {
+ KryoNamespace kryo = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .build();
+ @Override
+ public <T> byte[] encode(T object) {
+ return kryo.serialize(object);
+ }
+
+ @Override
+ public <T> T decode(byte[] bytes) {
+ return kryo.deserialize(bytes);
+ }
+ }).build();
+ }
+
+ public boolean add(PacketRequest request) {
+ if (requests.putIfAbsent(request, true) == null) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean remove(PacketRequest request) {
+ if (requests.remove(request) == null) {
+ return false;
+ }
+ return true;
+ }
+
+ public Set<PacketRequest> requests() {
+ return requests.keySet();
+ }
+
+ }
}
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 33631da..73a0e00 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -106,6 +106,8 @@
import org.onosproject.net.intent.constraint.WaypointConstraint;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.DefaultPacketRequest;
+import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.resource.Bandwidth;
import org.onosproject.net.resource.BandwidthResourceAllocation;
@@ -225,6 +227,8 @@
FlowRule.Type.class,
DefaultFlowRule.class,
DefaultFlowEntry.class,
+ DefaultPacketRequest.class,
+ PacketPriority.class,
FlowEntry.FlowEntryState.class,
FlowId.class,
DefaultTrafficSelector.class,
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java
index 30f3910..68da758 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimplePacketStore.java
@@ -15,15 +15,21 @@
*/
package org.onosproject.store.trivial.impl;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketEvent;
import org.onosproject.net.packet.PacketEvent.Type;
+import org.onosproject.net.packet.PacketRequest;
import org.onosproject.net.packet.PacketStore;
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
+
+import java.util.Collections;
+import java.util.Set;
+
/**
* Simple single instance implementation of the packet store.
*/
@@ -33,9 +39,21 @@
extends AbstractStore<PacketEvent, PacketStoreDelegate>
implements PacketStore {
+ private Set<PacketRequest> requests = Sets.newConcurrentHashSet();
+
@Override
public void emit(OutboundPacket packet) {
notifyDelegate(new PacketEvent(Type.EMIT, packet));
}
+ @Override
+ public boolean requestPackets(PacketRequest request) {
+ return requests.add(request);
+ }
+
+ @Override
+ public Set<PacketRequest> existingRequests() {
+ return Collections.unmodifiableSet(requests);
+ }
+
}