Merge branch 'optical-integration' of ssh://gerrit.onlab.us:29418/onos-next into optical-integration
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/testapp/LambdaForwarding.java b/apps/optical/src/main/java/org/onlab/onos/optical/testapp/LambdaForwarding.java
new file mode 100644
index 0000000..62148c8
--- /dev/null
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/testapp/LambdaForwarding.java
@@ -0,0 +1,156 @@
+package org.onlab.onos.optical.testapp;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.ApplicationId;
+import org.onlab.onos.CoreService;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceListener;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.DefaultFlowRule;
+import org.onlab.onos.net.flow.DefaultTrafficSelector;
+import org.onlab.onos.net.flow.DefaultTrafficTreatment;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleService;
+import org.onlab.onos.net.flow.TrafficSelector;
+import org.onlab.onos.net.flow.TrafficTreatment;
+import org.slf4j.Logger;
+
+/**
+ * Sample reactive forwarding application.
+ */
+@Component(immediate = true)
+public class LambdaForwarding {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleService flowRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private ApplicationId appId;
+
+ private final InternalDeviceListener listener = new InternalDeviceListener();
+
+ private final Map<DeviceId, Integer> uglyMap = new HashMap<>();
+
+ @Activate
+ public void activate() {
+ appId = coreService.registerApplication("org.onlab.onos.fwd");
+
+ deviceService.addListener(listener);
+
+ uglyMap.put(DeviceId.deviceId("of:0000ffffffffff01"), 1);
+ uglyMap.put(DeviceId.deviceId("of:0000ffffffffff02"), 2);
+ uglyMap.put(DeviceId.deviceId("of:0000ffffffffff03"), 3);
+
+ log.info("Started with Application ID {}", appId.id());
+ }
+
+ @Deactivate
+ public void deactivate() {
+ flowRuleService.removeFlowRulesById(appId);
+
+ log.info("Stopped");
+ }
+
+
+ private void pushRules(Device device) {
+
+ TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+ TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
+ int inport;
+ int outport;
+ short lambda = 10;
+
+ int switchNumber = uglyMap.get(device.id());
+ switch (switchNumber) {
+ case 1:
+ inport = 10;
+ outport = 20;
+ sbuilder.matchInport(PortNumber.portNumber(inport));
+ tbuilder.setOutput(PortNumber.portNumber(outport)).setLambda(lambda);
+ break;
+ case 2:
+ inport = 21;
+ outport = 11;
+ sbuilder.matchLambda(lambda).matchInport(PortNumber.portNumber(inport)); // match sigtype
+ tbuilder.setOutput(PortNumber.portNumber(outport));
+ break;
+ case 3:
+ inport = 30;
+ outport = 31;
+ sbuilder.matchLambda(lambda).matchInport(PortNumber.portNumber(inport));
+ tbuilder.setOutput(PortNumber.portNumber(outport)).setLambda(lambda);
+ break;
+ default:
+ }
+ sbuilder.matchLambda((short) 25).matchInport(PortNumber.portNumber(5));
+
+ tbuilder.setOutput(PortNumber.portNumber(5));
+
+ TrafficTreatment treatement = tbuilder.build();
+ TrafficSelector selector = sbuilder.build();
+
+ FlowRule f = new DefaultFlowRule(device.id(), selector,
+ treatement, 100, appId, 600, false);
+
+ flowRuleService.applyFlowRules(f);
+
+
+
+ }
+
+ public class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ pushRules(event.subject());
+ break;
+ case DEVICE_AVAILABILITY_CHANGED:
+ break;
+ case DEVICE_MASTERSHIP_CHANGED:
+ break;
+ case DEVICE_REMOVED:
+ break;
+ case DEVICE_SUSPENDED:
+ break;
+ case DEVICE_UPDATED:
+ break;
+ case PORT_ADDED:
+ break;
+ case PORT_REMOVED:
+ break;
+ case PORT_UPDATED:
+ break;
+ default:
+ break;
+
+ }
+
+ }
+
+ }
+
+
+}
+
+
diff --git a/apps/sdnip/src/main/resources/config-examples/sdnip.json b/apps/sdnip/src/main/resources/config-examples/sdnip.json
index b9a2d56..13f4db8 100644
--- a/apps/sdnip/src/main/resources/config-examples/sdnip.json
+++ b/apps/sdnip/src/main/resources/config-examples/sdnip.json
@@ -14,6 +14,16 @@
"attachmentDpid" : "00:00:00:00:00:00:00:a2",
"attachmentPort" : "1",
"ipAddress" : "192.168.30.1"
+ },
+ {
+ "attachmentDpid" : "00:00:00:00:00:00:00:a6",
+ "attachmentPort" : "1",
+ "ipAddress" : "192.168.40.1"
+ },
+ {
+ "attachmentDpid" : "00:00:00:00:00:00:00:a4",
+ "attachmentPort" : "4",
+ "ipAddress" : "192.168.60.1"
}
],
"bgpSpeakers" : [
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java b/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
index 6352b53..d2db96b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/BatchOperationResult.java
@@ -18,7 +18,7 @@
*/
package org.onlab.onos.net.flow;
-import java.util.List;
+import java.util.Set;
/**
* Interface capturing the result of a batch operation.
@@ -33,9 +33,9 @@
boolean isSuccess();
/**
- * Obtains a list of items which failed.
- * @return a list of failures
+ * Obtains a set of items which failed.
+ * @return a set of failures
*/
- List<T> failedItems();
+ Set<T> failedItems();
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
index 841e948..363831c 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/CompletedBatchOperation.java
@@ -18,19 +18,19 @@
*/
package org.onlab.onos.net.flow;
-import java.util.List;
+import java.util.Set;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
private final boolean success;
- private final List<FlowEntry> failures;
+ private final Set<FlowEntry> failures;
- public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
+ public CompletedBatchOperation(boolean success, Set<FlowEntry> failures) {
this.success = success;
- this.failures = ImmutableList.copyOf(failures);
+ this.failures = ImmutableSet.copyOf(failures);
}
@Override
@@ -39,7 +39,7 @@
}
@Override
- public List<FlowEntry> failedItems() {
+ public Set<FlowEntry> failedItems() {
return failures;
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
index abb29a6..63e7e24 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
@@ -176,6 +176,11 @@
}
@Override
+ public Builder matchLambda(short lambda) {
+ return add(Criteria.matchLambda(lambda));
+ }
+
+ @Override
public TrafficSelector build() {
return new DefaultTrafficSelector(ImmutableSet.copyOf(selector.values()));
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java
index b4d8c3e..0300079 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java
@@ -137,6 +137,7 @@
case OUTPUT:
outputs.add(instruction);
break;
+ case L0MODIFICATION:
case L2MODIFICATION:
case L3MODIFICATION:
// TODO: enforce modification order if any
@@ -193,6 +194,11 @@
}
@Override
+ public Builder setLambda(short lambda) {
+ return add(Instructions.modL0Lambda(lambda));
+ }
+
+ @Override
public TrafficTreatment build() {
//If we are dropping should we just return an emptry list?
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
new file mode 100644
index 0000000..4ba3366
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
@@ -0,0 +1,67 @@
+package org.onlab.onos.net.flow;
+
+import org.onlab.onos.event.AbstractEvent;
+
+/**
+ * Describes flow rule batch event.
+ */
+public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
+
+ /**
+ * Type of flow rule events.
+ */
+ public enum Type {
+
+ /**
+ * Signifies that a batch operation has been initiated.
+ */
+ BATCH_OPERATION_REQUESTED,
+
+ /**
+ * Signifies that a batch operation has completed.
+ */
+ BATCH_OPERATION_COMPLETED,
+ }
+
+ private final CompletedBatchOperation result;
+
+ /**
+ * Constructs a new FlowRuleBatchEvent.
+ * @param request batch operation request.
+ * @return event.
+ */
+ public static FlowRuleBatchEvent create(FlowRuleBatchRequest request) {
+ FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
+ return event;
+ }
+
+ /**
+ * Constructs a new FlowRuleBatchEvent.
+ * @param request batch operation request.
+ * @param result completed batch operation result.
+ * @return event.
+ */
+ public static FlowRuleBatchEvent create(FlowRuleBatchRequest request, CompletedBatchOperation result) {
+ FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_COMPLETED, request, result);
+ return event;
+ }
+
+ /**
+ * Returns the result of this batch operation.
+ * @return batch operation result.
+ */
+ public CompletedBatchOperation result() {
+ return result;
+ }
+
+ /**
+ * Creates an event of a given type and for the specified flow rule batch.
+ *
+ * @param type flow rule batch event type
+ * @param batch event flow rule batch subject
+ */
+ private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
+ super(type, request);
+ this.result = result;
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
new file mode 100644
index 0000000..0414fcb
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.net.flow;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+
+import com.google.common.collect.Lists;
+
+public class FlowRuleBatchRequest {
+
+ private final List<FlowEntry> toAdd;
+ private final List<FlowEntry> toRemove;
+
+ public FlowRuleBatchRequest(List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+ this.toAdd = Collections.unmodifiableList(toAdd);
+ this.toRemove = Collections.unmodifiableList(toRemove);
+ }
+
+ public List<FlowEntry> toAdd() {
+ return toAdd;
+ }
+
+ public List<FlowEntry> toRemove() {
+ return toRemove;
+ }
+
+ public FlowRuleBatchOperation asBatchOperation() {
+ List<FlowRuleBatchEntry> entries = Lists.newArrayList();
+ for (FlowEntry e : toAdd) {
+ entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
+ }
+ for (FlowEntry e : toRemove) {
+ entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
+ }
+ return new FlowRuleBatchOperation(entries);
+ }
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index 0b2c3d8..ae74ac5 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -18,11 +18,11 @@
*/
package org.onlab.onos.net.flow;
-import java.util.concurrent.Future;
-
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.provider.Provider;
+import com.google.common.util.concurrent.ListenableFuture;
+
/**
* Abstraction of a flow rule provider.
*/
@@ -60,6 +60,6 @@
* @param batch a batch of flow rules
* @return a future indicating the status of this execution
*/
- Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+ ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
index 63b7f77..11bd4ad 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
@@ -18,6 +18,8 @@
*/
package org.onlab.onos.net.flow;
+import java.util.concurrent.Future;
+
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Store;
@@ -25,7 +27,7 @@
/**
* Manages inventory of flow rules; not intended for direct use.
*/
-public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegate> {
+public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDelegate> {
/**
* Returns the number of flow rule in the store.
@@ -59,12 +61,26 @@
Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId);
/**
+ // TODO: Better description of method behavior.
* Stores a new flow rule without generating events.
*
* @param rule the flow rule to add
- * @return true if the rule should be handled locally
*/
- boolean storeFlowRule(FlowRule rule);
+ void storeFlowRule(FlowRule rule);
+
+ /**
+ * Stores a batch of flow rules.
+ * @param batchOperation batch of flow rules.
+ * @return Future response indicating success/failure of the batch operation
+ * all the way down to the device.
+ */
+ Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
+
+ /**
+ * Invoked on the completion of a storeBatch operation.
+ * @param result
+ */
+ void batchOperationComplete(FlowRuleBatchEvent event);
/**
* Marks a flow rule for deletion. Actual deletion will occur
@@ -73,7 +89,7 @@
* @param rule the flow rule to delete
* @return true if the rule should be handled locally
*/
- boolean deleteFlowRule(FlowRule rule);
+ void deleteFlowRule(FlowRule rule);
/**
* Stores a new flow rule, or updates an existing entry.
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java
index 4e5ebf6..fbd6b55 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStoreDelegate.java
@@ -23,5 +23,5 @@
/**
* Flow rule store delegate abstraction.
*/
-public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleEvent> {
+public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleBatchEvent> {
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
index b4d566c..49815db 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
@@ -130,6 +130,13 @@
public Builder matchTcpDst(Short tcpPort);
/**
+ * Matches an optical signal ID or lambda.
+ * @param lambda
+ * @return a selection builder
+ */
+ public Builder matchLambda(short lambda);
+
+ /**
* Builds an immutable traffic selector.
*
* @return traffic selector
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficTreatment.java b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficTreatment.java
index a576138..9b135ba 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficTreatment.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficTreatment.java
@@ -105,6 +105,13 @@
public Builder setIpDst(IpPrefix addr);
/**
+ * Sets the optical channel ID or lambda.
+ * @param lambda optical channel ID
+ * @return a treatment builder
+ */
+ public Builder setLambda(short lambda);
+
+ /**
* Builds an immutable traffic treatment descriptor.
*
* @return traffic treatment
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
index fb5fb97..2e177f7 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
@@ -151,10 +151,19 @@
return new TcpPortCriterion(tcpPort, Type.TCP_DST);
}
- /*
+ /**
+ * Creates a match on lambda field using the specified value.
+ *
+ * @param lambda
+ * @return match criterion
+ */
+ public static Criterion matchLambda(Short lambda) {
+ return new LambdaCriterion(lambda, Type.OCH_SIGID);
+ }
+
+ /**
* Implementations of criteria.
*/
-
public static final class PortCriterion implements Criterion {
private final PortNumber port;
@@ -523,4 +532,49 @@
return false;
}
}
+
+ public static final class LambdaCriterion implements Criterion {
+
+ private final short lambda;
+ private final Type type;
+
+ public LambdaCriterion(short lambda, Type type) {
+ this.lambda = lambda;
+ this.type = type;
+ }
+
+ @Override
+ public Type type() {
+ return this.type;
+ }
+
+ public Short lambda() {
+ return this.lambda;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(type().toString())
+ .add("lambda", lambda).toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(lambda, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof LambdaCriterion) {
+ LambdaCriterion that = (LambdaCriterion) obj;
+ return Objects.equals(lambda, that.lambda) &&
+ Objects.equals(type, that.type);
+ }
+ return false;
+ }
+ }
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criterion.java b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criterion.java
index 5337852..6110892 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criterion.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criterion.java
@@ -108,7 +108,11 @@
/** Logical Port Metadata. */
TUNNEL_ID,
/** IPv6 Extension Header pseudo-field. */
- IPV6_EXTHDR
+ IPV6_EXTHDR,
+ /** Optical channel signal ID (lambda). */
+ OCH_SIGID,
+ /** Optical channel signal type (fixed or flexible). */
+ OCH_SIGTYPE
}
/**
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instruction.java b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instruction.java
index 084ffe4..9b578b6 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instruction.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instruction.java
@@ -43,6 +43,11 @@
GROUP,
/**
+ * Signifies that the traffic should be modified in L0 way.
+ */
+ L0MODIFICATION,
+
+ /**
* Signifies that the traffic should be modified in L2 way.
*/
L2MODIFICATION,
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java
index 988c52f..b18d7ef 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java
@@ -24,6 +24,8 @@
import java.util.Objects;
import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.flow.instructions.L0ModificationInstruction.L0SubType;
+import org.onlab.onos.net.flow.instructions.L0ModificationInstruction.ModLambdaInstruction;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.L2SubType;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModEtherInstruction;
import org.onlab.onos.net.flow.instructions.L3ModificationInstruction.L3SubType;
@@ -62,6 +64,16 @@
}
/**
+ * Creates a l0 modification.
+ * @param lambda the lambda to modify to.
+ * @return a l0 modification
+ */
+ public static L0ModificationInstruction modL0Lambda(short lambda) {
+ checkNotNull(lambda, "L0 lambda cannot be null");
+ return new ModLambdaInstruction(L0SubType.LAMBDA, lambda);
+ }
+
+ /**
* Creates a l2 src modification.
* @param addr the mac address to modify to.
* @return a l2 modification
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L0ModificationInstruction.java b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L0ModificationInstruction.java
new file mode 100644
index 0000000..23e5f2a
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L0ModificationInstruction.java
@@ -0,0 +1,75 @@
+package org.onlab.onos.net.flow.instructions;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+import java.util.Objects;
+
+public abstract class L0ModificationInstruction implements Instruction {
+
+ /**
+ * Represents the type of traffic treatment.
+ */
+ public enum L0SubType {
+ /**
+ * Lambda modification.
+ */
+ LAMBDA
+
+ //TODO: remaining types
+ }
+
+ public abstract L0SubType subtype();
+
+ @Override
+ public Type type() {
+ return Type.L0MODIFICATION;
+ }
+
+ /**
+ * Represents a L0 lambda modification instruction.
+ */
+ public static final class ModLambdaInstruction extends L0ModificationInstruction {
+
+ private final L0SubType subtype;
+ private final short lambda;
+
+ public ModLambdaInstruction(L0SubType subType, short lambda) {
+ this.subtype = subType;
+ this.lambda = lambda;
+ }
+
+ @Override
+ public L0SubType subtype() {
+ return this.subtype;
+ }
+
+ public short lambda() {
+ return this.lambda;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(subtype().toString())
+ .add("lambda", lambda).toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(lambda, type(), subtype);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof ModLambdaInstruction) {
+ ModLambdaInstruction that = (ModLambdaInstruction) obj;
+ return Objects.equals(lambda, that.lambda) &&
+ Objects.equals(this.type(), that.type()) &&
+ Objects.equals(subtype, that.subtype);
+ }
+ return false;
+ }
+ }
+}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index d8f89ae..3ef9fc8 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,8 +5,10 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -30,7 +32,9 @@
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
@@ -47,6 +51,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* Provides implementation of the flow NB & SB APIs.
@@ -104,14 +111,7 @@
public void applyFlowRules(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
FlowRule f = flowRules[i];
- boolean local = store.storeFlowRule(f);
- if (local) {
- // TODO: aggregate all local rules and push down once?
- applyFlowRulesToProviders(f);
- eventDispatcher.post(
- new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f));
-
- }
+ store.storeFlowRule(f);
}
}
@@ -135,13 +135,7 @@
FlowRule f;
for (int i = 0; i < flowRules.length; i++) {
f = flowRules[i];
- boolean local = store.deleteFlowRule(f);
- if (local) {
- // TODO: aggregate all local rules and push down once?
- removeFlowRulesFromProviders(f);
- eventDispatcher.post(
- new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f));
- }
+ store.deleteFlowRule(f);
}
}
@@ -185,33 +179,21 @@
@Override
public Future<CompletedBatchOperation> applyBatch(
FlowRuleBatchOperation batch) {
- Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
+ Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
ArrayListMultimap.create();
List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
- final Device device = deviceService.getDevice(f.deviceId());
- final FlowRuleProvider frp = getProvider(device.providerId());
- batches.put(frp, fbe);
- switch (fbe.getOperator()) {
- case ADD:
- store.storeFlowRule(f);
- break;
- case REMOVE:
- store.deleteFlowRule(f);
- break;
- case MODIFY:
- default:
- log.error("Batch operation type {} unsupported.", fbe.getOperator());
- }
+ perDeviceBatches.put(f.deviceId(), fbe);
}
- for (FlowRuleProvider provider : batches.keySet()) {
+
+ for (DeviceId deviceId : perDeviceBatches.keySet()) {
FlowRuleBatchOperation b =
- new FlowRuleBatchOperation(batches.get(provider));
- Future<CompletedBatchOperation> future = provider.executeBatch(b);
+ new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
+ Future<CompletedBatchOperation> future = store.storeBatch(b);
futures.add(future);
}
- return new FlowRuleBatchFuture(futures, batches);
+ return new FlowRuleBatchFuture(futures, perDeviceBatches);
}
@Override
@@ -324,6 +306,7 @@
post(event);
}
} else {
+ log.info("Removing flow rules....");
removeFlowRules(flowEntry);
}
@@ -391,21 +374,47 @@
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
+ // TODO: Right now we only dispatch events at individual flowEntry level.
+ // It may be more efficient for also dispatch events as a batch.
@Override
- public void notify(FlowRuleEvent event) {
+ public void notify(FlowRuleBatchEvent event) {
+ final FlowRuleBatchRequest request = event.subject();
switch (event.type()) {
- case RULE_ADD_REQUESTED:
- applyFlowRulesToProviders(event.subject());
- break;
- case RULE_REMOVE_REQUESTED:
- removeFlowRulesFromProviders(event.subject());
- break;
+ case BATCH_OPERATION_REQUESTED:
+ for (FlowEntry entry : request.toAdd()) {
+ eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
+ }
+ for (FlowEntry entry : request.toRemove()) {
+ eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
+ }
+ // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
- case RULE_ADDED:
- case RULE_REMOVED:
- case RULE_UPDATED:
- // only dispatch events related to switch
- eventDispatcher.post(event);
+ FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+
+ FlowRuleProvider flowRuleProvider =
+ getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
+ final ListenableFuture<CompletedBatchOperation> result =
+ flowRuleProvider.executeBatch(batchOperation);
+ result.addListener(new Runnable() {
+ @Override
+ public void run() {
+ store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+ }
+ }, Executors.newCachedThreadPool());
+
+ break;
+ case BATCH_OPERATION_COMPLETED:
+ Set<FlowEntry> failedItems = event.result().failedItems();
+ for (FlowEntry entry : request.toAdd()) {
+ if (!failedItems.contains(entry)) {
+ eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
+ }
+ }
+ for (FlowEntry entry : request.toRemove()) {
+ if (!failedItems.contains(entry)) {
+ eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
+ }
+ }
break;
default:
break;
@@ -413,18 +422,15 @@
}
}
- private class FlowRuleBatchFuture
- implements Future<CompletedBatchOperation> {
+ private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
private final List<Future<CompletedBatchOperation>> futures;
- private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
+ private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
private final AtomicReference<BatchState> state;
private CompletedBatchOperation overall;
-
-
public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
- Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
+ Multimap<DeviceId, FlowRuleBatchEntry> batches) {
this.futures = futures;
this.batches = batches;
state = new AtomicReference<FlowRuleManager.BatchState>();
@@ -466,7 +472,7 @@
}
boolean success = true;
- List<FlowEntry> failed = Lists.newLinkedList();
+ Set<FlowEntry> failed = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
@@ -486,7 +492,7 @@
return overall;
}
boolean success = true;
- List<FlowEntry> failed = Lists.newLinkedList();
+ Set<FlowEntry> failed = Sets.newHashSet();
CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
@@ -500,7 +506,7 @@
return finalizeBatchOperation(success, failed);
}
- private boolean validateBatchOperation(List<FlowEntry> failed,
+ private boolean validateBatchOperation(Set<FlowEntry> failed,
CompletedBatchOperation completed) {
if (isCancelled()) {
@@ -522,7 +528,7 @@
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
- List<FlowEntry> failed) {
+ Set<FlowEntry> failed) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
@@ -545,11 +551,6 @@
store.storeFlowRule(fbe.getTarget());
}
}
-
}
}
-
-
-
-
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
index e59eb9f..ea1d09c 100644
--- a/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/link/impl/LinkManager.java
@@ -197,14 +197,7 @@
checkNotNull(linkDescription, LINK_DESC_NULL);
checkValidity();
- ConnectPoint src = linkDescription.src();
- ConnectPoint dst = linkDescription.dst();
- // if we aren't master for the device associated with the ConnectPoint
- // we probably shouldn't be doing this.
-// if (deviceService.getRole(dst.deviceId()) != MastershipRole.MASTER) {
-// return;
-// }
LinkEvent event = store.createOrUpdateLink(provider().id(),
linkDescription);
if (event != null) {
@@ -232,11 +225,7 @@
public void linksVanished(ConnectPoint connectPoint) {
checkNotNull(connectPoint, "Connect point cannot be null");
checkValidity();
- // if we aren't master for the device associated with the ConnectPoint
- // we probably shouldn't be doing this.
- if (deviceService.getRole(connectPoint.deviceId()) != MastershipRole.MASTER) {
- return;
- }
+
log.info("Links for connection point {} vanished", connectPoint);
// FIXME: This will remove links registered by other providers
removeLinks(getLinks(connectPoint));
@@ -246,11 +235,7 @@
public void linksVanished(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
- // if we aren't master for the device associated with the ConnectPoint
- // we probably shouldn't be doing this.
- if (deviceService.getRole(deviceId) != MastershipRole.MASTER) {
- return;
- }
+
log.info("Links for device {} vanished", deviceId);
removeLinks(getDeviceLinks(deviceId));
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
index 90db729..edd7db9 100644
--- a/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/statistic/impl/StatisticManager.java
@@ -20,7 +20,6 @@
import org.onlab.onos.net.statistic.StatisticService;
import org.onlab.onos.net.statistic.StatisticStore;
import org.slf4j.Logger;
-
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,17 +67,52 @@
@Override
public Link max(Path path) {
- return null;
+ if (path.links().isEmpty()) {
+ return null;
+ }
+ Load maxLoad = new DefaultLoad();
+ Link maxLink = null;
+ for (Link link : path.links()) {
+ Load load = loadInternal(link.src());
+ if (load.rate() > maxLoad.rate()) {
+ maxLoad = load;
+ maxLink = link;
+ }
+ }
+ return maxLink;
}
@Override
public Link min(Path path) {
- return null;
+ if (path.links().isEmpty()) {
+ return null;
+ }
+ Load minLoad = new DefaultLoad();
+ Link minLink = null;
+ for (Link link : path.links()) {
+ Load load = loadInternal(link.src());
+ if (load.rate() < minLoad.rate()) {
+ minLoad = load;
+ minLink = link;
+ }
+ }
+ return minLink;
}
@Override
public FlowRule highestHitter(ConnectPoint connectPoint) {
- return null;
+ Set<FlowEntry> hitters = statisticStore.getCurrentStatistic(connectPoint);
+ if (hitters.isEmpty()) {
+ return null;
+ }
+
+ FlowEntry max = hitters.iterator().next();
+ for (FlowEntry entry : hitters) {
+ if (entry.bytes() > max.bytes()) {
+ max = entry;
+ }
+ }
+ return max;
}
private Load loadInternal(ConnectPoint connectPoint) {
@@ -123,16 +157,12 @@
case RULE_UPDATED:
if (rule instanceof FlowEntry) {
statisticStore.addOrUpdateStatistic((FlowEntry) rule);
- } else {
- log.warn("IT AIN'T A FLOWENTRY");
}
break;
case RULE_ADD_REQUESTED:
- log.info("Preparing for stats");
statisticStore.prepareForStatistics(rule);
break;
case RULE_REMOVE_REQUESTED:
- log.info("Removing stats");
statisticStore.removeFromStatistics(rule);
break;
case RULE_REMOVED:
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index a2fbc9a..1677af6 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -1,10 +1,17 @@
package org.onlab.onos.net.flow.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
+import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -12,6 +19,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -59,16 +67,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
-import static java.util.Collections.EMPTY_LIST;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* Test codifying the flow rule service & flow rule provider service contracts.
@@ -182,7 +181,6 @@
// TODO: If preserving iteration order is a requirement, redo FlowRuleStore.
//backing store is sensitive to the order of additions/removals
- @SuppressWarnings("unchecked")
private boolean validateState(Map<FlowRule, FlowEntryState> expected) {
Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected);
Iterable<FlowEntry> rules = service.getFlowEntries(DID);
@@ -526,13 +524,13 @@
}
@Override
- public Future<CompletedBatchOperation> executeBatch(
+ public ListenableFuture<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
return new TestInstallationFuture();
}
private class TestInstallationFuture
- implements Future<CompletedBatchOperation> {
+ implements ListenableFuture<CompletedBatchOperation> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
@@ -550,10 +548,9 @@
}
@Override
- @SuppressWarnings("unchecked")
public CompletedBatchOperation get()
throws InterruptedException, ExecutionException {
- return new CompletedBatchOperation(true, EMPTY_LIST);
+ return new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet());
}
@Override
@@ -562,6 +559,11 @@
ExecutionException, TimeoutException {
return null;
}
+
+ @Override
+ public void addListener(Runnable task, Executor executor) {
+ // TODO: add stuff.
+ }
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index bde57c6..85f928a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -5,10 +5,14 @@
import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.List;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -19,11 +23,17 @@
import org.onlab.onos.ApplicationId;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
@@ -43,6 +53,7 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.Futures;
/**
* Manages inventory of flow rules using a distributed state management protocol.
@@ -50,7 +61,7 @@
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
- extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+ extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
@@ -92,7 +103,7 @@
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received add request for {}", rule);
- storeFlowEntryInternal(rule);
+ storeFlowRule(rule);
// FIXME what to respond.
try {
message.respond(SERIALIZER.encode("ACK"));
@@ -108,7 +119,7 @@
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received delete request for {}", rule);
- deleteFlowRuleInternal(rule);
+ deleteFlowRule(rule);
// FIXME what to respond.
try {
message.respond(SERIALIZER.encode("ACK"));
@@ -118,6 +129,22 @@
}
});
+
+ clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ FlowRule rule = SERIALIZER.decode(message.payload());
+ log.info("received get flow entry request for {}", rule);
+ FlowEntry flowEntry = getFlowEntryInternal(rule);
+ try {
+ message.respond(SERIALIZER.encode(flowEntry));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ });
+
log.info("Started");
}
@@ -127,6 +154,9 @@
}
+ // TODO: This is not a efficient operation on a distributed sharded
+ // flow store. We need to revisit the need for this operation or at least
+ // make it device specific.
@Override
public int getFlowRuleCount() {
return flowEntries.size();
@@ -134,7 +164,26 @@
@Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
- return getFlowEntryInternal(rule);
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ return getFlowEntryInternal(rule);
+ }
+
+ log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
+ replicaInfo.master().orNull(), rule.deviceId());
+
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+ SERIALIZER.encode(rule));
+
+ try {
+ ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+ return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ } catch (IOException | TimeoutException e) {
+ // FIXME: throw a FlowStoreException
+ throw new RuntimeException(e);
+ }
}
private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
@@ -165,19 +214,30 @@
}
@Override
- public boolean storeFlowRule(FlowRule rule) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return storeFlowEntryInternal(rule);
+ public void storeFlowRule(FlowRule rule) {
+ storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
+ }
+
+ public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
+ if (operation.getOperations().isEmpty()) {
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
- log.info("Forwarding storeFlowRule to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), rule.deviceId());
+ DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+
+ if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
+ return storeBatchInternal(operation);
+ }
+
+ log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
+ replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
FlowStoreMessageSubjects.STORE_FLOW_RULE,
- SERIALIZER.encode(rule));
+ SERIALIZER.encode(operation));
try {
ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
@@ -186,58 +246,44 @@
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
- return false;
+
+ return null;
}
- private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
- StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
- DeviceId deviceId = flowRule.deviceId();
- // write to local copy.
- if (!flowEntries.containsEntry(deviceId, flowEntry)) {
- flowEntries.put(deviceId, flowEntry);
- flowEntriesById.put(flowRule.appId(), flowEntry);
- notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
- return true;
+ private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+ List<FlowEntry> toRemove = new ArrayList<>();
+ List<FlowEntry> toAdd = new ArrayList<>();
+ // TODO: backup changes to hazelcast map
+ for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
+ FlowRule flowRule = batchEntry.getTarget();
+ FlowRuleOperation op = batchEntry.getOperator();
+ if (op.equals(FlowRuleOperation.REMOVE)) {
+ StoredFlowEntry entry = getFlowEntryInternal(flowRule);
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ }
+ toRemove.add(entry);
+ } else if (op.equals(FlowRuleOperation.ADD)) {
+ StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+ DeviceId deviceId = flowRule.deviceId();
+ if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+ flowEntries.put(deviceId, flowEntry);
+ flowEntriesById.put(flowRule.appId(), flowEntry);
+ toAdd.add(flowEntry);
+ }
+ }
}
- // write to backup.
- // TODO: write to a hazelcast map.
- return false;
+ if (toAdd.isEmpty() && toRemove.isEmpty()) {
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ }
+ notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
+ // TODO: imlpement this.
+ return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
}
@Override
- public synchronized boolean deleteFlowRule(FlowRule rule) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return deleteFlowRuleInternal(rule);
- }
-
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.DELETE_FLOW_RULE,
- SERIALIZER.encode(rule));
-
- try {
- ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
- } catch (IOException | TimeoutException e) {
- // FIXME: throw a FlowStoreException
- throw new RuntimeException(e);
- }
- return false;
- }
-
- private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
- StoredFlowEntry entry = getFlowEntryInternal(flowRule);
- if (entry == null) {
- return false;
- }
- entry.setState(FlowEntryState.PENDING_REMOVE);
-
- // TODO: also update backup.
-
- notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
-
- return true;
+ public void deleteFlowRule(FlowRule rule) {
+ storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
}
@Override
@@ -315,4 +361,9 @@
}
// TODO: also update backup.
}
+
+ @Override
+ public void batchOperationComplete(FlowRuleBatchEvent event) {
+ notifyDelegate(event);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index a43dad6..ca833b8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -12,4 +12,5 @@
public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
new MessageSubject("peer-forward-add-or-update-flow-rule");
public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
+ public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index e3d8fe0..ee3fb45 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -399,7 +399,7 @@
}
// Auxiliary extension to allow location to mutate.
- private class StoredHost extends DefaultHost {
+ private static final class StoredHost extends DefaultHost {
private Timestamped<HostLocation> location;
/**
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
index c156143..7447161 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValue.java
@@ -1,7 +1,7 @@
package org.onlab.onos.store.mastership.impl;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.EnumMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -17,9 +17,9 @@
* A structure that holds node mastership roles associated with a
* {@link DeviceId}. This structure needs to be locked through IMap.
*/
-public class RoleValue {
+final class RoleValue {
- protected Map<MastershipRole, List<NodeId>> value = new HashMap<>();
+ protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
public RoleValue() {
value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
@@ -27,7 +27,8 @@
value.put(MastershipRole.NONE, new LinkedList<NodeId>());
}
- public Map<MastershipRole, List<NodeId>> value() {
+ // exposing internals for serialization purpose only
+ Map<MastershipRole, List<NodeId>> value() {
return Collections.unmodifiableMap(value);
}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
index 22d1b35..4450e5b 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/RoleValueSerializer.java
@@ -35,10 +35,10 @@
@Override
public void write(Kryo kryo, Output output, RoleValue type) {
- output.writeInt(type.value().size());
+ final Map<MastershipRole, List<NodeId>> map = type.value();
+ output.writeInt(map.size());
- for (Map.Entry<MastershipRole, List<NodeId>> el :
- type.value().entrySet()) {
+ for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
output.writeInt(el.getKey().ordinal());
List<NodeId> nodes = el.getValue();
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
index 93a7b0d..6dd4bfb 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/SMap.java
@@ -492,7 +492,10 @@
}
private V deserializeVal(byte[] val) {
- return serializer.decode(val);
+ if (val == null) {
+ return null;
+ }
+ return serializer.decode(val.clone());
}
private Set<byte[]> serializeKeySet(Set<K> keys) {
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 9b75cea..7fddb01 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -33,6 +33,7 @@
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowId;
+import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.net.flow.criteria.Criteria;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instructions;
@@ -97,6 +98,8 @@
HostId.class,
HostDescription.class,
DefaultHostDescription.class,
+ DefaultFlowEntry.class,
+ StoredFlowEntry.class,
DefaultFlowRule.class,
DefaultFlowEntry.class,
FlowEntry.FlowEntryState.class,
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index d312af5..bbfc263 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -3,6 +3,8 @@
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -10,6 +12,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -17,11 +20,17 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowId;
import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchEvent;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
+import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
@@ -33,6 +42,7 @@
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
+import com.google.common.util.concurrent.Futures;
/**
* Manages inventory of flow rules using trivial in-memory implementation.
@@ -40,7 +50,7 @@
@Component(immediate = true)
@Service
public class SimpleFlowRuleStore
- extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+ extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
@@ -148,12 +158,11 @@
}
@Override
- public boolean storeFlowRule(FlowRule rule) {
- final boolean added = storeFlowRuleInternal(rule);
- return added;
+ public void storeFlowRule(FlowRule rule) {
+ storeFlowRuleInternal(rule);
}
- private boolean storeFlowRuleInternal(FlowRule rule) {
+ private void storeFlowRuleInternal(FlowRule rule) {
StoredFlowEntry f = new DefaultFlowEntry(rule);
final DeviceId did = f.deviceId();
final FlowId fid = f.id();
@@ -162,19 +171,20 @@
for (StoredFlowEntry fe : existing) {
if (fe.equals(rule)) {
// was already there? ignore
- return false;
+ return;
}
}
// new flow rule added
existing.add(f);
- // TODO: Should we notify only if it's "remote" event?
- //notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
- return true;
+ notifyDelegate(FlowRuleBatchEvent.create(
+ new FlowRuleBatchRequest(
+ Arrays.<FlowEntry>asList(f),
+ Collections.<FlowEntry>emptyList())));
}
}
@Override
- public boolean deleteFlowRule(FlowRule rule) {
+ public void deleteFlowRule(FlowRule rule) {
List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
@@ -184,14 +194,17 @@
synchronized (entry) {
entry.setState(FlowEntryState.PENDING_REMOVE);
// TODO: Should we notify only if it's "remote" event?
- //notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
- return true;
+ notifyDelegate(FlowRuleBatchEvent.create(
+ new FlowRuleBatchRequest(
+ Collections.<FlowEntry>emptyList(),
+ Arrays.<FlowEntry>asList(entry))));
}
}
}
}
+
+
//log.warn("Cannot find rule {}", rule);
- return false;
}
@Override
@@ -237,4 +250,24 @@
}
return null;
}
+
+ @Override
+ public Future<CompletedBatchOperation> storeBatch(
+ FlowRuleBatchOperation batchOperation) {
+ for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
+ if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
+ storeFlowRule(entry.getTarget());
+ } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
+ deleteFlowRule(entry.getTarget());
+ } else {
+ throw new UnsupportedOperationException("Unsupported operation type");
+ }
+ }
+ return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
+ }
+
+ @Override
+ public void batchOperationComplete(FlowRuleBatchEvent event) {
+ notifyDelegate(event);
+ }
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
index ef80b72..ee8570d 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
@@ -269,7 +269,7 @@
}
// Auxiliary extension to allow location to mutate.
- private class StoredHost extends DefaultHost {
+ private static final class StoredHost extends DefaultHost {
private HostLocation location;
/**
diff --git a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/DefaultOpenFlowPacketContext.java b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/DefaultOpenFlowPacketContext.java
index e56a4f9..b1536fb 100644
--- a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/DefaultOpenFlowPacketContext.java
+++ b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/DefaultOpenFlowPacketContext.java
@@ -1,5 +1,9 @@
package org.onlab.onos.openflow.controller;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.onlab.packet.Ethernet;
import org.projectfloodlight.openflow.protocol.OFPacketIn;
import org.projectfloodlight.openflow.protocol.OFPacketOut;
@@ -9,9 +13,6 @@
import org.projectfloodlight.openflow.types.OFBufferId;
import org.projectfloodlight.openflow.types.OFPort;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public final class DefaultOpenFlowPacketContext implements OpenFlowPacketContext {
private final AtomicBoolean free = new AtomicBoolean(true);
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
index 5047867..4ea2f71 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OFChannelHandler.java
@@ -41,7 +41,6 @@
import org.projectfloodlight.openflow.protocol.OFHelloElem;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPacketIn;
-import org.projectfloodlight.openflow.protocol.OFPacketOut;
import org.projectfloodlight.openflow.protocol.OFPortDescStatsReply;
import org.projectfloodlight.openflow.protocol.OFPortDescStatsRequest;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
@@ -661,10 +660,9 @@
* However, we could be more forgiving
* @param h the channel handler that received the message
* @param m the message
- * @throws SwitchStateException
- * @throws SwitchStateExeption we always through the execption
+ * @throws SwitchStateException we always throw the exception
*/
- // needs to be protected because enum members are acutally subclasses
+ // needs to be protected because enum members are actually subclasses
protected void illegalMessageReceived(OFChannelHandler h, OFMessage m)
throws SwitchStateException {
String msg = getSwitchStateMessage(h, m,
@@ -1025,7 +1023,9 @@
// all state for the original switch (with the same dpid),
// which we obviously don't want.
log.info("{}:removal called", getSwitchInfoString());
- sw.removeConnectedSwitch();
+ if (sw != null) {
+ sw.removeConnectedSwitch();
+ }
} else {
// A duplicate was disconnected on this ChannelHandler,
// this is the same switch reconnecting, but the original state was
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java
index c4c2e19..3d60dfa 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/drivers/impl/OFSwitchImplCPqD13.java
@@ -1188,7 +1188,8 @@
.setHardTimeout(0)
.setXid(getNextTransactionId())
.build();
- sendMsg(tableMissEntry);
+
+ write(tableMissEntry);
}
private void sendBarrier(boolean finalBarrier) {
@@ -1200,7 +1201,8 @@
.buildBarrierRequest()
.setXid(xid)
.build();
- sendMsg(br);
+
+ write(br);
}
@Override
@@ -1210,7 +1212,7 @@
@Override
public void write(OFMessage msg) {
- this.channel.write(msg);
+ this.channel.write(Collections.singletonList(msg));
}
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
index bf4fee0..e60ed90 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LinkDiscovery.java
@@ -90,7 +90,7 @@
* Instantiates discovery manager for the given physical switch. Creates a
* generic LLDP packet that will be customized for the port it is sent out on.
* Starts the the timer for the discovery process.
- * @param device the physical switch
+ * @param device the physical switch
* @param masterService
* @param useBDDP flag to also use BDDP for discovery
*/
@@ -217,7 +217,7 @@
final PortNumber srcPort = PortNumber.portNumber(onoslldp.getPort());
final DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
final DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId();
- this.ackProbe(srcPort.toLong());
+ this.ackProbe(dstPort.toLong());
ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
@@ -245,7 +245,7 @@
*/
@Override
public void run(final Timeout t) {
- this.log.debug("sending probes");
+ this.log.trace("sending probes");
synchronized (this) {
final Iterator<Long> fastIterator = this.fastPorts.iterator();
Long portNumber;
@@ -256,7 +256,7 @@
.getAndIncrement();
if (probeCount < LinkDiscovery.MAX_PROBE_COUNT) {
- this.log.debug("sending fast probe to port");
+ this.log.trace("sending fast probe to port");
sendProbes(portNumber);
} else {
// Update fast and slow ports
@@ -278,7 +278,7 @@
Iterator<Long> slowIterator = this.slowPorts.iterator();
while (slowIterator.hasNext()) {
portNumber = slowIterator.next();
- this.log.debug("sending slow probe to port {}", portNumber);
+ this.log.trace("sending slow probe to port {}", portNumber);
sendProbes(portNumber);
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java
index cfc3134..e04d87c 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowEntryBuilder.java
@@ -23,6 +23,8 @@
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFInstructionType;
import org.projectfloodlight.openflow.protocol.action.OFAction;
+import org.projectfloodlight.openflow.protocol.action.OFActionCircuit;
+import org.projectfloodlight.openflow.protocol.action.OFActionExperimenter;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
import org.projectfloodlight.openflow.protocol.action.OFActionSetDlDst;
import org.projectfloodlight.openflow.protocol.action.OFActionSetDlSrc;
@@ -34,6 +36,7 @@
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.protocol.match.Match;
import org.projectfloodlight.openflow.protocol.match.MatchField;
+import org.projectfloodlight.openflow.protocol.oxm.OFOxmOchSigidBasic;
import org.projectfloodlight.openflow.types.IPv4Address;
import org.projectfloodlight.openflow.types.Masked;
import org.slf4j.Logger;
@@ -166,6 +169,15 @@
builder.setIpSrc(IpPrefix.valueOf(si.getInt()));
}
break;
+ case EXPERIMENTER:
+ OFActionExperimenter exp = (OFActionExperimenter) act;
+ if (exp.getExperimenter() == 0x80005A06) {
+ OFActionCircuit ct = (OFActionCircuit) exp;
+ builder.setLambda(((OFOxmOchSigidBasic) ct.getField()).getValue().getChannelNumber());
+ } else {
+ log.warn("Unsupported OFActionExperimenter {}", exp.getExperimenter());
+ }
+ break;
case SET_TP_DST:
case SET_TP_SRC:
case POP_MPLS:
@@ -188,7 +200,7 @@
case DEC_MPLS_TTL:
case DEC_NW_TTL:
case ENQUEUE:
- case EXPERIMENTER:
+
case GROUP:
default:
log.warn("Action type {} not yet implemented.", act.getType());
@@ -268,6 +280,10 @@
case TCP_SRC:
builder.matchTcpSrc((short) match.get(MatchField.TCP_SRC).getPort());
break;
+ case OCH_SIGID:
+ builder.matchLambda(match.get(MatchField.OCH_SIGID).getChannelNumber());
+ break;
+ case OCH_SIGTYPE_BASIC:
case ARP_OP:
case ARP_SHA:
case ARP_SPA:
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index aa50833..bb881d2 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -14,6 +14,7 @@
import org.onlab.onos.net.flow.criteria.Criteria.EthTypeCriterion;
import org.onlab.onos.net.flow.criteria.Criteria.IPCriterion;
import org.onlab.onos.net.flow.criteria.Criteria.IPProtocolCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.LambdaCriterion;
import org.onlab.onos.net.flow.criteria.Criteria.PortCriterion;
import org.onlab.onos.net.flow.criteria.Criteria.TcpPortCriterion;
import org.onlab.onos.net.flow.criteria.Criteria.VlanIdCriterion;
@@ -21,6 +22,8 @@
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.flow.instructions.Instructions.OutputInstruction;
+import org.onlab.onos.net.flow.instructions.L0ModificationInstruction;
+import org.onlab.onos.net.flow.instructions.L0ModificationInstruction.ModLambdaInstruction;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModEtherInstruction;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModVlanIdInstruction;
@@ -35,6 +38,7 @@
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.match.Match;
import org.projectfloodlight.openflow.protocol.match.MatchField;
+import org.projectfloodlight.openflow.types.CircuitSignalID;
import org.projectfloodlight.openflow.types.EthType;
import org.projectfloodlight.openflow.types.IPv4Address;
import org.projectfloodlight.openflow.types.IpProtocol;
@@ -137,6 +141,8 @@
case DROP:
log.warn("Saw drop action; assigning drop action");
return new LinkedList<>();
+ case L0MODIFICATION:
+ acts.add(buildL0Modification(i));
case L2MODIFICATION:
acts.add(buildL2Modification(i));
break;
@@ -157,6 +163,20 @@
return acts;
}
+ private OFAction buildL0Modification(Instruction i) {
+ L0ModificationInstruction l0m = (L0ModificationInstruction) i;
+ switch (l0m.subtype()) {
+ case LAMBDA:
+ ModLambdaInstruction ml = (ModLambdaInstruction) i;
+ return factory.actions().circuit(factory.oxms().ochSigidBasic(
+ new CircuitSignalID((byte) 1, (byte) 2, ml.lambda(), (short) 1)));
+ default:
+ log.warn("Unimplemented action type {}.", l0m.subtype());
+ break;
+ }
+ return null;
+ }
+
private OFAction buildL3Modification(Instruction i) {
L3ModificationInstruction l3m = (L3ModificationInstruction) i;
ModIPInstruction ip;
@@ -261,6 +281,11 @@
tp = (TcpPortCriterion) c;
mBuilder.setExact(MatchField.TCP_SRC, TransportPort.of(tp.tcpPort()));
break;
+ case OCH_SIGID:
+ LambdaCriterion lc = (LambdaCriterion) c;
+ mBuilder.setExact(MatchField.OCH_SIGID,
+ new CircuitSignalID((byte) 1, (byte) 2, lc.lambda(), (short) 1));
+ break;
case ARP_OP:
case ARP_SHA:
case ARP_SPA:
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 6fb54e8..8d3c018 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -10,7 +10,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,9 +69,11 @@
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* Provider which uses an OpenFlow controller to detect network
@@ -97,6 +99,8 @@
private final InternalFlowProvider listener = new InternalFlowProvider();
+ // FIXME: This should be an expiring map to ensure futures that don't have
+ // a future eventually get garbage collected.
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
@@ -169,7 +173,7 @@
}
@Override
- public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
+ public ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws =
Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
@@ -330,18 +334,20 @@
}
- private class InstallationFuture implements Future<CompletedBatchOperation> {
+ private class InstallationFuture implements ListenableFuture<CompletedBatchOperation> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
- private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
+ private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
private final CountDownLatch countDownLatch;
private Long pendingXid;
private BatchState state;
+ private final ExecutionList executionList = new ExecutionList();
+
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.state = BatchState.STARTED;
this.sws = sws;
@@ -350,6 +356,7 @@
}
public void fail(OFErrorMsg msg, Dpid dpid) {
+
ok.set(false);
removeRequirement(dpid);
FlowEntry fe = null;
@@ -422,6 +429,9 @@
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
+ if (isDone()) {
+ return false;
+ }
ok.set(false);
this.state = BatchState.CANCELLED;
cleanUp();
@@ -434,7 +444,8 @@
}
}
- return isCancelled();
+ invokeCallbacks();
+ return true;
}
@Override
@@ -444,14 +455,15 @@
@Override
public boolean isDone() {
- return this.state == BatchState.FINISHED;
+ return this.state == BatchState.FINISHED || isCancelled();
}
@Override
public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
this.state = BatchState.FINISHED;
- return new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ return result;
}
@Override
@@ -460,7 +472,8 @@
TimeoutException {
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
- return new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
+ return result;
}
throw new TimeoutException();
}
@@ -478,10 +491,21 @@
private void removeRequirement(Dpid dpid) {
countDownLatch.countDown();
+ if (countDownLatch.getCount() == 0) {
+ invokeCallbacks();
+ }
sws.remove(dpid);
cleanUp();
}
+ @Override
+ public void addListener(Runnable runnable, Executor executor) {
+ executionList.add(runnable, executor);
+ }
+
+ private void invokeCallbacks() {
+ executionList.execute();
+ }
}
}
diff --git a/utils/misc/src/main/java/org/onlab/packet/ChassisId.java b/utils/misc/src/main/java/org/onlab/packet/ChassisId.java
index 5b48e63..4555124 100644
--- a/utils/misc/src/main/java/org/onlab/packet/ChassisId.java
+++ b/utils/misc/src/main/java/org/onlab/packet/ChassisId.java
@@ -32,7 +32,7 @@
* @param value the value to use.
*/
public ChassisId(String value) {
- this.value = Long.valueOf(value, 16);
+ this.value = Long.parseLong(value, 16);
}
/**
diff --git a/utils/misc/src/main/java/org/onlab/packet/DHCP.java b/utils/misc/src/main/java/org/onlab/packet/DHCP.java
index 2a116b1..119faf9 100644
--- a/utils/misc/src/main/java/org/onlab/packet/DHCP.java
+++ b/utils/misc/src/main/java/org/onlab/packet/DHCP.java
@@ -379,7 +379,7 @@
// 300
int optionsLength = 0;
for (final DHCPOption option : this.options) {
- if (option.getCode() == 0 || option.getCode() == 255) {
+ if (option.getCode() == 0 || option.getCode() == ((byte) 255)) {
optionsLength += 1;
} else {
optionsLength += 2 + (0xff & option.getLength());
diff --git a/utils/misc/src/main/java/org/onlab/packet/IPv4.java b/utils/misc/src/main/java/org/onlab/packet/IPv4.java
index 4b9fd66..634ceff 100644
--- a/utils/misc/src/main/java/org/onlab/packet/IPv4.java
+++ b/utils/misc/src/main/java/org/onlab/packet/IPv4.java
@@ -438,7 +438,7 @@
int result = 0;
for (int i = 0; i < 4; ++i) {
- result |= Integer.valueOf(octets[i]) << (3 - i) * 8;
+ result |= Integer.parseInt(octets[i]) << (3 - i) * 8;
}
return result;
}
@@ -471,7 +471,7 @@
int result = 0;
for (int i = 0; i < 4; ++i) {
result = ipAddress >> (3 - i) * 8 & 0xff;
- sb.append(Integer.valueOf(result).toString());
+ sb.append(result);
if (i != 3) {
sb.append(".");
}
diff --git a/utils/misc/src/main/java/org/onlab/util/HexString.java b/utils/misc/src/main/java/org/onlab/util/HexString.java
index db12aa3..2b91d8e 100644
--- a/utils/misc/src/main/java/org/onlab/util/HexString.java
+++ b/utils/misc/src/main/java/org/onlab/util/HexString.java
@@ -14,7 +14,7 @@
*/
public static String toHexString(final byte[] bytes) {
int i;
- StringBuilder ret = new StringBuilder();
+ StringBuilder ret = new StringBuilder(bytes.length * 3 - 1);
String tmp;
for (i = 0; i < bytes.length; i++) {
if (i > 0) {
@@ -31,22 +31,22 @@
public static String toHexString(final long val, final int padTo) {
char[] arr = Long.toHexString(val).toCharArray();
- String ret = "";
+ StringBuilder ret = new StringBuilder(padTo * 3 - 1);
// prepend the right number of leading zeros
int i = 0;
for (; i < (padTo * 2 - arr.length); i++) {
- ret += "0";
+ ret.append('0');
if ((i % 2) != 0) {
- ret += ":";
+ ret.append(':');
}
}
for (int j = 0; j < arr.length; j++) {
- ret += arr[j];
+ ret.append(arr[j]);
if ((((i + j) % 2) != 0) && (j < (arr.length - 1))) {
- ret += ":";
+ ret.append(':');
}
}
- return ret;
+ return ret.toString();
}
public static String toHexString(final long val) {
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 5ef1768..26d835d 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -163,6 +163,7 @@
handlers.putIfAbsent(type, handler);
}
+ @Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
@@ -242,7 +243,7 @@
}
}
- private class WriteTask implements Runnable {
+ private static class WriteTask implements Runnable {
private final InternalMessage message;
private final Channel channel;