Merge pull request #421 from pberde/master
Single updates queue changes to solve jira:155
diff --git a/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java b/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java
index 1e3ec6f..22ff029 100644
--- a/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java
+++ b/src/main/java/net/floodlightcontroller/core/IFloodlightProviderService.java
@@ -206,5 +206,11 @@
* switch over to ACTIVE role
*/
public void setAlwaysClearFlowsOnSwAdd(boolean value);
+
+ /**
+ * Publish updates to Controller updates queue
+ * @param IUpdate
+ */
+ public void publishUpdate(IUpdate update);
}
diff --git a/src/main/java/net/floodlightcontroller/core/IUpdate.java b/src/main/java/net/floodlightcontroller/core/IUpdate.java
new file mode 100644
index 0000000..950bc0f
--- /dev/null
+++ b/src/main/java/net/floodlightcontroller/core/IUpdate.java
@@ -0,0 +1,10 @@
+package net.floodlightcontroller.core;
+
+public interface IUpdate {
+
+ /**
+ * Calls the appropriate listeners
+ */
+ public void dispatch();
+
+}
diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
index ac29983..21eceb3 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
@@ -58,6 +58,7 @@
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.IOFSwitchFilter;
import net.floodlightcontroller.core.IOFSwitchListener;
+import net.floodlightcontroller.core.IUpdate;
import net.floodlightcontroller.core.annotations.LogMessageDoc;
import net.floodlightcontroller.core.annotations.LogMessageDocs;
import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
@@ -253,17 +254,8 @@
// Perf. related configuration
protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
protected static final int BATCH_MAX_SIZE = 100;
- protected static final boolean ALWAYS_DECODE_ETH = true;
-
- /**
- * Updates handled by the main loop
- */
- protected interface IUpdate {
- /**
- * Calls the appropriate listeners
- */
- public void dispatch();
- }
+ protected static final boolean ALWAYS_DECODE_ETH = true;
+
public enum SwitchUpdateType {
ADDED,
REMOVED,
@@ -271,6 +263,7 @@
PORTADDED,
PORTREMOVED
}
+
/**
* Update message indicating a switch was added or removed
* ONOS: This message extended to indicate Port add or removed event.
@@ -461,7 +454,13 @@
}
}
-
+ public void publishUpdate(IUpdate update) {
+ try {
+ this.updates.put(update);
+ } catch (InterruptedException e) {
+ log.error("Failure adding update to queue", e);
+ }
+ }
// **********************
// ChannelUpstreamHandler
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java
index 6113ea8..cdb71be 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/ILinkDiscovery.java
@@ -1,5 +1,7 @@
package net.onrc.onos.ofcontroller.linkdiscovery;
+import net.floodlightcontroller.core.IUpdate;
+
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.map.ser.std.ToStringSerializer;
import org.openflow.util.HexString;
@@ -27,7 +29,7 @@
}
}
- public class LDUpdate {
+ public class LDUpdate implements IUpdate{
protected long src;
protected short srcPort;
protected long dst;
@@ -129,6 +131,12 @@
return "LDUpdate: Unknown update.";
}
}
+
+ @Override
+ public void dispatch() {
+ // TODO Auto-generated method stub
+
+ }
}
public enum SwitchType {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
index f4e4223..7736723 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/linkdiscovery/internal/LinkDiscoveryManager.java
@@ -18,6 +18,7 @@
package net.onrc.onos.ofcontroller.linkdiscovery.internal;
import java.io.IOException;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
@@ -126,6 +127,7 @@
implements IOFMessageListener, IOFSwitchListener,
IStorageSourceListener, ILinkDiscoveryService,
IFloodlightModule, IInfoProvider, IHAListener {
+ protected IFloodlightProviderService controller;
protected static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
// Names of table/fields for links in the storage API
@@ -232,8 +234,40 @@
/* topology aware components are called in the order they were added to the
* the array */
protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
- protected BlockingQueue<LDUpdate> updates;
- protected Thread updatesThread;
+
+ protected class LinkUpdate extends LDUpdate {
+
+ public LinkUpdate(LDUpdate old) {
+ super(old);
+ }
+ @LogMessageDoc(level="ERROR",
+ message="Error in link discovery updates loop",
+ explanation="An unknown error occured while dispatching " +
+ "link update notifications",
+ recommendation=LogMessageDoc.GENERIC_ACTION)
+ @Override
+ public void dispatch() {
+ if (linkDiscoveryAware != null) {
+ if (log.isTraceEnabled()) {
+ log.trace("Dispatching link discovery update {} {} {} {} {} for {}",
+ new Object[]{this.getOperation(),
+ HexString.toHexString(this.getSrc()), this.getSrcPort(),
+ HexString.toHexString(this.getDst()), this.getDstPort(),
+ linkDiscoveryAware});
+ }
+ try {
+ for (ILinkDiscoveryListener lda : linkDiscoveryAware) { // order maintained
+ lda.linkDiscoveryUpdate(this);
+ }
+ }
+ catch (Exception e) {
+ log.error("Error in link discovery updates loop", e);
+ }
+ }
+
+ }
+
+ }
/**
* List of ports through which LLDP/BDDPs are not sent.
@@ -322,34 +356,7 @@
return ILinkDiscovery.LinkType.INVALID_LINK;
}
- @LogMessageDoc(level="ERROR",
- message="Error in link discovery updates loop",
- explanation="An unknown error occured while dispatching " +
- "link update notifications",
- recommendation=LogMessageDoc.GENERIC_ACTION)
- private void doUpdatesThread() throws InterruptedException {
- do {
- LDUpdate update = updates.take();
-
- if (linkDiscoveryAware != null) {
- if (log.isTraceEnabled()) {
- log.trace("Dispatching link discovery update {} {} {} {} {} for {}",
- new Object[]{update.getOperation(),
- HexString.toHexString(update.getSrc()), update.getSrcPort(),
- HexString.toHexString(update.getDst()), update.getDstPort(),
- linkDiscoveryAware});
- }
- try {
- for (ILinkDiscoveryListener lda : linkDiscoveryAware) { // order maintained
- lda.linkDiscoveryUpdate(update);
- }
- }
- catch (Exception e) {
- log.error("Error in link discovery updates loop", e);
- }
- }
- } while (updates.peek() != null);
- }
+
private boolean isLinkDiscoverySuppressed(long sw, short portNumber) {
return this.suppressLinkDiscovery.contains(new NodePortTuple(sw, portNumber));
}
@@ -490,7 +497,10 @@
if (portUp) operation = UpdateOperation.PORT_UP;
else operation = UpdateOperation.PORT_DOWN;
- updates.add(new LDUpdate(sw, port, operation));
+ LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
+
+
+ controller.publishUpdate(update);
}
/**
@@ -1134,10 +1144,11 @@
if (linkChanged) {
// find out if the link was added or removed here.
- updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+ LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
lt.getDst(), lt.getDstPort(),
getLinkType(lt, newInfo),
updateOperation));
+ controller.publishUpdate(update);
}
} finally {
lock.writeLock().unlock();
@@ -1184,10 +1195,11 @@
}
LinkInfo info = this.links.remove(lt);
- updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+ LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
lt.getDst(), lt.getDstPort(),
getLinkType(lt, info),
UpdateOperation.LINK_REMOVED));
+ controller.publishUpdate(update);
// Update Event History
evHistTopoLink(lt.getSrc(),
@@ -1251,8 +1263,8 @@
((byte)OFPortReason.OFPPR_MODIFY.ordinal() ==
ps.getReason() && !portEnabled(ps.getDesc()))) {
deleteLinksOnPort(npt, "Port Status Changed");
- LDUpdate update = new LDUpdate(sw, port, UpdateOperation.PORT_DOWN);
- updates.add(update);
+ LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, UpdateOperation.PORT_DOWN));
+ controller.publishUpdate(update);
linkDeleted = true;
}
else if (ps.getReason() ==
@@ -1284,14 +1296,15 @@
(updatedDstPortState != null)) {
// The link is already known to link discovery
// manager and the status has changed, therefore
- // send an LDUpdate.
+ // send an LinkUpdate.
UpdateOperation operation =
getUpdateOperation(linkInfo.getSrcPortState(),
linkInfo.getDstPortState());
- updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+ LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
lt.getDst(), lt.getDstPort(),
getLinkType(lt, linkInfo),
operation));
+ controller.publishUpdate(update);
writeLinkToStorage(lt, linkInfo);
linkInfoChanged = true;
}
@@ -1300,7 +1313,8 @@
UpdateOperation operation =
getUpdateOperation(ps.getDesc().getState());
- updates.add(new LDUpdate(sw, port, operation));
+ LinkUpdate update = new LinkUpdate(new LDUpdate(sw, port, operation));
+ controller.publishUpdate(update);
}
if (!linkDeleted && !linkInfoChanged){
@@ -1372,9 +1386,9 @@
}
// Update event history
evHistTopoSwitch(sw, EvAction.SWITCH_CONNECTED, "None");
- LDUpdate update = new LDUpdate(sw.getId(), null,
- UpdateOperation.SWITCH_UPDATED);
- updates.add(update);
+ LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), null,
+ UpdateOperation.SWITCH_UPDATED));
+ controller.publishUpdate(update);
}
/**
@@ -1399,8 +1413,8 @@
deleteLinks(eraseList, "Switch Removed");
// Send a switch removed update
- LDUpdate update = new LDUpdate(sw, null, UpdateOperation.SWITCH_REMOVED);
- updates.add(update);
+ LinkUpdate update = new LinkUpdate(new LDUpdate(sw, null, UpdateOperation.SWITCH_REMOVED));
+ controller.publishUpdate(update);
}
} finally {
lock.writeLock().unlock();
@@ -1486,10 +1500,11 @@
UpdateOperation operation;
operation = getUpdateOperation(info.getSrcPortState(),
info.getDstPortState());
- updates.add(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
+ LinkUpdate update = new LinkUpdate(new LDUpdate(lt.getSrc(), lt.getSrcPort(),
lt.getDst(), lt.getDstPort(),
getLinkType(lt, info),
operation));
+ controller.publishUpdate(update);
}
}
@@ -1776,16 +1791,18 @@
if (log.isTraceEnabled()) {
log.trace("SWITCH_IS_CORE_SWITCH set to False for {}", sw);
}
- updates.add(new LDUpdate(sw.getId(), SwitchType.BASIC_SWITCH,
+ LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), SwitchType.BASIC_SWITCH,
UpdateOperation.SWITCH_UPDATED));
+ controller.publishUpdate(update);
}
else {
sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH, new Boolean(true));
if (log.isTraceEnabled()) {
log.trace("SWITCH_IS_CORE_SWITCH set to True for {}", sw);
}
- updates.add(new LDUpdate(sw.getId(), SwitchType.CORE_SWITCH,
+ LinkUpdate update = new LinkUpdate(new LDUpdate(sw.getId(), SwitchType.CORE_SWITCH,
UpdateOperation.SWITCH_UPDATED));
+ controller.publishUpdate(update);
}
}
}
@@ -1847,7 +1864,6 @@
// We create this here because there is no ordering guarantee
this.linkDiscoveryAware = new ArrayList<ILinkDiscoveryListener>();
this.lock = new ReentrantReadWriteLock();
- this.updates = new LinkedBlockingQueue<LDUpdate>();
this.links = new HashMap<Link, LinkInfo>();
this.portLinks = new HashMap<NodePortTuple, Set<Link>>();
this.suppressLinkDiscovery =
@@ -1910,6 +1926,8 @@
}
ScheduledExecutorService ses = threadPool.getScheduledExecutor();
+ controller =
+ context.getServiceImpl(IFloodlightProviderService.class);
// To be started by the first switch connection
discoveryTask = new SingletonTask(ses, new Runnable() {
@@ -1953,20 +1971,6 @@
bddpTask = new SingletonTask(ses, new QuarantineWorker());
bddpTask.reschedule(BDDP_TASK_INTERVAL, TimeUnit.MILLISECONDS);
- updatesThread = new Thread(new Runnable () {
- @Override
- public void run() {
- while (true) {
- try {
- doUpdatesThread();
- } catch (InterruptedException e) {
- return;
- }
- }
- }}, "Topology Updates");
- updatesThread.start();
-
-
// Register for the OpenFlow messages we want to receive
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);