[ONOS-7566] Implementation of NetconfProxySession

Change-Id: I01cbe0b10ac36cb6db53127555b551f405acdeb1
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
index c952027..dfbe760 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.netconf.ctl.impl;
 
+import org.onosproject.netconf.NetconfController;
 import org.onosproject.netconf.NetconfDevice;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfException;
@@ -34,24 +35,37 @@
 
     private NetconfDeviceInfo netconfDeviceInfo;
     private boolean deviceState = true;
-    private final NetconfSessionFactory sessionFactory;
     private NetconfSession netconfSession;
+    private boolean isMasterSession = false;
+    private NetconfSession netconfProxySession;
 
     // will block until hello RPC handshake completes
     /**
      * Creates a new default NETCONF device with the information provided.
      * The device gets created only if no exception is thrown while connecting to
      * it and establishing the NETCONF session.
+     * The secure transport session will only be created if isMaster is true.
      * @param deviceInfo information about the device to be created.
+     * @param isMaster if true create secure transport session, otherwise create proxy session.
+     * @param netconfController netconf controller object
      * @throws NetconfException if there are problems in creating or establishing
      * the underlying NETCONF connection and session.
      */
-    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo)
+    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo,
+                                boolean isMaster,
+                                NetconfController netconfController)
             throws NetconfException {
         netconfDeviceInfo = deviceInfo;
-        sessionFactory = (ncDevInfo) -> new NetconfSessionMinaImpl(ncDevInfo);
         try {
-            netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+            if (isMaster) {
+                netconfSession = new NetconfSessionMinaImpl(deviceInfo);
+                isMasterSession = true;
+                netconfProxySession = netconfSession;
+            } else {
+                netconfProxySession = new NetconfSessionProxyImpl
+                        .ProxyNetconfSessionFactory()
+                        .createNetconfSession(deviceInfo, netconfController);
+            }
         } catch (NetconfException e) {
             deviceState = false;
             throw new NetconfException("Cannot create connection and session for device " +
@@ -64,18 +78,30 @@
      * Creates a new default NETCONF device with the information provided.
      * The device gets created only if no exception is thrown while connecting to
      * it and establishing the NETCONF session.
-     *
+     * The secure transport session will only be created if isMaster is true.
      * @param deviceInfo information about the device to be created.
      * @param factory the factory used to create the session
+     * @param isMaster if true create secure transport session, otherwise create proxy session.
+     * @param netconfController netconf controller object
      * @throws NetconfException if there are problems in creating or establishing
      * the underlying NETCONF connection and session.
      */
-    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo, NetconfSessionFactory factory)
+    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo,
+                                NetconfSessionFactory factory,
+                                boolean isMaster,
+                                NetconfController netconfController)
             throws NetconfException {
         netconfDeviceInfo = deviceInfo;
-        sessionFactory = factory;
         try {
-            netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+            if (isMaster) {
+                netconfSession = factory.createNetconfSession(deviceInfo, netconfController);
+                isMasterSession = true;
+                netconfProxySession = netconfSession;
+            } else {
+                netconfProxySession = new NetconfSessionProxyImpl
+                        .ProxyNetconfSessionFactory()
+                        .createNetconfSession(deviceInfo, netconfController);
+            }
         } catch (NetconfException e) {
             deviceState = false;
             throw new NetconfException("Cannot create connection and session for device " +
@@ -90,23 +116,29 @@
 
     @Override
     public NetconfSession getSession() {
-        return netconfSession;
+        return netconfProxySession;
     }
 
     @Override
     public void disconnect() {
         deviceState = false;
         try {
-            netconfSession.close();
+            if (isMasterSession) {
+                netconfSession.close();
+            }
+            netconfProxySession.close();
         } catch (NetconfException e) {
             log.warn("Cannot communicate with the device {} session already closed", netconfDeviceInfo);
         }
     }
 
     @Override
+    public boolean isMasterSession() {
+        return isMasterSession;
+    }
+
+    @Override
     public NetconfDeviceInfo getDeviceInfo() {
         return netconfDeviceInfo;
     }
-
-
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
new file mode 100644
index 0000000..1a57d4d
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl.impl;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.net.DeviceId;
+import org.onosproject.netconf.NetconfProxyMessage;
+
+import java.util.List;
+
+/**
+ * Default implementation of Netconf Proxy Message.
+ */
+public class DefaultNetconfProxyMessage implements NetconfProxyMessage {
+
+    private final SubjectType subjectType;
+    private final DeviceId deviceId;
+    private final List<String> arguments;
+
+    /**
+     * Create new NetconfProxyMessage with provided informations.
+     * @param subType Message subject type.
+     * @param devId Device information that recieve message.
+     * @param args Messages arguments.
+     */
+    public DefaultNetconfProxyMessage(SubjectType subType,
+                                      DeviceId devId,
+                                      List<String> args) {
+        subjectType = subType;
+        deviceId = devId;
+        arguments = args;
+    }
+
+    @Override
+    public SubjectType subjectType() {
+        return subjectType;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    @Override
+    public List<String> arguments() {
+        return ImmutableList.copyOf(arguments);
+    }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index ece7ca2..b793c13 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -16,15 +16,28 @@
 
 package org.onosproject.netconf.ctl.impl;
 
+
 import org.apache.commons.lang3.tuple.Triple;
 
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Lists;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
 import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.key.DeviceKey;
@@ -39,29 +52,37 @@
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
 import org.onosproject.netconf.NetconfDeviceOutputEventListener;
 import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfProxyMessage;
+import org.onosproject.netconf.NetconfProxyMessageHandler;
+import org.onosproject.netconf.NetconfSession;
 import org.onosproject.netconf.config.NetconfDeviceConfig;
 import org.onosproject.netconf.config.NetconfSshClientLib;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
 import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.security.Security;
+
+import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.getIntegerProperty;
@@ -95,6 +116,16 @@
 
     protected NetconfSshClientLib sshClientLib = NetconfSshClientLib.APACHE_MINA;
 
+
+    private static final String APACHE_MINA_STR = "apache-mina";
+
+
+    private static final MessageSubject SEND_REQUEST_SUBJECT_STRING =
+            new MessageSubject("netconf-session-master-send-message-string");
+
+    private static final MessageSubject SEND_REQUEST_SUBJECT_SET_STRING =
+            new MessageSubject("netconf-session-master-send-message-set-string");
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService cfgService;
 
@@ -110,6 +141,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipService mastershipService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicator;
+
     public static final Logger log = LoggerFactory
             .getLogger(NetconfControllerImpl.class);
 
@@ -120,26 +154,56 @@
     private final NetconfDeviceOutputEventListener downListener = new DeviceDownEventListener();
 
     protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
-    protected NetconfDeviceFactory deviceFactory = DefaultNetconfDevice::new;
+    protected NetconfDeviceFactory deviceFactory = new DefaultNetconfDeviceFactory();
+
+    protected NetconfProxyMessageHandler netconfProxyMessageHandler = new NetconfProxyMessageHandlerImpl();
+
 
     protected final ExecutorService executor =
             Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
                                                          "connection-reopen-%d", log));
 
+    public static final Serializer SERIALIZER = Serializer.using(
+            KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .register(NetconfProxyMessage.class)
+                    .register(NetconfProxyMessage.SubjectType.class)
+                    .register(DefaultNetconfProxyMessage.class)
+                    .register(String.class)
+                    .build("NetconfProxySession"));
+
     @Activate
     public void activate(ComponentContext context) {
         cfgService.registerProperties(getClass());
         modified(context);
         Security.addProvider(new BouncyCastleProvider());
+        clusterCommunicator.<NetconfProxyMessage, String>addSubscriber(
+                SEND_REQUEST_SUBJECT_STRING,
+                SERIALIZER::decode,
+                this::handleProxyMessage,
+                SERIALIZER::encode);
+        clusterCommunicator.<NetconfProxyMessage, Set<String>>addSubscriber(
+                SEND_REQUEST_SUBJECT_SET_STRING,
+                SERIALIZER::decode,
+                this::handleProxyMessage,
+                SERIALIZER::encode);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         netconfDeviceMap.values().forEach(device -> {
-            device.getSession().removeDeviceOutputListener(downListener);
+            if (device.isMasterSession()) {
+                try {
+                    device.getSession().removeDeviceOutputListener(downListener);
+                } catch (NetconfException e) {
+                    log.error("removeDeviceOutputListener Failed {}", e.getMessage());
+                }
+            }
             device.disconnect();
         });
+        clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_STRING);
+        clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_SET_STRING);
         cfgService.unregisterProperties(getClass(), false);
         netconfDeviceListeners.clear();
         netconfDeviceMap.clear();
@@ -228,6 +292,11 @@
 
     @Override
     public NetconfDevice connectDevice(DeviceId deviceId) throws NetconfException {
+        return connectDevice(deviceId, true);
+    }
+
+    @Override
+    public NetconfDevice connectDevice(DeviceId deviceId, boolean isMaster) throws NetconfException {
         NetconfDeviceConfig netCfg  = netCfgService.getConfig(
                 deviceId, NetconfDeviceConfig.class);
         NetconfDeviceInfo deviceInfo = null;
@@ -256,65 +325,77 @@
         mutex.lock();
         try {
             if (netconfDeviceMap.containsKey(deviceId)) {
-                log.debug("Device {} is already present", deviceId);
-                return netconfDeviceMap.get(deviceId);
-            } else if (netCfg != null) {
+                //If not master or already has session: return, otherwise create device again.
+                if (!isMaster || netconfDeviceMap.get(deviceId).isMasterSession()) {
+                    log.debug("Device {} is already present", deviceId);
+                    return netconfDeviceMap.get(deviceId);
+                }
+            }
+
+            if (netCfg != null) {
                 log.debug("Device {} is present in NetworkConfig", deviceId);
                 deviceInfo = new NetconfDeviceInfo(netCfg);
             } else {
                 log.debug("Creating NETCONF device {}", deviceId);
-                Device device = deviceService.getDevice(deviceId);
-                String ip, path = null;
-                int port;
-                if (device != null) {
-                    ip = device.annotations().value("ipaddress");
-                    port = Integer.parseInt(device.annotations().value("port"));
-                    path = device.annotations().value("path");
-                } else {
-                    Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
-                    ip = info.getLeft();
-                    port = info.getMiddle();
-                    path = (info.getRight().isPresent() ? info.getRight().get() : null);
-                }
-                try {
-                    DeviceKey deviceKey = deviceKeyService.getDeviceKey(
-                            DeviceKeyId.deviceKeyId(deviceId.toString()));
-                    if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
-                        UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
-
-                        deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
-                                                           usernamepasswd.password(),
-                                                           IpAddress.valueOf(ip),
-                                                           port,
-                                                           path);
-
-                    } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
-                        String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
-                        String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
-                        String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
-
-                        deviceInfo = new NetconfDeviceInfo(username,
-                                                           password,
-                                                           IpAddress.valueOf(ip),
-                                                           port,
-                                                           path,
-                                                           sshkey);
-                    } else {
-                        log.error("Unknown device key for device {}", deviceId);
-                    }
-                } catch (NullPointerException e) {
-                    throw new NetconfException("No Device Key for device " + deviceId, e);
-                }
+                deviceInfo = createDeviceInfo(deviceId);
             }
-            NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
-            netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
-            return netconfDevicedevice;
+            NetconfDevice netconfDevice = createDevice(deviceInfo, isMaster);
+            if (isMaster) {
+                netconfDevice.getSession().addDeviceOutputListener(downListener);
+            }
+            return netconfDevice;
         } finally {
 
             mutex.unlock();
         }
     }
 
+    private NetconfDeviceInfo createDeviceInfo(DeviceId deviceId) throws NetconfException {
+            Device device = deviceService.getDevice(deviceId);
+            String ip, path = null;
+            int port;
+            if (device != null) {
+                ip = device.annotations().value("ipaddress");
+                port = Integer.parseInt(device.annotations().value("port"));
+            } else {
+                Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
+                ip = info.getLeft();
+                port = info.getMiddle();
+                path = (info.getRight().isPresent() ? info.getRight().get() : null);
+            }
+            try {
+                DeviceKey deviceKey = deviceKeyService.getDeviceKey(
+                        DeviceKeyId.deviceKeyId(deviceId.toString()));
+                if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
+                    UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
+
+                    return new NetconfDeviceInfo(usernamepasswd.username(),
+                                                       usernamepasswd.password(),
+                                                       IpAddress.valueOf(ip),
+                                                       port,
+                                                       path);
+
+                } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
+                    String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
+                    String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
+                    String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+
+                    return new NetconfDeviceInfo(username,
+                                                       password,
+                                                       IpAddress.valueOf(ip),
+                                                       port,
+                                                       path,
+                                                       sshkey);
+                } else {
+                    log.error("Unknown device key for device {}", deviceId);
+                    throw new NetconfException("Unknown device key for device " + deviceId);
+                }
+            } catch (NullPointerException e) {
+                log.error("No Device Key for device {}, {}", deviceId, e);
+                throw new NetconfException("No Device Key for device " + deviceId, e);
+        }
+    }
+
     @Override
     public void disconnectDevice(DeviceId deviceId, boolean remove) {
         if (!netconfDeviceMap.containsKey(deviceId)) {
@@ -364,7 +445,12 @@
     }
 
     private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
-        NetconfDevice netconfDevice = deviceFactory.createNetconfDevice(deviceInfo);
+        return createDevice(deviceInfo, true);
+    }
+
+    private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo,
+                                       boolean isMaster) throws NetconfException {
+        NetconfDevice netconfDevice = deviceFactory.createNetconfDevice(deviceInfo, isMaster);
         netconfDeviceMap.put(deviceInfo.getDeviceId(), netconfDevice);
         for (NetconfDeviceListener l : netconfDeviceListeners) {
             l.deviceAdded(deviceInfo.getDeviceId());
@@ -383,6 +469,102 @@
         return netconfDeviceMap.keySet();
     }
 
+    @Override
+    public <T> CompletableFuture<T> executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+        DeviceId deviceId = proxyMessage.deviceId();
+        if (deviceService.getRole(deviceId).equals(MastershipRole.MASTER)) {
+            return CompletableFuture.completedFuture(
+                    netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+        } else {
+            MessageSubject subject;
+            switch (proxyMessage.subjectType()) {
+                case GET_DEVICE_CAPABILITIES_SET:
+                    subject = SEND_REQUEST_SUBJECT_SET_STRING;
+                    break;
+                default:
+                    subject = SEND_REQUEST_SUBJECT_STRING;
+                    break;
+            }
+
+            return clusterCommunicator
+                    .sendAndReceive(proxyMessage,
+                                    subject,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    mastershipService.getMasterFor(deviceId));
+        }
+    }
+
+    private <T> CompletableFuture<T> handleProxyMessage(NetconfProxyMessage proxyMessage) {
+        try {
+            return CompletableFuture.completedFuture(
+                    netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+        } catch (NetconfException e) {
+            CompletableFuture<T> errorFuture = new CompletableFuture<>();
+            errorFuture.completeExceptionally(e);
+            return errorFuture;
+        }
+    }
+
+    /**
+     * Netconf Proxy Message Handler Implementation class.
+     */
+    private class NetconfProxyMessageHandlerImpl implements NetconfProxyMessageHandler {
+
+        @Override
+        public <T> T handleIncomingMessage(NetconfProxyMessage proxyMessage) throws NetconfException {
+            //TODO: Should throw Netconf Exception in error cases?
+            DeviceId deviceId = proxyMessage.deviceId();
+            NetconfProxyMessage.SubjectType subjectType = proxyMessage.subjectType();
+            NetconfSession secureTransportSession;
+            if (netconfDeviceMap.get(deviceId).isMasterSession()) {
+                secureTransportSession = netconfDeviceMap.get(deviceId).getSession();
+            } else {
+                throw new NetconfException("Ssh session not present");
+            }
+            T reply = null;
+            ArrayList<String> arguments = Lists.newArrayList(proxyMessage.arguments());
+            try {
+                switch (subjectType) {
+                    case RPC:
+                        reply = (T) secureTransportSession.rpc(arguments.get(0))
+                                .get(netconfReplyTimeout, TimeUnit.SECONDS);
+                        break;
+                    case REQUEST_SYNC:
+                        reply = (T) secureTransportSession.requestSync(arguments.get(0));
+                        break;
+                    case START_SUBSCRIPTION:
+                        secureTransportSession.startSubscription(arguments.get(0));
+                        break;
+                    case END_SUBSCRIPTION:
+                        secureTransportSession.endSubscription();
+                        break;
+                    case REQUEST:
+                        reply = (T) secureTransportSession.request(arguments.get(0))
+                                .get(netconfReplyTimeout, TimeUnit.SECONDS);
+                        break;
+                    case GET_SESSION_ID:
+                        reply = (T) secureTransportSession.getSessionId();
+                        break;
+                    case SET_ONOS_CAPABILITIES:
+                        secureTransportSession.setOnosCapabilities(arguments);
+                        break;
+                    case GET_DEVICE_CAPABILITIES_SET:
+                        reply = (T) secureTransportSession.getDeviceCapabilitiesSet();
+                        break;
+                    default:
+                        log.error("Not yet supported for session method {}", subjectType);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new NetconfException(e.getMessage(), e.getCause());
+            } catch (ExecutionException | TimeoutException e) {
+                throw new NetconfException(e.getMessage(), e.getCause());
+            }
+
+            return reply;
+        }
+    }
 
     /**
      * Device factory for the specific NetconfDeviceImpl.
@@ -393,9 +575,20 @@
     private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory {
 
         @Override
-        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo)
+        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+            return createNetconfDevice(netconfDeviceInfo, true);
+        }
+
+        @Beta
+        @Override
+        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo,
+                                                 boolean isMaster)
                 throws NetconfException {
-            return new DefaultNetconfDevice(netconfDeviceInfo);
+            if (isMaster) {
+                log.info("Creating NETCONF session to {} with {}",
+                         netconfDeviceInfo.getDeviceId(), NetconfSshClientLib.APACHE_MINA);
+            }
+            return new DefaultNetconfDevice(netconfDeviceInfo, isMaster, NetconfControllerImpl.this);
         }
     }
 
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 36c9ec1..2d082e0 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -41,6 +41,7 @@
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverService;
 import org.onosproject.netconf.AbstractNetconfSession;
+import org.onosproject.netconf.NetconfController;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
 import org.onosproject.netconf.NetconfDeviceOutputEvent.Type;
@@ -505,10 +506,15 @@
 
     @Override
     public String requestSync(String request) throws NetconfException {
-        String reply = sendRequest(request);
+        return requestSync(request, replyTimeout);
+    }
+
+    @Override
+    public String requestSync(String request, int timeout) throws NetconfException {
+        String reply = sendRequest(request, timeout);
         if (!checkReply(reply)) {
             throw new NetconfException("Request not successful with device "
-                    + deviceInfo + " with reply " + reply);
+                                               + deviceInfo + " with reply " + reply);
         }
         return reply;
     }
@@ -621,13 +627,23 @@
         return streamHandler.sendMessage(request, messageId);
     }
 
+    private String sendRequest(String request, boolean isHello) throws NetconfException {
+        return sendRequest(request, isHello, replyTimeout);
+    }
+
     private String sendRequest(String request) throws NetconfException {
         // FIXME probably chunk-encoding too early
         request = formatNetconfMessage(request);
-        return sendRequest(request, false);
+        return sendRequest(request, false, replyTimeout);
     }
 
-    private String sendRequest(String request, boolean isHello) throws NetconfException {
+    private String sendRequest(String request, int timeout) throws NetconfException {
+        // FIXME probably chunk-encoding too early
+        request = formatNetconfMessage(request);
+        return sendRequest(request, false, timeout);
+    }
+
+    private String sendRequest(String request, boolean isHello, int timeout) throws NetconfException {
         checkAndReestablish();
         int messageId = -1;
         if (!isHello) {
@@ -859,7 +875,8 @@
     public static class MinaSshNetconfSessionFactory implements NetconfSessionFactory {
 
         @Override
-        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+                                                   NetconfController netconfController) throws NetconfException {
             return new NetconfSessionMinaImpl(netconfDeviceInfo);
         }
     }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
new file mode 100644
index 0000000..5734820
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl.impl;
+
+import org.onosproject.netconf.AbstractNetconfSession;
+import org.onosproject.netconf.NetconfController;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfProxyMessage;
+import org.onosproject.netconf.NetconfSession;
+import org.onosproject.netconf.NetconfSessionFactory;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class NetconfSessionProxyImpl extends AbstractNetconfSession {
+    protected final NetconfDeviceInfo deviceInfo;
+    protected final NetconfController netconfController;
+    private static final Logger log = getLogger(NetconfSessionMinaImpl.class);
+
+    private static final int CONFIGURE_REPLY_TIMEOUT_SEC = 5;
+
+    public NetconfSessionProxyImpl(NetconfDeviceInfo deviceInfo,
+                                   NetconfController controller) {
+        this.deviceInfo = deviceInfo;
+        this.netconfController = controller;
+    }
+
+    private <T> CompletableFuture<T> executeAtMasterCompletableFuture(
+            NetconfProxyMessage proxyMessage)
+            throws NetconfException {
+        CompletableFuture<T> reply = netconfController.executeAtMaster(proxyMessage);
+        return reply;
+    }
+
+    private <T> T executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+        return executeAtMaster(proxyMessage, CONFIGURE_REPLY_TIMEOUT_SEC);
+    }
+
+    private <T> T executeAtMaster(NetconfProxyMessage proxyMessage, int timeout) throws NetconfException {
+        CompletableFuture<T> reply = executeAtMasterCompletableFuture(proxyMessage);
+        try {
+            return reply.get(timeout, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new NetconfException(e.getMessage(), e.getCause());
+        } catch (ExecutionException e) {
+            throw new NetconfException(e.getMessage(), e.getCause());
+        } catch (TimeoutException e) {
+            throw new NetconfException(e.getMessage(), e.getCause());
+        }
+
+    }
+
+    protected NetconfProxyMessage makeProxyMessage(NetconfProxyMessage.SubjectType subjectType, String request) {
+        return new DefaultNetconfProxyMessage(subjectType,
+                                              deviceInfo.getDeviceId(),
+                                              new ArrayList<>(Arrays.asList(request)));
+    }
+
+    @Override
+    public CompletableFuture<String> rpc(String request)
+            throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.RPC,
+                                                            request);
+
+        return executeAtMasterCompletableFuture(proxyMessage);
+    }
+
+    @Override
+    public CompletableFuture<String> request(String request)
+            throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.REQUEST,
+                                                            request);
+
+        return executeAtMasterCompletableFuture(proxyMessage);
+    }
+
+    @Override
+    public String requestSync(String request, int timeout)
+            throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.REQUEST_SYNC,
+                                                            request);
+
+        return executeAtMaster(proxyMessage, timeout);
+    }
+
+    @Override
+    public String requestSync(String request)
+            throws NetconfException {
+
+        return requestSync(request, CONFIGURE_REPLY_TIMEOUT_SEC);
+    }
+
+    @Override
+    public void startSubscription(String filterSchema) throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.START_SUBSCRIPTION,
+                                                            filterSchema);
+
+        executeAtMaster(proxyMessage);
+    }
+
+    @Override
+    public void endSubscription() throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.END_SUBSCRIPTION,
+                                                            "");
+
+        executeAtMaster(proxyMessage);
+    }
+
+    @Override
+    public String getSessionId() {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.GET_SESSION_ID,
+                                                            "");
+        try {
+            return executeAtMaster(proxyMessage);
+        } catch (NetconfException e) {
+            log.error("Cannot get session id : {}", e);
+            return String.valueOf(-1);
+        }
+
+    }
+
+    @Override
+    public Set<String> getDeviceCapabilitiesSet() {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.GET_DEVICE_CAPABILITIES_SET,
+                                                            "");
+        try {
+            return executeAtMaster(proxyMessage);
+        } catch (NetconfException e) {
+            log.error("Could not get device capabilities : {}", e);
+            return null;
+        }
+    }
+
+
+    @Override
+    public void setOnosCapabilities(Iterable<String> capabilities) {
+        ArrayList<String> capabilitiesList = new ArrayList<>();
+        capabilities.spliterator().forEachRemaining(c -> capabilitiesList.add(c));
+
+        NetconfProxyMessage proxyMessage =
+                new DefaultNetconfProxyMessage(
+                        NetconfProxyMessage.SubjectType.SET_ONOS_CAPABILITIES,
+                        deviceInfo.getDeviceId(), capabilitiesList);
+        try {
+            executeAtMaster(proxyMessage);
+        } catch (NetconfException e) {
+            log.error("Could not set onos capabilities : {}", e);
+        }
+    }
+
+
+    public static class ProxyNetconfSessionFactory implements NetconfSessionFactory {
+
+        @Override
+        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+                                                   NetconfController netconfController) {
+            return new NetconfSessionProxyImpl(netconfDeviceInfo, netconfController);
+        }
+    }
+}