[ONOS-5451] Implement virtual switch and controller connection handler

- ONOS-5453 that implement OF message encoder/decoder is also covered
- When OFConnectionHandler.connect() is called, it connects to the tenant controller with given vSwitch
- OpenFlow sesstion establishment will covered with ONOS-5452

Change-Id: I0c69d0ceac5aa04590d41f5b26170939ef6f5268
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
index 0252049..8ee6f64 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitch.java
@@ -42,6 +42,4 @@
      * @return true if the switch is connected, false otherwise
      */
     boolean isConnected();
-
-    // TODO add builder interface
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java
index a385e8b..c9d35e9 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchCapabilities.java
@@ -15,10 +15,79 @@
  */
 package org.onosproject.ofagent.api;
 
+import org.projectfloodlight.openflow.protocol.OFCapabilities;
+
+import java.util.Set;
+
 /**
  * Representation of capabilities of a virtual OpenFlow switch.
  */
 public interface OFSwitchCapabilities {
 
-    // TODO implement
+    /**
+     * Returns the capabilities of the switch.
+     *
+     * @return capabilities
+     */
+    Set<OFCapabilities> ofSwitchCapabilities();
+
+    interface Builder {
+
+        /**
+         * Builds a OFSwitchCapabilities object.
+         *
+         * @return OFSwitchCapabilities
+         */
+        OFSwitchCapabilities build();
+
+        /**
+         * Enable OFPC_FLOW_STATS capability.
+         *
+         * @return Builder object
+         */
+        Builder flowStats();
+
+        /**
+         * Enable OFPC_TABLE_STATS capability.
+         *
+         * @return Builder object
+         */
+        Builder tableStats();
+
+        /**
+         * Enable OFPC_PORT_STATS capability.
+         *
+         * @return Builder object
+         */
+        Builder portStats();
+
+        /**
+         * Enable OFPC_GROUP_STATS capability.
+         *
+         * @return Builder object
+         */
+        Builder groupStats();
+
+        /**
+         * Enable OFPC_IP_REASM capability.
+         *
+         * @return Builder object
+         */
+        Builder ipReasm();
+
+        /**
+         * Enable OFPC_QUEUE_STATS capability.
+         *
+         * @return Builder object
+         */
+        Builder queueStats();
+
+        /**
+         * Enable OFPC_PORT_BLOCKED capability.
+         *
+         * @return Builder object
+         */
+        Builder portBlocked();
+    }
+
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
index de925b5..4f33b51 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/api/OFSwitchService.java
@@ -121,4 +121,19 @@
      * @param msg     packet out message with lldp
      */
     void processLldp(Channel channel, OFMessage msg);
+
+    /**
+     * Sends hello to the controller.
+     *
+     * @param channel received channel
+     */
+    void sendOfHello(Channel channel);
+
+    /**
+     * Processes echo request from the controllers.
+     *
+     * @param channel received channel
+     * @param msg     echo request message
+     */
+    void processEchoRequest(Channel channel, OFMessage msg);
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
new file mode 100644
index 0000000..32956c4
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFController.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
+import org.onosproject.ofagent.api.OFController;
+
+/**
+ * Implementation of tenant openflow controller.
+ */
+public class DefaultOFController implements OFController {
+    private IpAddress ip;
+    private TpPort port;
+
+    public DefaultOFController(IpAddress ip, TpPort port) {
+        this.ip = ip;
+        this.port = port;
+    }
+
+    @Override
+    public IpAddress ip() {
+        return ip;
+    }
+
+    @Override
+    public TpPort port() {
+        return port;
+    }
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
index 01068c9..4258321 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitch.java
@@ -15,16 +15,27 @@
  */
 package org.onosproject.ofagent.impl;
 
+import com.google.common.collect.Lists;
 import io.netty.channel.Channel;
 import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.ofagent.api.OFSwitch;
 import org.onosproject.ofagent.api.OFSwitchCapabilities;
 import org.projectfloodlight.openflow.protocol.OFControllerRole;
+import org.projectfloodlight.openflow.protocol.OFEchoReply;
+import org.projectfloodlight.openflow.protocol.OFEchoRequest;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFFeaturesReply;
+import org.projectfloodlight.openflow.protocol.OFHello;
 import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFVersion;
+import org.projectfloodlight.openflow.types.DatapathId;
 
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -39,33 +50,42 @@
 
     private static final String ERR_CH_DUPLICATE = "Channel already exists: ";
     private static final String ERR_CH_NOT_FOUND = "Channel not found: ";
+    private static final long NUM_BUFFERS = 1024;
+    private static final short NUM_TABLES = 3;
 
     private final Device device;
     private final OFSwitchCapabilities capabilities;
+    private final DatapathId datapathId;
 
     private final ConcurrentHashMap<Channel, OFControllerRole> controllerRoleMap
             = new ConcurrentHashMap<>();
 
-    private DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) {
+    protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
+    private int handshakeTransactionIds;
+
+    public DefaultOFSwitch(Device device, OFSwitchCapabilities capabilities) {
         this.device = device;
         this.capabilities = capabilities;
+        datapathId = getDpidFromDeviceId(device.id());
+        handshakeTransactionIds = -1;
+
     }
 
     // TODO add builder
 
     @Override
     public Device device() {
-        return null;
+        return device;
     }
 
     @Override
     public OFSwitchCapabilities capabilities() {
-        return null;
+        return capabilities;
     }
 
     @Override
     public boolean isConnected() {
-        return false;
+        return !controllerChannels().isEmpty();
     }
 
     @Override
@@ -162,10 +182,59 @@
     @Override
     public void processFeaturesRequest(Channel channel, OFMessage msg) {
         // TODO process features request and send reply
+        List<OFMessage> ofMessageList = Lists.newArrayList();
+
+        OFFeaturesReply.Builder frBuilder = FACTORY.buildFeaturesReply()
+                .setDatapathId(datapathId)
+                .setNBuffers(NUM_BUFFERS)
+                .setNTables(NUM_TABLES)
+                .setCapabilities(capabilities.ofSwitchCapabilities())
+                .setXid(msg.getXid());
+
+        ofMessageList.add(frBuilder.build());
+        channel.write(ofMessageList);
+
     }
 
     @Override
     public void processLldp(Channel channel, OFMessage msg) {
         // TODO process lldp
     }
+
+    @Override
+    public void sendOfHello(Channel channel) {
+        List<OFMessage> ofMessageList = Lists.newArrayList();
+        OFHello.Builder ofHello = FACTORY.buildHello()
+                .setXid(this.handshakeTransactionIds--);
+
+        ofMessageList.add(ofHello.build());
+        channel.write(ofMessageList);
+    }
+
+    @Override
+    public void processEchoRequest(Channel channel, OFMessage msg) {
+        List<OFMessage> ofMessageList = Lists.newArrayList();
+        OFEchoReply.Builder echoBuilder = FACTORY.buildEchoReply()
+                .setXid(msg.getXid())
+                .setData(((OFEchoRequest) msg).getData());
+
+        ofMessageList.add(echoBuilder.build());
+        channel.write(ofMessageList);
+    }
+
+    private DatapathId getDpidFromDeviceId(DeviceId deviceId) {
+        String deviceIdToString = deviceId.toString().split(":")[1];
+
+        assert (deviceIdToString.length() == 16);
+
+        String resultedHexString = new String();
+        for (int i = 0; i < 8; i++) {
+            resultedHexString = resultedHexString + deviceIdToString.charAt(2 * i)
+                    + deviceIdToString.charAt(2 * i + 1);
+            if (i != 7) {
+                resultedHexString += ":";
+            }
+        }
+        return DatapathId.of(resultedHexString);
+    }
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitchCapabilities.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitchCapabilities.java
new file mode 100644
index 0000000..3ac0f11
--- /dev/null
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/DefaultOFSwitchCapabilities.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.ofagent.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.onosproject.ofagent.api.OFSwitchCapabilities;
+import org.projectfloodlight.openflow.protocol.OFCapabilities;
+
+import java.util.Set;
+
+/**
+ * Implementation of openflow switch capabilities.
+ */
+public final class DefaultOFSwitchCapabilities implements OFSwitchCapabilities {
+
+    private final Set<OFCapabilities> ofCapabilities;
+
+    private DefaultOFSwitchCapabilities(Set<OFCapabilities> ofSwitchCapabilities) {
+        this.ofCapabilities = ImmutableSet.copyOf(ofSwitchCapabilities);
+    }
+
+    @Override
+    public Set<OFCapabilities> ofSwitchCapabilities() {
+        return ofCapabilities;
+    }
+
+    /**
+     * Returns DefaultOFSwitchCapabilities builder object.
+     *
+     * @return DefaultOFSwitchCapabilities builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder implements OFSwitchCapabilities.Builder {
+        private Set<OFCapabilities> ofCapabilities;
+
+        private Builder() {
+            ofCapabilities = Sets.newHashSet();
+        }
+
+        @Override
+        public Builder flowStats() {
+            ofCapabilities.add(OFCapabilities.FLOW_STATS);
+            return this;
+        }
+
+        @Override
+        public Builder tableStats() {
+            ofCapabilities.add(OFCapabilities.TABLE_STATS);
+            return this;
+        }
+
+        @Override
+        public Builder portStats() {
+            ofCapabilities.add(OFCapabilities.PORT_STATS);
+            return this;
+        }
+
+        @Override
+        public Builder groupStats() {
+            ofCapabilities.add(OFCapabilities.GROUP_STATS);
+            return this;
+        }
+
+        @Override
+        public Builder ipReasm() {
+            ofCapabilities.add(OFCapabilities.IP_REASM);
+            return this;
+        }
+
+        @Override
+        public Builder queueStats() {
+            ofCapabilities.add(OFCapabilities.QUEUE_STATS);
+            return this;
+        }
+
+        @Override
+        public Builder portBlocked() {
+            ofCapabilities.add(OFCapabilities.PORT_BLOCKED);
+            return this;
+        }
+
+        @Override
+        public OFSwitchCapabilities build() {
+            return new DefaultOFSwitchCapabilities(ofCapabilities);
+        }
+    }
+}
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
index b077af4..38530737 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelHandler.java
@@ -15,15 +15,26 @@
  */
 package org.onosproject.ofagent.impl;
 
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.ReferenceCountUtil;
+import io.netty.handler.timeout.ReadTimeoutException;
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.osgi.ServiceDirectory;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
 import org.onosproject.ofagent.api.OFSwitch;
+import org.projectfloodlight.openflow.protocol.OFErrorMsg;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFFactory;
 import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+
 /**
  * Implementation of OpenFlow channel handler.
  * It processes OpenFlow message according to the channel state.
@@ -33,10 +44,12 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final OFSwitch ofSwitch;
 
-    private Channel channel;
+    private ChannelHandlerContext ctx;
     private ChannelState state;
+    protected static final OFFactory FACTORY = OFFactories.getFactory(OFVersion.OF_13);
+    protected VirtualNetworkService vNetService;
 
-    private enum ChannelState {
+    enum ChannelState {
 
         INIT {
             @Override
@@ -49,14 +62,38 @@
             @Override
             void processOFMessage(final OFChannelHandler handler,
                                   final OFMessage msg) {
-                // TODO implement
+
+                switch (msg.getType()) {
+                    case HELLO:
+                        handler.setState(ChannelState.WAIT_FEATURE_REQUEST);
+                        break;
+                    default:
+                        handler.illegalMessageReceived(msg);
+                        break;
+                }
             }
         },
         WAIT_FEATURE_REQUEST {
             @Override
             void processOFMessage(final OFChannelHandler handler,
                                   final OFMessage msg) {
-                // TODO implement
+
+                switch (msg.getType()) {
+                    case FEATURES_REQUEST:
+                        handler.ofSwitch.processFeaturesRequest(handler.ctx.channel(), msg);
+                        handler.setState(ChannelState.ESTABLISHED);
+                        break;
+                    case ECHO_REQUEST:
+                        handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+                        break;
+                    case ERROR:
+                        handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+                        break;
+                    default:
+                        handler.illegalMessageReceived(msg);
+                        break;
+
+                }
             }
         },
         ESTABLISHED {
@@ -65,9 +102,32 @@
                                   final OFMessage msg) {
                 // TODO implement
                 // TODO add this channel to ofSwitch role service
+                switch (msg.getType()) {
+                    case STATS_REQUEST:
+                        //TODO implement
+                        //TODO: use vNetService to build OFPortDesc.
+                        break;
+                    case SET_CONFIG:
+                        //TODO implement
+                        break;
+                    case GET_CONFIG_REQUEST:
+                        //TODO implement
+                        break;
+                    case BARRIER_REQUEST:
+                        //TODO implement
+                        break;
+                    case ECHO_REQUEST:
+                        handler.ofSwitch.processEchoRequest(handler.ctx.channel(), msg);
+                        break;
+                    case ERROR:
+                        handler.logErrorClose(handler.ctx, (OFErrorMsg) msg);
+                        break;
+                    default:
+                        handler.unhandledMessageReceived(msg);
+                        break;
+                }
             }
         };
-
         abstract void processOFMessage(final OFChannelHandler handler,
                                        final OFMessage msg);
     }
@@ -78,39 +138,91 @@
      * @param ofSwitch openflow switch that owns this channel
      */
     public OFChannelHandler(OFSwitch ofSwitch) {
+        super();
         this.ofSwitch = ofSwitch;
+
+        setState(ChannelState.INIT);
+
+        ServiceDirectory services = new DefaultServiceDirectory();
+        vNetService = services.get(VirtualNetworkService.class);
     }
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        super.channelActive(ctx);
-        this.channel = ctx.channel();
+        this.ctx = ctx;
+        log.debug("Channel Active. Send OF_13 Hello to {}", ctx.channel().remoteAddress());
+
+        try {
+            ofSwitch.sendOfHello(ctx.channel());
+            setState(ChannelState.WAIT_HELLO);
+        } catch (Throwable cause) {
+            log.error("Exception occured because of{}", cause.getMessage());
+        }
     }
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg)
             throws Exception {
-        try {
-            OFMessage ofMsg = (OFMessage) msg;
-            // TODO process OF message
 
-        } finally {
-            ReferenceCountUtil.release(msg);
+        try {
+            if (msg instanceof List) {
+                ((List) msg).forEach(ofm -> {
+                    state.processOFMessage(this, (OFMessage) ofm);
+                });
+            } else {
+                state.processOFMessage(this, (OFMessage) msg);
+            }
+        } catch (Throwable cause) {
+            log.error("Exception occured {}", cause.getMessage());
         }
+
     }
 
     @Override
     public void channelReadComplete(ChannelHandlerContext ctx)
             throws Exception {
-        ctx.flush();
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        if (cause instanceof ReadTimeoutException) {
+            log.error("Connection closed because of ReadTimeoutException {}", cause.getMessage());
+        } else if (cause instanceof ClosedChannelException) {
+            log.error("ClosedChannelException occured");
+            return;
+        } else if (cause instanceof RejectedExecutionException) {
+            log.error("Could not process message: queue full");
+        } else if (cause instanceof IOException) {
+            log.error("IOException occured");
+        } else {
+            log.error("Error while processing message from switch {}", cause.getMessage());
+        }
         ctx.close();
     }
 
     private void setState(ChannelState state) {
         this.state = state;
     }
+
+    private void logErrorClose(ChannelHandlerContext ctx, OFErrorMsg errorMsg) {
+        log.error("{} from switch {} in state {}",
+                errorMsg,
+                ofSwitch.device().id().toString(),
+                state);
+
+        log.error("Disconnecting...");
+        ctx.close();
+    }
+
+    private void illegalMessageReceived(OFMessage ofMessage) {
+        log.warn("Controller should never send this message {} in current state {}",
+                ofMessage.getType().toString(),
+                state);
+    }
+
+    private void unhandledMessageReceived(OFMessage ofMessage) {
+        log.warn("Unhandled message {} received in state {}. Ignored",
+                ofMessage.getType().toString(),
+                state);
+    }
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
index a3be413..e93b0c9 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFChannelInitializer.java
@@ -17,6 +17,7 @@
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import org.onosproject.ofagent.api.OFSwitch;
 
 /**
@@ -25,6 +26,7 @@
 public final class OFChannelInitializer extends ChannelInitializer<SocketChannel> {
 
     private final OFSwitch ofSwitch;
+    private static final int READ_TIMEOUT = 30;
 
     /**
      * Default constructor.
@@ -38,6 +40,9 @@
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
 
-        // TODO configure OF channel pipeline
+        ch.pipeline().addLast(new OFMessageDecoder())
+                .addLast(new OFMessageEncoder())
+                .addLast(new ReadTimeoutHandler(READ_TIMEOUT))
+                .addLast(new OFChannelHandler(ofSwitch));
     }
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
index 27a6bc5..7f59cf5 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFConnectionHandler.java
@@ -15,14 +15,19 @@
  */
 package org.onosproject.ofagent.impl;
 
+import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 import org.onosproject.ofagent.api.OFController;
 import org.onosproject.ofagent.api.OFSwitch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -33,12 +38,11 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final int MAX_RETRY = 10;
-
-    private final AtomicInteger retryCount = new AtomicInteger();
+    private final AtomicInteger retryCount;
     private final OFSwitch ofSwitch;
     private final OFController controller;
-    private final NioEventLoopGroup workGroup;
+    private final EventLoopGroup workGroup;
+    private static final int MAX_RETRY = 3;
 
     /**
      * Default constructor.
@@ -48,28 +52,43 @@
      * @param workGroup  work group for connection
      */
     public OFConnectionHandler(OFSwitch ofSwitch, OFController controller,
-                               NioEventLoopGroup workGroup) {
+                               EventLoopGroup workGroup) {
         this.ofSwitch = ofSwitch;
         this.controller = controller;
         this.workGroup = workGroup;
+        this.retryCount = new AtomicInteger();
     }
 
     /**
      * Creates a connection to the supplied controller.
+     *
      */
     public void connect() {
-        // TODO initiates a connection to the controller
+
+        SocketAddress remoteAddr = new InetSocketAddress(controller.ip().toInetAddress(), controller.port().toInt());
+
+        log.debug("Connecting to controller {}:{}", controller.ip(), controller.port());
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(workGroup)
+                .channel(NioSocketChannel.class)
+                .option(ChannelOption.SO_KEEPALIVE, true)
+                .handler(new OFChannelInitializer(ofSwitch));
+
+        bootstrap.connect(remoteAddr).addListener(this);
     }
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
 
         if (future.isSuccess()) {
-            log.debug("{} is connected to controller {}", ofSwitch.device().id(), controller);
-            // TODO do something for a new connection if there's any
+            ofSwitch.addControllerChannel(future.channel());
+            log.debug("Connected to controller {}:{} for device {}",
+                    controller.ip(), controller.port(), ofSwitch.device().id());
         } else {
-            log.debug("{} failed to connect {}, retry..", ofSwitch.device().id(), controller);
-            // TODO retry connect if retry count is less than MAX
+            log.info("Failed to connect controller {}:{}. Retry...", controller.ip(), controller.port());
+            if (retryCount.getAndIncrement() < MAX_RETRY) {
+                this.connect();
+            }
         }
     }
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
index 79d0f4e..7e3d1d4 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageDecoder.java
@@ -18,6 +18,9 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFMessageReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,9 +36,18 @@
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
             throws Exception {
-        log.trace("Received message from {}: {}", ctx.channel().remoteAddress(),
-                  in.readByte());
 
-        // TODO decode byte message to OFMessage
+        if (!ctx.channel().isActive()) {
+            return;
+        }
+
+        try {
+            OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
+            OFMessage message = reader.readFrom(in);
+            out.add(message);
+        } catch (Throwable cause) {
+            log.error("Exception occured while processing decoding because of {}", cause.getMessage());
+        }
+
     }
 }
diff --git a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
index 1a67602..3d9f8ee 100644
--- a/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
+++ b/apps/ofagent/src/main/java/org/onosproject/ofagent/impl/OFMessageEncoder.java
@@ -16,19 +16,50 @@
 package org.onosproject.ofagent.impl;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.EncoderException;
 import io.netty.handler.codec.MessageToByteEncoder;
 import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Encodes OFMessage to a byte buffer.
  */
 public final class OFMessageEncoder extends MessageToByteEncoder<Iterable<OFMessage>> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Override
     protected void encode(ChannelHandlerContext ctx, Iterable<OFMessage> msgList, ByteBuf out)
             throws Exception {
 
-        // TODO encode OFMessage to ByteBuf
+        if (!ctx.channel().isActive()) {
+            return;
+        }
+
+        if (msgList instanceof Iterable) {
+            msgList.forEach(msg -> {
+                try {
+                    ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer();
+                    msg.writeTo(byteBuf);
+
+                    ctx.writeAndFlush(byteBuf);
+                } catch (Exception e) {
+                    log.error("error occured because of {}", e.getMessage());
+                }
+            });
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        if (cause instanceof EncoderException) {
+            log.error("Connection closed because of EncoderException {}", cause.getMessage());
+            ctx.close();
+        } else {
+            log.error("Exception occured while processing encoding because of {}", cause.getMessage());
+            ctx.close();
+        }
     }
 }