Indentation cleanups - WHITESPACE ONLY
Change-Id: I58d5d4da6f703d07a2c31ecb109497b46be0d5d8
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index eb65669..8e5619d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -35,12 +35,12 @@
* immediately kicks synchronization to keep switch's flow table latest state.
* Adversely, when a switch is removed from network, FlowProgrammer immediately
* stops synchronization.
- * @author Brian
*
+ * @author Brian
*/
-public class FlowProgrammer implements IFloodlightModule,
- IOFMessageListener,
- IOFSwitchListener {
+public class FlowProgrammer implements IFloodlightModule,
+ IOFMessageListener,
+ IOFSwitchListener {
// flag to enable FlowSynchronizer
private static final boolean enableFlowSync = false;
protected static final Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
@@ -52,124 +52,124 @@
private static final int NUM_PUSHER_THREAD = 1;
protected FlowSynchronizer synchronizer;
-
+
public FlowProgrammer() {
- pusher = new FlowPusher(NUM_PUSHER_THREAD);
- if (enableFlowSync) {
- synchronizer = new FlowSynchronizer();
- }
+ pusher = new FlowPusher(NUM_PUSHER_THREAD);
+ if (enableFlowSync) {
+ synchronizer = new FlowSynchronizer();
+ }
}
-
+
@Override
public void init(FloodlightModuleContext context)
- throws FloodlightModuleException {
- floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- registryService = context.getServiceImpl(IControllerRegistryService.class);
- restApi = context.getServiceImpl(IRestApiService.class);
- pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
- if (enableFlowSync) {
- synchronizer.init(pusher);
- }
+ throws FloodlightModuleException {
+ floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
+ registryService = context.getServiceImpl(IControllerRegistryService.class);
+ restApi = context.getServiceImpl(IRestApiService.class);
+ pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
+ if (enableFlowSync) {
+ synchronizer.init(pusher);
+ }
}
@Override
public void startUp(FloodlightModuleContext context) {
- restApi.addRestletRoutable(new FlowProgrammerWebRoutable());
- pusher.start();
- floodlightProvider.addOFMessageListener(OFType.FLOW_REMOVED, this);
- floodlightProvider.addOFSwitchListener(this);
+ restApi.addRestletRoutable(new FlowProgrammerWebRoutable());
+ pusher.start();
+ floodlightProvider.addOFMessageListener(OFType.FLOW_REMOVED, this);
+ floodlightProvider.addOFSwitchListener(this);
}
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
- Collection<Class<? extends IFloodlightService>> l =
- new ArrayList<Class<? extends IFloodlightService>>();
- l.add(IFlowPusherService.class);
- if (enableFlowSync) {
- l.add(IFlowSyncService.class);
- }
- return l;
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFlowPusherService.class);
+ if (enableFlowSync) {
+ l.add(IFlowSyncService.class);
+ }
+ return l;
}
@Override
public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
- Map<Class<? extends IFloodlightService>,
- IFloodlightService> m =
- new HashMap<Class<? extends IFloodlightService>,
- IFloodlightService>();
- m.put(IFlowPusherService.class, pusher);
- if (enableFlowSync) {
- m.put(IFlowSyncService.class, synchronizer);
- }
- return m;
+ Map<Class<? extends IFloodlightService>,
+ IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>,
+ IFloodlightService>();
+ m.put(IFlowPusherService.class, pusher);
+ if (enableFlowSync) {
+ m.put(IFlowSyncService.class, synchronizer);
+ }
+ return m;
}
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
- Collection<Class<? extends IFloodlightService>> l =
- new ArrayList<Class<? extends IFloodlightService>>();
- l.add(IFloodlightProviderService.class);
- l.add(IRestApiService.class);
- return l;
+ Collection<Class<? extends IFloodlightService>> l =
+ new ArrayList<Class<? extends IFloodlightService>>();
+ l.add(IFloodlightProviderService.class);
+ l.add(IRestApiService.class);
+ return l;
}
@Override
public String getName() {
- // TODO Auto-generated method stub
- return "FlowProgrammer";
+ // TODO Auto-generated method stub
+ return "FlowProgrammer";
}
@Override
public boolean isCallbackOrderingPrereq(OFType type, String name) {
- // TODO Auto-generated method stub
- return false;
+ // TODO Auto-generated method stub
+ return false;
}
@Override
public boolean isCallbackOrderingPostreq(OFType type, String name) {
- // TODO Auto-generated method stub
- return false;
+ // TODO Auto-generated method stub
+ return false;
}
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- switch (msg.getType()) {
- case FLOW_REMOVED:
- OFFlowRemoved flowMsg = (OFFlowRemoved) msg;
- FlowEntryId id = new FlowEntryId(flowMsg.getCookie());
- log.debug("Got flow entry removed from {}: {}",sw.getId(), id);
- // TODO: Inform the Forwarding module that a flow has expired
- break;
- default:
- break;
- }
+ switch (msg.getType()) {
+ case FLOW_REMOVED:
+ OFFlowRemoved flowMsg = (OFFlowRemoved) msg;
+ FlowEntryId id = new FlowEntryId(flowMsg.getCookie());
+ log.debug("Got flow entry removed from {}: {}", sw.getId(), id);
+ // TODO: Inform the Forwarding module that a flow has expired
+ break;
+ default:
+ break;
+ }
- return Command.CONTINUE;
+ return Command.CONTINUE;
}
@Override
public void addedSwitch(IOFSwitch sw) {
- log.debug("Switch added: {}", sw.getId());
+ log.debug("Switch added: {}", sw.getId());
- if (enableFlowSync) {
- if (registryService.hasControl(sw.getId())) {
- synchronizer.synchronize(sw);
- }
- }
+ if (enableFlowSync) {
+ if (registryService.hasControl(sw.getId())) {
+ synchronizer.synchronize(sw);
+ }
+ }
}
@Override
public void removedSwitch(IOFSwitch sw) {
- log.debug("Switch removed: {}", sw.getId());
-
- if (enableFlowSync) {
- synchronizer.interrupt(sw);
- }
+ log.debug("Switch removed: {}", sw.getId());
+
+ if (enableFlowSync) {
+ synchronizer.interrupt(sw);
+ }
}
@Override
public void switchPortChanged(Long switchId) {
- // TODO Auto-generated method stub
+ // TODO Auto-generated method stub
}
-
+
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index b52bafc..1c0802a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -47,13 +47,13 @@
* FlowPusher is a implementation of FlowPusherService.
* FlowPusher assigns one message queue instance for each one switch.
* Number of message processing threads is configurable by constructor, and
- * one thread can handle multiple message queues. Each queue will be assigned to
+ * one thread can handle multiple message queues. Each queue will be assigned to
* a thread according to hash function defined by getHash().
* Each processing thread reads messages from queues and sends it to switches
* in round-robin. Processing thread also calculates rate of sending to suppress
* excessive message sending.
- * @author Naoki Shiota
*
+ * @author Naoki Shiota
*/
public class FlowPusher implements IFlowPusherService, IOFMessageListener {
private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
@@ -63,995 +63,1006 @@
// The local copy should go away!
//
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
- protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
-
+ protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
- private static class SwitchQueueEntry {
- OFMessage msg;
-
- public SwitchQueueEntry(OFMessage msg) {
- this.msg = msg;
- }
-
- public OFMessage getOFMessage() {
- return msg;
- }
- }
-
- /**
- * SwitchQueue represents message queue attached to a switch.
- * This consists of queue itself and variables used for limiting sending rate.
- * @author Naoki Shiota
- *
- */
- private class SwitchQueue {
- List<Queue<SwitchQueueEntry>> raw_queues;
- QueueState state;
-
- // Max rate of sending message (bytes/ms). 0 implies no limitation.
- long max_rate = 0; // 0 indicates no limitation
- long last_sent_time = 0;
- long last_sent_size = 0;
-
- // "To be deleted" flag
- boolean toBeDeleted = false;
-
- SwitchQueue() {
- raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
- MsgPriority.values().length);
- for (int i = 0; i < MsgPriority.values().length; ++i) {
- raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
- }
-
- state = QueueState.READY;
- }
-
- /**
- * Check if sending rate is within the rate
- * @param current Current time
- * @return true if within the rate
- */
- boolean isSendable(long current) {
- if (max_rate == 0) {
- // no limitation
- return true;
- }
-
- if (current == last_sent_time) {
- return false;
- }
-
- // Check if sufficient time (from aspect of rate) elapsed or not.
- long rate = last_sent_size / (current - last_sent_time);
- return (rate < max_rate);
- }
-
- /**
- * Log time and size of last sent data.
- * @param current Time to be sent.
- * @param size Size of sent data (in bytes).
- */
- void logSentData(long current, long size) {
- last_sent_time = current;
- last_sent_size = size;
- }
-
- boolean add(SwitchQueueEntry entry, MsgPriority priority) {
- Queue<SwitchQueueEntry> queue = getQueue(priority);
- if (queue == null) {
- log.error("Unexpected priority : ", priority);
- return false;
- }
- return queue.add(entry);
- }
-
- /**
- * Poll single appropriate entry object according to QueueState.
- * @return Entry object.
- */
- SwitchQueueEntry poll() {
- switch (state) {
- case READY:
- {
- for (int i = 0; i < raw_queues.size(); ++i) {
- SwitchQueueEntry entry = raw_queues.get(i).poll();
- if (entry != null) {
- return entry;
- }
- }
-
- return null;
- }
- case SUSPENDED:
- {
- // Only polling from high priority queue
- SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
- return entry;
- }
- default:
- log.error("Unexpected QueueState : ", state);
- return null;
- }
- }
-
- /**
- * Check if this object has any messages in the queues to be sent
- * @return True if there are some messages to be sent.
- */
- boolean hasMessageToSend() {
- switch (state) {
- case READY:
- for (Queue<SwitchQueueEntry> queue : raw_queues) {
- if (! queue.isEmpty()) {
- return true;
- }
- }
- break;
- case SUSPENDED:
- // Only checking high priority queue
- return (! getQueue(MsgPriority.HIGH).isEmpty());
- default:
- log.error("Unexpected QueueState : ", state);
- return false;
- }
-
- return false;
- }
-
- Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
- return raw_queues.get(priority.ordinal());
- }
- }
-
- /**
- * BarrierInfo holds information to specify barrier message sent to switch.
- * @author Naoki
- */
- private static class BarrierInfo {
- final long dpid;
- final int xid;
-
- static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
- return new BarrierInfo(sw.getId(), req.getXid());
- }
-
- static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
- return new BarrierInfo(sw.getId(), rpy.getXid());
- }
-
- private BarrierInfo(long dpid, int xid) {
- this.dpid = dpid;
- this.xid = xid;
- }
+ private static class SwitchQueueEntry {
+ OFMessage msg;
- // Auto generated code by Eclipse
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (dpid ^ (dpid >>> 32));
- result = prime * result + xid;
- return result;
- }
+ public SwitchQueueEntry(OFMessage msg) {
+ this.msg = msg;
+ }
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
-
- BarrierInfo other = (BarrierInfo) obj;
- return (this.dpid == other.dpid) && (this.xid == other.xid);
- }
-
-
- }
-
- private OFMessageDamper messageDamper = null;
- private IThreadPoolService threadPool = null;
+ public OFMessage getOFMessage() {
+ return msg;
+ }
+ }
- private FloodlightContext context = null;
- private BasicFactory factory = null;
-
- // Map of threads versus dpid
- private Map<Long, FlowPusherThread> threadMap = null;
- // Map from (DPID and transaction ID) to Future objects.
- private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
- = new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
-
- private int number_thread;
-
- /**
- * Main thread that reads messages from queues and sends them to switches.
- * @author Naoki Shiota
- *
- */
- private class FlowPusherThread extends Thread {
- private Map<IOFSwitch,SwitchQueue> assignedQueues
- = new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
-
- final Lock queuingLock = new ReentrantLock();
- final Condition messagePushed = queuingLock.newCondition();
-
- @Override
- public void run() {
- this.setName("FlowPusherThread " + this.getId() );
- while (true) {
- while (! queuesHasMessageToSend()) {
- queuingLock.lock();
-
- try {
- // wait for message pushed to queue
- messagePushed.await();
- } catch (InterruptedException e) {
- // Interrupted to be shut down (not an error)
- log.debug("FlowPusherThread is interrupted");
- return;
- } finally {
- queuingLock.unlock();
- }
- }
-
- // for safety of concurrent access, copy set of key objects
- Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
- for (IOFSwitch sw : assignedQueues.keySet()) {
- keys.add(sw);
- }
-
- for (IOFSwitch sw : keys) {
- SwitchQueue queue = assignedQueues.get(sw);
+ /**
+ * SwitchQueue represents message queue attached to a switch.
+ * This consists of queue itself and variables used for limiting sending rate.
+ *
+ * @author Naoki Shiota
+ */
+ private class SwitchQueue {
+ List<Queue<SwitchQueueEntry>> raw_queues;
+ QueueState state;
- if (sw == null || queue == null) {
- continue;
- }
-
- synchronized (queue) {
- processQueue(sw, queue, MAX_MESSAGE_SEND);
- if (queue.toBeDeleted && ! queue.hasMessageToSend()) {
- // remove queue if flagged to be.
- assignedQueues.remove(sw);
- }
- }
- }
- }
- }
-
- /**
- * Read messages from queue and send them to the switch.
- * If number of messages excess the limit, stop sending messages.
- * @param sw Switch to which messages will be sent.
- * @param queue Queue of messages.
- * @param max_msg Limitation of number of messages to be sent. If set to 0,
- * all messages in queue will be sent.
- */
- private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
- // check sending rate and determine it to be sent or not
- long current_time = System.currentTimeMillis();
- long size = 0;
-
- if (queue.isSendable(current_time)) {
- int i = 0;
- while (queue.hasMessageToSend()) {
- // Number of messages excess the limit
- if (0 < max_msg && max_msg <= i) {
- break;
- }
- ++i;
-
- SwitchQueueEntry queueEntry;
- synchronized (queue) {
- queueEntry = queue.poll();
- }
-
- OFMessage msg = queueEntry.getOFMessage();
- try {
- messageDamper.write(sw, msg, context);
- if (log.isTraceEnabled()) {
- log.trace("Pusher sends message : {}", msg);
- }
- size += msg.getLength();
- } catch (IOException e) {
- e.printStackTrace();
- log.error("Exception in sending message ({}) : {}", msg, e);
- }
- }
-
- sw.flush();
- queue.logSentData(current_time, size);
- }
- }
-
- private boolean queuesHasMessageToSend() {
- for (SwitchQueue queue : assignedQueues.values()) {
- if (queue.hasMessageToSend()) {
- return true;
- }
- }
+ // Max rate of sending message (bytes/ms). 0 implies no limitation.
+ long max_rate = 0; // 0 indicates no limitation
+ long last_sent_time = 0;
+ long last_sent_size = 0;
- return false;
- }
-
- private void notifyMessagePushed() {
- queuingLock.lock();
- try {
- messagePushed.signal();
- } finally {
- queuingLock.unlock();
- }
- }
- }
-
- /**
- * Initialize object with one thread.
- */
- public FlowPusher() {
- number_thread = DEFAULT_NUMBER_THREAD;
- }
-
- /**
- * Initialize object with threads of given number.
- * @param number_thread Number of threads to handle messages.
- */
- public FlowPusher(int number_thread) {
- if (number_thread > 0) {
- this.number_thread = number_thread;
- } else {
- this.number_thread = DEFAULT_NUMBER_THREAD;
- }
- }
-
- /**
- * Set parameters needed for sending messages.
- * @param context FloodlightContext used for sending messages.
- * If null, FlowPusher uses default context.
- * @param modContext FloodlightModuleContext used for acquiring
- * ThreadPoolService and registering MessageListener.
- * @param factory Factory object to create OFMessage objects.
- * @param damper Message damper used for sending messages.
- * If null, FlowPusher creates its own damper object.
- */
- public void init(FloodlightContext context,
- FloodlightModuleContext modContext,
- BasicFactory factory,
- OFMessageDamper damper) {
- this.context = context;
- this.factory = factory;
- this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
- IFloodlightProviderService flservice
- = modContext.getServiceImpl(IFloodlightProviderService.class);
- flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
-
- if (damper != null) {
- messageDamper = damper;
- } else {
- // use default values
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
- }
- }
-
- /**
- * Begin processing queue.
- */
- public void start() {
- if (factory == null) {
- log.error("FlowPusher not yet initialized.");
- return;
- }
-
- threadMap = new HashMap<Long,FlowPusherThread>();
- for (long i = 0; i < number_thread; ++i) {
- FlowPusherThread thread = new FlowPusherThread();
-
- threadMap.put(i, thread);
- thread.start();
- }
- }
-
- @Override
- public boolean suspend(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
-
- if (queue == null) {
- // create queue in case suspend is called before first message addition
- queue = createQueueImpl(sw);
- }
-
- synchronized (queue) {
- if (queue.state == QueueState.READY) {
- queue.state = QueueState.SUSPENDED;
- return true;
- }
- return false;
- }
- }
+ // "To be deleted" flag
+ boolean toBeDeleted = false;
- @Override
- public boolean resume(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
-
- if (queue == null) {
- log.error("No queue is attached to DPID : {}", sw.getId());
- return false;
- }
-
- synchronized (queue) {
- if (queue.state == QueueState.SUSPENDED) {
- queue.state = QueueState.READY;
-
- // Free the latch if queue has any messages
- FlowPusherThread thread = getProcessingThread(sw);
- if (queue.hasMessageToSend()) {
- thread.notifyMessagePushed();
- }
- return true;
- }
- return false;
- }
- }
-
- @Override
- public QueueState getState(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
-
- if (queue == null) {
- return QueueState.UNKNOWN;
- }
-
- return queue.state;
- }
+ SwitchQueue() {
+ raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
+ MsgPriority.values().length);
+ for (int i = 0; i < MsgPriority.values().length; ++i) {
+ raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
+ }
- /**
- * Stop processing queue and exit thread.
- */
- public void stop() {
- if (threadMap == null) {
- return;
- }
-
- for (FlowPusherThread t : threadMap.values()) {
- t.interrupt();
- }
- }
-
- @Override
- public void setRate(IOFSwitch sw, long rate) {
- SwitchQueue queue = getQueue(sw);
- if (queue == null) {
- queue = createQueueImpl(sw);
- }
-
- if (rate > 0) {
- log.debug("rate for {} is set to {}", sw.getId(), rate);
- synchronized (queue) {
- queue.max_rate = rate;
- }
- }
- }
+ state = QueueState.READY;
+ }
- @Override
- public boolean createQueue(IOFSwitch sw) {
- SwitchQueue queue = createQueueImpl(sw);
-
- return (queue != null);
- }
-
- protected SwitchQueue createQueueImpl(IOFSwitch sw) {
- SwitchQueue queue = getQueue(sw);
- if (queue != null) {
- return queue;
- }
-
- FlowPusherThread proc = getProcessingThread(sw);
- queue = new SwitchQueue();
- queue.state = QueueState.READY;
- proc.assignedQueues.put(sw, queue);
-
- return queue;
- }
+ /**
+ * Check if sending rate is within the rate
+ *
+ * @param current Current time
+ * @return true if within the rate
+ */
+ boolean isSendable(long current) {
+ if (max_rate == 0) {
+ // no limitation
+ return true;
+ }
- @Override
- public boolean deleteQueue(IOFSwitch sw) {
- return deleteQueue(sw, false);
- }
-
- @Override
- public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
- FlowPusherThread proc = getProcessingThread(sw);
-
- if (forceStop) {
- SwitchQueue queue = proc.assignedQueues.remove(sw);
- if (queue == null) {
- return false;
- }
- return true;
- } else {
- SwitchQueue queue = getQueue(sw);
- if (queue == null) {
- return false;
- }
- synchronized (queue) {
- queue.toBeDeleted = true;
- }
- return true;
- }
- }
-
- @Override
- public boolean add(IOFSwitch sw, OFMessage msg) {
- return add(sw, msg, MsgPriority.NORMAL);
- }
-
- @Override
- public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
- return addMessageImpl(sw, msg, priority);
- }
-
- @Override
- public void pushFlowEntries(
- Collection<Pair<IOFSwitch, FlowEntry>> entries) {
- pushFlowEntries(entries, MsgPriority.NORMAL);
- }
-
- @Override
- public void pushFlowEntries(
- Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+ if (current == last_sent_time) {
+ return false;
+ }
- for (Pair<IOFSwitch, FlowEntry> entry : entries) {
- add(entry.first, entry.second, priority);
- }
- }
+ // Check if sufficient time (from aspect of rate) elapsed or not.
+ long rate = last_sent_size / (current - last_sent_time);
+ return (rate < max_rate);
+ }
- @Override
- public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
- pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
- }
+ /**
+ * Log time and size of last sent data.
+ *
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
+ void logSentData(long current, long size) {
+ last_sent_time = current;
+ last_sent_size = size;
+ }
- @Override
- public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
- Collection<Pair<IOFSwitch, FlowEntry>> entries =
- new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+ boolean add(SwitchQueueEntry entry, MsgPriority priority) {
+ Queue<SwitchQueueEntry> queue = getQueue(priority);
+ if (queue == null) {
+ log.error("Unexpected priority : ", priority);
+ return false;
+ }
+ return queue.add(entry);
+ }
- entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
- pushFlowEntries(entries, priority);
- }
-
- /**
- * Create a message from FlowEntry and add it to the queue of the switch.
- * @param sw Switch to which message is pushed.
- * @param flowEntry FlowEntry object used for creating message.
- * @return true if message is successfully added to a queue.
- */
- private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
- //
- // Create the OpenFlow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntry.flowEntryId().value();
+ /**
+ * Poll single appropriate entry object according to QueueState.
+ *
+ * @return Entry object.
+ */
+ SwitchQueueEntry poll() {
+ switch (state) {
+ case READY: {
+ for (int i = 0; i < raw_queues.size(); ++i) {
+ SwitchQueueEntry entry = raw_queues.get(i).poll();
+ if (entry != null) {
+ return entry;
+ }
+ }
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug(
- "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntry.flowEntryId(),
- flowEntry.flowEntryUserState());
- return false;
- }
+ return null;
+ }
+ case SUSPENDED: {
+ // Only polling from high priority queue
+ SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+ return entry;
+ }
+ default:
+ log.error("Unexpected QueueState : ", state);
+ return null;
+ }
+ }
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
- FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+ /**
+ * Check if this object has any messages in the queues to be sent
+ *
+ * @return True if there are some messages to be sent.
+ */
+ boolean hasMessageToSend() {
+ switch (state) {
+ case READY:
+ for (Queue<SwitchQueueEntry> queue : raw_queues) {
+ if (!queue.isEmpty()) {
+ return true;
+ }
+ }
+ break;
+ case SUSPENDED:
+ // Only checking high priority queue
+ return (!getQueue(MsgPriority.HIGH).isEmpty());
+ default:
+ log.error("Unexpected QueueState : ", state);
+ return false;
+ }
- // Match the Incoming Port
- Port matchInPort = flowEntryMatch.inPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort.value());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
+ return false;
+ }
- // Match the Source MAC address
- MACAddress matchSrcMac = flowEntryMatch.srcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
+ Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
+ return raw_queues.get(priority.ordinal());
+ }
+ }
- // Match the Destination MAC address
- MACAddress matchDstMac = flowEntryMatch.dstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
+ /**
+ * BarrierInfo holds information to specify barrier message sent to switch.
+ *
+ * @author Naoki
+ */
+ private static class BarrierInfo {
+ final long dpid;
+ final int xid;
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
+ static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+ return new BarrierInfo(sw.getId(), req.getXid());
+ }
- // Match the VLAN ID
- Short matchVlanId = flowEntryMatch.vlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
+ static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+ return new BarrierInfo(sw.getId(), rpy.getXid());
+ }
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryMatch.vlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards()
- & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
+ private BarrierInfo(long dpid, int xid) {
+ this.dpid = dpid;
+ this.xid = xid;
+ }
- // Match the Source IPv4 Network prefix
- IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
- }
+ // Auto generated code by Eclipse
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (dpid ^ (dpid >>> 32));
+ result = prime * result + xid;
+ return result;
+ }
- // Natch the Destination IPv4 Network prefix
- IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
- }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
- // Match the IP protocol
- Byte matchIpProto = flowEntryMatch.ipProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
+ BarrierInfo other = (BarrierInfo) obj;
+ return (this.dpid == other.dpid) && (this.xid == other.xid);
+ }
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryMatch.ipToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
+ }
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
+ private OFMessageDamper messageDamper = null;
+ private IThreadPoolService threadPool = null;
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action
- .actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action
- .actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action
- .actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action
- .actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action
- .actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
- .actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
- .actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
+ private FloodlightContext context = null;
+ private BasicFactory factory = null;
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa = new OFActionOutput(actionOutput.port()
- .value(), (short) 0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ // Map of threads versus dpid
+ private Map<Long, FlowPusherThread> threadMap = null;
+ // Map from (DPID and transaction ID) to Future objects.
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+ = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
- actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ private int number_thread;
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
- actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ /**
+ * Main thread that reads messages from queues and sends them to switches.
+ *
+ * @author Naoki Shiota
+ */
+ private class FlowPusherThread extends Thread {
+ private Map<IOFSwitch, SwitchQueue> assignedQueues
+ = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
+ final Lock queuingLock = new ReentrantLock();
+ final Condition messagePushed = queuingLock.newCondition();
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa = new OFActionDataLayerSource(
- actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ @Override
+ public void run() {
+ this.setName("FlowPusherThread " + this.getId());
+ while (true) {
+ while (!queuesHasMessageToSend()) {
+ queuingLock.lock();
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
- actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ try {
+ // wait for message pushed to queue
+ messagePushed.await();
+ } catch (InterruptedException e) {
+ // Interrupted to be shut down (not an error)
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ } finally {
+ queuingLock.unlock();
+ }
+ }
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
- actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ // for safety of concurrent access, copy set of key objects
+ Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
+ for (IOFSwitch sw : assignedQueues.keySet()) {
+ keys.add(sw);
+ }
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
- actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ for (IOFSwitch sw : keys) {
+ SwitchQueue queue = assignedQueues.get(sw);
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
- actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ if (sw == null || queue == null) {
+ continue;
+ }
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
- actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ synchronized (queue) {
+ processQueue(sw, queue, MAX_MESSAGE_SEND);
+ if (queue.toBeDeleted && !queue.hasMessageToSend()) {
+ // remove queue if flagged to be.
+ assignedQueues.remove(sw);
+ }
+ }
+ }
+ }
+ }
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
- actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
+ /**
+ * Read messages from queue and send them to the switch.
+ * If number of messages excess the limit, stop sending messages.
+ *
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param max_msg Limitation of number of messages to be sent. If set to 0,
+ * all messages in queue will be sent.
+ */
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.currentTimeMillis();
+ long size = 0;
- if (actionEnqueue != null) {
- OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
- .value(), actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
+ if (queue.isSendable(current_time)) {
+ int i = 0;
+ while (queue.hasMessageToSend()) {
+ // Number of messages excess the limit
+ if (0 < max_msg && max_msg <= i) {
+ break;
+ }
+ ++i;
- fm.setIdleTimeout((short)flowEntry.idleTimeout())
- .setHardTimeout((short)flowEntry.hardTimeout())
- .setPriority((short)flowEntry.priority())
- .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
- .setCommand(flowModCommand).setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
- || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
+ SwitchQueueEntry queueEntry;
+ synchronized (queue) {
+ queueEntry = queue.poll();
+ }
- //
- // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
- // permanent.
- //
- if ((flowEntry.idleTimeout() != 0) ||
- (flowEntry.hardTimeout() != 0)) {
- fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- }
+ OFMessage msg = queueEntry.getOFMessage();
+ try {
+ messageDamper.write(sw, msg, context);
+ if (log.isTraceEnabled()) {
+ log.trace("Pusher sends message : {}", msg);
+ }
+ size += msg.getLength();
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error("Exception in sending message ({}) : {}", msg, e);
+ }
+ }
- if (log.isTraceEnabled()) {
- log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
- , flowEntry.flowEntryUserState()
- , sw.getStringId()
- , flowEntry.flowEntryId()
- , matchSrcMac
- , matchDstMac
- , matchInPort
- , actionOutputPort
- );
- }
+ sw.flush();
+ queue.logSentData(current_time, size);
+ }
+ }
- return addMessageImpl(sw, fm, priority);
- }
-
- /**
- * Add message to queue
- * @param sw
- * @param msg
- * @param flowEntryId
- * @return
- */
- protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
- FlowPusherThread thread = getProcessingThread(sw);
-
- SwitchQueue queue = getQueue(sw);
+ private boolean queuesHasMessageToSend() {
+ for (SwitchQueue queue : assignedQueues.values()) {
+ if (queue.hasMessageToSend()) {
+ return true;
+ }
+ }
- // create queue at first addition of message
- if (queue == null) {
- queue = createQueueImpl(sw);
- }
-
- SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+ return false;
+ }
- synchronized (queue) {
- queue.add(entry,priority);
- if (log.isTraceEnabled()) {
- log.trace("Message is pushed : {}", entry.getOFMessage());
- }
- }
-
- thread.notifyMessagePushed();
+ private void notifyMessagePushed() {
+ queuingLock.lock();
+ try {
+ messagePushed.signal();
+ } finally {
+ queuingLock.unlock();
+ }
+ }
+ }
- return true;
- }
-
- @Override
- public OFBarrierReply barrier(IOFSwitch sw) {
- OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
- if (future == null) {
- return null;
- }
-
- try {
- return future.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- log.error("InterruptedException: {}", e);
- return null;
- } catch (ExecutionException e) {
- e.printStackTrace();
- log.error("ExecutionException: {}", e);
- return null;
- }
- }
+ /**
+ * Initialize object with one thread.
+ */
+ public FlowPusher() {
+ number_thread = DEFAULT_NUMBER_THREAD;
+ }
- @Override
- public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
- // TODO creation of message and future should be moved to OFSwitchImpl
+ /**
+ * Initialize object with threads of given number.
+ *
+ * @param number_thread Number of threads to handle messages.
+ */
+ public FlowPusher(int number_thread) {
+ if (number_thread > 0) {
+ this.number_thread = number_thread;
+ } else {
+ this.number_thread = DEFAULT_NUMBER_THREAD;
+ }
+ }
- if (sw == null) {
- return null;
- }
-
- OFBarrierRequest msg = createBarrierRequest(sw);
-
- OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
- barrierFutures.put(BarrierInfo.create(sw,msg), future);
-
- addMessageImpl(sw, msg, MsgPriority.NORMAL);
-
- return future;
- }
+ /**
+ * Set parameters needed for sending messages.
+ *
+ * @param context FloodlightContext used for sending messages.
+ * If null, FlowPusher uses default context.
+ * @param modContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
+ * @param factory Factory object to create OFMessage objects.
+ * @param damper Message damper used for sending messages.
+ * If null, FlowPusher creates its own damper object.
+ */
+ public void init(FloodlightContext context,
+ FloodlightModuleContext modContext,
+ BasicFactory factory,
+ OFMessageDamper damper) {
+ this.context = context;
+ this.factory = factory;
+ this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+ IFloodlightProviderService flservice
+ = modContext.getServiceImpl(IFloodlightProviderService.class);
+ flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
- protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
- OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
- msg.setXid(sw.getNextTransactionId());
-
- return msg;
- }
-
- /**
- * Get a queue attached to a switch.
- * @param sw Switch object
- * @return Queue object
- */
- protected SwitchQueue getQueue(IOFSwitch sw) {
- if (sw == null) {
- return null;
- }
-
- FlowPusherThread th = getProcessingThread(sw);
- if (th == null) {
- return null;
- }
-
- return th.assignedQueues.get(sw);
- }
-
- /**
- * Get a hash value correspondent to a switch.
- * @param sw Switch object
- * @return Hash value
- */
- protected long getHash(IOFSwitch sw) {
- // This code assumes DPID is sequentially assigned.
- // TODO consider equalization algorithm
- return sw.getId() % number_thread;
- }
+ if (damper != null) {
+ messageDamper = damper;
+ } else {
+ // use default values
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
+ }
- /**
- * Get a Thread object which processes the queue attached to a switch.
- * @param sw Switch object
- * @return Thread object
- */
- protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
- long hash = getHash(sw);
-
- return threadMap.get(hash);
- }
-
- @Override
- public String getName() {
- return "flowpusher";
- }
+ /**
+ * Begin processing queue.
+ */
+ public void start() {
+ if (factory == null) {
+ log.error("FlowPusher not yet initialized.");
+ return;
+ }
- @Override
- public boolean isCallbackOrderingPrereq(OFType type, String name) {
- return false;
- }
+ threadMap = new HashMap<Long, FlowPusherThread>();
+ for (long i = 0; i < number_thread; ++i) {
+ FlowPusherThread thread = new FlowPusherThread();
- @Override
- public boolean isCallbackOrderingPostreq(OFType type, String name) {
- return false;
- }
+ threadMap.put(i, thread);
+ thread.start();
+ }
+ }
- @Override
- public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- if (log.isTraceEnabled()) {
- log.trace("Received BARRIER_REPLY from : {}", sw.getId());
- }
+ @Override
+ public boolean suspend(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
- if (msg.getType() != OFType.BARRIER_REPLY) {
- log.error("Unexpected reply message : {}", msg.getType());
- return Command.CONTINUE;
- }
-
- OFBarrierReply reply = (OFBarrierReply) msg;
- BarrierInfo info = BarrierInfo.create(sw,reply);
-
- // Deliver future if exists
- OFBarrierReplyFuture future = barrierFutures.get(info);
- if (future != null) {
- future.deliverFuture(sw, msg);
- barrierFutures.remove(info);
- }
-
- return Command.CONTINUE;
- }
+ if (queue == null) {
+ // create queue in case suspend is called before first message addition
+ queue = createQueueImpl(sw);
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.READY) {
+ queue.state = QueueState.SUSPENDED;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public boolean resume(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ log.error("No queue is attached to DPID : {}", sw.getId());
+ return false;
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.SUSPENDED) {
+ queue.state = QueueState.READY;
+
+ // Free the latch if queue has any messages
+ FlowPusherThread thread = getProcessingThread(sw);
+ if (queue.hasMessageToSend()) {
+ thread.notifyMessagePushed();
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public QueueState getState(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ return QueueState.UNKNOWN;
+ }
+
+ return queue.state;
+ }
+
+ /**
+ * Stop processing queue and exit thread.
+ */
+ public void stop() {
+ if (threadMap == null) {
+ return;
+ }
+
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
+ }
+ }
+
+ @Override
+ public void setRate(IOFSwitch sw, long rate) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ queue = createQueueImpl(sw);
+ }
+
+ if (rate > 0) {
+ log.debug("rate for {} is set to {}", sw.getId(), rate);
+ synchronized (queue) {
+ queue.max_rate = rate;
+ }
+ }
+ }
+
+ @Override
+ public boolean createQueue(IOFSwitch sw) {
+ SwitchQueue queue = createQueueImpl(sw);
+
+ return (queue != null);
+ }
+
+ protected SwitchQueue createQueueImpl(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue != null) {
+ return queue;
+ }
+
+ FlowPusherThread proc = getProcessingThread(sw);
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ proc.assignedQueues.put(sw, queue);
+
+ return queue;
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw) {
+ return deleteQueue(sw, false);
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+ FlowPusherThread proc = getProcessingThread(sw);
+
+ if (forceStop) {
+ SwitchQueue queue = proc.assignedQueues.remove(sw);
+ if (queue == null) {
+ return false;
+ }
+ return true;
+ } else {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return false;
+ }
+ synchronized (queue) {
+ queue.toBeDeleted = true;
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public boolean add(IOFSwitch sw, OFMessage msg) {
+ return add(sw, msg, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+ return addMessageImpl(sw, msg, priority);
+ }
+
+ @Override
+ public void pushFlowEntries(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+ pushFlowEntries(entries, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public void pushFlowEntries(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+
+ for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+ add(entry.first, entry.second, priority);
+ }
+ }
+
+ @Override
+ public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+ pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+ Collection<Pair<IOFSwitch, FlowEntry>> entries =
+ new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+
+ entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
+ pushFlowEntries(entries, priority);
+ }
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+ //
+ // Create the OpenFlow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntry.flowEntryId().value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug(
+ "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntry.flowEntryId(),
+ flowEntry.flowEntryUserState());
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+ FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+ // Match the Incoming Port
+ Port matchInPort = flowEntryMatch.inPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort.value());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ MACAddress matchSrcMac = flowEntryMatch.srcMac();
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ MACAddress matchDstMac = flowEntryMatch.dstMac();
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryMatch.vlanId();
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards()
+ & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+ }
+
+ // Natch the Destination IPv4 Network prefix
+ IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryMatch.ipProto();
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryMatch.ipToS();
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+ //
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action
+ .actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+ .actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action
+ .actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+ .actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action
+ .actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+ .actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+ .actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+ .value(), (short) 0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+ actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+ actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+ actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+ actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+ actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+ actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+ actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+ actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+ actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+ .value(), actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout((short) flowEntry.idleTimeout())
+ .setHardTimeout((short) flowEntry.hardTimeout())
+ .setPriority((short) flowEntry.priority())
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+ .setCommand(flowModCommand).setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+ || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
+ // permanent.
+ //
+ if ((flowEntry.idleTimeout() != 0) ||
+ (flowEntry.hardTimeout() != 0)) {
+ fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+ , flowEntry.flowEntryUserState()
+ , sw.getStringId()
+ , flowEntry.flowEntryId()
+ , matchSrcMac
+ , matchDstMac
+ , matchInPort
+ , actionOutputPort
+ );
+ }
+
+ return addMessageImpl(sw, fm, priority);
+ }
+
+ /**
+ * Add message to queue
+ *
+ * @param sw
+ * @param msg
+ * @param flowEntryId
+ * @return
+ */
+ protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+ FlowPusherThread thread = getProcessingThread(sw);
+
+ SwitchQueue queue = getQueue(sw);
+
+ // create queue at first addition of message
+ if (queue == null) {
+ queue = createQueueImpl(sw);
+ }
+
+ SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+
+ synchronized (queue) {
+ queue.add(entry, priority);
+ if (log.isTraceEnabled()) {
+ log.trace("Message is pushed : {}", entry.getOFMessage());
+ }
+ }
+
+ thread.notifyMessagePushed();
+
+ return true;
+ }
+
+ @Override
+ public OFBarrierReply barrier(IOFSwitch sw) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ if (future == null) {
+ return null;
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("InterruptedException: {}", e);
+ return null;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ log.error("ExecutionException: {}", e);
+ return null;
+ }
+ }
+
+ @Override
+ public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ // TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
+
+ OFBarrierRequest msg = createBarrierRequest(sw);
+
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ barrierFutures.put(BarrierInfo.create(sw, msg), future);
+
+ addMessageImpl(sw, msg, MsgPriority.NORMAL);
+
+ return future;
+ }
+
+ protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+
+ return msg;
+ }
+
+ /**
+ * Get a queue attached to a switch.
+ *
+ * @param sw Switch object
+ * @return Queue object
+ */
+ protected SwitchQueue getQueue(IOFSwitch sw) {
+ if (sw == null) {
+ return null;
+ }
+
+ FlowPusherThread th = getProcessingThread(sw);
+ if (th == null) {
+ return null;
+ }
+
+ return th.assignedQueues.get(sw);
+ }
+
+ /**
+ * Get a hash value correspondent to a switch.
+ *
+ * @param sw Switch object
+ * @return Hash value
+ */
+ protected long getHash(IOFSwitch sw) {
+ // This code assumes DPID is sequentially assigned.
+ // TODO consider equalization algorithm
+ return sw.getId() % number_thread;
+ }
+
+ /**
+ * Get a Thread object which processes the queue attached to a switch.
+ *
+ * @param sw Switch object
+ * @return Thread object
+ */
+ protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
+ long hash = getHash(sw);
+
+ return threadMap.get(hash);
+ }
+
+ @Override
+ public String getName() {
+ return "flowpusher";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ if (log.isTraceEnabled()) {
+ log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+ }
+
+ if (msg.getType() != OFType.BARRIER_REPLY) {
+ log.error("Unexpected reply message : {}", msg.getType());
+ return Command.CONTINUE;
+ }
+
+ OFBarrierReply reply = (OFBarrierReply) msg;
+ BarrierInfo info = BarrierInfo.create(sw, reply);
+
+ // Deliver future if exists
+ OFBarrierReplyFuture future = barrierFutures.get(info);
+ if (future != null) {
+ future.deliverFuture(sw, msg);
+ barrierFutures.remove(info);
+ }
+
+ return Command.CONTINUE;
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 1f426b3..12df976 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -34,8 +34,8 @@
* In addition to IFlowSyncService, FlowSynchronizer periodically reads flow
* tables from switches and compare them with GraphDB to drop unnecessary
* flows and/or to install missing flows.
- * @author Brian
*
+ * @author Brian
*/
public class FlowSynchronizer implements IFlowSyncService {
@@ -44,337 +44,344 @@
// TODO: fix when FlowSynchronizer is refactored
// private DBOperation dbHandler;
protected IFlowPusherService pusher;
- private Map<IOFSwitch, FutureTask<SyncResult>> switchThreads;
+ private Map<IOFSwitch, FutureTask<SyncResult>> switchThreads;
public FlowSynchronizer() {
- // TODO: fix when FlowSynchronizer is refactored
- // dbHandler = GraphDBManager.getDBOperation();
- switchThreads = new HashMap<IOFSwitch, FutureTask<SyncResult>>();
+ // TODO: fix when FlowSynchronizer is refactored
+ // dbHandler = GraphDBManager.getDBOperation();
+ switchThreads = new HashMap<IOFSwitch, FutureTask<SyncResult>>();
}
@Override
public Future<SyncResult> synchronize(IOFSwitch sw) {
- Synchronizer sync = new Synchronizer(sw);
- FutureTask<SyncResult> task = new FutureTask<SyncResult>(sync);
- switchThreads.put(sw, task);
- task.run();
- return task;
+ Synchronizer sync = new Synchronizer(sw);
+ FutureTask<SyncResult> task = new FutureTask<SyncResult>(sync);
+ switchThreads.put(sw, task);
+ task.run();
+ return task;
}
-
+
@Override
public void interrupt(IOFSwitch sw) {
- FutureTask<SyncResult> t = switchThreads.remove(sw);
- if(t != null) {
- t.cancel(true);
- }
+ FutureTask<SyncResult> t = switchThreads.remove(sw);
+ if (t != null) {
+ t.cancel(true);
+ }
}
/**
* Initialize Synchronizer.
+ *
* @param pusherService FlowPusherService used for sending messages.
*/
public void init(IFlowPusherService pusherService) {
- pusher = pusherService;
+ pusher = pusherService;
}
/**
* Synchronizer represents main thread of synchronization.
- * @author Brian
*
+ * @author Brian
*/
- protected class Synchronizer implements Callable<SyncResult> {
- IOFSwitch sw;
- // TODO: fix when FlowSynchronizer is refactored
- // ISwitchObject swObj;
+ protected class Synchronizer implements Callable<SyncResult> {
+ IOFSwitch sw;
+ // TODO: fix when FlowSynchronizer is refactored
+ // ISwitchObject swObj;
- public Synchronizer(IOFSwitch sw) {
- this.sw = sw;
- Dpid dpid = new Dpid(sw.getId());
- // TODO: fix when FlowSynchronizer is refactored
- // this.swObj = dbHandler.searchSwitch(dpid.toString());
- }
+ public Synchronizer(IOFSwitch sw) {
+ this.sw = sw;
+ Dpid dpid = new Dpid(sw.getId());
+ // TODO: fix when FlowSynchronizer is refactored
+ // this.swObj = dbHandler.searchSwitch(dpid.toString());
+ }
- double graphIDTime, switchTime, compareTime, graphEntryTime, extractTime, pushTime, totalTime;
- @Override
- public SyncResult call() {
- pusher.suspend(sw);
- try {
- long start = System.nanoTime();
- Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
- long step1 = System.nanoTime();
- Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
- if (switchEntries == null) {
- log.debug("getFlowEntriesFromSwitch() failed");
- return null;
- }
- long step2 = System.nanoTime();
- SyncResult result = compare(graphEntries, switchEntries);
- long step3 = System.nanoTime();
- graphIDTime = (step1 - start);
- switchTime = (step2 - step1);
- compareTime = (step3 - step2);
- totalTime = (step3 - start);
- outputTime();
-
- return result;
- } finally {
- pusher.resume(sw);
- }
- }
-
- private void outputTime() {
- double div = Math.pow(10, 6); //convert nanoseconds to ms
- graphIDTime /= div;
- switchTime /= div;
- compareTime = (compareTime - graphEntryTime - extractTime - pushTime) / div;
- graphEntryTime /= div;
- extractTime /= div;
- pushTime /= div;
- totalTime /= div;
- log.debug("Sync time (ms):{},{},{},{},{},{},{}"
- , graphIDTime
- , switchTime
- , compareTime
- , graphEntryTime
- , extractTime
- , pushTime
- , totalTime);
- }
+ double graphIDTime, switchTime, compareTime, graphEntryTime, extractTime, pushTime, totalTime;
- /**
- * Compare flows entries in GraphDB and switch to pick up necessary
- * messages.
- * After picking up, picked messages are added to FlowPusher.
- * @param graphEntries Flow entries in GraphDB.
- * @param switchEntries Flow entries in switch.
- */
- private SyncResult compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
- int added = 0, removed = 0, skipped = 0;
- for(FlowEntryWrapper entry : switchEntries) {
- if(graphEntries.contains(entry)) {
- graphEntries.remove(entry);
- skipped++;
- }
- else {
- // remove flow entry from the switch
- entry.removeFromSwitch(sw);
- removed++;
- }
- }
- for(FlowEntryWrapper entry : graphEntries) {
- // add flow entry to switch
- entry.addToSwitch(sw);
- graphEntryTime += entry.dbTime;
- extractTime += entry.extractTime;
- pushTime += entry.pushTime;
- added++;
- }
- log.debug("Flow entries added {}, " +
- "Flow entries removed {}, " +
- "Flow entries skipped {}"
- , added
- , removed
- , skipped );
+ @Override
+ public SyncResult call() {
+ pusher.suspend(sw);
+ try {
+ long start = System.nanoTime();
+ Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
+ long step1 = System.nanoTime();
+ Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
+ if (switchEntries == null) {
+ log.debug("getFlowEntriesFromSwitch() failed");
+ return null;
+ }
+ long step2 = System.nanoTime();
+ SyncResult result = compare(graphEntries, switchEntries);
+ long step3 = System.nanoTime();
+ graphIDTime = (step1 - start);
+ switchTime = (step2 - step1);
+ compareTime = (step3 - step2);
+ totalTime = (step3 - start);
+ outputTime();
- return new SyncResult(added, removed, skipped);
- }
+ return result;
+ } finally {
+ pusher.resume(sw);
+ }
+ }
- /**
- * Read GraphDB to get FlowEntries associated with a switch.
- * @return set of FlowEntries
- */
- private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
- Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
+ private void outputTime() {
+ double div = Math.pow(10, 6); //convert nanoseconds to ms
+ graphIDTime /= div;
+ switchTime /= div;
+ compareTime = (compareTime - graphEntryTime - extractTime - pushTime) / div;
+ graphEntryTime /= div;
+ extractTime /= div;
+ pushTime /= div;
+ totalTime /= div;
+ log.debug("Sync time (ms):{},{},{},{},{},{},{}"
+ , graphIDTime
+ , switchTime
+ , compareTime
+ , graphEntryTime
+ , extractTime
+ , pushTime
+ , totalTime);
+ }
- // TODO: fix when FlowSynchronizer is refactored
- /*
- for(IFlowEntry entry : swObj.getFlowEntries()) {
- FlowEntryWrapper fe = new FlowEntryWrapper(entry);
- entries.add(fe);
- }
- */
- return entries;
- }
+ /**
+ * Compare flows entries in GraphDB and switch to pick up necessary
+ * messages.
+ * After picking up, picked messages are added to FlowPusher.
+ *
+ * @param graphEntries Flow entries in GraphDB.
+ * @param switchEntries Flow entries in switch.
+ */
+ private SyncResult compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
+ int added = 0, removed = 0, skipped = 0;
+ for (FlowEntryWrapper entry : switchEntries) {
+ if (graphEntries.contains(entry)) {
+ graphEntries.remove(entry);
+ skipped++;
+ } else {
+ // remove flow entry from the switch
+ entry.removeFromSwitch(sw);
+ removed++;
+ }
+ }
+ for (FlowEntryWrapper entry : graphEntries) {
+ // add flow entry to switch
+ entry.addToSwitch(sw);
+ graphEntryTime += entry.dbTime;
+ extractTime += entry.extractTime;
+ pushTime += entry.pushTime;
+ added++;
+ }
+ log.debug("Flow entries added {}, " +
+ "Flow entries removed {}, " +
+ "Flow entries skipped {}"
+ , added
+ , removed
+ , skipped);
- /**
- * Read flow table from switch and derive FlowEntries from table.
- * @return set of FlowEntries
- */
- private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
+ return new SyncResult(added, removed, skipped);
+ }
- int lengthU = 0;
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
+ /**
+ * Read GraphDB to get FlowEntries associated with a switch.
+ *
+ * @return set of FlowEntries
+ */
+ private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
+ Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
- OFFlowStatisticsRequest stat = new OFFlowStatisticsRequest();
- stat.setOutPort((short) 0xffff); //TODO: OFPort.OFPP_NONE
- stat.setTableId((byte) 0xff); // TODO: fix this with enum (ALL TABLES)
- stat.setMatch(match);
- List<OFStatistics> stats = new ArrayList<OFStatistics>();
- stats.add(stat);
- lengthU += stat.getLength();
+ // TODO: fix when FlowSynchronizer is refactored
+ /*
+ for(IFlowEntry entry : swObj.getFlowEntries()) {
+ FlowEntryWrapper fe = new FlowEntryWrapper(entry);
+ entries.add(fe);
+ }
+ */
+ return entries;
+ }
- OFStatisticsRequest req = new OFStatisticsRequest();
- req.setStatisticType(OFStatisticsType.FLOW);
- req.setStatistics(stats);
- lengthU += req.getLengthU();
- req.setLengthU(lengthU);
+ /**
+ * Read flow table from switch and derive FlowEntries from table.
+ *
+ * @return set of FlowEntries
+ */
+ private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
- List<OFStatistics> entries = null;
- try {
- Future<List<OFStatistics>> dfuture = sw.getStatistics(req);
- entries = dfuture.get();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- return null;
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- return null;
- } catch (ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- return null;
- }
+ int lengthU = 0;
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
- Set<FlowEntryWrapper> results = new HashSet<FlowEntryWrapper>();
- for(OFStatistics result : entries){
- OFFlowStatisticsReply entry = (OFFlowStatisticsReply) result;
- FlowEntryWrapper fe = new FlowEntryWrapper(entry);
- results.add(fe);
- }
- return results;
- }
+ OFFlowStatisticsRequest stat = new OFFlowStatisticsRequest();
+ stat.setOutPort((short) 0xffff); //TODO: OFPort.OFPP_NONE
+ stat.setTableId((byte) 0xff); // TODO: fix this with enum (ALL TABLES)
+ stat.setMatch(match);
+ List<OFStatistics> stats = new ArrayList<OFStatistics>();
+ stats.add(stat);
+ lengthU += stat.getLength();
+
+ OFStatisticsRequest req = new OFStatisticsRequest();
+ req.setStatisticType(OFStatisticsType.FLOW);
+ req.setStatistics(stats);
+ lengthU += req.getLengthU();
+ req.setLengthU(lengthU);
+
+ List<OFStatistics> entries = null;
+ try {
+ Future<List<OFStatistics>> dfuture = sw.getStatistics(req);
+ entries = dfuture.get();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ } catch (ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return null;
+ }
+
+ Set<FlowEntryWrapper> results = new HashSet<FlowEntryWrapper>();
+ for (OFStatistics result : entries) {
+ OFFlowStatisticsReply entry = (OFFlowStatisticsReply) result;
+ FlowEntryWrapper fe = new FlowEntryWrapper(entry);
+ results.add(fe);
+ }
+ return results;
+ }
}
/**
* FlowEntryWrapper represents abstract FlowEntry which is embodied
* by FlowEntryId (from GraphDB) or OFFlowStatisticsReply (from switch).
- * @author Brian
*
+ * @author Brian
*/
class FlowEntryWrapper {
- FlowEntryId flowEntryId;
- // TODO: fix when FlowSynchronizer is refactored
- // IFlowEntry iFlowEntry;
- OFFlowStatisticsReply statisticsReply;
+ FlowEntryId flowEntryId;
+ // TODO: fix when FlowSynchronizer is refactored
+ // IFlowEntry iFlowEntry;
+ OFFlowStatisticsReply statisticsReply;
- // TODO: fix when FlowSynchronizer is refactored
- /*
- public FlowEntryWrapper(IFlowEntry entry) {
- flowEntryId = new FlowEntryId(entry.getFlowEntryId());
- iFlowEntry = entry;
- }
- */
+ // TODO: fix when FlowSynchronizer is refactored
+ /*
+ public FlowEntryWrapper(IFlowEntry entry) {
+ flowEntryId = new FlowEntryId(entry.getFlowEntryId());
+ iFlowEntry = entry;
+ }
+ */
- public FlowEntryWrapper(OFFlowStatisticsReply entry) {
- flowEntryId = new FlowEntryId(entry.getCookie());
- statisticsReply = entry;
- }
+ public FlowEntryWrapper(OFFlowStatisticsReply entry) {
+ flowEntryId = new FlowEntryId(entry.getCookie());
+ statisticsReply = entry;
+ }
- /**
- * Install this FlowEntry to a switch via FlowPusher.
- * @param sw Switch to which flow will be installed.
- */
- double dbTime, extractTime, pushTime;
- public void addToSwitch(IOFSwitch sw) {
- if (statisticsReply != null) {
- log.error("Error adding existing flow entry {} to sw {}",
- statisticsReply.getCookie(), sw.getId());
- return;
- }
+ /**
+ * Install this FlowEntry to a switch via FlowPusher.
+ *
+ * @param sw Switch to which flow will be installed.
+ */
+ double dbTime, extractTime, pushTime;
- double startDB = System.nanoTime();
- // Get the Flow Entry state from the Network Graph
- // TODO: fix when FlowSynchronizer is refactored
- /*
- if (iFlowEntry == null) {
- try {
- // TODO: fix when FlowSynchronizer is refactored
- iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
- } catch (Exception e) {
- log.error("Error finding flow entry {} in Network Graph",
- flowEntryId);
- return;
+ public void addToSwitch(IOFSwitch sw) {
+ if (statisticsReply != null) {
+ log.error("Error adding existing flow entry {} to sw {}",
+ statisticsReply.getCookie(), sw.getId());
+ return;
}
- }
- */
- dbTime = System.nanoTime() - startDB;
- //
- // TODO: The old FlowDatabaseOperation class is gone, so the code
- //
- /*
- double startExtract = System.nanoTime();
- FlowEntry flowEntry =
- FlowDatabaseOperation.extractFlowEntry(iFlowEntry);
- if (flowEntry == null) {
- log.error("Cannot add flow entry {} to sw {} : flow entry cannot be extracted",
- flowEntryId, sw.getId());
- return;
- }
- extractTime = System.nanoTime() - startExtract;
+ double startDB = System.nanoTime();
+ // Get the Flow Entry state from the Network Graph
+ // TODO: fix when FlowSynchronizer is refactored
+ /*
+ if (iFlowEntry == null) {
+ try {
+ // TODO: fix when FlowSynchronizer is refactored
+ iFlowEntry = dbHandler.searchFlowEntry(flowEntryId);
+ } catch (Exception e) {
+ log.error("Error finding flow entry {} in Network Graph",
+ flowEntryId);
+ return;
+ }
+ }
+ */
+ dbTime = System.nanoTime() - startDB;
- double startPush = System.nanoTime();
- pusher.pushFlowEntry(sw, flowEntry, MsgPriority.HIGH);
- pushTime = System.nanoTime() - startPush;
- */
- }
+ //
+ // TODO: The old FlowDatabaseOperation class is gone, so the code
+ //
+ /*
+ double startExtract = System.nanoTime();
+ FlowEntry flowEntry =
+ FlowDatabaseOperation.extractFlowEntry(iFlowEntry);
+ if (flowEntry == null) {
+ log.error("Cannot add flow entry {} to sw {} : flow entry cannot be extracted",
+ flowEntryId, sw.getId());
+ return;
+ }
+ extractTime = System.nanoTime() - startExtract;
- /**
- * Remove this FlowEntry from a switch via FlowPusher.
- * @param sw Switch from which flow will be removed.
- */
- public void removeFromSwitch(IOFSwitch sw) {
- if (statisticsReply == null) {
- log.error("Error removing non-existent flow entry {} from sw {}",
- flowEntryId, sw.getId());
- return;
- }
+ double startPush = System.nanoTime();
+ pusher.pushFlowEntry(sw, flowEntry, MsgPriority.HIGH);
+ pushTime = System.nanoTime() - startPush;
+ */
+ }
- // Convert Statistics Reply to Flow Mod, then write it
- OFFlowMod fm = new OFFlowMod();
- fm.setCookie(statisticsReply.getCookie());
- fm.setCommand(OFFlowMod.OFPFC_DELETE_STRICT);
- fm.setLengthU(OFFlowMod.MINIMUM_LENGTH);
- fm.setMatch(statisticsReply.getMatch());
- fm.setPriority(statisticsReply.getPriority());
- fm.setOutPort(OFPort.OFPP_NONE);
+ /**
+ * Remove this FlowEntry from a switch via FlowPusher.
+ *
+ * @param sw Switch from which flow will be removed.
+ */
+ public void removeFromSwitch(IOFSwitch sw) {
+ if (statisticsReply == null) {
+ log.error("Error removing non-existent flow entry {} from sw {}",
+ flowEntryId, sw.getId());
+ return;
+ }
- pusher.add(sw, fm, MsgPriority.HIGH);
- }
+ // Convert Statistics Reply to Flow Mod, then write it
+ OFFlowMod fm = new OFFlowMod();
+ fm.setCookie(statisticsReply.getCookie());
+ fm.setCommand(OFFlowMod.OFPFC_DELETE_STRICT);
+ fm.setLengthU(OFFlowMod.MINIMUM_LENGTH);
+ fm.setMatch(statisticsReply.getMatch());
+ fm.setPriority(statisticsReply.getPriority());
+ fm.setOutPort(OFPort.OFPP_NONE);
- /**
- * Return the hash code of the Flow Entry ID
- */
- @Override
- public int hashCode() {
- return flowEntryId.hashCode();
- }
+ pusher.add(sw, fm, MsgPriority.HIGH);
+ }
- /**
- * Returns true of the object is another Flow Entry ID with
- * the same value; otherwise, returns false.
- *
- * @param Object to compare
- * @return true if the object has the same Flow Entry ID.
- */
- @Override
- public boolean equals(Object obj){
- if(obj != null && obj.getClass() == this.getClass()) {
- FlowEntryWrapper entry = (FlowEntryWrapper) obj;
- // TODO: we need to actually compare the match + actions
- return this.flowEntryId.equals(entry.flowEntryId);
- }
- return false;
- }
+ /**
+ * Return the hash code of the Flow Entry ID
+ */
+ @Override
+ public int hashCode() {
+ return flowEntryId.hashCode();
+ }
- @Override
- public String toString() {
- return flowEntryId.toString();
- }
+ /**
+ * Returns true of the object is another Flow Entry ID with
+ * the same value; otherwise, returns false.
+ *
+ * @param Object to compare
+ * @return true if the object has the same Flow Entry ID.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj.getClass() == this.getClass()) {
+ FlowEntryWrapper entry = (FlowEntryWrapper) obj;
+ // TODO: we need to actually compare the match + actions
+ return this.flowEntryId.equals(entry.flowEntryId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return flowEntryId.toString();
+ }
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index 29a579e..6658495 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -17,162 +17,171 @@
* read a message from queue and send it to switch in order.
* To guarantee message has been installed, FlowPusherService can add barrier
* message to queue and can notify when barrier message is sent to switch.
- * @author Naoki Shiota
*
+ * @author Naoki Shiota
*/
public interface IFlowPusherService extends IFloodlightService {
- public static enum MsgPriority {
- HIGH, // High priority: e.g. flow synchronization
- NORMAL, // Normal priority
+ public static enum MsgPriority {
+ HIGH, // High priority: e.g. flow synchronization
+ NORMAL, // Normal priority
// LOW, // Low priority, not needed for now
- }
-
+ }
+
public static enum QueueState {
- READY, // Queues with all priority are at work
- SUSPENDED, // Only prior queue is at work
- UNKNOWN
- }
+ READY, // Queues with all priority are at work
+ SUSPENDED, // Only prior queue is at work
+ UNKNOWN
+ }
- /**
- * Create a queue correspondent to the switch.
- * @param sw Switch to which new queue is attached.
- * @return true if new queue is successfully created.
- */
- boolean createQueue(IOFSwitch sw);
+ /**
+ * Create a queue correspondent to the switch.
+ *
+ * @param sw Switch to which new queue is attached.
+ * @return true if new queue is successfully created.
+ */
+ boolean createQueue(IOFSwitch sw);
- /**
- * Delete a queue correspondent to the switch.
- * Messages remains in queue will be all sent before queue is deleted.
- * @param sw Switch of which queue is deleted.
- * @return true if queue is successfully deleted.
- */
- boolean deleteQueue(IOFSwitch sw);
-
- /**
- * Delete a queue correspondent to the switch.
- * By setting force flag on, queue will be deleted immediately.
- * @param sw Switch of which queue is deleted.
- * @param forceStop If this flag is set to true, queue will be deleted
- * immediately regardless of any messages in the queue.
- * If false, all messages will be sent to switch and queue will
- * be deleted after that.
- * @return true if queue is successfully deleted or flagged to be deleted.
- */
- boolean deleteQueue(IOFSwitch sw, boolean forceStop);
-
- /**
- * Add a message to the queue of the switch with normal priority.
- *
- * Note: Notification is NOT delivered for the pushed message.
- *
- * @param sw Switch to which message is pushed.
- * @param msg Message object to be added.
- * @return true if message is successfully added to a queue.
- */
- boolean add(IOFSwitch sw, OFMessage msg);
+ /**
+ * Delete a queue correspondent to the switch.
+ * Messages remains in queue will be all sent before queue is deleted.
+ *
+ * @param sw Switch of which queue is deleted.
+ * @return true if queue is successfully deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw);
- /**
- * Add a message to the queue of the switch with specific priority.
- *
- * @param sw Switch to which message is pushed.
- * @param msg Message object to be added.
- * @param priority Sending priority of the message.
- * @return true if message is successfully added to a queue.
- */
- boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority);
-
- /**
- * Push a collection of Flow Entries to the corresponding switches
- * with normal priority.
- *
- * Note: Notification is delivered for the Flow Entries that
- * are pushed successfully.
- *
- * @param entries the collection of <IOFSwitch, FlowEntry> pairs
- * to push.
- */
- void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries);
+ /**
+ * Delete a queue correspondent to the switch.
+ * By setting force flag on, queue will be deleted immediately.
+ *
+ * @param sw Switch of which queue is deleted.
+ * @param forceStop If this flag is set to true, queue will be deleted
+ * immediately regardless of any messages in the queue.
+ * If false, all messages will be sent to switch and queue will
+ * be deleted after that.
+ * @return true if queue is successfully deleted or flagged to be deleted.
+ */
+ boolean deleteQueue(IOFSwitch sw, boolean forceStop);
- /**
- * Push a collection of Flow Entries to the corresponding switches
- * with specific priority.
- *
- * Note: Notification is delivered for the Flow Entries that
- * are pushed successfully.
- *
- * @param entries the collection of <IOFSwitch, FlowEntry> pairs
- * to push.
- * @param priority Sending priority of flow entries.
- */
- void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries,
- MsgPriority priority);
-
- /**
- * Create a message from FlowEntry and add it to the queue of the
- * switch with normal priority.
- *
- * Note: Notification is delivered for the Flow Entries that
- * are pushed successfully.
- *
- * @param sw Switch to which message is pushed.
- * @param flowEntry FlowEntry object used for creating message.
- * @return true if message is successfully added to a queue.
- */
- void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry);
+ /**
+ * Add a message to the queue of the switch with normal priority.
+ * <p/>
+ * Note: Notification is NOT delivered for the pushed message.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @return true if message is successfully added to a queue.
+ */
+ boolean add(IOFSwitch sw, OFMessage msg);
- /**
- * Create a message from FlowEntry and add it to the queue of the
- * switch with specific priority.
- *
- * Note: Notification is delivered for the Flow Entries that
- * are pushed successfully.
- *
- * @param sw Switch to which message is pushed.
- * @param flowEntry FlowEntry object used for creating message.
- * @return true if message is successfully added to a queue.
- */
- void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry,
- MsgPriority priority);
-
- /**
- * Set sending rate to a switch.
- * @param sw Switch.
- * @param rate Rate in bytes/ms.
- */
- public void setRate(IOFSwitch sw, long rate);
-
- /**
- * Add BARRIER message to queue and wait for reply.
- * @param sw Switch to which barrier message is pushed.
- * @return BARRIER_REPLY message sent from switch.
- */
- OFBarrierReply barrier(IOFSwitch sw);
-
- /**
- * Add BARRIER message to queue asynchronously.
- * @param sw Switch to which barrier message is pushed.
- * @return Future object of BARRIER_REPLY message which will be sent from switch.
- */
- OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
-
- /**
- * Suspend pushing message to a switch.
- * @param sw Switch to be suspended pushing message.
- * @return true if success
- */
- boolean suspend(IOFSwitch sw);
-
- /**
- * Resume pushing message to a switch.
- * @param sw Switch to be resumed pushing message.
- * @return true if success
- */
- boolean resume(IOFSwitch sw);
-
- /**
- * Get state of queue attached to a switch.
- * @param sw Switch to be checked.
- * @return State of queue.
- */
- QueueState getState(IOFSwitch sw);
+ /**
+ * Add a message to the queue of the switch with specific priority.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param msg Message object to be added.
+ * @param priority Sending priority of the message.
+ * @return true if message is successfully added to a queue.
+ */
+ boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority);
+
+ /**
+ * Push a collection of Flow Entries to the corresponding switches
+ * with normal priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * to push.
+ */
+ void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries);
+
+ /**
+ * Push a collection of Flow Entries to the corresponding switches
+ * with specific priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+ * to push.
+ * @param priority Sending priority of flow entries.
+ */
+ void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries,
+ MsgPriority priority);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the
+ * switch with normal priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry);
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the
+ * switch with specific priority.
+ * <p/>
+ * Note: Notification is delivered for the Flow Entries that
+ * are pushed successfully.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry,
+ MsgPriority priority);
+
+ /**
+ * Set sending rate to a switch.
+ *
+ * @param sw Switch.
+ * @param rate Rate in bytes/ms.
+ */
+ public void setRate(IOFSwitch sw, long rate);
+
+ /**
+ * Add BARRIER message to queue and wait for reply.
+ *
+ * @param sw Switch to which barrier message is pushed.
+ * @return BARRIER_REPLY message sent from switch.
+ */
+ OFBarrierReply barrier(IOFSwitch sw);
+
+ /**
+ * Add BARRIER message to queue asynchronously.
+ *
+ * @param sw Switch to which barrier message is pushed.
+ * @return Future object of BARRIER_REPLY message which will be sent from switch.
+ */
+ OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
+
+ /**
+ * Suspend pushing message to a switch.
+ *
+ * @param sw Switch to be suspended pushing message.
+ * @return true if success
+ */
+ boolean suspend(IOFSwitch sw);
+
+ /**
+ * Resume pushing message to a switch.
+ *
+ * @param sw Switch to be resumed pushing message.
+ * @return true if success
+ */
+ boolean resume(IOFSwitch sw);
+
+ /**
+ * Get state of queue attached to a switch.
+ *
+ * @param sw Switch to be checked.
+ * @return State of queue.
+ */
+ QueueState getState(IOFSwitch sw);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
index 4fe0857..63d395f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
@@ -8,23 +8,23 @@
/**
* FlowSyncService is a service to synchronize GraphDB and switch's flow table.
* FlowSyncService offers APIs to trigger and interrupt synchronization explicitly.
- * @author Brian
*
+ * @author Brian
*/
public interface IFlowSyncService extends IFloodlightService {
public Future<SyncResult> synchronize(IOFSwitch sw);
-
+
public void interrupt(IOFSwitch sw);
-
+
public class SyncResult {
- public final int flowAdded;
- public final int flowRemoved;
- public final int flowSkipped;
-
- public SyncResult(int added, int removed, int skipped) {
- flowAdded = added;
- flowRemoved = removed;
- flowSkipped = skipped;
- }
+ public final int flowAdded;
+ public final int flowRemoved;
+ public final int flowSkipped;
+
+ public SyncResult(int added, int removed, int skipped) {
+ flowAdded = added;
+ flowRemoved = removed;
+ flowSkipped = skipped;
+ }
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
index 3013f5a..501eaa6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
@@ -15,13 +15,13 @@
protected volatile boolean finished;
public OFBarrierReplyFuture(IThreadPoolService tp,
- IOFSwitch sw, int transactionId) {
+ IOFSwitch sw, int transactionId) {
super(tp, sw, OFType.FEATURES_REPLY, transactionId);
init();
}
public OFBarrierReplyFuture(IThreadPoolService tp,
- IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+ IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
init();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoInterruptResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoInterruptResource.java
index 3c2920d..66704bf 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoInterruptResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoInterruptResource.java
@@ -7,8 +7,8 @@
/**
* FlowProgrammer REST API implementation: Interrupt synchronization to a switch.
- *
- * GET /wm/fprog/synchronizer/interrupt/{dpid}/json"
+ * <p/>
+ * GET /wm/fprog/synchronizer/interrupt/{dpid}/json"
*/
public class DoInterruptResource extends SynchronizerResource {
@@ -19,26 +19,26 @@
*/
@Get("json")
public boolean retrieve() {
- if (! init()) {
- return false;
- }
-
- long dpid;
- try {
- dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
- } catch (NumberFormatException e) {
- log.error("Invalid number format");
- return false;
- }
+ if (!init()) {
+ return false;
+ }
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- synchronizer.interrupt(sw);
-
- return true;
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ synchronizer.interrupt(sw);
+
+ return true;
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoSynchronizeResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoSynchronizeResource.java
index dc8d431..c7deb11 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoSynchronizeResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/DoSynchronizeResource.java
@@ -7,8 +7,8 @@
/**
* FlowProgrammer REST API implementation: Begin synchronization to a switch.
- *
- * GET /wm/fprog/synchronizer/sync/{dpid}/json"
+ * <p/>
+ * GET /wm/fprog/synchronizer/sync/{dpid}/json"
*/
public class DoSynchronizeResource extends SynchronizerResource {
/**
@@ -18,27 +18,27 @@
*/
@Get("json")
public boolean retrieve() {
- if (! init()) {
- return false;
- }
-
- long dpid;
- try {
- dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
- } catch (NumberFormatException e) {
- log.error("Invalid number format");
- return false;
- }
+ if (!init()) {
+ return false;
+ }
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- synchronizer.synchronize(sw);
-
- return true;
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ synchronizer.synchronize(sw);
+
+ return true;
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
index 9325a00..26ea765 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/FlowProgrammerWebRoutable.java
@@ -8,21 +8,21 @@
public class FlowProgrammerWebRoutable implements RestletRoutable {
- @Override
- public Restlet getRestlet(Context context) {
- Router router = new Router(context);
- router.attach("/pusher/setrate/{dpid}/{rate}/json", SetPushRateResource.class);
- router.attach("/pusher/suspend/{dpid}/json", SuspendPusherResource.class);
- router.attach("/pusher/resume/{dpid}/json", ResumePusherResource.class);
- router.attach("/pusher/barrier/{dpid}/json", SendBarrierResource.class);
- router.attach("/synchronizer/sync/{dpid}/json", DoSynchronizeResource.class);
- router.attach("/synchronizer/interrupt/{dpid}/json", DoInterruptResource.class);
- return router;
- }
+ @Override
+ public Restlet getRestlet(Context context) {
+ Router router = new Router(context);
+ router.attach("/pusher/setrate/{dpid}/{rate}/json", SetPushRateResource.class);
+ router.attach("/pusher/suspend/{dpid}/json", SuspendPusherResource.class);
+ router.attach("/pusher/resume/{dpid}/json", ResumePusherResource.class);
+ router.attach("/pusher/barrier/{dpid}/json", SendBarrierResource.class);
+ router.attach("/synchronizer/sync/{dpid}/json", DoSynchronizeResource.class);
+ router.attach("/synchronizer/interrupt/{dpid}/json", DoInterruptResource.class);
+ return router;
+ }
- @Override
- public String basePath() {
- return "/wm/onos/flowprogrammer";
- }
+ @Override
+ public String basePath() {
+ return "/wm/onos/flowprogrammer";
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java
index 4e1c0fc..5257646 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/PusherResource.java
@@ -11,23 +11,23 @@
protected final static Logger log = LoggerFactory.getLogger(PusherResource.class);
protected IFloodlightProviderService provider;
- protected IFlowPusherService pusher;
-
- protected boolean init() {
- provider = (IFloodlightProviderService)
- getContext().getAttributes().
- get(IFloodlightProviderService.class.getCanonicalName());
- if (provider == null) {
- log.debug("ONOS FloodlightProvider not found");
- return false;
- }
-
- pusher = (IFlowPusherService)getContext().getAttributes().
- get(IFlowPusherService.class.getCanonicalName());
- if (pusher == null) {
- log.debug("ONOS FlowPusherService not found");
- return false;
- }
- return true;
- }
+ protected IFlowPusherService pusher;
+
+ protected boolean init() {
+ provider = (IFloodlightProviderService)
+ getContext().getAttributes().
+ get(IFloodlightProviderService.class.getCanonicalName());
+ if (provider == null) {
+ log.debug("ONOS FloodlightProvider not found");
+ return false;
+ }
+
+ pusher = (IFlowPusherService) getContext().getAttributes().
+ get(IFlowPusherService.class.getCanonicalName());
+ if (pusher == null) {
+ log.debug("ONOS FlowPusherService not found");
+ return false;
+ }
+ return true;
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
index dcbe3e9..80f04f5 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/ResumePusherResource.java
@@ -7,8 +7,8 @@
/**
* FlowProgrammer REST API implementation: Resume sending message to switch.
- *
- * GET /wm/fprog/pusher/resume/{dpid}/json"
+ * <p/>
+ * GET /wm/fprog/pusher/resume/{dpid}/json"
*/
public class ResumePusherResource extends PusherResource {
/**
@@ -18,24 +18,24 @@
*/
@Get("json")
public boolean retrieve() {
- if (! init()) {
- return false;
- }
-
- long dpid;
- try {
- dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
- } catch (NumberFormatException e) {
- log.error("Invalid number format");
- return false;
- }
+ if (!init()) {
+ return false;
+ }
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- return pusher.resume(sw);
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ return pusher.resume(sw);
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
index 33b666a..53fd14f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SendBarrierResource.java
@@ -8,8 +8,8 @@
/**
* FlowProgrammer REST API implementation: Send barrier message to switch.
- *
- * GET /wm/fprog/pusher/barrier/{dpid}/json"
+ * <p/>
+ * GET /wm/fprog/pusher/barrier/{dpid}/json"
*/
public class SendBarrierResource extends PusherResource {
/**
@@ -19,23 +19,23 @@
*/
@Get("json")
public OFBarrierReply retrieve() {
- if (! init()) {
- return null;
- }
- long dpid;
- try {
- dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
- } catch (NumberFormatException e) {
- log.error("Invalid number format");
- return null;
- }
+ if (!init()) {
+ return null;
+ }
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return null;
+ }
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return null;
- }
-
- return pusher.barrier(sw);
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return null;
+ }
+
+ return pusher.barrier(sw);
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
index 9431d65..5879069 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SetPushRateResource.java
@@ -7,8 +7,8 @@
/**
* FlowProgrammer REST API implementation: Set sending rate to the switch.
- *
- * GET /wm/fprog/pusher/setrate/{dpid}/{rate}/json"
+ * <p/>
+ * GET /wm/fprog/pusher/setrate/{dpid}/{rate}/json"
*/
public class SetPushRateResource extends PusherResource {
@@ -19,29 +19,29 @@
*/
@Get("json")
public boolean retrieve() {
- if (! init()) {
- return false;
- }
-
- long dpid;
- long rate;
-
- try {
- dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
- rate = Long.valueOf((String)getRequestAttributes().get("rate"));
- } catch (NumberFormatException e) {
- log.error("Invalid number format");
- return false;
- }
+ if (!init()) {
+ return false;
+ }
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- pusher.setRate(sw, rate);
-
- return true;
+ long dpid;
+ long rate;
+
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ rate = Long.valueOf((String) getRequestAttributes().get("rate"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ pusher.setRate(sw, rate);
+
+ return true;
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
index 1a5266b..d01e259 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SuspendPusherResource.java
@@ -9,8 +9,8 @@
/**
* FlowProgrammer REST API implementation: Suspend sending message to switch.
- *
- * GET /wm/fprog/pusher/suspend/{dpid}/json"
+ * <p/>
+ * GET /wm/fprog/pusher/suspend/{dpid}/json"
*/
public class SuspendPusherResource extends PusherResource {
@@ -23,24 +23,24 @@
*/
@Get("json")
public boolean retrieve() {
- if (! init()) {
- return false;
- }
-
- long dpid;
- try {
- dpid = HexString.toLong((String)getRequestAttributes().get("dpid"));
- } catch (NumberFormatException e) {
- log.error("Invalid number format");
- return false;
- }
+ if (!init()) {
+ return false;
+ }
- IOFSwitch sw = provider.getSwitches().get(dpid);
- if (sw == null) {
- log.error("Invalid dpid");
- return false;
- }
-
- return pusher.suspend(sw);
+ long dpid;
+ try {
+ dpid = HexString.toLong((String) getRequestAttributes().get("dpid"));
+ } catch (NumberFormatException e) {
+ log.error("Invalid number format");
+ return false;
+ }
+
+ IOFSwitch sw = provider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.error("Invalid dpid");
+ return false;
+ }
+
+ return pusher.suspend(sw);
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SynchronizerResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SynchronizerResource.java
index 12bf8f3..b155fd3 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SynchronizerResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/web/SynchronizerResource.java
@@ -9,27 +9,27 @@
public class SynchronizerResource extends ServerResource {
protected final static Logger log = LoggerFactory.getLogger(SynchronizerResource.class);
-
+
protected IFloodlightProviderService provider;
protected IFlowSyncService synchronizer;
protected boolean init() {
- provider = (IFloodlightProviderService)
- getContext().getAttributes().
- get(IFloodlightProviderService.class.getCanonicalName());
- if (provider == null) {
- log.debug("ONOS FloodlightProvider not found");
- return false;
- }
-
- synchronizer = (IFlowSyncService)
- getContext().getAttributes().
- get(IFlowSyncService.class.getCanonicalName());
- if (synchronizer == null) {
- log.debug("ONOS FlowSyncService not found");
- return false;
- }
-
- return true;
+ provider = (IFloodlightProviderService)
+ getContext().getAttributes().
+ get(IFloodlightProviderService.class.getCanonicalName());
+ if (provider == null) {
+ log.debug("ONOS FloodlightProvider not found");
+ return false;
+ }
+
+ synchronizer = (IFlowSyncService)
+ getContext().getAttributes().
+ get(IFlowSyncService.class.getCanonicalName());
+ if (synchronizer == null) {
+ log.debug("ONOS FlowSyncService not found");
+ return false;
+ }
+
+ return true;
}
}