netty4 OpenFlow southbound

- separate I/O thread and message dispatch threads

Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java
index 0dbdef4..3dbc5e3 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/DefaultOpenFlowPacketContext.java
@@ -114,7 +114,7 @@
     public Dpid dpid() {
         checkPermission(PACKET_READ);
 
-        return new Dpid(sw.getId());
+        return sw.getDpid();
     }
 
     /**
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java
index fdc1248..9eac361 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/Dpid.java
@@ -123,7 +123,7 @@
      */
     public static URI uri(long value) {
         try {
-            return new URI(SCHEME, toHex(value), null);
+            return new URI(SCHEME + ":" + toHex(value));
         } catch (URISyntaxException e) {
             return null;
         }
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
new file mode 100644
index 0000000..744e3d1
--- /dev/null
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSession.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow.controller;
+
+import java.util.Collections;
+
+import org.projectfloodlight.openflow.protocol.OFMessage;
+
+/**
+ * Represents to OpenFlow session.
+ */
+public interface OpenFlowSession {
+
+    /**
+     * Returns session state.
+     *
+     * @return true if active.
+     */
+    boolean isActive();
+
+    /**
+     * Requests to close this OpenFlow session.
+     */
+    void closeSession();
+
+    /**
+     * Sends messages over this session.
+     * @param msgs to send
+     * @return true is messages were sent
+     */
+    boolean sendMsg(Iterable<OFMessage> msgs);
+
+    /**
+     * Sends message over this session.
+     *
+     * @param msg to send
+     * @return true is messages were sent
+     */
+    default boolean sendMsg(OFMessage msg) {
+        return sendMsg(Collections.singletonList(msg));
+    }
+
+    /**
+     * Returns debug information about this session.
+     *
+     * @return debug information
+     */
+    CharSequence sessionInfo();
+
+
+}
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
index 2ac0e02..8782236 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/OpenFlowSwitch.java
@@ -101,6 +101,15 @@
     long getId();
 
     /**
+     * Gets the datapathId of the switch.
+     *
+     * @return the switch dpid
+     */
+    default Dpid getDpid() {
+        return new Dpid(getId());
+    }
+
+    /**
      * fetch the manufacturer description.
      * @return the description
      */
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index eb6571e..0943ed1 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -16,12 +16,13 @@
 
 package org.onosproject.openflow.controller.driver;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
-import org.jboss.netty.channel.Channel;
-import org.onlab.packet.IpAddress;
+
 import org.onosproject.net.Device;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
@@ -43,8 +44,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -61,7 +60,7 @@
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
 
-    private Channel channel;
+    private OpenFlowSession channel;
     protected String channelId;
 
     private boolean connected;
@@ -71,6 +70,7 @@
     private final AtomicInteger xidCounter = new AtomicInteger(0);
 
     private OFVersion ofVersion;
+    private OFFactory ofFactory;
 
     protected List<OFPortDescStatsReply> ports = new ArrayList<>();
 
@@ -106,7 +106,7 @@
     @Override
     public final void disconnectSwitch() {
         setConnected(false);
-        this.channel.close();
+        this.channel.closeSession();
     }
 
     @Override
@@ -151,15 +151,14 @@
                           dpid, messages.size());
             } else {
                 // not transitioning to MASTER
-                log.warn("Dropping message for switch {} (role: {}, connected: {}): {}",
-                         dpid, role, channel.isConnected(), msgs);
+                log.warn("Dropping message for switch {} (role: {}, active: {}): {}",
+                         dpid, role, channel.isActive(), msgs);
             }
         }
     }
 
     private void sendMsgsOnChannel(List<OFMessage> msgs) {
-        if (channel.isConnected()) {
-            channel.write(msgs);
+        if (channel.sendMsg(msgs)) {
             agent.processDownstreamMessage(dpid, msgs);
         } else {
             log.warn("Dropping messages for switch {} because channel is not connected: {}",
@@ -197,18 +196,9 @@
     }
 
     @Override
-    public final void setChannel(Channel channel) {
+    public final void setChannel(OpenFlowSession channel) {
         this.channel = channel;
-        final SocketAddress address = channel.getRemoteAddress();
-        if (address instanceof InetSocketAddress) {
-            final InetSocketAddress inetAddress = (InetSocketAddress) address;
-            final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
-            if (ipAddress.isIp4()) {
-                channelId = ipAddress.toString() + ':' + inetAddress.getPort();
-            } else {
-                channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
-            }
-        }
+        channelId = channel.sessionInfo().toString();
     }
 
     @Override
@@ -226,6 +216,11 @@
     }
 
     @Override
+    public Dpid getDpid() {
+        return this.dpid;
+    }
+
+    @Override
     public final String getStringId() {
         return this.dpid.toString();
     }
@@ -233,6 +228,7 @@
     @Override
     public final void setOFVersion(OFVersion ofV) {
         this.ofVersion = ofV;
+        this.ofFactory = OFFactories.getFactory(ofV);
     }
 
     @Override
@@ -264,7 +260,11 @@
     @Override
     public final void handleMessage(OFMessage m) {
         if (this.role == RoleState.MASTER || m instanceof OFPortStatus) {
-            this.agent.processMessage(dpid, m);
+            try {
+                this.agent.processMessage(dpid, m);
+            } catch (Exception e) {
+                log.warn("Unhandled exception processing {}@{}", m, dpid, e);
+            }
         } else {
             log.trace("Dropping received message {}, was not MASTER", m);
         }
@@ -319,7 +319,7 @@
 
     @Override
     public OFFactory factory() {
-        return OFFactories.getFactory(ofVersion);
+        return ofFactory;
     }
 
     @Override
@@ -513,8 +513,9 @@
 
     @Override
     public String toString() {
-        return this.getClass().getName() + " [" + ((channel != null)
-                ? channel.getRemoteAddress() : "?")
-                + " DPID[" + ((getStringId() != null) ? getStringId() : "?") + "]]";
+        return MoreObjects.toStringHelper(getClass())
+                .add("session", channel.sessionInfo())
+                .add("dpid", dpid)
+                .toString();
     }
 }
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java
index e6e6fca..095944e 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/OpenFlowSwitchDriver.java
@@ -15,9 +15,9 @@
  */
 package org.onosproject.openflow.controller.driver;
 
-import org.jboss.netty.channel.Channel;
 import org.onosproject.net.driver.HandlerBehaviour;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.projectfloodlight.openflow.protocol.OFDescStatsReply;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
@@ -166,10 +166,11 @@
     void setTableFull(boolean full);
 
     /**
-     * Sets the associated Netty channel for this switch.
-     * @param channel the Netty channel
+     * Sets the associated OpenFlow session for this switch.
+     *
+     * @param session the OpenFlow session
      */
-    void setChannel(Channel channel);
+    void setChannel(OpenFlowSession session);
 
     /**
      * Sets whether the switch is connected.
@@ -227,4 +228,5 @@
      * @param message an OpenFlow message
      */
     void sendHandshakeMessage(OFMessage message);
+
 }