stats are being sent; now handle them
diff --git a/features/features.xml b/features/features.xml
index 205d78e..6d981a8 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -7,6 +7,8 @@
description="ONOS 3rd party dependencies">
<bundle>mvn:commons-lang/commons-lang/2.6</bundle>
<bundle>mvn:com.google.guava/guava/18.0</bundle>
+
+ <bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
</feature>
<feature name="onos-thirdparty-web" version="1.0.0"
@@ -18,6 +20,7 @@
<bundle>mvn:com.sun.jersey/jersey-core/1.18.1</bundle>
<bundle>mvn:com.sun.jersey/jersey-server/1.18.1</bundle>
<bundle>mvn:com.sun.jersey/jersey-servlet/1.18.1</bundle>
+
</feature>
<feature name="onos-api" version="1.0.0"
@@ -60,7 +63,6 @@
<feature name="onos-openflow" version="1.0.0"
description="ONOS OpenFlow API, Controller & Providers">
<feature>onos-api</feature>
- <bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
<bundle>mvn:org.onlab.onos/onos-of-api/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-of-ctl/1.0.0-SNAPSHOT</bundle>
diff --git a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/OpenFlowController.java b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/OpenFlowController.java
index 6b2ca9c..b7e98c3 100644
--- a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/OpenFlowController.java
+++ b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/OpenFlowController.java
@@ -78,6 +78,20 @@
public void removePacketListener(PacketListener listener);
/**
+ * Register a listener for OF msg events.
+ *
+ * @param listener the listener to notify
+ */
+ public void addEventListener(OpenFlowEventListener listener);
+
+ /**
+ * Unregister a listener.
+ *
+ * @param listener the listener to unregister
+ */
+ public void removeEventListener(OpenFlowEventListener listener);
+
+ /**
* Send a message to a particular switch.
* @param dpid the switch to send to.
* @param msg the message to send
diff --git a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/OpenFlowEventListener.java b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/OpenFlowEventListener.java
new file mode 100644
index 0000000..3fbacc7
--- /dev/null
+++ b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/OpenFlowEventListener.java
@@ -0,0 +1,17 @@
+package org.onlab.onos.openflow.controller;
+
+import org.projectfloodlight.openflow.protocol.OFMessage;
+
+
+/**
+ * Notifies providers about openflow msg events.
+ */
+public interface OpenFlowEventListener {
+
+ /**
+ * Handles the message event.
+ *
+ * @param msg the message
+ */
+ public void handleMessage(Dpid dpid, OFMessage msg);
+}
diff --git a/openflow/api/src/test/java/org/onlab/onos/openflow/controller/OpenflowControllerAdapter.java b/openflow/api/src/test/java/org/onlab/onos/openflow/controller/OpenflowControllerAdapter.java
index 48484c3..7c6b9da 100644
--- a/openflow/api/src/test/java/org/onlab/onos/openflow/controller/OpenflowControllerAdapter.java
+++ b/openflow/api/src/test/java/org/onlab/onos/openflow/controller/OpenflowControllerAdapter.java
@@ -63,4 +63,16 @@
@Override
public void setRole(Dpid dpid, RoleState role) {
}
+
+ @Override
+ public void addEventListener(OpenFlowEventListener listener) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void removeEventListener(OpenFlowEventListener listener) {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
index 32bfcde..d74c497 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -3,8 +3,6 @@
import static org.onlab.util.Tools.namedThreads;
import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -16,10 +14,10 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.of.controller.OpenFlowEventListener;
import org.onlab.onos.openflow.controller.DefaultOpenFlowPacketContext;
import org.onlab.onos.openflow.controller.Dpid;
import org.onlab.onos.openflow.controller.OpenFlowController;
+import org.onlab.onos.openflow.controller.OpenFlowEventListener;
import org.onlab.onos.openflow.controller.OpenFlowPacketContext;
import org.onlab.onos.openflow.controller.OpenFlowSwitch;
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
@@ -29,13 +27,12 @@
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPacketIn;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
-import org.projectfloodlight.openflow.protocol.OFType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
@Component(immediate = true)
@Service
@@ -45,7 +42,7 @@
LoggerFactory.getLogger(OpenFlowControllerImpl.class);
private final ExecutorService executor = Executors.newFixedThreadPool(16,
- namedThreads("of-event-dispatch-%d"));
+ namedThreads("of-event-%d"));
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
@@ -61,7 +58,7 @@
protected Multimap<Integer, PacketListener> ofPacketListener =
ArrayListMultimap.create();
- protected Map<OFType, List<OpenFlowEventListener>> ofEventListener = Maps.newHashMap();
+ protected Set<OpenFlowEventListener> ofEventListener = Sets.newHashSet();
private final Controller ctrl = new Controller();
@@ -128,6 +125,16 @@
}
@Override
+ public void addEventListener(OpenFlowEventListener listener) {
+ ofEventListener.add(listener);
+ }
+
+ @Override
+ public void removeEventListener(OpenFlowEventListener listener) {
+ ofEventListener.remove(listener);
+ }
+
+ @Override
public void write(Dpid dpid, OFMessage msg) {
this.getSwitch(dpid).sendMsg(msg);
}
@@ -320,14 +327,11 @@
@Override
public void run() {
- List<OpenFlowEventListener> listeners =
- ofEventListener.get(OFType.FLOW_REMOVED);
- for (OpenFlowEventListener listener : listeners) {
+ for (OpenFlowEventListener listener : ofEventListener) {
listener.handleMessage(dpid, msg);
}
}
}
-
}
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
new file mode 100644
index 0000000..ecdd17c
--- /dev/null
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -0,0 +1,243 @@
+package org.onlab.onos.provider.of.flow.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.TrafficSelector;
+import org.onlab.onos.net.flow.TrafficTreatment;
+import org.onlab.onos.net.flow.criteria.Criteria.EthCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.EthTypeCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.IPCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.IPProtocolCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.PortCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.VlanIdCriterion;
+import org.onlab.onos.net.flow.criteria.Criteria.VlanPcpCriterion;
+import org.onlab.onos.net.flow.criteria.Criterion;
+import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.flow.instructions.Instructions.OutputInstruction;
+import org.onlab.onos.net.flow.instructions.L2ModificationInstruction;
+import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModEtherInstruction;
+import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModVlanIdInstruction;
+import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModVlanPcpInstruction;
+import org.onlab.onos.net.flow.instructions.L3ModificationInstruction;
+import org.onlab.onos.net.flow.instructions.L3ModificationInstruction.ModIPInstruction;
+import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
+import org.projectfloodlight.openflow.protocol.OFFlowModFlags;
+import org.projectfloodlight.openflow.protocol.action.OFAction;
+import org.projectfloodlight.openflow.protocol.match.Match;
+import org.projectfloodlight.openflow.protocol.match.MatchField;
+import org.projectfloodlight.openflow.types.EthType;
+import org.projectfloodlight.openflow.types.IPv4Address;
+import org.projectfloodlight.openflow.types.IpProtocol;
+import org.projectfloodlight.openflow.types.MacAddress;
+import org.projectfloodlight.openflow.types.Masked;
+import org.projectfloodlight.openflow.types.OFBufferId;
+import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.OFVlanVidMatch;
+import org.projectfloodlight.openflow.types.VlanPcp;
+import org.projectfloodlight.openflow.types.VlanVid;
+import org.slf4j.Logger;
+
+
+public class FlowModBuilder {
+
+ private final Logger log = getLogger(getClass());
+
+ private final OFFactory factory;
+ private final TrafficTreatment treatment;
+ private final TrafficSelector selector;
+
+ private final int priority;
+
+
+
+ public FlowModBuilder(FlowRule flowRule, OFFactory factory) {
+ this.factory = factory;
+ this.treatment = flowRule.treatment();
+ this.selector = flowRule.selector();
+ this.priority = flowRule.priority();
+ }
+
+ public OFFlowMod buildFlowMod() {
+ Match match = buildMatch();
+ List<OFAction> actions = buildActions();
+
+ //TODO: what to do without bufferid? do we assume that there will be a pktout as well?
+ OFFlowMod fm = factory.buildFlowModify()
+ .setBufferId(OFBufferId.NO_BUFFER)
+ .setActions(actions)
+ .setMatch(match)
+ .setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
+ .setIdleTimeout(10)
+ .setHardTimeout(10)
+ .setPriority(priority)
+ .build();
+
+ return fm;
+
+ }
+
+ private List<OFAction> buildActions() {
+ List<OFAction> acts = new LinkedList<>();
+ for (Instruction i : treatment.instructions()) {
+ switch (i.type()) {
+ case DROP:
+ log.warn("Saw drop action; assigning drop action");
+ return new LinkedList<>();
+ case L2MODIFICATION:
+ acts.add(buildL2Modification(i));
+ case L3MODIFICATION:
+ acts.add(buildL3Modification(i));
+ case OUTPUT:
+ OutputInstruction out = (OutputInstruction) i;
+ acts.add(factory.actions().buildOutput().setPort(
+ OFPort.of((int) out.port().toLong())).build());
+ break;
+ case GROUP:
+ default:
+ log.warn("Instruction type {} not yet implemented.", i.type());
+ }
+ }
+
+ return acts;
+ }
+
+ private OFAction buildL3Modification(Instruction i) {
+ L3ModificationInstruction l3m = (L3ModificationInstruction) i;
+ ModIPInstruction ip;
+ switch (l3m.subtype()) {
+ case L3_DST:
+ ip = (ModIPInstruction) i;
+ return factory.actions().setNwDst(IPv4Address.of(ip.ip().toInt()));
+ case L3_SRC:
+ ip = (ModIPInstruction) i;
+ return factory.actions().setNwSrc(IPv4Address.of(ip.ip().toInt()));
+ default:
+ log.warn("Unimplemented action type {}.", l3m.subtype());
+ break;
+ }
+ return null;
+ }
+
+ private OFAction buildL2Modification(Instruction i) {
+ L2ModificationInstruction l2m = (L2ModificationInstruction) i;
+ ModEtherInstruction eth;
+ switch (l2m.subtype()) {
+ case L2_DST:
+ eth = (ModEtherInstruction) l2m;
+ return factory.actions().setDlDst(MacAddress.of(eth.mac().toLong()));
+ case L2_SRC:
+ eth = (ModEtherInstruction) l2m;
+ return factory.actions().setDlSrc(MacAddress.of(eth.mac().toLong()));
+ case VLAN_ID:
+ ModVlanIdInstruction vlanId = (ModVlanIdInstruction) l2m;
+ return factory.actions().setVlanVid(VlanVid.ofVlan(vlanId.vlanId.toShort()));
+ case VLAN_PCP:
+ ModVlanPcpInstruction vlanPcp = (ModVlanPcpInstruction) l2m;
+ return factory.actions().setVlanPcp(VlanPcp.of(vlanPcp.vlanPcp()));
+ default:
+ log.warn("Unimplemented action type {}.", l2m.subtype());
+ break;
+ }
+ return null;
+ }
+
+ private Match buildMatch() {
+ Match.Builder mBuilder = factory.buildMatch();
+ EthCriterion eth;
+ IPCriterion ip;
+ for (Criterion c : selector.criteria()) {
+ switch (c.type()) {
+ case IN_PORT:
+ PortCriterion inport = (PortCriterion) c;
+ mBuilder.setExact(MatchField.IN_PORT, OFPort.of((int) inport.port().toLong()));
+ break;
+ case ETH_SRC:
+ eth = (EthCriterion) c;
+ mBuilder.setExact(MatchField.ETH_SRC, MacAddress.of(eth.mac().toLong()));
+ break;
+ case ETH_DST:
+ eth = (EthCriterion) c;
+ mBuilder.setExact(MatchField.ETH_DST, MacAddress.of(eth.mac().toLong()));
+ break;
+ case ETH_TYPE:
+ EthTypeCriterion ethType = (EthTypeCriterion) c;
+ mBuilder.setExact(MatchField.ETH_TYPE, EthType.of(ethType.ethType()));
+ break;
+ case IPV4_DST:
+ ip = (IPCriterion) c;
+ if (ip.ip().isMasked()) {
+ Masked<IPv4Address> maskedIp = Masked.of(IPv4Address.of(ip.ip().toInt()),
+ IPv4Address.of(ip.ip().netmask().toInt()));
+ mBuilder.setMasked(MatchField.IPV4_DST, maskedIp);
+ } else {
+ mBuilder.setExact(MatchField.IPV4_DST, IPv4Address.of(ip.ip().toInt()));
+ }
+ break;
+ case IPV4_SRC:
+ ip = (IPCriterion) c;
+ if (ip.ip().isMasked()) {
+ Masked<IPv4Address> maskedIp = Masked.of(IPv4Address.of(ip.ip().toInt()),
+ IPv4Address.of(ip.ip().netmask().toInt()));
+ mBuilder.setMasked(MatchField.IPV4_SRC, maskedIp);
+ } else {
+ mBuilder.setExact(MatchField.IPV4_SRC, IPv4Address.of(ip.ip().toInt()));
+ }
+ break;
+ case IP_PROTO:
+ IPProtocolCriterion p = (IPProtocolCriterion) c;
+ mBuilder.setExact(MatchField.IP_PROTO, IpProtocol.of(p.protocol()));
+ break;
+ case VLAN_PCP:
+ VlanPcpCriterion vpcp = (VlanPcpCriterion) c;
+ mBuilder.setExact(MatchField.VLAN_PCP, VlanPcp.of(vpcp.priority()));
+ break;
+ case VLAN_VID:
+ VlanIdCriterion vid = (VlanIdCriterion) c;
+ mBuilder.setExact(MatchField.VLAN_VID,
+ OFVlanVidMatch.ofVlanVid(VlanVid.ofVlan(vid.vlanId().toShort())));
+ break;
+ case ARP_OP:
+ case ARP_SHA:
+ case ARP_SPA:
+ case ARP_THA:
+ case ARP_TPA:
+ case ICMPV4_CODE:
+ case ICMPV4_TYPE:
+ case ICMPV6_CODE:
+ case ICMPV6_TYPE:
+ case IN_PHY_PORT:
+ case IPV6_DST:
+ case IPV6_EXTHDR:
+ case IPV6_FLABEL:
+ case IPV6_ND_SLL:
+ case IPV6_ND_TARGET:
+ case IPV6_ND_TLL:
+ case IPV6_SRC:
+ case IP_DSCP:
+ case IP_ECN:
+ case METADATA:
+ case MPLS_BOS:
+ case MPLS_LABEL:
+ case MPLS_TC:
+ case PBB_ISID:
+ case SCTP_DST:
+ case SCTP_SRC:
+ case TCP_DST:
+ case TCP_SRC:
+ case TUNNEL_ID:
+ case UDP_DST:
+ case UDP_SRC:
+ default:
+ log.warn("Match type {} not yet implemented.", c.type());
+ }
+ }
+ return mBuilder.build();
+ }
+
+}
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowStatsCollector.java
new file mode 100644
index 0000000..6c2997f
--- /dev/null
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowStatsCollector.java
@@ -0,0 +1,78 @@
+package org.onlab.onos.provider.of.flow.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.onlab.onos.openflow.controller.OpenFlowSwitch;
+import org.onlab.util.Timer;
+import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
+import org.projectfloodlight.openflow.types.OFPort;
+import org.projectfloodlight.openflow.types.TableId;
+import org.slf4j.Logger;
+
+public class FlowStatsCollector implements TimerTask {
+
+ private final Logger log = getLogger(getClass());
+
+ private final HashedWheelTimer timer = Timer.getTimer();
+ private final OpenFlowSwitch sw;
+ private final int refreshInterval;
+
+ private Timeout timeout;
+
+ private boolean stopTimer = false;;
+
+ public FlowStatsCollector(OpenFlowSwitch sw, int refreshInterval) {
+ this.sw = sw;
+ this.refreshInterval = refreshInterval;
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ log.debug("Collecting stats for {}", this.sw.getStringId());
+
+ sendFlowStatistics();
+
+ if (!this.stopTimer) {
+ log.debug("Scheduling stats collection in {} seconds for {}",
+ this.refreshInterval, this.sw.getStringId());
+ timeout.getTimer().newTimeout(this, refreshInterval,
+ TimeUnit.SECONDS);
+ }
+
+
+ }
+
+ private void sendFlowStatistics() {
+ OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
+ .setMatch(sw.factory().matchWildcardAll())
+ .setTableId(TableId.ALL)
+ .setOutPort(OFPort.NO_MASK)
+ .build();
+
+ this.sw.sendMsg(request);
+
+ }
+
+ public void start() {
+
+ /*
+ * Initially start polling quickly. Then drop down to configured value
+ */
+ log.info("Starting Stats collection thread for {}",
+ this.sw.getStringId());
+ timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
+ }
+
+ public void stop() {
+ log.info("Stopping Stats collection thread for {}",
+ this.sw.getStringId());
+ this.stopTimer = true;
+ timeout.cancel();
+ }
+
+}
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 9a85d69..9a5a9dd 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,6 +2,8 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.Map;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -17,9 +19,9 @@
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyService;
-import org.onlab.onos.of.controller.OpenFlowEventListener;
import org.onlab.onos.openflow.controller.Dpid;
import org.onlab.onos.openflow.controller.OpenFlowController;
+import org.onlab.onos.openflow.controller.OpenFlowEventListener;
import org.onlab.onos.openflow.controller.OpenFlowSwitch;
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
@@ -27,6 +29,8 @@
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.slf4j.Logger;
+import com.google.common.collect.Maps;
+
/**
* Provider which uses an OpenFlow controller to detect network
* end-station hosts.
@@ -47,6 +51,8 @@
private FlowRuleProviderService providerService;
+ private final InternalFlowProvider listener = new InternalFlowProvider();
+
/**
* Creates an OpenFlow host provider.
*/
@@ -57,6 +63,8 @@
@Activate
public void activate() {
providerService = providerRegistry.register(this);
+ controller.addListener(listener);
+ controller.addEventListener(listener);
log.info("Started");
}
@@ -99,16 +107,18 @@
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
+ private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
@Override
public void switchAdded(Dpid dpid) {
-
-
+ FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), 1);
+ fsc.start();
+ collectors.put(dpid, fsc);
}
@Override
public void switchRemoved(Dpid dpid) {
-
+ collectors.remove(dpid).stop();
}
@@ -121,11 +131,13 @@
public void handleMessage(Dpid dpid, OFMessage msg) {
switch (msg.getType()) {
case FLOW_REMOVED:
+ //TODO: make this better
OFFlowRemoved removed = (OFFlowRemoved) msg;
FlowRule fr = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)), null, null);
providerService.flowRemoved(fr);
break;
case STATS_REPLY:
+ break;
case BARRIER_REPLY:
case ERROR:
default: