netty4 OpenFlow southbound
- separate I/O thread and message dispatch threads
Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java
index 0dbdef4..3dbc5e3 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java
@@ -114,7 +114,7 @@
public Dpid dpid() {
checkPermission(PACKET_READ);
- return new Dpid(sw.getId());
+ return sw.getDpid();
}
/**
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java
index fdc1248..9eac361 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java
@@ -123,7 +123,7 @@
*/
public static URI uri(long value) {
try {
- return new URI(SCHEME, toHex(value), null);
+ return new URI(SCHEME + ":" + toHex(value));
} catch (URISyntaxException e) {
return null;
}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
new file mode 100644
index 0000000..744e3d1
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import java.util.Collections;
+
+import org.projectfloodlight.openflow.protocol.OFMessage;
+
+/**
+ * Represents to OpenFlow session.
+ */
+public interface OpenFlowSession {
+
+ /**
+ * Returns session state.
+ *
+ * @return true if active.
+ */
+ boolean isActive();
+
+ /**
+ * Requests to close this OpenFlow session.
+ */
+ void closeSession();
+
+ /**
+ * Sends messages over this session.
+ * @param msgs to send
+ * @return true is messages were sent
+ */
+ boolean sendMsg(Iterable<OFMessage> msgs);
+
+ /**
+ * Sends message over this session.
+ *
+ * @param msg to send
+ * @return true is messages were sent
+ */
+ default boolean sendMsg(OFMessage msg) {
+ return sendMsg(Collections.singletonList(msg));
+ }
+
+ /**
+ * Returns debug information about this session.
+ *
+ * @return debug information
+ */
+ CharSequence sessionInfo();
+
+
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
index 2ac0e02..8782236 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
@@ -101,6 +101,15 @@
long getId();
/**
+ * Gets the datapathId of the switch.
+ *
+ * @return the switch dpid
+ */
+ default Dpid getDpid() {
+ return new Dpid(getId());
+ }
+
+ /**
* fetch the manufacturer description.
* @return the description
*/
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index eb6571e..0943ed1 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -16,12 +16,13 @@
package org.onosproject.openflow.controller.driver;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
-import org.jboss.netty.channel.Channel;
-import org.onlab.packet.IpAddress;
+
import org.onosproject.net.Device;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
@@ -43,8 +44,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -61,7 +60,7 @@
protected final Logger log = LoggerFactory.getLogger(getClass());
- private Channel channel;
+ private OpenFlowSession channel;
protected String channelId;
private boolean connected;
@@ -71,6 +70,7 @@
private final AtomicInteger xidCounter = new AtomicInteger(0);
private OFVersion ofVersion;
+ private OFFactory ofFactory;
protected List<OFPortDescStatsReply> ports = new ArrayList<>();
@@ -106,7 +106,7 @@
@Override
public final void disconnectSwitch() {
setConnected(false);
- this.channel.close();
+ this.channel.closeSession();
}
@Override
@@ -151,15 +151,14 @@
dpid, messages.size());
} else {
// not transitioning to MASTER
- log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
- dpid, role, channel.isConnected(), msgs);
+ log.warn("Dropping message for switch {} (role: {}, active: {}): {}",
+ dpid, role, channel.isActive(), msgs);
}
}
}
private void sendMsgsOnChannel(List<OFMessage> msgs) {
- if (channel.isConnected()) {
- channel.write(msgs);
+ if (channel.sendMsg(msgs)) {
agent.processDownstreamMessage(dpid, msgs);
} else {
log.warn("Dropping messages for switch {} because channel is not connected: {}",
@@ -197,18 +196,9 @@
}
@Override
- public final void setChannel(Channel channel) {
+ public final void setChannel(OpenFlowSession channel) {
this.channel = channel;
- final SocketAddress address = channel.getRemoteAddress();
- if (address instanceof InetSocketAddress) {
- final InetSocketAddress inetAddress = (InetSocketAddress) address;
- final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
- if (ipAddress.isIp4()) {
- channelId = ipAddress.toString() + ':' + inetAddress.getPort();
- } else {
- channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
- }
- }
+ channelId = channel.sessionInfo().toString();
}
@Override
@@ -226,6 +216,11 @@
}
@Override
+ public Dpid getDpid() {
+ return this.dpid;
+ }
+
+ @Override
public final String getStringId() {
return this.dpid.toString();
}
@@ -233,6 +228,7 @@
@Override
public final void setOFVersion(OFVersion ofV) {
this.ofVersion = ofV;
+ this.ofFactory = OFFactories.getFactory(ofV);
}
@Override
@@ -264,7 +260,11 @@
@Override
public final void handleMessage(OFMessage m) {
if (this.role == RoleState.MASTER || m instanceof OFPortStatus) {
- this.agent.processMessage(dpid, m);
+ try {
+ this.agent.processMessage(dpid, m);
+ } catch (Exception e) {
+ log.warn("Unhandled exception processing {}@{}", m, dpid, e);
+ }
} else {
log.trace("Dropping received message {}, was not MASTER", m);
}
@@ -319,7 +319,7 @@
@Override
public OFFactory factory() {
- return OFFactories.getFactory(ofVersion);
+ return ofFactory;
}
@Override
@@ -513,8 +513,9 @@
@Override
public String toString() {
- return this.getClass().getName() + " [" + ((channel != null)
- ? channel.getRemoteAddress() : "?")
- + " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
+ return MoreObjects.toStringHelper(getClass())
+ .add("session", channel.sessionInfo())
+ .add("dpid", dpid)
+ .toString();
}
}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java
index e6e6fca..095944e 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java
@@ -15,9 +15,9 @@
*/
package org.onosproject.openflow.controller.driver;
-import org.jboss.netty.channel.Channel;
import org.onosproject.net.driver.HandlerBehaviour;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
@@ -166,10 +166,11 @@
void setTableFull(boolean full);
/**
- * Sets the associated Netty channel for this switch.
- * @param channel the Netty channel
+ * Sets the associated OpenFlow session for this switch.
+ *
+ * @param session the OpenFlow session
*/
- void setChannel(Channel channel);
+ void setChannel(OpenFlowSession session);
/**
* Sets whether the switch is connected.
@@ -227,4 +228,5 @@
* @param message an OpenFlow message
*/
void sendHandshakeMessage(OFMessage message);
+
}