Merge master branch into syncdev17
Conflicts:
src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
diff --git a/cluster-mgmt/template/onsdemo_edge_template.py b/cluster-mgmt/template/onsdemo_edge_template.py
index e340f38..c3d0287 100755
--- a/cluster-mgmt/template/onsdemo_edge_template.py
+++ b/cluster-mgmt/template/onsdemo_edge_template.py
@@ -65,7 +65,7 @@
switch.append(sw)
for i in range (NR_NODES):
- host.append(self.addHost( 'host%d' % (int(i)+1) ))
+ host.append(self.addHost( 'host%d.%d' % (NWID, int(i)+1) ))
for i in range (NR_NODES):
self.addLink(host[i], switch[i])
@@ -117,7 +117,7 @@
host = []
for i in range (NR_NODES):
- host.append(net.get( 'host%d' % (int(i)+1) ))
+ host.append(net.get( 'host%d.%d' % (NWID, (int(i)+1)) ))
net.start()
diff --git a/kryo2/pom.xml b/kryo2/pom.xml
index 1beb87d..788f952 100644
--- a/kryo2/pom.xml
+++ b/kryo2/pom.xml
@@ -7,7 +7,7 @@
<version>2.22</version>
<packaging>jar</packaging>
- <name>kyro2</name>
+ <name>kryo2</name>
<url>http://maven.apache.org</url>
<properties>
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index 180cbe9..775f952 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -607,6 +607,29 @@
}
/**
+ * Get a Flow for a given Flow ID.
+ *
+ * @param flowId the Flow ID of the Flow to get.
+ * @return the Flow if found, otherwise null.
+ */
+ @Override
+ public FlowPath getFlow(FlowId flowId) {
+ byte[] valueBytes = mapFlow.get(flowId.value());
+ if (valueBytes == null)
+ return null;
+
+ Kryo kryo = kryoFactory.newKryo();
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowPath flowPath = kryo.readObject(input, FlowPath.class);
+ kryoFactory.deleteKryo(kryo);
+
+ return flowPath;
+ }
+
+ /**
* Send a notification that a Flow is added.
*
* @param flowPath the Flow that is added.
@@ -702,6 +725,29 @@
}
/**
+ * Get a Flow Entry for a given Flow Entry ID.
+ *
+ * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ @Override
+ public FlowEntry getFlowEntry(FlowEntryId flowEntryId) {
+ byte[] valueBytes = mapFlowEntry.get(flowEntryId.value());
+ if (valueBytes == null)
+ return null;
+
+ Kryo kryo = kryoFactory.newKryo();
+ //
+ // Decode the value
+ //
+ Input input = new Input(valueBytes);
+ FlowEntry flowEntry = kryo.readObject(input, FlowEntry.class);
+ kryoFactory.deleteKryo(kryo);
+
+ return flowEntry;
+ }
+
+ /**
* Send a notification that a FlowEntry is added.
*
* @param flowEntry the FlowEntry that is added.
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 9361341..034fe25 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -50,7 +50,7 @@
* @param arpEventHandler The ARP event handler to de-register.
*/
public void deregisterArpEventHandler(IArpEventHandler arpEventHandler);
-
+
/**
* Get all Flows that are currently in the datagrid.
*
@@ -59,6 +59,14 @@
Collection<FlowPath> getAllFlows();
/**
+ * Get a Flow for a given Flow ID.
+ *
+ * @param flowId the Flow ID of the Flow to get.
+ * @return the Flow if found, otherwise null.
+ */
+ FlowPath getFlow(FlowId flowId);
+
+ /**
* Send a notification that a Flow is added.
*
* @param flowPath the Flow that is added.
@@ -92,6 +100,14 @@
Collection<FlowEntry> getAllFlowEntries();
/**
+ * Get a Flow Entry for a given Flow Entry ID.
+ *
+ * @param flowEntryId the Flow Entry ID of the Flow Entry to get.
+ * @return the Flow Entry if found, otherwise null.
+ */
+ FlowEntry getFlowEntry(FlowEntryId flowEntryId);
+
+ /**
* Send a notification that a FlowEntry is added.
*
* @param flowEntry the FlowEntry that is added.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 04cde1d..8f72261 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -449,6 +449,16 @@
}
/**
+ * Inform the Flow Manager that a Flow Entry on switch expired.
+ *
+ * @param sw the switch the Flow Entry expired on.
+ * @param flowEntryId the Flow Entry ID of the expired Flow Entry.
+ */
+ public void flowEntryOnSwitchExpired(IOFSwitch sw, FlowEntryId flowEntryId) {
+ // TODO: Not implemented yet
+ }
+
+ /**
* Push modified Flow Entries to switches.
*
* NOTE: Only the Flow Entries to switches controlled by this instance
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index ba3a6e7..8d2b797 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -2,10 +2,12 @@
import java.util.ArrayList;
+import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -112,4 +114,12 @@
* @return unique flow ID
*/
public long getNextFlowEntryId();
+
+ /**
+ * Inform the Flow Manager that a Flow Entry on switch expired.
+ *
+ * @param sw the switch the Flow Entry expired on.
+ * @param flowEntryId the Flow Entry ID of the expired Flow Entry.
+ */
+ public void flowEntryOnSwitchExpired(IOFSwitch sw, FlowEntryId flowEntryId);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index 69afc3d..461d231 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -22,6 +22,8 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
import net.onrc.onos.ofcontroller.flowprogrammer.web.FlowProgrammerWebRoutable;
+import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.registry.controller.IControllerRegistryService;
/**
@@ -47,6 +49,7 @@
protected volatile IFloodlightProviderService floodlightProvider;
protected volatile IControllerRegistryService registryService;
protected volatile IRestApiService restApi;
+ protected volatile IFlowService flowManager;
protected FlowPusher pusher;
private static final int NUM_PUSHER_THREAD = 1;
@@ -66,7 +69,7 @@
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
registryService = context.getServiceImpl(IControllerRegistryService.class);
restApi = context.getServiceImpl(IRestApiService.class);
-
+ flowManager = context.getServiceImpl(IFlowService.class);
pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
if (enableFlowSync) {
synchronizer.init(pusher);
@@ -138,6 +141,8 @@
case FLOW_REMOVED:
OFFlowRemoved flowMsg = (OFFlowRemoved) msg;
log.debug("Got flow removed from "+ sw.getId() +": "+ flowMsg.getCookie());
+ FlowEntryId id = new FlowEntryId(flowMsg.getCookie());
+ flowManager.flowEntryOnSwitchExpired(sw, id);
break;
default:
break;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 526c727..e4aa319 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -146,6 +147,7 @@
private Map<IOFSwitch,SwitchQueue> queues
= new HashMap<IOFSwitch,SwitchQueue>();
+ // reusable latch used for waiting for arrival of message
private Semaphore mutex = new Semaphore(0);
@Override
@@ -160,14 +162,16 @@
return;
}
- Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
+ // for safety of concurrent access, copy all key objects
+ Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
synchronized (queues) {
- entries = queues.entrySet();
+ for (IOFSwitch sw : queues.keySet()) {
+ keys.add(sw);
+ }
}
- for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
- IOFSwitch sw = entry.getKey();
- SwitchQueue queue = entry.getValue();
+ for (IOFSwitch sw : keys) {
+ SwitchQueue queue = queues.get(sw);
// Skip if queue is suspended
if (sw == null || queue == null ||
@@ -175,48 +179,62 @@
continue;
}
- // check sending rate and determine it to be sent or not
- long current_time = System.currentTimeMillis();
- long size = 0;
-
synchronized (queue) {
- if (queue.isSendable(current_time)) {
- int i = 0;
- while (! queue.isEmpty()) {
- // Number of messages excess the limit
- if (i >= MAX_MESSAGE_SEND) {
- // Messages remains in queue
- mutex.release();
- break;
- }
- ++i;
-
- OFMessage msg = queue.poll();
- try {
- messageDamper.write(sw, msg, context);
- log.debug("Pusher sends message : {}", msg);
- size += msg.getLength();
- } catch (IOException e) {
- e.printStackTrace();
- log.error("Exception in sending message ({}) : {}", msg, e);
+ processQueue(sw, queue, MAX_MESSAGE_SEND);
+ if (queue.isEmpty()) {
+ // remove queue if flagged to be.
+ if (queue.toBeDeleted) {
+ synchronized (queues) {
+ queues.remove(sw);
}
}
- sw.flush();
- queue.logSentData(current_time, size);
-
- if (queue.isEmpty()) {
- // remove queue if flagged to be.
- if (queue.toBeDeleted) {
- synchronized (queues) {
- queues.remove(sw);
- }
- }
+ } else {
+ // if some messages remains in queue, latch down
+ if (mutex.availablePermits() == 0) {
+ mutex.release();
}
}
}
}
}
}
+
+ /**
+ * Read messages from queue and send them to the switch.
+ * If number of messages excess the limit, stop sending messages.
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param max_msg Limitation of number of messages to be sent. If set to 0,
+ * all messages in queue will be sent.
+ */
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.currentTimeMillis();
+ long size = 0;
+
+ if (queue.isSendable(current_time)) {
+ int i = 0;
+ while (! queue.isEmpty()) {
+ // Number of messages excess the limit
+ if (0 < max_msg && max_msg <= i) {
+ break;
+ }
+ ++i;
+
+ OFMessage msg = queue.poll();
+ try {
+ messageDamper.write(sw, msg, context);
+ log.debug("Pusher sends message : {}", msg);
+ size += msg.getLength();
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error("Exception in sending message ({}) : {}", msg, e);
+ }
+ }
+ sw.flush();
+ queue.logSentData(current_time, size);
+ }
+ }
}
/**
@@ -363,7 +381,7 @@
FlowPusherThread proc = getProcess(sw);
queue = new SwitchQueue();
queue.state = QueueState.READY;
- synchronized (proc) {
+ synchronized (proc.queues) {
proc.queues.put(sw, queue);
}
diff --git a/web/ons-demo/js/app.js b/web/ons-demo/js/app.js
index 94c41e2..d869de7 100644
--- a/web/ons-demo/js/app.js
+++ b/web/ons-demo/js/app.js
@@ -1,6 +1,17 @@
/*global d3, document∆*/
+function updateFlow(model) {
+ model.flows.forEach(function (flow) {
+ flow.flowId = flow.flowId.value;
+ flow.installerId = flow.installerId.value;
+ flow.dstDpid = flow.dataPath.dstPort.dpid.value;
+ flow.srcDpid = flow.dataPath.srcPort.dpid.value;
+ flow.dstPort = flow.dataPath.dstPort.port.value;
+ flow.srcPort = flow.dataPath.srcPort.port.value;
+ });
+}
+
function sync() {
var d = Date.now();
@@ -8,6 +19,7 @@
// console.log('Update time: ' + (Date.now() - d)/1000 + 's');
if (newModel) {
+ updateFlow(newModel);
var modelChanged = false;
var newModelString = JSON.stringify(newModel);
if (!modelString || newModelString != modelString) {
diff --git a/web/topology_rest.py b/web/topology_rest.py
index bac3113..b3a415e 100755
--- a/web/topology_rest.py
+++ b/web/topology_rest.py
@@ -720,8 +720,8 @@
# print "Debug: Controller command %s called %s" % (cmd, controller_name)
else:
# No longer use -i to specify keys (use .ssh/config to specify it)
- start_onos="ssh %s ONOS/start-onos.sh start" % (controller_name)
- stop_onos="ssh %s ONOS/start-onos.sh stop" % (controller_name)
+ start_onos="ssh %s \"cd ONOS; ./start-onos.sh start\"" % (controller_name)
+ stop_onos="ssh %s \"cd ONOS; ./start-onos.sh stop\"" % (controller_name)
# start_onos="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-onos.sh start" % (controller_name)
# stop_onos="ssh -i ~/.ssh/onlabkey.pem %s ONOS/start-onos.sh stop" % (controller_name)
@@ -960,7 +960,7 @@
parsedResult = json.loads(result)
if len(parsedResult) > 0:
if parsedResult[-1].has_key('flowId'):
- flow_nr = int(parsedResult[-1]['flowId'], 16)
+ flow_nr = int(parsedResult[-1]['flowId']['value'], 16)
else:
flow_nr = -1 # first flow
print "first flow"