Flow Objective implementation
Provides an abstraction which isolates the application from any pipeline
knowledge. By using the provided objectives applications can express
their forwarding desires in a pipeline agnostic way. The objectives
are then consumed by a driver for the specific device who converts them
into the appropriate pipeline coherent flows.
Change-Id: I74a68b4971c367c0cd5b7de9d877abdd117afa98
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/DefaultNextGroup.java b/core/api/src/main/java/org/onosproject/net/behaviour/DefaultNextGroup.java
new file mode 100644
index 0000000..ef1f9de
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/DefaultNextGroup.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.behaviour;
+
+/**
+ * Default implementation of a next group.
+ */
+public class DefaultNextGroup implements NextGroup {
+
+ private final byte[] data;
+
+ public DefaultNextGroup(byte[] data) {
+ this.data = data;
+ }
+
+ @Override
+ public byte[] data() {
+ return data;
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/NextGroup.java b/core/api/src/main/java/org/onosproject/net/behaviour/NextGroup.java
new file mode 100644
index 0000000..b5a3891
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/NextGroup.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.behaviour;
+
+/**
+ * Opaque data type for carrying group-like information.
+ * Only relevant to a pipeliner driver.
+ */
+public interface NextGroup {
+
+ /**
+ * Serialized form of the next group.
+ * @return a byte array.
+ */
+ byte[] data();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
index eda131a..dcfc588 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
@@ -21,9 +21,6 @@
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
-import java.util.Collection;
-import java.util.concurrent.Future;
-
/**
* Behaviour for handling various pipelines.
*/
@@ -40,24 +37,21 @@
/**
* Installs the filtering rules onto the device.
*
- * @param filterObjectives the collection of filters
- * @return a future indicating the success of the operation
+ * @param filterObjective a filtering objective
*/
- Future<Boolean> filter(Collection<FilteringObjective> filterObjectives);
+ void filter(FilteringObjective filterObjective);
/**
* Installs the forwarding rules onto the device.
*
- * @param forwardObjectives the collection of forwarding objectives
- * @return a future indicating the success of the operation
+ * @param forwardObjective a forwarding objective
*/
- Future<Boolean> forward(Collection<ForwardingObjective> forwardObjectives);
+ void forward(ForwardingObjective forwardObjective);
/**
* Installs the next hop elements into the device.
*
- * @param nextObjectives the collection of next objectives
- * @return a future indicating the success of the operation
+ * @param nextObjective a next objectives
*/
- Future<Boolean> next(Collection<NextObjective> nextObjectives);
+ void next(NextObjective nextObjective);
}
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
index c2c6dfd..d0ca42b 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
@@ -16,6 +16,7 @@
package org.onosproject.net.behaviour;
import org.onlab.osgi.ServiceDirectory;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
/**
* Processing context and supporting services for the pipeline behaviour.
@@ -30,5 +31,11 @@
*/
ServiceDirectory directory();
+ /**
+ * Returns the Objective Store where data can be stored and retrieved.
+ * @return the flow objective store
+ */
+ FlowObjectiveStore store();
+
// TODO: add means to store and access shared state
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java
index 33b8f5a..94519a9 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultFilteringObjective.java
@@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -42,6 +43,7 @@
private final List<Criterion> conditions;
private final int id;
private final Operation op;
+ private final Optional<ObjectiveContext> context;
private DefaultFilteringObjective(Type type, boolean permanent, int timeout,
ApplicationId appId, int priority, Criterion key,
@@ -54,6 +56,25 @@
this.priority = priority;
this.conditions = conditions;
this.op = op;
+ this.context = Optional.empty();
+
+ this.id = Objects.hash(type, key, conditions, permanent,
+ timeout, appId, priority);
+ }
+
+ public DefaultFilteringObjective(Type type, boolean permanent, int timeout,
+ ApplicationId appId, int priority, Criterion key,
+ List<Criterion> conditions,
+ ObjectiveContext context, Operation op) {
+ this.key = key;
+ this.type = type;
+ this.permanent = permanent;
+ this.timeout = timeout;
+ this.appId = appId;
+ this.priority = priority;
+ this.conditions = conditions;
+ this.op = op;
+ this.context = Optional.ofNullable(context);
this.id = Objects.hash(type, key, conditions, permanent,
timeout, appId, priority);
@@ -104,6 +125,11 @@
return op;
}
+ @Override
+ public Optional<ObjectiveContext> context() {
+ return context;
+ }
+
/**
* Returns a new builder.
*
@@ -201,6 +227,31 @@
}
+ @Override
+ public FilteringObjective add(ObjectiveContext context) {
+ List<Criterion> conditions = listBuilder.build();
+ checkNotNull(type, "Must have a type.");
+ checkArgument(!conditions.isEmpty(), "Must have at least one condition.");
+ checkNotNull(appId, "Must supply an application id");
+
+ return new DefaultFilteringObjective(type, permanent, timeout,
+ appId, priority, key, conditions,
+ context, Operation.ADD);
+ }
+
+ @Override
+ public FilteringObjective remove(ObjectiveContext context) {
+ List<Criterion> conditions = listBuilder.build();
+ checkNotNull(type, "Must have a type.");
+ checkArgument(!conditions.isEmpty(), "Must have at least one condition.");
+ checkNotNull(appId, "Must supply an application id");
+
+
+ return new DefaultFilteringObjective(type, permanent, timeout,
+ appId, priority, key, conditions,
+ context, Operation.REMOVE);
+ }
+
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
index d110e07..6489bea 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultForwardingObjective.java
@@ -20,6 +20,7 @@
import org.onosproject.net.flow.TrafficTreatment;
import java.util.Objects;
+import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -38,6 +39,7 @@
private final int nextId;
private final TrafficTreatment treatment;
private final Operation op;
+ private final Optional<ObjectiveContext> context;
private final int id;
@@ -55,6 +57,29 @@
this.nextId = nextId;
this.treatment = treatment;
this.op = op;
+ this.context = Optional.empty();
+
+ this.id = Objects.hash(selector, flag, permanent,
+ timeout, appId, priority, nextId,
+ treatment, op);
+ }
+
+ private DefaultForwardingObjective(TrafficSelector selector,
+ Flag flag, boolean permanent,
+ int timeout, ApplicationId appId,
+ int priority, int nextId,
+ TrafficTreatment treatment,
+ ObjectiveContext context, Operation op) {
+ this.selector = selector;
+ this.flag = flag;
+ this.permanent = permanent;
+ this.timeout = timeout;
+ this.appId = appId;
+ this.priority = priority;
+ this.nextId = nextId;
+ this.treatment = treatment;
+ this.op = op;
+ this.context = Optional.ofNullable(context);
this.id = Objects.hash(selector, flag, permanent,
timeout, appId, priority, nextId,
@@ -113,6 +138,11 @@
return op;
}
+ @Override
+ public Optional<ObjectiveContext> context() {
+ return context;
+ }
+
/**
* Returns a new builder.
*
@@ -186,7 +216,7 @@
public ForwardingObjective add() {
checkNotNull(selector, "Must have a selector");
checkNotNull(flag, "A flag must be set");
- checkArgument(nextId != null && treatment != null, "Must supply at " +
+ checkArgument(nextId != null || treatment != null, "Must supply at " +
"least a treatment and/or a nextId");
checkNotNull(appId, "Must supply an application id");
return new DefaultForwardingObjective(selector, flag, permanent,
@@ -198,12 +228,38 @@
public ForwardingObjective remove() {
checkNotNull(selector, "Must have a selector");
checkNotNull(flag, "A flag must be set");
- checkArgument(nextId != null && treatment != null, "Must supply at " +
+ checkArgument(nextId != null || treatment != null, "Must supply at " +
"least a treatment and/or a nextId");
checkNotNull(appId, "Must supply an application id");
return new DefaultForwardingObjective(selector, flag, permanent,
timeout, appId, priority,
nextId, treatment, Operation.REMOVE);
}
+
+ @Override
+ public ForwardingObjective add(ObjectiveContext context) {
+ checkNotNull(selector, "Must have a selector");
+ checkNotNull(flag, "A flag must be set");
+ checkArgument(nextId != null || treatment != null, "Must supply at " +
+ "least a treatment and/or a nextId");
+ checkNotNull(appId, "Must supply an application id");
+ return new DefaultForwardingObjective(selector, flag, permanent,
+ timeout, appId, priority,
+ nextId, treatment,
+ context, Operation.ADD);
+ }
+
+ @Override
+ public ForwardingObjective remove(ObjectiveContext context) {
+ checkNotNull(selector, "Must have a selector");
+ checkNotNull(flag, "A flag must be set");
+ checkArgument(nextId != null || treatment != null, "Must supply at " +
+ "least a treatment and/or a nextId");
+ checkNotNull(appId, "Must supply an application id");
+ return new DefaultForwardingObjective(selector, flag, permanent,
+ timeout, appId, priority,
+ nextId, treatment,
+ context, Operation.REMOVE);
+ }
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java
index 4ab79ff..cc316fe 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/DefaultNextObjective.java
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -34,13 +35,28 @@
private final ApplicationId appId;
private final Type type;
private final Integer id;
+ private final Operation op;
+ private final Optional<ObjectiveContext> context;
private DefaultNextObjective(Integer id, List<TrafficTreatment> treatments,
- ApplicationId appId, Type type) {
+ ApplicationId appId, Type type, Operation op) {
this.treatments = treatments;
this.appId = appId;
this.type = type;
this.id = id;
+ this.op = op;
+ this.context = Optional.empty();
+ }
+
+ private DefaultNextObjective(Integer id, List<TrafficTreatment> treatments,
+ ApplicationId appId, ObjectiveContext context,
+ Type type, Operation op) {
+ this.treatments = treatments;
+ this.appId = appId;
+ this.type = type;
+ this.id = id;
+ this.op = op;
+ this.context = Optional.ofNullable(context);
}
@Override
@@ -80,7 +96,12 @@
@Override
public Operation op() {
- throw new UnsupportedOperationException("Next Objective has no operation");
+ return op;
+ }
+
+ @Override
+ public Optional<ObjectiveContext> context() {
+ return context;
}
/**
@@ -101,8 +122,6 @@
private final ImmutableList.Builder<TrafficTreatment> listBuilder
= ImmutableList.builder();
-
-
@Override
public NextObjective.Builder withId(int nextId) {
this.id = nextId;
@@ -143,7 +162,7 @@
}
@Override
- public Builder fromApp(ApplicationId appId) {
+ public NextObjective.Builder fromApp(ApplicationId appId) {
this.appId = appId;
return this;
}
@@ -160,14 +179,49 @@
}
@Override
- public NextObjective build() {
+ public NextObjective add() {
List<TrafficTreatment> treatments = listBuilder.build();
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
- return new DefaultNextObjective(id, treatments, appId, type);
+ return new DefaultNextObjective(id, treatments, appId, type, Operation.ADD);
+ }
+
+ @Override
+ public NextObjective remove() {
+ List<TrafficTreatment> treatments = listBuilder.build();
+ checkNotNull(appId, "Must supply an application id");
+ checkNotNull(id, "id cannot be null");
+ checkNotNull(type, "The type cannot be null");
+ checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
+
+ return new DefaultNextObjective(id, treatments, appId, type, Operation.REMOVE);
+ }
+
+ @Override
+ public NextObjective add(ObjectiveContext context) {
+ List<TrafficTreatment> treatments = listBuilder.build();
+ checkNotNull(appId, "Must supply an application id");
+ checkNotNull(id, "id cannot be null");
+ checkNotNull(type, "The type cannot be null");
+ checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
+
+ return new DefaultNextObjective(id, treatments, appId,
+ context, type, Operation.ADD);
+ }
+
+ @Override
+ public NextObjective remove(ObjectiveContext context) {
+ List<TrafficTreatment> treatments = listBuilder.build();
+ checkNotNull(appId, "Must supply an application id");
+ checkNotNull(id, "id cannot be null");
+ checkNotNull(type, "The type cannot be null");
+ checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
+
+ return new DefaultNextObjective(id, treatments, appId,
+ context, type, Operation.REMOVE);
}
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java
index d892a97..89b668d 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FilteringObjective.java
@@ -114,6 +114,24 @@
*/
public FilteringObjective remove();
+ /**
+ * Builds the filtering objective that will be added.
+ * The context will be used to notify the calling application.
+ *
+ * @param context an objective context
+ * @return a filtering objective
+ */
+ public FilteringObjective add(ObjectiveContext context);
+
+ /**
+ * Builds the filtering objective that will be removed.
+ * The context will be used to notify the calling application.
+ *
+ * @param context an objective context
+ * @return a filtering objective
+ */
+ public FilteringObjective remove(ObjectiveContext context);
+
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java
index 14af2b8..690fcc7 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveService.java
@@ -17,9 +17,6 @@
import org.onosproject.net.DeviceId;
-import java.util.Collection;
-import java.util.concurrent.Future;
-
/**
* Service for programming data plane flow rules in manner independent of
* specific device table pipeline configuration.
@@ -30,27 +27,24 @@
* Installs the filtering rules onto the specified device.
*
* @param deviceId device identifier
- * @param filteringObjectives the collection of filters
- * @return a future indicating the success of the operation
+ * @param filteringObjective the filtering objective
*/
- Future<Boolean> filter(DeviceId deviceId, Collection<FilteringObjective> filteringObjectives);
+ void filter(DeviceId deviceId, FilteringObjective filteringObjective);
/**
* Installs the forwarding rules onto the specified device.
*
* @param deviceId device identifier
- * @param forwardingObjectives the collection of forwarding objectives
- * @return a future indicating the success of the operation
+ * @param forwardingObjective the forwarding objective
*/
- Future<Boolean> forward(DeviceId deviceId, Collection<ForwardingObjective> forwardingObjectives);
+ void forward(DeviceId deviceId, ForwardingObjective forwardingObjective);
/**
* Installs the next hop elements into the specified device.
*
* @param deviceId device identifier
- * @param nextObjectives the collection of next objectives
- * @return a future indicating the success of the operation
+ * @param nextObjective a next objective
*/
- Future<Boolean> next(DeviceId deviceId, Collection<NextObjective> nextObjectives);
+ void next(DeviceId deviceId, NextObjective nextObjective);
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java
new file mode 100644
index 0000000..e667618
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.store.Store;
+
+/**
+ * The flow objective store.
+ */
+public interface FlowObjectiveStore
+ extends Store<ObjectiveEvent, FlowObjectiveStoreDelegate> {
+
+ /**
+ * Adds a NextGroup to the store.
+ *
+ * @param nextId an integer
+ * @param group a next group opaque object
+ */
+ void putNextGroup(Integer nextId, NextGroup group);
+
+ /**
+ * Fetch a next group from the store.
+ * @param nextId an integer
+ * @return a next group
+ */
+ NextGroup getNextGroup(Integer nextId);
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStoreDelegate.java
new file mode 100644
index 0000000..5af7836
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Flow Objective store delegate abstraction.
+ */
+public interface FlowObjectiveStoreDelegate extends StoreDelegate<ObjectiveEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java
index 4fecc54..dcc377f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ForwardingObjective.java
@@ -121,5 +121,23 @@
* @return a forwarding objective.
*/
public ForwardingObjective remove();
+
+ /**
+ * Builds the forwarding objective that will be added.
+ * The context will be used to notify the calling application.
+ *
+ * @param context an objective context
+ * @return a forwarding objective
+ */
+ public ForwardingObjective add(ObjectiveContext context);
+
+ /**
+ * Builds the forwarding objective that will be removed.
+ * The context will be used to notify the calling application.
+ *
+ * @param context an objective context
+ * @return a forwarding objective
+ */
+ public ForwardingObjective remove(ObjectiveContext context);
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java
index 52e79ee..02c4d9e 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/NextObjective.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.net.flowobjective;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.TrafficTreatment;
import java.util.Collection;
@@ -95,12 +96,40 @@
*/
public Builder addTreatment(TrafficTreatment treatment);
+ @Override
+ public Builder fromApp(ApplicationId appId);
+
/**
- * Builds a next step.
+ * Builds the next objective that will be added.
*
- * @return a next step
+ * @return a next objective
*/
- public NextObjective build();
+ public NextObjective add();
+
+ /**
+ * Builds the next objective that will be removed.
+ *
+ * @return a next objective.
+ */
+ public NextObjective remove();
+
+ /**
+ * Builds the next objective that will be added.
+ * The context will be used to notify the calling application.
+ *
+ * @param context an objective context
+ * @return a next objective
+ */
+ public NextObjective add(ObjectiveContext context);
+
+ /**
+ * Builds the next objective that will be removed.
+ * The context will be used to notify the calling application.
+ *
+ * @param context an objective context
+ * @return a next objective
+ */
+ public NextObjective remove(ObjectiveContext context);
}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java b/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java
index 3971b03..fa98b6d 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/Objective.java
@@ -17,6 +17,8 @@
import org.onosproject.core.ApplicationId;
+import java.util.Optional;
+
/**
* Base representation of an flow description.
*/
@@ -84,6 +86,14 @@
Operation op();
/**
+ * Obtains an optional context.
+ *
+ * @return optional; which will be empty if there is no context.
+ * Otherwise it will return the context.
+ */
+ Optional<ObjectiveContext> context();
+
+ /**
* An objective builder.
*/
public interface Builder {
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java
new file mode 100644
index 0000000..00e4ed8
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveContext.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * The context of a objective that will become the subject of
+ * the notification.
+ *
+ * Implementations of this class must be serializable.
+ */
+public interface ObjectiveContext {
+
+ default void onSuccess(Objective objective) {}
+
+ default void onError(Objective objective, ObjectiveError error) {}
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
new file mode 100644
index 0000000..6e60ab6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+/**
+ * Represents the set of errors possible when processing an objective.
+ */
+public enum ObjectiveError {
+
+ /**
+ * The driver processing this objective does not know how to process it.
+ */
+ UNSUPPORTED,
+
+ /**
+ * The flow installation for this objective failed.
+ */
+ FLOWINSTALLATIONFAILED,
+
+ /**
+ * THe group installation for this objective failed.
+ */
+ GROUPINSTALLATIONFAILED,
+
+ /**
+ * The group was reported as installed but is not missing.
+ */
+ GROUPMISSING,
+
+ /**
+ * An unknown error occurred.
+ */
+ UNKNOWN
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveEvent.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveEvent.java
new file mode 100644
index 0000000..9f095cf
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveEvent.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flowobjective;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Describes a objective event.
+ */
+public class ObjectiveEvent extends AbstractEvent<ObjectiveEvent.Type, Integer> {
+
+ /**
+ * Type of objective events.
+ */
+ public enum Type {
+ /**
+ * Signifies that the objective has been added to the store.
+ */
+ ADD,
+
+ /**
+ * Signifies that the objective has been removed.
+ */
+ REMOVE
+ }
+
+ /**
+ * Creates an event of the given type for the specified objective id.
+ *
+ * @param type the type of the event
+ * @param objective the objective id the event is about
+ */
+ public ObjectiveEvent(Type type, Integer objective) {
+ super(type, objective);
+ }
+
+ /**
+ * Creates an event of the given type for the specified objective id at the given
+ * time.
+ *
+ * @param type the type of the event
+ * @param objective the objective id the event is about
+ * @param time the time of the event
+ */
+ public ObjectiveEvent(Type type, Integer objective, long time) {
+ super(type, objective, time);
+ }
+}
+
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 4e78d04..17b0aa7 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -17,7 +17,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -43,9 +43,12 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,14 +56,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.Set;
import static com.google.common.base.Preconditions.checkState;
/**
* Provides implementation of the flow objective programming service.
*/
-@Component(immediate = false)
+@Component(immediate = true)
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
@@ -89,6 +92,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected GroupService groupService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowObjectiveStore flowObjectiveStore;
+
+ private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
@@ -101,10 +108,16 @@
private final Map<DeviceId, Collection<Objective>> pendingObjectives =
Maps.newConcurrentMap();
+
private NodeId localNode;
+ private Map<Integer, Set<PendingNext>> pendingForwards =
+ Maps.newConcurrentMap();
+
+
@Activate
protected void activate() {
+ flowObjectiveStore.setDelegate(delegate);
localNode = clusterService.getLocalNode().id();
mastershipService.addListener(mastershipListener);
deviceService.addListener(deviceListener);
@@ -114,46 +127,64 @@
@Deactivate
protected void deactivate() {
+ flowObjectiveStore.unsetDelegate(delegate);
mastershipService.removeListener(mastershipListener);
deviceService.removeListener(deviceListener);
log.info("Stopped");
}
@Override
- public Future<Boolean> filter(DeviceId deviceId,
- Collection<FilteringObjective> filteringObjectives) {
+ public void filter(DeviceId deviceId,
+ FilteringObjective filteringObjective) {
if (deviceService.isAvailable(deviceId)) {
- return getDevicePipeliner(deviceId).filter(filteringObjectives);
+ getDevicePipeliner(deviceId).filter(filteringObjective);
} else {
- filteringObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+ updatePendingMap(deviceId, filteringObjective);
}
- return Futures.immediateFuture(true);
- }
-
-
- @Override
- public Future<Boolean> forward(DeviceId deviceId,
- Collection<ForwardingObjective> forwardingObjectives) {
- if (deviceService.isAvailable(deviceId)) {
- return getDevicePipeliner(deviceId).forward(forwardingObjectives);
- } else {
- forwardingObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
- }
- return Futures.immediateFuture(true);
}
@Override
- public Future<Boolean> next(DeviceId deviceId,
- Collection<NextObjective> nextObjectives) {
- if (deviceService.isAvailable(deviceId)) {
- return getDevicePipeliner(deviceId).next(nextObjectives);
- } else {
- nextObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
+ public void forward(DeviceId deviceId,
+ ForwardingObjective forwardingObjective) {
+
+ if (queueObjective(deviceId, forwardingObjective)) {
+ return;
}
- return Futures.immediateFuture(true);
+
+ if (deviceService.isAvailable(deviceId)) {
+ getDevicePipeliner(deviceId).forward(forwardingObjective);
+ } else {
+ updatePendingMap(deviceId, forwardingObjective);
+ }
+
}
+ @Override
+ public void next(DeviceId deviceId,
+ NextObjective nextObjective) {
+ if (deviceService.isAvailable(deviceId)) {
+ getDevicePipeliner(deviceId).next(nextObjective);
+ } else {
+ updatePendingMap(deviceId, nextObjective);
+ }
+ }
+
+ private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+ if (fwd.nextId() != null &&
+ flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
+ log.warn("Queuing forwarding objective.");
+ if (pendingForwards.putIfAbsent(fwd.nextId(),
+ Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
+ Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
+ pending.add(new PendingNext(deviceId, fwd));
+ }
+ return true;
+ }
+ return false;
+ }
+
+
private void updatePendingMap(DeviceId deviceId, Objective pending) {
if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
Collection<Objective> objectives = pendingObjectives.get(deviceId);
@@ -169,6 +200,33 @@
return pipeliner;
}
+ private void setupPipelineHandler(DeviceId deviceId) {
+ if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
+ // Attempt to lookup the handler in the cache
+ DriverHandler handler = driverHandlers.get(deviceId);
+ if (handler == null) {
+ try {
+ // Otherwise create it and if it has pipeline behaviour, cache it
+ handler = driverService.createHandler(deviceId);
+ if (!handler.driver().hasBehaviour(Pipeliner.class)) {
+ log.warn("Pipeline behaviour not supported for device {}",
+ deviceId);
+ return;
+ }
+ } catch (ItemNotFoundException e) {
+ log.warn("No applicable driver for device {}", deviceId);
+ return;
+ }
+ driverHandlers.put(deviceId, handler);
+ }
+
+ // Always (re)initialize the pipeline behaviour
+ Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
+ pipeliner.init(deviceId, context);
+ pipeliners.putIfAbsent(deviceId, pipeliner);
+ log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
+ }
+ }
// Triggers driver setup when the local node becomes a device master.
private class InnerMastershipListener implements MastershipListener {
@@ -221,52 +279,70 @@
pendingObjectives.getOrDefault(deviceId,
Collections.emptySet()).forEach(obj -> {
if (obj instanceof NextObjective) {
- getDevicePipeliner(deviceId)
- .next(Collections.singletonList((NextObjective) obj));
+ next(deviceId, (NextObjective) obj);
} else if (obj instanceof ForwardingObjective) {
- getDevicePipeliner(deviceId)
- .forward(Collections.singletonList((ForwardingObjective) obj));
+ forward(deviceId, (ForwardingObjective) obj);
} else {
getDevicePipeliner(deviceId)
- .filter(Collections.singletonList((FilteringObjective) obj));
+ .filter((FilteringObjective) obj);
}
});
}
}
- private void setupPipelineHandler(DeviceId deviceId) {
- if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
- // Attempt to lookup the handler in the cache
- DriverHandler handler = driverHandlers.get(deviceId);
- if (handler == null) {
- try {
- // Otherwise create it and if it has pipeline behaviour, cache it
- handler = driverService.createHandler(deviceId);
- if (!handler.driver().hasBehaviour(Pipeliner.class)) {
- log.warn("Pipeline behaviour not supported for device {}",
- deviceId);
- return;
- }
- } catch (ItemNotFoundException e) {
- log.warn("No applicable driver for device {}", deviceId);
- return;
- }
- driverHandlers.put(deviceId, handler);
- }
-
- // Always (re)initialize the pipeline behaviour
- Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
- pipeliner.init(deviceId, context);
- pipeliners.putIfAbsent(deviceId, pipeliner);
- log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
- }
- }
-
// Processing context for initializing pipeline driver behaviours.
private class InnerPipelineContext implements PipelinerContext {
@Override
public ServiceDirectory directory() {
return serviceDirectory;
}
+
+ @Override
+ public FlowObjectiveStore store() {
+ return flowObjectiveStore;
+ }
+
+
+ }
+
+ private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+ @Override
+ public void notify(ObjectiveEvent event) {
+ Set<PendingNext> pending = pendingForwards.remove(event.subject());
+
+ if (pending == null) {
+ return;
+ }
+
+ log.info("Processing pending objectives {}", pending.size());
+
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .forward(p.forwardingObjective()));
+
+ }
+ }
+
+ /**
+ * Data class used to hold a pending forwarding objective that could not
+ * be processed because the associated next object was not present.
+ */
+ private class PendingNext {
+ private final DeviceId deviceId;
+ private final ForwardingObjective fwd;
+
+ public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+ this.deviceId = deviceId;
+ this.fwd = fwd;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ForwardingObjective forwardingObjective() {
+ return fwd;
+ }
+
+
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
new file mode 100644
index 0000000..94d72ec
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flowobjective.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.behaviour.DefaultNextGroup;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of created next groups.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedFlowObjectiveStore
+ extends AbstractStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
+ implements FlowObjectiveStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private ConsistentMap<Integer, byte[]> nextGroups;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Activate
+ public void activate() {
+ nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
+ .withName("flowobjective-groups")
+ .withSerializer(Serializer.using(
+ new KryoNamespace.Builder()
+ .register(byte[].class)
+ .build()))
+ .build();
+
+ log.info("Started");
+ }
+
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+
+ @Override
+ public void putNextGroup(Integer nextId, NextGroup group) {
+ nextGroups.putIfAbsent(nextId, group.data());
+ notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.ADD, nextId));
+ }
+
+ @Override
+ public NextGroup getNextGroup(Integer nextId) {
+ Versioned<byte[]> versionGroup = nextGroups.get(nextId);
+ if (versionGroup != null) {
+ return new DefaultNextGroup(versionGroup.value());
+ }
+ return null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 3081194..bc9d609 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -167,10 +167,13 @@
public PacketRequestTracker() {
requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
.withName("packet-requests")
+ .withSerializer(Serializer.using(
+ new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
.withSerializer(new Serializer() {
KryoNamespace kryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.build();
+
@Override
public <T> byte[] encode(T object) {
return kryo.serialize(object);