Merge branch 'ONOS' into syncdev (adopt latest one)
Conflicts:
src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
src/main/java/net/onrc/onos/datagrid/IDatagridService.java
src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
diff --git a/conf/onos-embedded.properties b/conf/onos-embedded.properties
index 777ff2b..e280e41 100644
--- a/conf/onos-embedded.properties
+++ b/conf/onos-embedded.properties
@@ -9,6 +9,7 @@
net.floodlightcontroller.ui.web.StaticWebRoutable,\
net.onrc.onos.datagrid.HazelcastDatagrid,\
net.onrc.onos.ofcontroller.flowmanager.FlowManager,\
+net.onrc.onos.ofcontroller.flowprogrammer.FlowProgrammer,\
net.onrc.onos.ofcontroller.topology.TopologyManager,\
net.onrc.onos.registry.controller.ZookeeperRegistry
net.floodlightcontroller.restserver.RestApiServer.port = 8080
diff --git a/conf/onos.properties b/conf/onos.properties
index e858adb..f7bffb2 100644
--- a/conf/onos.properties
+++ b/conf/onos.properties
@@ -9,6 +9,7 @@
net.floodlightcontroller.ui.web.StaticWebRoutable,\
net.onrc.onos.datagrid.HazelcastDatagrid,\
net.onrc.onos.ofcontroller.flowmanager.FlowManager,\
+net.onrc.onos.ofcontroller.flowprogrammer.FlowProgrammer,\
net.onrc.onos.ofcontroller.topology.TopologyManager,\
net.onrc.onos.registry.controller.ZookeeperRegistry
net.floodlightcontroller.restserver.RestApiServer.port = 8080
diff --git a/src/main/java/net/onrc/onos/flow/FlowManagerImpl.java b/src/main/java/net/onrc/onos/flow/FlowManagerImpl.java
index 097ddb7..9865deb 100644
--- a/src/main/java/net/onrc/onos/flow/FlowManagerImpl.java
+++ b/src/main/java/net/onrc/onos/flow/FlowManagerImpl.java
@@ -274,6 +274,7 @@
flowEntryAction.setActionOutput(flowEntry.outPort());
flowEntryActions.addAction(flowEntryAction);
dataPath.flowEntries().add(flowEntry);
+ // TODO (BOC): why is this twice?
dataPath.flowEntries().add(flowEntry);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
index 926788f..f623541 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -888,7 +888,7 @@
* @param flowEntryObj the object to extract the Flow Entry State from.
* @return the extracted Flow Entry State.
*/
- private static FlowEntry extractFlowEntry(IFlowEntry flowEntryObj) {
+ public static FlowEntry extractFlowEntry(IFlowEntry flowEntryObj) {
String flowEntryIdStr = flowEntryObj.getFlowEntryId();
String switchDpidStr = flowEntryObj.getSwitchDpid();
String userState = flowEntryObj.getUserState();
@@ -898,7 +898,7 @@
(switchDpidStr == null) ||
(userState == null) ||
(switchState == null)) {
- // TODO: A work-around, becauuse of some bogus database objects
+ // TODO: A work-around, because of some bogus database objects
return null;
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index c8e10c6..0927e49 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -19,7 +19,6 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
import net.floodlightcontroller.util.OFMessageDamper;
-
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.graph.GraphDBOperation;
import net.onrc.onos.ofcontroller.core.INetMapStorage;
@@ -27,9 +26,9 @@
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
import net.onrc.onos.ofcontroller.flowmanager.web.FlowWebRoutable;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
import net.onrc.onos.ofcontroller.topology.Topology;
-import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.*;
import org.openflow.protocol.OFType;
@@ -46,6 +45,9 @@
// notification mechanism for the Flow Manager.
//
private final static boolean enableNotifications = false;
+
+ // flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
+ private final static boolean enableFlowPusher = false;
protected GraphDBOperation dbHandlerApi;
protected GraphDBOperation dbHandlerInner;
@@ -57,8 +59,10 @@
protected FloodlightModuleContext context;
protected FlowEventHandler flowEventHandler;
+ protected IFlowPusherService pusher;
+
protected OFMessageDamper messageDamper;
-
+
//
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
@@ -122,6 +126,8 @@
Iterable<IFlowEntry> allFlowEntries =
dbHandlerInner.getAllSwitchNotUpdatedFlowEntries();
for (IFlowEntry flowEntryObj : allFlowEntries) {
+ log.debug("flowEntryobj : {}", flowEntryObj);
+
counterAllFlowEntries++;
String dpidStr = flowEntryObj.getSwitchDpid();
@@ -156,6 +162,8 @@
}
counterMyNotUpdatedFlowEntries++;
}
+
+ log.debug("addFlowEntries : {}", addFlowEntries);
//
// Process the Flow Entries that need to be added
@@ -458,9 +466,13 @@
datagridService = context.getServiceImpl(IDatagridService.class);
restApi = context.getServiceImpl(IRestApiService.class);
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
+ if (enableFlowPusher) {
+ pusher = context.getServiceImpl(IFlowPusherService.class);
+ } else {
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
this.init("");
@@ -503,7 +515,7 @@
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
-
+
//
// Create the Flow Event Handler thread and register it with the
// Datagrid Service
@@ -747,7 +759,7 @@
/**
* Reconcile a flow.
*
- * @param flowObj the flow that needs to be reconciliated.
+ * @param flowObj the flow that needs to be reconciled.
* @param newDataPath the new data path to use.
* @return true on success, otherwise false.
*/
@@ -835,9 +847,13 @@
*/
private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowObj, flowEntryObj);
+ if (enableFlowPusher) {
+ return pusher.add(mySwitch, flowObj, flowEntryObj);
+ } else {
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowObj, flowEntryObj);
+ }
}
/**
@@ -850,9 +866,13 @@
*/
private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowPath, flowEntry);
+ if (enableFlowPusher) {
+ return pusher.add(mySwitch, flowPath, flowEntry);
+ } else {
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowPath, flowEntry);
+ }
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
index 9741b04..8bed120 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
@@ -36,11 +36,14 @@
public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
+ // TODO add Pusher instance member
+ //
+
/**
* Install a Flow Entry on a switch.
*
* @param messageFactory the OpenFlow message factory to use.
- * @maram messageDamper the OpenFlow message damper to use.
+ * @param messageDamper the OpenFlow message damper to use.
* @param mySwitch the switch to install the Flow Entry into.
* @param flowObj the flow path object for the flow entry to install.
* @param flowEntryObj the flow entry object to install.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
new file mode 100644
index 0000000..a59a9f9
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -0,0 +1,81 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+public class FlowProgrammer implements IFloodlightModule {
+ private static final boolean enableFlowSync = false;
+
+ protected volatile IFloodlightProviderService floodlightProvider;
+
+ protected FlowPusher pusher;
+ private static final int NUM_PUSHER_THREAD = 1;
+
+ protected FlowSynchronizer synchronizer;
+
+ public FlowProgrammer() {
+ pusher = new FlowPusher(NUM_PUSHER_THREAD);
+ if (enableFlowSync) {
+ synchronizer = new FlowSynchronizer();
+ }
+ }
+
+ @Override
+ public void init(FloodlightModuleContext context)
+ throws FloodlightModuleException {
+ floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
+ pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+ if (enableFlowSync) {
+ synchronizer.init(context);
+ }
+ }
+
+ @Override
+ public void startUp(FloodlightModuleContext context) {
+ pusher.start();
+ if (enableFlowSync) {
+ synchronizer.startUp(context);
+ }
+ }
+
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFlowPusherService.class);
+ if (enableFlowSync) {
+ l.add(IFlowSyncService.class);
+ }
+ return l;
+ }
+
+ @Override
+ public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
+ Map<Class<? extends IFloodlightService>,
+ IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>,
+ IFloodlightService>();
+ m.put(IFlowPusherService.class, pusher);
+ if (enableFlowSync) {
+ m.put(IFlowSyncService.class, synchronizer);
+ }
+ return m;
+ }
+
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFloodlightProviderService.class);
+ return l;
+ }
+
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
new file mode 100644
index 0000000..532477a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -0,0 +1,1006 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.openflow.protocol.*;
+import org.openflow.protocol.action.*;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.util.MACAddress;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
+import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.IPv4Net;
+import net.onrc.onos.ofcontroller.util.Port;
+
+/**
+ * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
+ * messages to switches in proper rate.
+ * @author Naoki Shiota
+ *
+ */
+public class FlowPusher implements IFlowPusherService {
+ private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+
+ // NOTE: Below are moved from FlowManager.
+ // TODO: Values copied from elsewhere (class LearningSwitch).
+ // The local copy should go away!
+ //
+ protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
+ protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+
+ // Interval of sleep when queue is empty
+ protected static final long SLEEP_MILLI_SEC = 10;
+ protected static final int SLEEP_NANO_SEC = 0;
+
+ // Number of messages sent to switch at once
+ protected static final int MAX_MESSAGE_SEND = 100;
+
+ public static final short PRIORITY_DEFAULT = 100;
+ public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
+ public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
+
+ public enum QueueState {
+ READY,
+ SUSPENDED,
+ }
+
+ @SuppressWarnings("serial")
+ private class SwitchQueue extends ArrayDeque<OFMessage> {
+ QueueState state;
+
+ // Max rate of sending message (bytes/sec). 0 implies no limitation.
+ long max_rate = 0; // 0 indicates no limitation
+ long last_sent_time = 0;
+ long last_sent_size = 0;
+
+ /**
+ * Check if sending rate is within the rate
+ * @param current Current time
+ * @return true if within the rate
+ */
+ boolean isSendable(long current) {
+ if (max_rate == 0) {
+ // no limitation
+ return true;
+ }
+
+ long rate = last_sent_size / (current - last_sent_time);
+
+ if (rate < max_rate) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void logSentData(long current, long size) {
+ last_sent_time = current;
+ last_sent_size = size;
+ }
+
+ }
+
+ private OFMessageDamper messageDamper;
+
+ private FloodlightContext context = null;
+ private BasicFactory factory = null;
+ private Map<Long, FlowPusherProcess> threadMap = null;
+
+ private int number_thread = 1;
+
+ /**
+ * Main thread that reads messages from queues and sends them to switches.
+ * @author Naoki Shiota
+ *
+ */
+ private class FlowPusherProcess implements Runnable {
+ private Map<IOFSwitch,SwitchQueue> queues
+ = new HashMap<IOFSwitch,SwitchQueue>();
+
+ private boolean isStopped = false;
+ private boolean isMsgAdded = false;
+
+ @Override
+ public void run() {
+ log.debug("Begin Flow Pusher Process");
+
+ while (true) {
+ Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+ synchronized (queues) {
+ entries = queues.entrySet();
+ }
+
+ // Set taint flag to false at this moment.
+ isMsgAdded = false;
+
+ for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
+ IOFSwitch sw = entry.getKey();
+ SwitchQueue queue = entry.getValue();
+
+ // Skip if queue is suspended
+ if (sw == null || queue == null ||
+ queue.state != QueueState.READY) {
+ continue;
+ }
+
+ // check sending rate and determine it to be sent or not
+ long current_time = System.nanoTime();
+ long size = 0;
+
+ synchronized (queue) {
+ if (queue.isSendable(current_time)) {
+ int i = 0;
+ while (! queue.isEmpty()) {
+ // Number of messages excess the limit
+ if (++i >= MAX_MESSAGE_SEND) {
+ // Messages remains in queue
+ isMsgAdded = true;
+ break;
+ }
+
+ OFMessage msg = queue.poll();
+
+ // if need to send, call IOFSwitch#write()
+ try {
+ messageDamper.write(sw, msg, context);
+ log.debug("Pusher sends message : {}", msg);
+ size += msg.getLength();
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error("Exception in sending message ({}) : {}", msg, e);
+ }
+ }
+ sw.flush();
+ queue.logSentData(current_time, size);
+ }
+ }
+ }
+
+ // sleep while all queues are empty
+ while (! (isMsgAdded || isStopped)) {
+ try {
+ Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("Thread.sleep failed");
+ }
+ }
+
+ log.debug("Exit sleep loop.");
+
+ if (isStopped) {
+ log.debug("Pusher Process finished.");
+ return;
+ }
+
+ }
+ }
+ }
+
+ public FlowPusher() {
+
+ }
+
+ public FlowPusher(int number_thread) {
+ this.number_thread = number_thread;
+ }
+
+ public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
+ this.context = context;
+ this.factory = factory;
+
+ if (damper != null) {
+ messageDamper = damper;
+ } else {
+ // use default value
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
+ }
+
+ /**
+ * Begin processing queue.
+ */
+ public void start() {
+ if (factory == null) {
+ log.error("FlowPusher not yet initialized.");
+ return;
+ }
+
+ threadMap = new HashMap<Long,FlowPusherProcess>();
+ for (long i = 0; i < number_thread; ++i) {
+ FlowPusherProcess runnable = new FlowPusherProcess();
+ threadMap.put(i, runnable);
+
+ Thread thread = new Thread(runnable);
+ thread.start();
+ }
+ }
+
+ /**
+ * Suspend sending messages to switch.
+ * @param sw
+ */
+ @Override
+ public boolean suspend(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ return false;
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.READY) {
+ queue.state = QueueState.SUSPENDED;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Resume sending messages to switch.
+ */
+ @Override
+ public boolean resume(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ return false;
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.SUSPENDED) {
+ queue.state = QueueState.READY;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Check if given switch is suspended.
+ */
+ @Override
+ public boolean isSuspended(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ // TODO Is true suitable for this case?
+ return true;
+ }
+
+ return (queue.state == QueueState.SUSPENDED);
+ }
+
+ /**
+ * Stop processing queue and exit thread.
+ */
+ public void stop() {
+ if (threadMap == null) {
+ return;
+ }
+
+ for (FlowPusherProcess runnable : threadMap.values()) {
+ if (! runnable.isStopped) {
+ runnable.isStopped = true;
+ }
+ }
+ }
+
+ /**
+ * Set sending rate to a switch.
+ * @param sw Switch.
+ * @param rate Rate in bytes/sec.
+ */
+ public void setRate(IOFSwitch sw, long rate) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return;
+ }
+
+ if (rate > 0) {
+ queue.max_rate = rate;
+ }
+ }
+
+ /**
+ * Add OFMessage to the queue related to given switch.
+ * @param sw Switch to which message is sent.
+ * @param msg Message to be sent.
+ * @return true if succeed.
+ */
+ @Override
+ public boolean add(IOFSwitch sw, OFMessage msg) {
+ FlowPusherProcess proc = getProcess(sw);
+ SwitchQueue queue = proc.queues.get(sw);
+
+ if (queue == null) {
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ synchronized (proc) {
+ proc.queues.put(sw, queue);
+ }
+ }
+
+ synchronized (queue) {
+ queue.add(msg);
+ log.debug("Message is pushed : {}", msg);
+ }
+
+ proc.isMsgAdded = true;
+
+ return true;
+ }
+
+ /**
+ * Create OFMessage from given flow information and add it to the queue.
+ * @param sw Switch to which message is sent.
+ * @param flowObj FlowPath.
+ * @param flowEntryObj FlowEntry.
+ * @return true if succeed.
+ */
+ @Override
+ public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
+ log.debug("sending : {}, {}", sw, flowObj);
+ String flowEntryIdStr = flowEntryObj.getFlowEntryId();
+ if (flowEntryIdStr == null)
+ return false;
+ FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
+ String userState = flowEntryObj.getUserState();
+ if (userState == null)
+ return false;
+
+ //
+ // Create the Open Flow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntryId.value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (userState.equals("FE_USER_ADD")) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (userState.equals("FE_USER_MODIFY")) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (userState.equals("FE_USER_DELETE")) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntryId.toString(), userState);
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+
+ // Match the Incoming Port
+ Short matchInPort = flowEntryObj.getMatchInPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ String matchSrcMac = flowEntryObj.getMatchSrcMac();
+ if (matchSrcMac == null)
+ matchSrcMac = flowObj.getMatchSrcMac();
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ String matchDstMac = flowEntryObj.getMatchDstMac();
+ if (matchDstMac == null)
+ matchDstMac = flowObj.getMatchDstMac();
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
+ if (matchEthernetFrameType == null)
+ matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryObj.getMatchVlanId();
+ if (matchVlanId == null)
+ matchVlanId = flowObj.getMatchVlanId();
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
+ if (matchVlanPriority == null)
+ matchVlanPriority = flowObj.getMatchVlanPriority();
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
+ if (matchSrcIPv4Net == null)
+ matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
+ }
+
+ // Match the Destination IPv4 Network prefix
+ String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
+ if (matchDstIPv4Net == null)
+ matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryObj.getMatchIpProto();
+ if (matchIpProto == null)
+ matchIpProto = flowObj.getMatchIpProto();
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryObj.getMatchIpToS();
+ if (matchIpToS == null)
+ matchIpToS = flowObj.getMatchIpToS();
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
+ if (matchSrcTcpUdpPort == null)
+ matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
+ if (matchDstTcpUdpPort == null)
+ matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = null;
+ String actionsStr = flowEntryObj.getActions();
+ if (actionsStr != null)
+ flowEntryActions = new FlowEntryActions(actionsStr);
+ else
+ flowEntryActions = new FlowEntryActions();
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa =
+ new OFActionOutput(actionOutput.port().value(),
+ (short)0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa =
+ new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa =
+ new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa =
+ new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa =
+ new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa =
+ new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa =
+ new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa =
+ new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa =
+ new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa =
+ new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa =
+ new OFActionEnqueue(actionEnqueue.port().value(),
+ actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+ .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+ .setPriority(PRIORITY_DEFAULT)
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE)
+ .setCookie(cookie)
+ .setCommand(flowModCommand)
+ .setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
+ (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // TODO: Set the following flag
+ // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ // See method ForwardingBase::pushRoute()
+ //
+
+ //
+ // Write the message to the switch
+ //
+ log.debug("MEASUREMENT: Installing flow entry " + userState +
+ " into switch DPID: " +
+ sw.getStringId() +
+ " flowEntryId: " + flowEntryId.toString() +
+ " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
+ " inPort: " + matchInPort + " outPort: " + actionOutputPort
+ );
+ add(sw,fm);
+ //
+ // TODO: We should use the OpenFlow Barrier mechanism
+ // to check for errors, and update the SwitchState
+ // for a flow entry after the Barrier message is
+ // is received.
+ //
+ flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
+
+ return true;
+ }
+
+ /**
+ * Create OFMessage from given flow information and add it to the queue.
+ * @param sw Switch to which message is sent.
+ * @param flowPath FlowPath.
+ * @param flowEntry FlowEntry.
+ * @return true if secceed.
+ */
+ @Override
+ public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
+ //
+ // Create the OpenFlow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntry.flowEntryId().value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug(
+ "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntry.flowEntryId().toString(),
+ flowEntry.flowEntryUserState());
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+ FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
+ FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+ // Match the Incoming Port
+ Port matchInPort = flowEntryMatch.inPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort.value());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ MACAddress matchSrcMac = flowEntryMatch.srcMac();
+ if ((matchSrcMac == null) && (flowPathMatch != null)) {
+ matchSrcMac = flowPathMatch.srcMac();
+ }
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ MACAddress matchDstMac = flowEntryMatch.dstMac();
+ if ((matchDstMac == null) && (flowPathMatch != null)) {
+ matchDstMac = flowPathMatch.dstMac();
+ }
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+ if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
+ matchEthernetFrameType = flowPathMatch.ethernetFrameType();
+ }
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryMatch.vlanId();
+ if ((matchVlanId == null) && (flowPathMatch != null)) {
+ matchVlanId = flowPathMatch.vlanId();
+ }
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+ if ((matchVlanPriority == null) && (flowPathMatch != null)) {
+ matchVlanPriority = flowPathMatch.vlanPriority();
+ }
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards()
+ & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+ if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
+ matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
+ }
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+ }
+
+ // Natch the Destination IPv4 Network prefix
+ IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+ if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
+ matchDstIPv4Net = flowPathMatch.dstIPv4Net();
+ }
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryMatch.ipProto();
+ if ((matchIpProto == null) && (flowPathMatch != null)) {
+ matchIpProto = flowPathMatch.ipProto();
+ }
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryMatch.ipToS();
+ if ((matchIpToS == null) && (flowPathMatch != null)) {
+ matchIpToS = flowPathMatch.ipToS();
+ }
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+ if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
+ matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
+ }
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+ if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
+ matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
+ }
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+ //
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action
+ .actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+ .actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action
+ .actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+ .actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action
+ .actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+ .actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+ .actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+ .value(), (short) 0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+ actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+ actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+ actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+ actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+ actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+ actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+ actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+ actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+ actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+ .value(), actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+ .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+ .setPriority(PRIORITY_DEFAULT)
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+ .setCommand(flowModCommand).setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+ || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // TODO: Set the following flag
+ // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ // See method ForwardingBase::pushRoute()
+ //
+
+ //
+ // Write the message to the switch
+ //
+ log.debug("MEASUREMENT: Installing flow entry "
+ + flowEntry.flowEntryUserState() + " into switch DPID: "
+ + sw.getStringId() + " flowEntryId: "
+ + flowEntry.flowEntryId().toString() + " srcMac: "
+ + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
+ + matchInPort + " outPort: " + actionOutputPort);
+
+ //
+ // TODO: We should use the OpenFlow Barrier mechanism
+ // to check for errors, and update the SwitchState
+ // for a flow entry after the Barrier message is
+ // is received.
+ //
+ // TODO: The FlowEntry Object in Titan should be set
+ // to FE_SWITCH_UPDATED.
+ //
+
+ return add(sw,fm);
+ }
+
+ private SwitchQueue getQueue(IOFSwitch sw) {
+ if (sw == null) {
+ return null;
+ }
+
+ return getProcess(sw).queues.get(sw);
+ }
+
+ private long getHash(IOFSwitch sw) {
+ // TODO should consider equalization algorithm
+ return sw.getId() % number_thread;
+ }
+
+ private FlowPusherProcess getProcess(IOFSwitch sw) {
+ long hash = getHash(sw);
+
+ return threadMap.get(hash);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
new file mode 100644
index 0000000..b2e4552
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -0,0 +1,306 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFStatisticsRequest;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionDataLayerDestination;
+import org.openflow.protocol.action.OFActionDataLayerSource;
+import org.openflow.protocol.action.OFActionEnqueue;
+import org.openflow.protocol.action.OFActionNetworkLayerDestination;
+import org.openflow.protocol.action.OFActionNetworkLayerSource;
+import org.openflow.protocol.action.OFActionNetworkTypeOfService;
+import org.openflow.protocol.action.OFActionOutput;
+import org.openflow.protocol.action.OFActionStripVirtualLan;
+import org.openflow.protocol.action.OFActionTransportLayerDestination;
+import org.openflow.protocol.action.OFActionTransportLayerSource;
+import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
+import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
+import org.openflow.protocol.statistics.OFFlowStatisticsReply;
+import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
+import org.openflow.protocol.statistics.OFStatistics;
+import org.openflow.protocol.statistics.OFStatisticsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.tinkerpop.blueprints.Direction;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.restserver.IRestApiService;
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.graph.GraphDBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.onrc.onos.ofcontroller.core.module.IOnosService;
+import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction;
+import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionEnqueue;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionOutput;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetEthernetAddr;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIPv4Addr;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIpToS;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetTcpUdpPort;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanId;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanPriority;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionStripVlan;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+
+public class FlowSynchronizer implements IFlowSyncService, IOFSwitchListener {
+
+ protected static Logger log = LoggerFactory.getLogger(FlowSynchronizer.class);
+ protected IFloodlightProviderService floodlightProvider;
+ protected IControllerRegistryService registryService;
+ protected IFlowPusherService pusher;
+
+ private GraphDBOperation dbHandler;
+ private Map<IOFSwitch, Thread> switchThread = new HashMap<IOFSwitch, Thread>();
+
+ public FlowSynchronizer() {
+ dbHandler = new GraphDBOperation("");
+ }
+
+ public void synchronize(IOFSwitch sw) {
+ Synchroizer sync = new Synchroizer(sw);
+ Thread t = new Thread(sync);
+ t.start();
+ switchThread.put(sw, t);
+ }
+
+ @Override
+ public void addedSwitch(IOFSwitch sw) {
+ log.debug("Switch added: {}", sw.getId());
+
+ if (registryService.hasControl(sw.getId())) {
+ synchronize(sw);
+ }
+ }
+
+ @Override
+ public void removedSwitch(IOFSwitch sw) {
+ log.debug("Switch removed: {}", sw.getId());
+
+ Thread t = switchThread.remove(sw);
+ if(t != null) {
+ t.interrupt();
+ }
+
+ }
+
+ @Override
+ public void switchPortChanged(Long switchId) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public String getName() {
+ return "FlowSynchronizer";
+ }
+
+ //@Override
+ public void init(FloodlightModuleContext context)
+ throws FloodlightModuleException {
+ floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
+ registryService = context.getServiceImpl(IControllerRegistryService.class);
+ pusher = context.getServiceImpl(IFlowPusherService.class);
+ }
+
+ //@Override
+ public void startUp(FloodlightModuleContext context) {
+ floodlightProvider.addOFSwitchListener(this);
+ }
+
+ protected class Synchroizer implements Runnable {
+ IOFSwitch sw;
+ ISwitchObject swObj;
+
+ public Synchroizer(IOFSwitch sw) {
+ this.sw = sw;
+ Dpid dpid = new Dpid(sw.getId());
+ this.swObj = dbHandler.searchSwitch(dpid.toString());
+ }
+
+ @Override
+ public void run() {
+ Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
+ Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
+ compare(graphEntries, switchEntries);
+ }
+
+ private void compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
+ int added = 0, removed = 0, skipped = 0;
+ for(FlowEntryWrapper entry : switchEntries) {
+ if(graphEntries.contains(entry)) {
+ graphEntries.remove(entry);
+ skipped++;
+ }
+ else {
+ // remove flow entry from the switch
+ entry.removeFromSwitch(sw);
+ removed++;
+ }
+ }
+ for(FlowEntryWrapper entry : graphEntries) {
+ // add flow entry to switch
+ entry.addToSwitch(sw);
+ added++;
+ }
+ log.debug("Flow entries added "+ added + ", " +
+ "Flow entries removed "+ removed + ", " +
+ "Flow entries skipped " + skipped);
+ }
+
+ private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
+ Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
+ for(IFlowEntry entry : swObj.getFlowEntries()) {
+ FlowEntryWrapper fe = new FlowEntryWrapper(entry);
+ entries.add(fe);
+ }
+ return entries;
+ }
+
+ private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
+
+ int lengthU = 0;
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+
+ OFFlowStatisticsRequest stat = new OFFlowStatisticsRequest();
+ stat.setOutPort((short) 0xffff); //TODO: OFPort.OFPP_NONE
+ stat.setTableId((byte) 0xff); // TODO: fix this with enum (ALL TABLES)
+ stat.setMatch(match);
+ List<OFStatistics> stats = new ArrayList<OFStatistics>();
+ stats.add(stat);
+ lengthU += stat.getLength();
+
+ OFStatisticsRequest req = new OFStatisticsRequest();
+ req.setStatisticType(OFStatisticsType.FLOW);
+ req.setStatistics(stats);
+ lengthU += req.getLengthU();
+ req.setLengthU(lengthU);
+
+ List<OFStatistics> entries = null;
+ try {
+ Future<List<OFStatistics>> dfuture = sw.getStatistics(req);
+ entries = dfuture.get();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ Set<FlowEntryWrapper> results = new HashSet<FlowEntryWrapper>();
+ for(OFStatistics result : entries){
+ OFFlowStatisticsReply entry = (OFFlowStatisticsReply) result;
+ FlowEntryWrapper fe = new FlowEntryWrapper(entry);
+ results.add(fe);
+ }
+ return results;
+ }
+
+ }
+
+ class FlowEntryWrapper {
+ FlowEntryId id;
+ IFlowEntry iflowEntry;
+ OFFlowStatisticsReply statisticsReply;
+
+ public FlowEntryWrapper(IFlowEntry entry) {
+ iflowEntry = entry;
+ id = new FlowEntryId(entry.getFlowEntryId());
+ }
+
+ public FlowEntryWrapper(OFFlowStatisticsReply entry) {
+ statisticsReply = entry;
+ id = new FlowEntryId(entry.getCookie());
+ }
+
+ public void addToSwitch(IOFSwitch sw) {
+ if(iflowEntry != null) {
+ pusher.add(sw, iflowEntry.getFlow(), iflowEntry);
+ }
+ else if(statisticsReply != null) {
+ log.error("Adding existing flow entry {} to sw {}",
+ statisticsReply.getCookie(), sw.getId());
+ }
+ }
+
+ public void removeFromSwitch(IOFSwitch sw){
+ if(iflowEntry != null) {
+ log.error("Removing non-existent flow entry {} from sw {}",
+ iflowEntry.getFlowEntryId(), sw.getId());
+
+ }
+ else if(statisticsReply != null) {
+ // Convert Statistics Reply to Flow Mod, then write it
+ OFFlowMod fm = new OFFlowMod();
+ fm.setCookie(statisticsReply.getCookie());
+ fm.setCommand(OFFlowMod.OFPFC_DELETE_STRICT);
+ fm.setLengthU(OFFlowMod.MINIMUM_LENGTH);
+ fm.setMatch(statisticsReply.getMatch());
+ fm.setPriority(statisticsReply.getPriority());
+ fm.setOutPort(OFPort.OFPP_NONE);
+ pusher.add(sw, fm);
+ }
+ }
+
+ /**
+ * Return the hash code of the Flow Entry ID
+ */
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ /**
+ * Returns true of the object is another Flow Entry ID with
+ * the same value; otherwise, returns false.
+ *
+ * @param Object to compare
+ */
+ @Override
+ public boolean equals(Object obj){
+ if(obj.getClass() == this.getClass()) {
+ FlowEntryWrapper entry = (FlowEntryWrapper) obj;
+ // TODO: we need to actually compare the match + actions
+ return this.id.equals(entry.id);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return id.toString();
+ }
+ }
+}
+
+
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
new file mode 100644
index 0000000..e16dd20
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -0,0 +1,43 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+import org.openflow.protocol.OFMessage;
+
+public interface IFlowPusherService extends IFloodlightService {
+ /**
+ * Add a message to the queue of a switch.
+ * @param sw
+ * @param msg
+ * @return
+ */
+ boolean add(IOFSwitch sw, OFMessage msg);
+ boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
+ boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+
+ /**
+ * Suspend pushing message to a switch.
+ * @param sw
+ * @return true if success
+ */
+ boolean suspend(IOFSwitch sw);
+
+ /**
+ * Resume pushing message to a switch.
+ * @param sw
+ * @return true if success
+ */
+ boolean resume(IOFSwitch sw);
+
+ /**
+ * Get whether pushing of message is suspended or not.
+ * @param sw
+ * @return true if suspended
+ */
+ boolean isSuspended(IOFSwitch sw);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
new file mode 100644
index 0000000..1e71f6a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
@@ -0,0 +1,13 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+/**
+ * @author bocon
+ *
+ */
+public interface IFlowSyncService extends IFloodlightService {
+ public void synchronize(IOFSwitch sw);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
index c4e75a5..29efe4c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/FlowEntryId.java
@@ -66,6 +66,29 @@
public void setValue(long value) {
this.value = value;
}
+
+ /**
+ * Returns true of the object is another Flow Entry ID with
+ * the same value; otherwise, returns false.
+ *
+ * @param Object to compare
+ */
+ @Override
+ public boolean equals(Object obj){
+ if(obj.getClass() == this.getClass()) {
+ FlowEntryId entry = (FlowEntryId) obj;
+ return this.value() == entry.value();
+ }
+ return false;
+ }
+
+ /**
+ * Return the hash code of the Flow Entry ID
+ */
+ @Override
+ public int hashCode() {
+ return Long.valueOf(value).hashCode();
+ }
/**
* Convert the Flow Entry ID value to a hexadecimal string.
diff --git a/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule b/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule
index 7c4bc1a..fcdd6b5 100644
--- a/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule
+++ b/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule
@@ -20,8 +20,9 @@
net.floodlightcontroller.core.test.MockThreadPoolService
net.onrc.onos.datagrid.HazelcastDatagrid
net.onrc.onos.ofcontroller.flowmanager.FlowManager
+net.onrc.onos.ofcontroller.flowprogrammer.FlowProgrammer
net.onrc.onos.ofcontroller.topology.TopologyManager
net.onrc.onos.ofcontroller.bgproute.BgpRoute
net.onrc.onos.registry.controller.ZookeeperRegistry
net.onrc.onos.registry.controller.StandaloneRegistry
-net.onrc.onos.ofcontroller.core.module.OnosModuleLoader
\ No newline at end of file
+net.onrc.onos.ofcontroller.core.module.OnosModuleLoader