CORD-160 Add connect and disconnect methods to ovsdb

Change-Id: I66e777f8ec9c5834e27b1dc685fdeb197e30ce0d
diff --git a/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbClientService.java b/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbClientService.java
index ab88a24..c5f2092 100644
--- a/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbClientService.java
+++ b/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbClientService.java
@@ -228,4 +228,9 @@
      * @return ovsdb ports
      */
     Set<OvsdbPort> getLocalPorts(Iterable<String> ifaceids);
+
+    /**
+     * Disconnects the ovsdb server.
+     */
+    void disconnect();
 }
diff --git a/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbController.java b/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbController.java
index 9e24524..24bfeae 100644
--- a/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbController.java
+++ b/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/OvsdbController.java
@@ -15,6 +15,9 @@
  */
 package org.onosproject.ovsdb.controller;
 
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
+
 import java.util.List;
 
 /**
@@ -65,4 +68,12 @@
      * @return OvsdbClient ovsdb node information
      */
     OvsdbClientService getOvsdbClient(OvsdbNodeId nodeId);
+
+    /**
+     * Connect to the ovsdb server with given ip address and port number.
+     *
+     * @param ip ip address
+     * @param port port number
+     */
+    void connect(IpAddress ip, TpPort port);
 }
diff --git a/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java b/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java
index 0c64cc0..1b41be9 100644
--- a/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java
+++ b/ovsdb/api/src/main/java/org/onosproject/ovsdb/controller/driver/DefaultOvsdbClient.java
@@ -821,7 +821,7 @@
             Function<JsonNode, DatabaseSchema> rowFunction = new Function<JsonNode, DatabaseSchema>() {
                 @Override
                 public DatabaseSchema apply(JsonNode input) {
-                    log.info("Get ovsdb database schema", dbName);
+                    log.info("Get ovsdb database schema {}", dbName);
                     DatabaseSchema dbSchema = FromJsonUtil
                             .jsonNodeToDbSchema(dbName, input);
                     if (dbSchema == null) {
@@ -1185,4 +1185,10 @@
         }
         return ifaceid;
     }
+
+    @Override
+    public void disconnect() {
+        channel.disconnect();
+        this.agent.removeConnectedNode(nodeId);
+    }
 }
diff --git a/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java b/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java
index 0758232..2e84a16 100644
--- a/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java
+++ b/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/Controller.java
@@ -15,25 +15,38 @@
  */
 package org.onosproject.ovsdb.controller.impl;
 
+import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.CharsetUtil;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
 import org.onosproject.ovsdb.controller.OvsdbConstant;
 import org.onosproject.ovsdb.controller.OvsdbNodeId;
 import org.onosproject.ovsdb.controller.driver.DefaultOvsdbClient;
@@ -63,6 +76,9 @@
     private EventLoopGroup workerGroup;
     private Class<? extends ServerChannel> serverChannelClass;
 
+    private static final int MAX_RETRY = 5;
+    private static final int IDLE_TIMEOUT_SEC = 10;
+
     /**
      * Initialization.
      */
@@ -198,4 +214,86 @@
         workerGroup.shutdownGracefully();
         bossGroup.shutdownGracefully();
     }
+
+    /**
+     * Connect to the ovsdb server with given ip address and port number.
+     *
+     * @param ip ip address
+     * @param port port number
+     */
+    public void connect(IpAddress ip, TpPort port) {
+        ChannelFutureListener listener = new ConnectionListener(this, ip, port);
+        connectRetry(ip, port, listener);
+    }
+
+    private void connectRetry(IpAddress ip, TpPort port, ChannelFutureListener listener) {
+        try {
+            Bootstrap b = new Bootstrap();
+            b.group(workerGroup)
+                    .channel(NioSocketChannel.class)
+                    .option(ChannelOption.TCP_NODELAY, true)
+                    .handler(new ChannelInitializer<SocketChannel>() {
+
+                        @Override
+                        protected void initChannel(SocketChannel channel) throws Exception {
+                            ChannelPipeline p = channel.pipeline();
+                            p.addLast(new MessageDecoder(),
+                                      new StringEncoder(CharsetUtil.UTF_8),
+                                      new IdleStateHandler(IDLE_TIMEOUT_SEC, 0, 0),
+                                      new ConnectionHandler());
+                        }
+                    });
+            b.remoteAddress(ip.toString(), port.toInt());
+            b.connect().addListener(listener);
+        } catch (Exception e) {
+            log.warn("Connection to the ovsdb server {}:{} failed", ip.toString(), port.toString());
+        }
+    }
+
+    private class ConnectionListener implements ChannelFutureListener {
+        private Controller controller;
+        private IpAddress ip;
+        private TpPort port;
+        private AtomicInteger count = new AtomicInteger();
+
+        public ConnectionListener(Controller controller,
+                                  IpAddress ip,
+                                  TpPort port) {
+            this.controller = controller;
+            this.ip = ip;
+            this.port = port;
+        }
+
+        @Override
+        public void operationComplete(ChannelFuture channelFuture) throws Exception {
+            if (!channelFuture.isSuccess()) {
+                channelFuture.channel().close();
+
+                if (count.incrementAndGet() < MAX_RETRY) {
+                    final EventLoop loop = channelFuture.channel().eventLoop();
+
+                    loop.schedule(() -> {
+                        controller.connectRetry(this.ip, this.port, this);
+                    }, 1L, TimeUnit.SECONDS);
+                } else {
+                    log.info("Connection to the ovsdb {}:{} failed",
+                             this.ip.toString(), this.port.toString());
+                }
+            } else {
+                handleNewNodeConnection(channelFuture.channel());
+            }
+        }
+    }
+
+    private class ConnectionHandler extends ChannelDuplexHandler {
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+            IdleStateEvent e = (IdleStateEvent) evt;
+
+            if (e.state() == IdleState.READER_IDLE) {
+                ctx.close();
+            }
+        }
+    }
 }
diff --git a/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java b/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java
index 9b48296..beeaa9c 100644
--- a/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java
+++ b/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java
@@ -33,6 +33,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
 import org.onosproject.ovsdb.controller.DefaultEventSubject;
 import org.onosproject.ovsdb.controller.EventSubject;
 import org.onosproject.ovsdb.controller.OvsdbClientService;
@@ -142,6 +143,11 @@
         return ovsdbClients.get(nodeId);
     }
 
+    @Override
+    public void connect(IpAddress ip, TpPort port) {
+        controller.connect(ip, port);
+    }
+
     /**
      * Implementation of an Ovsdb Agent which is responsible for keeping track
      * of connected node and the state in which they are.
diff --git a/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java b/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java
index 37942c2..1956a1e 100644
--- a/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java
+++ b/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbJsonRpcHandler.java
@@ -89,7 +89,7 @@
      */
     private void processOvsdbMessage(JsonNode jsonNode) {
 
-        log.info("Handle ovsdb message");
+        log.debug("Handle ovsdb message");
 
         if (jsonNode.has("result")) {
 
diff --git a/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java b/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
index 5e4c567..7663a64 100644
--- a/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
+++ b/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
@@ -27,6 +27,7 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
+import org.onlab.packet.TpPort;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
 import org.onosproject.net.device.DeviceDescription;
@@ -193,6 +194,10 @@
             return null;
         }
 
+        @Override
+        public void connect(IpAddress ip, TpPort port) {
+
+        }
     }
 
 }
diff --git a/providers/ovsdb/host/src/test/java/org/onosproject/ovsdb/provider/host/OvsdbHostProviderTest.java b/providers/ovsdb/host/src/test/java/org/onosproject/ovsdb/provider/host/OvsdbHostProviderTest.java
index 019cfa1..01e07dd 100644
--- a/providers/ovsdb/host/src/test/java/org/onosproject/ovsdb/provider/host/OvsdbHostProviderTest.java
+++ b/providers/ovsdb/host/src/test/java/org/onosproject/ovsdb/provider/host/OvsdbHostProviderTest.java
@@ -26,6 +26,7 @@
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.HostId;
 import org.onosproject.net.host.HostDescription;
@@ -201,5 +202,10 @@
         public OvsdbClientService getOvsdbClient(OvsdbNodeId nodeId) {
             return null;
         }
+
+        @Override
+        public void connect(IpAddress ip, TpPort port) {
+
+        }
     }
 }