Renamed devicemanager, flowprogrammer, linkdiscovery and util packages
net.onrc.onos.ofcontroller.devicemanager.* => net.onrc.onos.core.devicemanager.*
net.onrc.onos.ofcontroller.flowprogrammer.* => net.onrc.onos.core.flowprogrammer.*
net.onrc.onos.ofcontroller.linkdiscovery.* => net.onrc.onos.core.linkdiscovery.*
net.onrc.onos.ofcontroller.util.* => net.onrc.onos.core.util.*
Change-Id: Iaa865af552e8fb3a589e73d006569ac79f5a0f08
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
new file mode 100644
index 0000000..d30c10f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
@@ -0,0 +1,175 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.openflow.protocol.OFFlowRemoved;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitchListener;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.restserver.IRestApiService;
+import net.onrc.onos.core.flowprogrammer.web.FlowProgrammerWebRoutable;
+import net.onrc.onos.core.util.FlowEntryId;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+
+/**
+ * FlowProgrammer is a module responsible to maintain flows installed to switches.
+ * FlowProgrammer consists of FlowPusher and FlowSynchronizer.
+ * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizes
+ * flows between GraphDB and switches.
+ * FlowProgrammer also watch the event of addition/deletion of switches to
+ * start/stop synchronization. When a switch is added to network, FlowProgrammer
+ * immediately kicks synchronization to keep switch's flow table latest state.
+ * Adversely, when a switch is removed from network, FlowProgrammer immediately
+ * stops synchronization.
+ *
+ * @author Brian
+ */
+public class FlowProgrammer implements IFloodlightModule,
+ IOFMessageListener,
+ IOFSwitchListener {
+ // flag to enable FlowSynchronizer
+ private static final boolean enableFlowSync = false;
+ protected static final Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
+ protected volatile IFloodlightProviderService floodlightProvider;
+ protected volatile IControllerRegistryService registryService;
+ protected volatile IRestApiService restApi;
+
+ protected FlowPusher pusher;
+ private static final int NUM_PUSHER_THREAD = 1;
+
+ protected FlowSynchronizer synchronizer;
+
+ public FlowProgrammer() {
+ pusher = new FlowPusher(NUM_PUSHER_THREAD);
+ if (enableFlowSync) {
+ synchronizer = new FlowSynchronizer();
+ }
+ }
+
+ @Override
+ public void init(FloodlightModuleContext context)
+ throws FloodlightModuleException {
+ floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
+ registryService = context.getServiceImpl(IControllerRegistryService.class);
+ restApi = context.getServiceImpl(IRestApiService.class);
+ pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
+ if (enableFlowSync) {
+ synchronizer.init(pusher);
+ }
+ }
+
+ @Override
+ public void startUp(FloodlightModuleContext context) {
+ restApi.addRestletRoutable(new FlowProgrammerWebRoutable());
+ pusher.start();
+ floodlightProvider.addOFMessageListener(OFType.FLOW_REMOVED, this);
+ floodlightProvider.addOFSwitchListener(this);
+ }
+
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFlowPusherService.class);
+ if (enableFlowSync) {
+ l.add(IFlowSyncService.class);
+ }
+ return l;
+ }
+
+ @Override
+ public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
+ Map<Class<? extends IFloodlightService>,
+ IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>,
+ IFloodlightService>();
+ m.put(IFlowPusherService.class, pusher);
+ if (enableFlowSync) {
+ m.put(IFlowSyncService.class, synchronizer);
+ }
+ return m;
+ }
+
+ @Override
+ public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFloodlightProviderService.class);
+ l.add(IRestApiService.class);
+ return l;
+ }
+
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return "FlowProgrammer";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ switch (msg.getType()) {
+ case FLOW_REMOVED:
+ OFFlowRemoved flowMsg = (OFFlowRemoved) msg;
+ FlowEntryId id = new FlowEntryId(flowMsg.getCookie());
+ log.debug("Got flow entry removed from {}: {}", sw.getId(), id);
+ // TODO: Inform the Forwarding module that a flow has expired
+ break;
+ default:
+ break;
+ }
+
+ return Command.CONTINUE;
+ }
+
+ @Override
+ public void addedSwitch(IOFSwitch sw) {
+ log.debug("Switch added: {}", sw.getId());
+
+ if (enableFlowSync) {
+ if (registryService.hasControl(sw.getId())) {
+ synchronizer.synchronize(sw);
+ }
+ }
+ }
+
+ @Override
+ public void removedSwitch(IOFSwitch sw) {
+ log.debug("Switch removed: {}", sw.getId());
+
+ if (enableFlowSync) {
+ synchronizer.interrupt(sw);
+ }
+ }
+
+ @Override
+ public void switchPortChanged(Long switchId) {
+ // TODO Auto-generated method stub
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
new file mode 100644
index 0000000..bdb1f43
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -0,0 +1,1068 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.openflow.protocol.*;
+import org.openflow.protocol.action.*;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.MACAddress;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.onrc.onos.core.util.FlowEntry;
+import net.onrc.onos.core.util.FlowEntryAction;
+import net.onrc.onos.core.util.FlowEntryActions;
+import net.onrc.onos.core.util.FlowEntryMatch;
+import net.onrc.onos.core.util.FlowEntryUserState;
+import net.onrc.onos.core.util.IPv4Net;
+import net.onrc.onos.core.util.Pair;
+import net.onrc.onos.core.util.Port;
+import net.onrc.onos.core.util.FlowEntryAction.*;
+
+/**
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
+ *
+ * @author Naoki Shiota
+ */
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
+ private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ protected static final int DEFAULT_NUMBER_THREAD = 1;
+
+ // TODO: Values copied from elsewhere (class LearningSwitch).
+ // The local copy should go away!
+ //
+ protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
+ protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+
+ // Number of messages sent to switch at once
+ protected static final int MAX_MESSAGE_SEND = 100;
+
+ private static class SwitchQueueEntry {
+ OFMessage msg;
+
+ public SwitchQueueEntry(OFMessage msg) {
+ this.msg = msg;
+ }
+
+ public OFMessage getOFMessage() {
+ return msg;
+ }
+ }
+
+ /**
+ * SwitchQueue represents message queue attached to a switch.
+ * This consists of queue itself and variables used for limiting sending rate.
+ *
+ * @author Naoki Shiota
+ */
+ private class SwitchQueue {
+ List<Queue<SwitchQueueEntry>> raw_queues;
+ QueueState state;
+
+ // Max rate of sending message (bytes/ms). 0 implies no limitation.
+ long max_rate = 0; // 0 indicates no limitation
+ long last_sent_time = 0;
+ long last_sent_size = 0;
+
+ // "To be deleted" flag
+ boolean toBeDeleted = false;
+
+ SwitchQueue() {
+ raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
+ MsgPriority.values().length);
+ for (int i = 0; i < MsgPriority.values().length; ++i) {
+ raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
+ }
+
+ state = QueueState.READY;
+ }
+
+ /**
+ * Check if sending rate is within the rate
+ *
+ * @param current Current time
+ * @return true if within the rate
+ */
+ boolean isSendable(long current) {
+ if (max_rate == 0) {
+ // no limitation
+ return true;
+ }
+
+ if (current == last_sent_time) {
+ return false;
+ }
+
+ // Check if sufficient time (from aspect of rate) elapsed or not.
+ long rate = last_sent_size / (current - last_sent_time);
+ return (rate < max_rate);
+ }
+
+ /**
+ * Log time and size of last sent data.
+ *
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
+ void logSentData(long current, long size) {
+ last_sent_time = current;
+ last_sent_size = size;
+ }
+
+ boolean add(SwitchQueueEntry entry, MsgPriority priority) {
+ Queue<SwitchQueueEntry> queue = getQueue(priority);
+ if (queue == null) {
+ log.error("Unexpected priority : ", priority);
+ return false;
+ }
+ return queue.add(entry);
+ }
+
+ /**
+ * Poll single appropriate entry object according to QueueState.
+ *
+ * @return Entry object.
+ */
+ SwitchQueueEntry poll() {
+ switch (state) {
+ case READY: {
+ for (int i = 0; i < raw_queues.size(); ++i) {
+ SwitchQueueEntry entry = raw_queues.get(i).poll();
+ if (entry != null) {
+ return entry;
+ }
+ }
+
+ return null;
+ }
+ case SUSPENDED: {
+ // Only polling from high priority queue
+ SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+ return entry;
+ }
+ default:
+ log.error("Unexpected QueueState : ", state);
+ return null;
+ }
+ }
+
+ /**
+ * Check if this object has any messages in the queues to be sent
+ *
+ * @return True if there are some messages to be sent.
+ */
+ boolean hasMessageToSend() {
+ switch (state) {
+ case READY:
+ for (Queue<SwitchQueueEntry> queue : raw_queues) {
+ if (!queue.isEmpty()) {
+ return true;
+ }
+ }
+ break;
+ case SUSPENDED:
+ // Only checking high priority queue
+ return (!getQueue(MsgPriority.HIGH).isEmpty());
+ default:
+ log.error("Unexpected QueueState : ", state);
+ return false;
+ }
+
+ return false;
+ }
+
+ Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
+ return raw_queues.get(priority.ordinal());
+ }
+ }
+
+ /**
+ * BarrierInfo holds information to specify barrier message sent to switch.
+ *
+ * @author Naoki
+ */
+ private static class BarrierInfo {
+ final long dpid;
+ final int xid;
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+ return new BarrierInfo(sw.getId(), req.getXid());
+ }
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+ return new BarrierInfo(sw.getId(), rpy.getXid());
+ }
+
+ private BarrierInfo(long dpid, int xid) {
+ this.dpid = dpid;
+ this.xid = xid;
+ }
+
+ // Auto generated code by Eclipse
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (dpid ^ (dpid >>> 32));
+ result = prime * result + xid;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ BarrierInfo other = (BarrierInfo) obj;
+ return (this.dpid == other.dpid) && (this.xid == other.xid);
+ }
+
+
+ }
+
+ private OFMessageDamper messageDamper = null;
+ private IThreadPoolService threadPool = null;
+
+ private FloodlightContext context = null;
+ private BasicFactory factory = null;
+
+ // Map of threads versus dpid
+ private Map<Long, FlowPusherThread> threadMap = null;
+ // Map from (DPID and transaction ID) to Future objects.
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+ = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
+
+ private int number_thread;
+
+ /**
+ * Main thread that reads messages from queues and sends them to switches.
+ *
+ * @author Naoki Shiota
+ */
+ private class FlowPusherThread extends Thread {
+ private Map<IOFSwitch, SwitchQueue> assignedQueues
+ = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
+
+ final Lock queuingLock = new ReentrantLock();
+ final Condition messagePushed = queuingLock.newCondition();
+
+ @Override
+ public void run() {
+ this.setName("FlowPusherThread " + this.getId());
+ while (true) {
+ while (!queuesHasMessageToSend()) {
+ queuingLock.lock();
+
+ try {
+ // wait for message pushed to queue
+ messagePushed.await();
+ } catch (InterruptedException e) {
+ // Interrupted to be shut down (not an error)
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ } finally {
+ queuingLock.unlock();
+ }
+ }
+
+ // for safety of concurrent access, copy set of key objects
+ Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
+ for (IOFSwitch sw : assignedQueues.keySet()) {
+ keys.add(sw);
+ }
+
+ for (IOFSwitch sw : keys) {
+ SwitchQueue queue = assignedQueues.get(sw);
+
+ if (sw == null || queue == null) {
+ continue;
+ }
+
+ synchronized (queue) {
+ processQueue(sw, queue, MAX_MESSAGE_SEND);
+ if (queue.toBeDeleted && !queue.hasMessageToSend()) {
+ // remove queue if flagged to be.
+ assignedQueues.remove(sw);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Read messages from queue and send them to the switch.
+ * If number of messages excess the limit, stop sending messages.
+ *
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param max_msg Limitation of number of messages to be sent. If set to 0,
+ * all messages in queue will be sent.
+ */
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.currentTimeMillis();
+ long size = 0;
+
+ if (queue.isSendable(current_time)) {
+ int i = 0;
+ while (queue.hasMessageToSend()) {
+ // Number of messages excess the limit
+ if (0 < max_msg && max_msg <= i) {
+ break;
+ }
+ ++i;
+
+ SwitchQueueEntry queueEntry;
+ synchronized (queue) {
+ queueEntry = queue.poll();
+ }
+
+ OFMessage msg = queueEntry.getOFMessage();
+ try {
+ messageDamper.write(sw, msg, context);
+ if (log.isTraceEnabled()) {
+ log.trace("Pusher sends message : {}", msg);
+ }
+ size += msg.getLength();
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error("Exception in sending message ({}) : {}", msg, e);
+ }
+ }
+
+ sw.flush();
+ queue.logSentData(current_time, size);
+ }
+ }
+
+ private boolean queuesHasMessageToSend() {
+ for (SwitchQueue queue : assignedQueues.values()) {
+ if (queue.hasMessageToSend()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void notifyMessagePushed() {
+ queuingLock.lock();
+ try {
+ messagePushed.signal();
+ } finally {
+ queuingLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Initialize object with one thread.
+ */
+ public FlowPusher() {
+ number_thread = DEFAULT_NUMBER_THREAD;
+ }
+
+ /**
+ * Initialize object with threads of given number.
+ *
+ * @param number_thread Number of threads to handle messages.
+ */
+ public FlowPusher(int number_thread) {
+ if (number_thread > 0) {
+ this.number_thread = number_thread;
+ } else {
+ this.number_thread = DEFAULT_NUMBER_THREAD;
+ }
+ }
+
+ /**
+ * Set parameters needed for sending messages.
+ *
+ * @param context FloodlightContext used for sending messages.
+ * If null, FlowPusher uses default context.
+ * @param modContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
+ * @param factory Factory object to create OFMessage objects.
+ * @param damper Message damper used for sending messages.
+ * If null, FlowPusher creates its own damper object.
+ */
+ public void init(FloodlightContext context,
+ FloodlightModuleContext modContext,
+ BasicFactory factory,
+ OFMessageDamper damper) {
+ this.context = context;
+ this.factory = factory;
+ this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+ IFloodlightProviderService flservice
+ = modContext.getServiceImpl(IFloodlightProviderService.class);
+ flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
+
+ if (damper != null) {
+ messageDamper = damper;
+ } else {
+ // use default values
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
+ }
+
+ /**
+ * Begin processing queue.
+ */
+ public void start() {
+ if (factory == null) {
+ log.error("FlowPusher not yet initialized.");
+ return;
+ }
+
+ threadMap = new HashMap<Long, FlowPusherThread>();
+ for (long i = 0; i < number_thread; ++i) {
+ FlowPusherThread thread = new FlowPusherThread();
+
+ threadMap.put(i, thread);
+ thread.start();
+ }
+ }
+
+ @Override
+ public boolean suspend(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ // create queue in case suspend is called before first message addition
+ queue = createQueueImpl(sw);
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.READY) {
+ queue.state = QueueState.SUSPENDED;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public boolean resume(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ log.error("No queue is attached to DPID : {}", sw.getId());
+ return false;
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.SUSPENDED) {
+ queue.state = QueueState.READY;
+
+ // Free the latch if queue has any messages
+ FlowPusherThread thread = getProcessingThread(sw);
+ if (queue.hasMessageToSend()) {
+ thread.notifyMessagePushed();
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public QueueState getState(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ return QueueState.UNKNOWN;
+ }
+
+ return queue.state;
+ }
+
+ /**
+ * Stop processing queue and exit thread.
+ */
+ public void stop() {
+ if (threadMap == null) {
+ return;
+ }
+
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
+ }
+ }
+
+ @Override
+ public void setRate(IOFSwitch sw, long rate) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ queue = createQueueImpl(sw);
+ }
+
+ if (rate > 0) {
+ log.debug("rate for {} is set to {}", sw.getId(), rate);
+ synchronized (queue) {
+ queue.max_rate = rate;
+ }
+ }
+ }
+
+ @Override
+ public boolean createQueue(IOFSwitch sw) {
+ SwitchQueue queue = createQueueImpl(sw);
+
+ return (queue != null);
+ }
+
+ protected SwitchQueue createQueueImpl(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue != null) {
+ return queue;
+ }
+
+ FlowPusherThread proc = getProcessingThread(sw);
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ proc.assignedQueues.put(sw, queue);
+
+ return queue;
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw) {
+ return deleteQueue(sw, false);
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+ FlowPusherThread proc = getProcessingThread(sw);
+
+ if (forceStop) {
+ SwitchQueue queue = proc.assignedQueues.remove(sw);
+ if (queue == null) {
+ return false;
+ }
+ return true;
+ } else {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return false;
+ }
+ synchronized (queue) {
+ queue.toBeDeleted = true;
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public boolean add(IOFSwitch sw, OFMessage msg) {
+ return add(sw, msg, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+ return addMessageImpl(sw, msg, priority);
+ }
+
+ @Override
+ public void pushFlowEntries(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+ pushFlowEntries(entries, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public void pushFlowEntries(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+
+ for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+ add(entry.first, entry.second, priority);
+ }
+ }
+
+ @Override
+ public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+ pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+ Collection<Pair<IOFSwitch, FlowEntry>> entries =
+ new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+
+ entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
+ pushFlowEntries(entries, priority);
+ }
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+ //
+ // Create the OpenFlow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntry.flowEntryId().value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug(
+ "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntry.flowEntryId(),
+ flowEntry.flowEntryUserState());
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+ FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+ // Match the Incoming Port
+ Port matchInPort = flowEntryMatch.inPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort.value());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ MACAddress matchSrcMac = flowEntryMatch.srcMac();
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ MACAddress matchDstMac = flowEntryMatch.dstMac();
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryMatch.vlanId();
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards()
+ & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+ }
+
+ // Natch the Destination IPv4 Network prefix
+ IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryMatch.ipProto();
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryMatch.ipToS();
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+ //
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action
+ .actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+ .actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action
+ .actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+ .actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action
+ .actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+ .actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+ .actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+ .value(), (short) 0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+ actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+ actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+ actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+ actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+ actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+ actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+ actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+ actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+ actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+ .value(), actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout((short) flowEntry.idleTimeout())
+ .setHardTimeout((short) flowEntry.hardTimeout())
+ .setPriority((short) flowEntry.priority())
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+ .setCommand(flowModCommand).setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+ || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
+ // permanent.
+ //
+ if ((flowEntry.idleTimeout() != 0) ||
+ (flowEntry.hardTimeout() != 0)) {
+ fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+ , flowEntry.flowEntryUserState()
+ , sw.getStringId()
+ , flowEntry.flowEntryId()
+ , matchSrcMac
+ , matchDstMac
+ , matchInPort
+ , actionOutputPort
+ );
+ }
+
+ return addMessageImpl(sw, fm, priority);
+ }
+
+ /**
+ * Add message to queue
+ *
+ * @param sw
+ * @param msg
+ * @param flowEntryId
+ * @return
+ */
+ protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+ FlowPusherThread thread = getProcessingThread(sw);
+
+ SwitchQueue queue = getQueue(sw);
+
+ // create queue at first addition of message
+ if (queue == null) {
+ queue = createQueueImpl(sw);
+ }
+
+ SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+
+ synchronized (queue) {
+ queue.add(entry, priority);
+ if (log.isTraceEnabled()) {
+ log.trace("Message is pushed : {}", entry.getOFMessage());
+ }
+ }
+
+ thread.notifyMessagePushed();
+
+ return true;
+ }
+
+ @Override
+ public OFBarrierReply barrier(IOFSwitch sw) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ if (future == null) {
+ return null;
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("InterruptedException: {}", e);
+ return null;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ log.error("ExecutionException: {}", e);
+ return null;
+ }
+ }
+
+ @Override
+ public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ // TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
+
+ OFBarrierRequest msg = createBarrierRequest(sw);
+
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ barrierFutures.put(BarrierInfo.create(sw, msg), future);
+
+ addMessageImpl(sw, msg, MsgPriority.NORMAL);
+
+ return future;
+ }
+
+ protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+
+ return msg;
+ }
+
+ /**
+ * Get a queue attached to a switch.
+ *
+ * @param sw Switch object
+ * @return Queue object
+ */
+ protected SwitchQueue getQueue(IOFSwitch sw) {
+ if (sw == null) {
+ return null;
+ }
+
+ FlowPusherThread th = getProcessingThread(sw);
+ if (th == null) {
+ return null;
+ }
+
+ return th.assignedQueues.get(sw);
+ }
+
+ /**
+ * Get a hash value correspondent to a switch.
+ *
+ * @param sw Switch object
+ * @return Hash value
+ */
+ protected long getHash(IOFSwitch sw) {
+ // This code assumes DPID is sequentially assigned.
+ // TODO consider equalization algorithm
+ return sw.getId() % number_thread;
+ }
+
+ /**
+ * Get a Thread object which processes the queue attached to a switch.
+ *
+ * @param sw Switch object
+ * @return Thread object
+ */
+ protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
+ long hash = getHash(sw);
+
+ return threadMap.get(hash);
+ }
+
+ @Override
+ public String getName() {
+ return "flowpusher";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ if (log.isTraceEnabled()) {
+ log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+ }
+
+ if (msg.getType() != OFType.BARRIER_REPLY) {
+ log.error("Unexpected reply message : {}", msg.getType());
+ return Command.CONTINUE;
+ }
+
+ OFBarrierReply reply = (OFBarrierReply) msg;
+ BarrierInfo info = BarrierInfo.create(sw, reply);
+
+ // Deliver future if exists
+ OFBarrierReplyFuture future = barrierFutures.get(info);
+ if (future != null) {
+ future.deliverFuture(sw, msg);
+ barrierFutures.remove(info);
+ }
+
+ return Command.CONTINUE;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
new file mode 100644
index 0000000..52eeff2
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
@@ -0,0 +1,387 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFStatisticsRequest;
+import org.openflow.protocol.statistics.OFFlowStatisticsReply;
+import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
+import org.openflow.protocol.statistics.OFStatistics;
+import org.openflow.protocol.statistics.OFStatisticsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.core.flowprogrammer.IFlowPusherService.MsgPriority;
+import net.onrc.onos.core.util.Dpid;
+import net.onrc.onos.core.util.FlowEntry;
+import net.onrc.onos.core.util.FlowEntryId;
+
+/**
+ * FlowSynchronizer is an implementation of FlowSyncService.
+ * In addition to IFlowSyncService, FlowSynchronizer periodically reads flow
+ * tables from switches and compare them with GraphDB to drop unnecessary
+ * flows and/or to install missing flows.
+ *
+ * @author Brian
+ */
+public class FlowSynchronizer implements IFlowSyncService {
+
+ private static Logger log = LoggerFactory.getLogger(FlowSynchronizer.class);
+
+ // TODO: fix when FlowSynchronizer is refactored
+ // private DBOperation dbHandler;
+ protected IFlowPusherService pusher;
+ private Map<IOFSwitch, FutureTask<SyncResult>> switchThreads;
+
+ public FlowSynchronizer() {
+ // TODO: fix when FlowSynchronizer is refactored
+ // dbHandler = GraphDBManager.getDBOperation();
+ switchThreads = new HashMap<IOFSwitch, FutureTask<SyncResult>>();
+ }
+
+ @Override
+ public Future<SyncResult> synchronize(IOFSwitch sw) {
+ Synchronizer sync = new Synchronizer(sw);
+ FutureTask<SyncResult> task = new FutureTask<SyncResult>(sync);
+ switchThreads.put(sw, task);
+ task.run();
+ return task;
+ }
+
+ @Override
+ public void interrupt(IOFSwitch sw) {
+ FutureTask<SyncResult> t = switchThreads.remove(sw);
+ if (t != null) {
+ t.cancel(true);
+ }
+ }
+
+ /**
+ * Initialize Synchronizer.
+ *
+ * @param pusherService FlowPusherService used for sending messages.
+ */
+ public void init(IFlowPusherService pusherService) {
+ pusher = pusherService;
+ }
+
+ /**
+ * Synchronizer represents main thread of synchronization.
+ *
+ * @author Brian
+ */
+ protected class Synchronizer implements Callable<SyncResult> {
+ IOFSwitch sw;
+ // TODO: fix when FlowSynchronizer is refactored
+ // ISwitchObject swObj;
+
+ public Synchronizer(IOFSwitch sw) {
+ this.sw = sw;
+ Dpid dpid = new Dpid(sw.getId());
+ // TODO: fix when FlowSynchronizer is refactored
+ // this.swObj = dbHandler.searchSwitch(dpid.toString());
+ }
+
+ double graphIDTime, switchTime, compareTime, graphEntryTime, extractTime, pushTime, totalTime;
+
+ @Override
+ public SyncResult call() {
+ pusher.suspend(sw);
+ try {
+ long start = System.nanoTime();
+ Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
+ long step1 = System.nanoTime();
+ Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
+ if (switchEntries == null) {
+ log.debug("getFlowEntriesFromSwitch() failed");
+ return null;
+ }
+ long step2 = System.nanoTime();
+ SyncResult result = compare(graphEntries, switchEntries);
+ long step3 = System.nanoTime();
+ graphIDTime = (step1 - start);
+ switchTime = (step2 - step1);
+ compareTime = (step3 - step2);
+ totalTime = (step3 - start);
+ outputTime();
+
+ return result;
+ } finally {
+ pusher.resume(sw);
+ }
+ }
+
+ private void outputTime() {
+ double div = Math.pow(10, 6); //convert nanoseconds to ms
+ graphIDTime /= div;
+ switchTime /= div;
+ compareTime = (compareTime - graphEntryTime - extractTime - pushTime) / div;
+ graphEntryTime /= div;
+ extractTime /= div;
+ pushTime /= div;
+ totalTime /= div;
+ log.debug("Sync time (ms):{},{},{},{},{},{},{}"
+ , graphIDTime
+ , switchTime
+ , compareTime
+ , graphEntryTime
+ , extractTime
+ , pushTime
+ , totalTime);
+ }
+
+ /**
+ * Compare flows entries in GraphDB and switch to pick up necessary
+ * messages.
+ * After picking up, picked messages are added to FlowPusher.
+ *
+ * @param graphEntries Flow entries in GraphDB.
+ * @param switchEntries Flow entries in switch.
+ */
+ private SyncResult compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
+ int added = 0, removed = 0, skipped = 0;
+ for (FlowEntryWrapper entry : switchEntries) {
+ if (graphEntries.contains(entry)) {
+ graphEntries.remove(entry);
+ skipped++;
+ } else {
+ // remove flow entry from the switch
+ entry.removeFromSwitch(sw);
+ removed++;
+ }
+ }
+ for (FlowEntryWrapper entry : graphEntries) {
+ // add flow entry to switch
+ entry.addToSwitch(sw);
+ graphEntryTime += entry.dbTime;
+ extractTime += entry.extractTime;
+ pushTime += entry.pushTime;
+ added++;
+ }
+ log.debug("Flow entries added {}, " +
+ "Flow entries removed {}, " +
+ "Flow entries skipped {}"
+ , added
+ , removed
+ , skipped);
+
+ return new SyncResult(added, removed, skipped);
+ }
+
+ /**
+ * Read GraphDB to get FlowEntries associated with a switch.
+ *
+ * @return set of FlowEntries
+ */
+ private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
+ Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
+
+ // TODO: fix when FlowSynchronizer is refactored
+ /*
+ for(IFlowEntry entry : swObj.getFlowEntries()) {
+ FlowEntryWrapper fe = new FlowEntryWrapper(entry);
+ entries.add(fe);
+ }
+ */
+ return entries;
+ }
+
+ /**
+ * Read flow table from switch and derive FlowEntries from table.
+ *
+ * @return set of FlowEntries
+ */
+ private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
+
+ int lengthU = 0;
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+
+ OFFlowStatisticsRequest stat = new OFFlowStatisticsRequest();
+ stat.setOutPort((short) 0xffff); //TODO: OFPort.OFPP_NONE
+ stat.setTableId((byte) 0xff); // TODO: fix this with enum (ALL TABLES)
+ stat.setMatch(match);
+ List<OFStatistics> stats = new ArrayList<OFStatistics>();
+ stats.add(stat);
+ lengthU += stat.getLength();
+
+ OFStatisticsRequest req = new OFStatisticsRequest();
+ req.setStatisticType(OFStatisticsType.FLOW);
+ req.setStatistics(stats);
+ lengthU += req.getLengthU();
+ req.setLengthU(lengthU);
+
+ List<OFStatistics> entries = null;
+ try {
+ Future<List<OFStatistics>> dfuture = sw.getStatistics(req);
+ entries = dfuture.get();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ } catch (ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ }
+
+ Set<FlowEntryWrapper> results = new HashSet<FlowEntryWrapper>();
+ for (OFStatistics result : entries) {
+ OFFlowStatisticsReply entry = (OFFlowStatisticsReply) result;
+ FlowEntryWrapper fe = new FlowEntryWrapper(entry);
+ results.add(fe);
+ }
+ return results;
+ }
+
+ }
+
+ /**
+ * FlowEntryWrapper represents abstract FlowEntry which is embodied
+ * by FlowEntryId (from GraphDB) or OFFlowStatisticsReply (from switch).
+ *
+ * @author Brian
+ */
+ class FlowEntryWrapper {
+ FlowEntryId flowEntryId;
+ // TODO: fix when FlowSynchronizer is refactored
+ // IFlowEntry iFlowEntry;
+ OFFlowStatisticsReply statisticsReply;
+
+
+ // TODO: fix when FlowSynchronizer is refactored
+ /*
+ public FlowEntryWrapper(IFlowEntry entry) {
+ flowEntryId = new FlowEntryId(entry.getFlowEntryId());
+ iFlowEntry = entry;
+ }
+ */
+
+ public FlowEntryWrapper(OFFlowStatisticsReply entry) {
+ flowEntryId = new FlowEntryId(entry.getCookie());
+ statisticsReply = entry;
+ }
+
+ /**
+ * Install this FlowEntry to a switch via FlowPusher.
+ *
+ * @param sw Switch to which flow will be installed.
+ */
+ double dbTime, extractTime, pushTime;
+
+ public void addToSwitch(IOFSwitch sw) {
+ if (statisticsReply != null) {
+ log.error("Error adding existing flow entry {} to sw {}",
+ statisticsReply.getCookie(), sw.getId());
+ return;
+ }
+
+ double startDB = System.nanoTime();
+ // Get the Flow Entry state from the Network Graph
+ // TODO: fix when FlowSynchronizer is refactored
+ /*
+ if (iFlowEntry == null) {
+ try {
+ // TODO: fix when FlowSynchronizer is refactored
+ iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ log.error("Error finding flow entry {} in Network Graph",
+ flowEntryId);
+ return;
+ }
+ }
+ */
+ dbTime = System.nanoTime() - startDB;
+
+ //
+ // TODO: The old FlowDatabaseOperation class is gone, so the code
+ //
+ /*
+ double startExtract = System.nanoTime();
+ FlowEntry flowEntry =
+ FlowDatabaseOperation.extractFlowEntry(iFlowEntry);
+ if (flowEntry == null) {
+ log.error("Cannot add flow entry {} to sw {} : flow entry cannot be extracted",
+ flowEntryId, sw.getId());
+ return;
+ }
+ extractTime = System.nanoTime() - startExtract;
+
+ double startPush = System.nanoTime();
+ pusher.pushFlowEntry(sw, flowEntry, MsgPriority.HIGH);
+ pushTime = System.nanoTime() - startPush;
+ */
+ }
+
+ /**
+ * Remove this FlowEntry from a switch via FlowPusher.
+ *
+ * @param sw Switch from which flow will be removed.
+ */
+ public void removeFromSwitch(IOFSwitch sw) {
+ if (statisticsReply == null) {
+ log.error("Error removing non-existent flow entry {} from sw {}",
+ flowEntryId, sw.getId());
+ return;
+ }
+
+ // Convert Statistics Reply to Flow Mod, then write it
+ OFFlowMod fm = new OFFlowMod();
+ fm.setCookie(statisticsReply.getCookie());
+ fm.setCommand(OFFlowMod.OFPFC_DELETE_STRICT);
+ fm.setLengthU(OFFlowMod.MINIMUM_LENGTH);
+ fm.setMatch(statisticsReply.getMatch());
+ fm.setPriority(statisticsReply.getPriority());
+ fm.setOutPort(OFPort.OFPP_NONE);
+
+ pusher.add(sw, fm, MsgPriority.HIGH);
+ }
+
+ /**
+ * Return the hash code of the Flow Entry ID
+ */
+ @Override
+ public int hashCode() {
+ return flowEntryId.hashCode();
+ }
+
+ /**
+ * Returns true of the object is another Flow Entry ID with
+ * the same value; otherwise, returns false.
+ *
+ * @param Object to compare
+ * @return true if the object has the same Flow Entry ID.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj.getClass() == this.getClass()) {
+ FlowEntryWrapper entry = (FlowEntryWrapper) obj;
+ // TODO: we need to actually compare the match + actions
+ return this.flowEntryId.equals(entry.flowEntryId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return flowEntryId.toString();
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
new file mode 100644
index 0000000..c0c7ae7
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
@@ -0,0 +1,187 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.util.Collection;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.core.util.FlowEntry;
+import net.onrc.onos.core.util.Pair;
+
+/**
+ * FlowPusherService is a service to send message to switches in proper rate.
+ * Conceptually a queue is attached to each switch, and FlowPusherService
+ * read a message from queue and send it to switch in order.
+ * To guarantee message has been installed, FlowPusherService can add barrier
+ * message to queue and can notify when barrier message is sent to switch.
+ *
+ * @author Naoki Shiota
+ */
+public interface IFlowPusherService extends IFloodlightService {
+ public static enum MsgPriority {
+ HIGH, // High priority: e.g. flow synchronization
+ NORMAL, // Normal priority
+// LOW, // Low priority, not needed for now
+ }
+
+ public static enum QueueState {
+ READY, // Queues with all priority are at work
+ SUSPENDED, // Only prior queue is at work
+ UNKNOWN
+ }
+
+ /**
+ * Create a queue correspondent to the switch.
+ *
+ * @param sw Switch to which new queue is attached.
+ * @return true if new queue is successfully created.
+ */
+ boolean createQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * Messages remains in queue will be all sent before queue is deleted.
+ *
+ * @param sw Switch of which queue is deleted.
+ * @return true if queue is successfully deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw);
+
+ /**
+ * Delete a queue correspondent to the switch.
+ * By setting force flag on, queue will be deleted immediately.
+ *
+ * @param sw Switch of which queue is deleted.
+ * @param forceStop If this flag is set to true, queue will be deleted
+ * immediately regardless of any messages in the queue.
+ * If false, all messages will be sent to switch and queue will
+ * be deleted after that.
+ * @return true if queue is successfully deleted or flagged to be deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw, boolean forceStop);
+
+ /**
+ * Add a message to the queue of the switch with normal priority.
+ * <p/>
+ * Note: Notification is NOT delivered for the pushed message.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @return true if message is successfully added to a queue.
+ */
+ boolean add(IOFSwitch sw, OFMessage msg);
+
+ /**
+ * Add a message to the queue of the switch with specific priority.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @param priority Sending priority of the message.
+ * @return true if message is successfully added to a queue.
+ */
+ boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority);
+
+ /**
+ * Push a collection of Flow Entries to the corresponding switches
+ * with normal priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * to push.
+ */
+ void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries);
+
+ /**
+ * Push a collection of Flow Entries to the corresponding switches
+ * with specific priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * to push.
+ * @param priority Sending priority of flow entries.
+ */
+ void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries,
+ MsgPriority priority);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the
+ * switch with normal priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the
+ * switch with specific priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry,
+ MsgPriority priority);
+
+ /**
+ * Set sending rate to a switch.
+ *
+ * @param sw Switch.
+ * @param rate Rate in bytes/ms.
+ */
+ public void setRate(IOFSwitch sw, long rate);
+
+ /**
+ * Add BARRIER message to queue and wait for reply.
+ *
+ * @param sw Switch to which barrier message is pushed.
+ * @return BARRIER_REPLY message sent from switch.
+ */
+ OFBarrierReply barrier(IOFSwitch sw);
+
+ /**
+ * Add BARRIER message to queue asynchronously.
+ *
+ * @param sw Switch to which barrier message is pushed.
+ * @return Future object of BARRIER_REPLY message which will be sent from switch.
+ */
+ OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
+
+ /**
+ * Suspend pushing message to a switch.
+ *
+ * @param sw Switch to be suspended pushing message.
+ * @return true if success
+ */
+ boolean suspend(IOFSwitch sw);
+
+ /**
+ * Resume pushing message to a switch.
+ *
+ * @param sw Switch to be resumed pushing message.
+ * @return true if success
+ */
+ boolean resume(IOFSwitch sw);
+
+ /**
+ * Get state of queue attached to a switch.
+ *
+ * @param sw Switch to be checked.
+ * @return State of queue.
+ */
+ QueueState getState(IOFSwitch sw);
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowSyncService.java b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowSyncService.java
new file mode 100644
index 0000000..f2d5989
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowSyncService.java
@@ -0,0 +1,30 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.util.concurrent.Future;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+/**
+ * FlowSyncService is a service to synchronize GraphDB and switch's flow table.
+ * FlowSyncService offers APIs to trigger and interrupt synchronization explicitly.
+ *
+ * @author Brian
+ */
+public interface IFlowSyncService extends IFloodlightService {
+ public Future<SyncResult> synchronize(IOFSwitch sw);
+
+ public void interrupt(IOFSwitch sw);
+
+ public class SyncResult {
+ public final int flowAdded;
+ public final int flowRemoved;
+ public final int flowSkipped;
+
+ public SyncResult(int added, int removed, int skipped) {
+ flowAdded = added;
+ flowRemoved = removed;
+ flowSkipped = skipped;
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/core/flowprogrammer/OFBarrierReplyFuture.java
new file mode 100644
index 0000000..5815137
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/OFBarrierReplyFuture.java
@@ -0,0 +1,49 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+public class OFBarrierReplyFuture extends OFMessageFuture<OFBarrierReply> {
+
+ protected volatile boolean finished;
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId);
+ init();
+ }
+
+ public OFBarrierReplyFuture(IThreadPoolService tp,
+ IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+ super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
+ init();
+ }
+
+ private void init() {
+ this.finished = false;
+ this.result = null;
+ }
+
+ @Override
+ protected void handleReply(IOFSwitch sw, OFMessage msg) {
+ this.result = (OFBarrierReply) msg;
+ this.finished = true;
+ }
+
+ @Override
+ protected boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ protected void unRegister() {
+ super.unRegister();
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/DoInterruptResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/DoInterruptResource.java
new file mode 100644
index 0000000..f2b4631
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/DoInterruptResource.java
@@ -0,0 +1,44 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Interrupt synchronization to a switch.
+ * <p/>
+ * GET /wm/fprog/synchronizer/interrupt/{dpid}/json"
+ */
+public class DoInterruptResource extends SynchronizerResource {
+
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (!init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ synchronizer.interrupt(sw);
+
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/DoSynchronizeResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/DoSynchronizeResource.java
new file mode 100644
index 0000000..51d39d8
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/DoSynchronizeResource.java
@@ -0,0 +1,44 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Begin synchronization to a switch.
+ * <p/>
+ * GET /wm/fprog/synchronizer/sync/{dpid}/json"
+ */
+public class DoSynchronizeResource extends SynchronizerResource {
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (!init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ synchronizer.synchronize(sw);
+
+ return true;
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/FlowProgrammerWebRoutable.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/FlowProgrammerWebRoutable.java
new file mode 100644
index 0000000..ab3f641
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/FlowProgrammerWebRoutable.java
@@ -0,0 +1,28 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import org.restlet.Context;
+import org.restlet.Restlet;
+import org.restlet.routing.Router;
+
+import net.floodlightcontroller.restserver.RestletRoutable;
+
+public class FlowProgrammerWebRoutable implements RestletRoutable {
+
+ @Override
+ public Restlet getRestlet(Context context) {
+ Router router = new Router(context);
+ router.attach("/pusher/setrate/{dpid}/{rate}/json", SetPushRateResource.class);
+ router.attach("/pusher/suspend/{dpid}/json", SuspendPusherResource.class);
+ router.attach("/pusher/resume/{dpid}/json", ResumePusherResource.class);
+ router.attach("/pusher/barrier/{dpid}/json", SendBarrierResource.class);
+ router.attach("/synchronizer/sync/{dpid}/json", DoSynchronizeResource.class);
+ router.attach("/synchronizer/interrupt/{dpid}/json", DoInterruptResource.class);
+ return router;
+ }
+
+ @Override
+ public String basePath() {
+ return "/wm/onos/flowprogrammer";
+ }
+
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/PusherResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/PusherResource.java
new file mode 100644
index 0000000..2f7af29
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/PusherResource.java
@@ -0,0 +1,33 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
+
+public class PusherResource extends ServerResource {
+ protected final static Logger log = LoggerFactory.getLogger(PusherResource.class);
+
+ protected IFloodlightProviderService provider;
+ protected IFlowPusherService pusher;
+
+ protected boolean init() {
+ provider = (IFloodlightProviderService)
+ getContext().getAttributes().
+ get(IFloodlightProviderService.class.getCanonicalName());
+ if (provider == null) {
+ log.debug("ONOS FloodlightProvider not found");
+ return false;
+ }
+
+ pusher = (IFlowPusherService) getContext().getAttributes().
+ get(IFlowPusherService.class.getCanonicalName());
+ if (pusher == null) {
+ log.debug("ONOS FlowPusherService not found");
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/ResumePusherResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/ResumePusherResource.java
new file mode 100644
index 0000000..a9e6c81
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/ResumePusherResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Resume sending message to switch.
+ * <p/>
+ * GET /wm/fprog/pusher/resume/{dpid}/json"
+ */
+public class ResumePusherResource extends PusherResource {
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (!init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ return pusher.resume(sw);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
new file mode 100644
index 0000000..236cc85
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
@@ -0,0 +1,41 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Send barrier message to switch.
+ * <p/>
+ * GET /wm/fprog/pusher/barrier/{dpid}/json"
+ */
+public class SendBarrierResource extends PusherResource {
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public OFBarrierReply retrieve() {
+ if (!init()) {
+ return null;
+ }
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return null;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return null;
+ }
+
+ return pusher.barrier(sw);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SetPushRateResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SetPushRateResource.java
new file mode 100644
index 0000000..11bbd28
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SetPushRateResource.java
@@ -0,0 +1,47 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+
+/**
+ * FlowProgrammer REST API implementation: Set sending rate to the switch.
+ * <p/>
+ * GET /wm/fprog/pusher/setrate/{dpid}/{rate}/json"
+ */
+public class SetPushRateResource extends PusherResource {
+
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (!init()) {
+ return false;
+ }
+
+ long dpid;
+ long rate;
+
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ rate = Long.valueOf((String) getRequestAttributes().get("rate"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ pusher.setRate(sw, rate);
+
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SuspendPusherResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SuspendPusherResource.java
new file mode 100644
index 0000000..4d651d3
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SuspendPusherResource.java
@@ -0,0 +1,46 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.util.HexString;
+import org.restlet.resource.Get;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * FlowProgrammer REST API implementation: Suspend sending message to switch.
+ * <p/>
+ * GET /wm/fprog/pusher/suspend/{dpid}/json"
+ */
+public class SuspendPusherResource extends PusherResource {
+
+ protected final static Logger log = LoggerFactory.getLogger(SetPushRateResource.class);
+
+ /**
+ * Implement the API.
+ *
+ * @return true if succeeded, false if failed.
+ */
+ @Get("json")
+ public boolean retrieve() {
+ if (!init()) {
+ return false;
+ }
+
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ return pusher.suspend(sw);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SynchronizerResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SynchronizerResource.java
new file mode 100644
index 0000000..429d52f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SynchronizerResource.java
@@ -0,0 +1,35 @@
+package net.onrc.onos.core.flowprogrammer.web;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.onrc.onos.core.flowprogrammer.IFlowSyncService;
+
+import org.restlet.resource.ServerResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizerResource extends ServerResource {
+ protected final static Logger log = LoggerFactory.getLogger(SynchronizerResource.class);
+
+ protected IFloodlightProviderService provider;
+ protected IFlowSyncService synchronizer;
+
+ protected boolean init() {
+ provider = (IFloodlightProviderService)
+ getContext().getAttributes().
+ get(IFloodlightProviderService.class.getCanonicalName());
+ if (provider == null) {
+ log.debug("ONOS FloodlightProvider not found");
+ return false;
+ }
+
+ synchronizer = (IFlowSyncService)
+ getContext().getAttributes().
+ get(IFlowSyncService.class.getCanonicalName());
+ if (synchronizer == null) {
+ log.debug("ONOS FlowSyncService not found");
+ return false;
+ }
+
+ return true;
+ }
+}