[ONOS-7566] Implementation of NetconfProxySession

Change-Id: I01cbe0b10ac36cb6db53127555b551f405acdeb1
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java
index 2254572..7843f7d 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java
@@ -80,6 +80,21 @@
     @Override
     public abstract CompletableFuture<String> rpc(String request) throws NetconfException;
 
+    protected CompletableFuture<CharSequence> executeRpc(String rpcString) throws NetconfException {
+        return rpc(rpcString)
+                .thenApply(msg -> {
+                    // crude way of removing rpc-reply envelope
+                    int begin = msg.indexOf("<data>");
+                    int end = msg.lastIndexOf("</data>");
+                    if (begin != -1 && end != -1) {
+                        return msg.subSequence(begin, end + "</data>".length());
+                    } else {
+                        // FIXME probably should exceptionally fail here.
+                        return msg;
+                    }
+                });
+    }
+
     @Override
     public CompletableFuture<CharSequence> asyncGetConfig(DatastoreId datastore) throws NetconfException {
         StringBuilder rpc = new StringBuilder();
@@ -102,7 +117,6 @@
     @Override
     public CompletableFuture<CharSequence> asyncGet() throws NetconfException {
         StringBuilder rpc = new StringBuilder();
-
         rpc.append(RPC_OPEN);
         rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
         rpc.append(GET_OPEN).append(NEW_LINE);
@@ -113,22 +127,6 @@
         return executeRpc(rpc.toString());
     }
 
-    protected CompletableFuture<CharSequence> executeRpc(String rpcString) throws NetconfException {
-        return rpc(rpcString)
-                .thenApply(msg -> {
-                    // crude way of removing rpc-reply envelope
-                    int begin = msg.indexOf("<data>");
-                    int end = msg.lastIndexOf("</data>");
-                    if (begin != -1 && end != -1) {
-                        return msg.subSequence(begin, end + "</data>".length());
-                    } else {
-                        // FIXME probably should exceptionally fail here.
-                        return msg;
-                    }
-                });
-
-    }
-
     @Override
     public String get(String request) throws NetconfException {
         return requestSync(request);
@@ -362,10 +360,14 @@
     public abstract Set<String> getDeviceCapabilitiesSet();
 
     @Override
-    public abstract void addDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+    public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException {
+        throw new NetconfException("Only master session can call addDeviceOutputListener");
+    }
 
     @Override
-    public abstract void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+    public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException {
+        throw new NetconfException("Only master session can call removeDeviceOutputListener");
+    }
 
     /**
      * Checks errors in reply from the session.
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
index 041c379..45d81d2 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
@@ -21,6 +21,7 @@
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Abstraction of an NETCONF controller. Serves as a one stop shop for obtaining
@@ -53,6 +54,20 @@
     NetconfDevice connectDevice(DeviceId deviceId) throws NetconfException;
 
     /**
+     * Tries to connect to a specific NETCONF device, if the connection is succesful
+     * it creates and adds the device to the ONOS core as a NetconfDevice.
+     * If isMaster true: Will create two sessions for a device : secure transport session and proxy session.
+     * If isMaster false: Will create only proxy session.
+     * @param deviceId deviceId of the device to connect
+     * @param isMaster if the controller is master for the device
+     * @return NetconfDevice Netconf device
+     * @throws NetconfException when device is not available
+     */
+    default NetconfDevice connectDevice(DeviceId deviceId, boolean isMaster) throws NetconfException {
+        return connectDevice(deviceId);
+    }
+
+    /**
      * Disconnects a Netconf device and removes it from the core.
      *
      * @param deviceId id of the device to remove
@@ -107,4 +122,20 @@
      * @return NetconfDevice Netconf device
      */
     NetconfDevice getNetconfDevice(IpAddress ip, int port, String path);
+
+    /**
+     * If master, will execute the call locally else will use
+     * clusterCommunicationManager to execute at master controller.
+     * Meant only for internal synchronization and not to be used by applications.
+     *
+     * @param proxyMessage proxy message
+     * @param <T> return type
+     * @return Completable future of class T
+     * @throws NetconfException netconf exception
+     */
+    default <T> CompletableFuture<T> executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+        CompletableFuture<T> errorFuture = new CompletableFuture<>();
+        errorFuture.completeExceptionally(new NetconfException("Method executeAtMaster not implemented"));
+        return errorFuture;
+    }
 }
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java
index 639f0a8..8c756e5 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java
@@ -24,13 +24,22 @@
 
     /**
      * Returns whether a device is a NETCONF device with a capabilities list
-     * and is accessible.
+     * and is accessible, through a secure transport session or a proxy session.
      *
      * @return true if device is accessible, false otherwise
      */
     boolean isActive();
 
     /**
+     * Returns whether the device has secure transport session.
+     *
+     * @return true if secure transport session exists, false otherwise
+     */
+    default boolean isMasterSession() {
+        return isActive();
+    }
+
+    /**
      * Returns a NETCONF session context for this device.
      *
      * @return netconf session
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java
index 312c7c3..53e5db7 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java
@@ -16,6 +16,8 @@
 
 package org.onosproject.netconf;
 
+import com.google.common.annotations.Beta;
+
 /**
  * Abstract interface for the creation of a NETCONF device.
  */
@@ -31,4 +33,19 @@
      */
     NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo)
             throws NetconfException;
+
+    /**
+     * Creates a new NETCONF device based on the supplied information.
+     * @param netconfDeviceInfo information of the device to create.
+     * @param isMaster if true create secure transport session with the device,
+     *                 else just create a proxy session
+     * @return Instance of NetconfDevice.
+     * @throws NetconfException when problems arise creating the device and establishing
+     * the connection.
+     */
+    @Beta
+    default NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo,
+                                              boolean isMaster) throws NetconfException {
+        return createNetconfDevice(netconfDeviceInfo);
+    }
 }
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java
new file mode 100644
index 0000000..8ebfbe3
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf;
+
+import org.onosproject.net.DeviceId;
+
+import java.util.List;
+
+/**
+ * Interface representing a NETCONF proxy message.
+ */
+public interface NetconfProxyMessage {
+
+    enum SubjectType {
+        RPC,
+        // FIXME in the final form there should only be (async) RPC
+        // and REQUEST, REQUEST_SYNC should go away
+        // once NetconfSession methods got cleaned up.
+        REQUEST,
+        REQUEST_SYNC,
+        START_SUBSCRIPTION,
+        END_SUBSCRIPTION,
+        GET_SESSION_ID,
+        GET_DEVICE_CAPABILITIES_SET,
+        SET_ONOS_CAPABILITIES
+    }
+
+    /**
+     * Returns the subject of the message.
+     *
+     * @return subject in enum subject type
+     */
+    NetconfProxyMessage.SubjectType subjectType();
+
+    /**
+     * Returns the device id of the device to which the message is intended.
+     * @return device id
+     */
+    DeviceId deviceId();
+
+    /**
+     * Returns the arguments of the intended method call in order.
+     * @return arguments
+     */
+    List<String> arguments();
+}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java
new file mode 100644
index 0000000..a1d4d74
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf;
+
+/**
+ * Abstract interface for the implementation of proxy message handler.
+ */
+public interface NetconfProxyMessageHandler {
+    /**
+     * Will decode the message on case basis and
+     * call the actual method in Netconf Session implementation bound to secure transport.
+     * @param proxyMessage incoming proxy message
+     * @param <T> return type
+     * @return the value returned by session call
+     * @throws NetconfException netconf exception
+     */
+    <T> T handleIncomingMessage(NetconfProxyMessage proxyMessage) throws NetconfException;
+}
\ No newline at end of file
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index bc04118..d0c80de 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -122,6 +122,19 @@
     String requestSync(String request) throws NetconfException;
 
     /**
+     * Executes an synchronous RPC to the server with specific reply TIMEOUT.
+     *
+     * @param request the XML containing the RPC for the server.
+     * @param timeout the reply timeout.
+     * @return Server response or ERROR
+     * @throws NetconfException when there is a problem in the communication process on
+     * the underlying connection
+     */
+    default String requestSync(String request, int timeout) throws NetconfException {
+        return "";
+    }
+
+    /**
      * Retrieves the specified configuration.
      *
      * @param netconfTargetConfig the type of configuration to retrieve.
@@ -341,15 +354,17 @@
      * Add a listener to the underlying stream handler implementation.
      *
      * @param listener event listener.
+     * @throws NetconfException when this method will be called by STANDBY or NONE node.
      */
-    void addDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+    void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException;
 
     /**
      * Remove a listener from the underlying stream handler implementation.
      *
      * @param listener event listener.
+     * @throws NetconfException when this method will be called by STANDBY or NONE node.
      */
-    void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+    void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException;
 
     /**
      * Read the connect timeout that this session was created with.
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java
index 806eccb..95f32a4 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java
@@ -25,9 +25,11 @@
     /**
      * Creates a new NETCONF session for the specified device.
      * @param netconfDeviceInfo information of the device to create the session for.
+     * @param netconfController netconf controller object
      * @return Instance of NetconfSession.
      * @throws NetconfException when problems arise establishing the connection.
      */
-    NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo)
+    NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+                                        NetconfController netconfController)
             throws NetconfException;
 }
diff --git a/protocols/netconf/ctl/BUILD b/protocols/netconf/ctl/BUILD
index 0e9a912..7a2b9b6 100644
--- a/protocols/netconf/ctl/BUILD
+++ b/protocols/netconf/ctl/BUILD
@@ -1,8 +1,10 @@
-COMPILE_DEPS = CORE_DEPS + JACKSON + CLI + [
+COMPILE_DEPS = CORE_DEPS + JACKSON + CLI + KRYO + [
     "@ganymed_ssh2//jar",
     "@sshd_core//jar",
     "@bcpkix_jdk15on//jar",
     "@bcprov_jdk15on//jar",
+    "@org_osgi_service_cm//jar",
+    "//core/store/serializers:onos-core-serializers",
     "//protocols/netconf/api:onos-protocols-netconf-api",
 ]
 
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
index c952027..dfbe760 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.netconf.ctl.impl;
 
+import org.onosproject.netconf.NetconfController;
 import org.onosproject.netconf.NetconfDevice;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfException;
@@ -34,24 +35,37 @@
 
     private NetconfDeviceInfo netconfDeviceInfo;
     private boolean deviceState = true;
-    private final NetconfSessionFactory sessionFactory;
     private NetconfSession netconfSession;
+    private boolean isMasterSession = false;
+    private NetconfSession netconfProxySession;
 
     // will block until hello RPC handshake completes
     /**
      * Creates a new default NETCONF device with the information provided.
      * The device gets created only if no exception is thrown while connecting to
      * it and establishing the NETCONF session.
+     * The secure transport session will only be created if isMaster is true.
      * @param deviceInfo information about the device to be created.
+     * @param isMaster if true create secure transport session, otherwise create proxy session.
+     * @param netconfController netconf controller object
      * @throws NetconfException if there are problems in creating or establishing
      * the underlying NETCONF connection and session.
      */
-    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo)
+    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo,
+                                boolean isMaster,
+                                NetconfController netconfController)
             throws NetconfException {
         netconfDeviceInfo = deviceInfo;
-        sessionFactory = (ncDevInfo) -> new NetconfSessionMinaImpl(ncDevInfo);
         try {
-            netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+            if (isMaster) {
+                netconfSession = new NetconfSessionMinaImpl(deviceInfo);
+                isMasterSession = true;
+                netconfProxySession = netconfSession;
+            } else {
+                netconfProxySession = new NetconfSessionProxyImpl
+                        .ProxyNetconfSessionFactory()
+                        .createNetconfSession(deviceInfo, netconfController);
+            }
         } catch (NetconfException e) {
             deviceState = false;
             throw new NetconfException("Cannot create connection and session for device " +
@@ -64,18 +78,30 @@
      * Creates a new default NETCONF device with the information provided.
      * The device gets created only if no exception is thrown while connecting to
      * it and establishing the NETCONF session.
-     *
+     * The secure transport session will only be created if isMaster is true.
      * @param deviceInfo information about the device to be created.
      * @param factory the factory used to create the session
+     * @param isMaster if true create secure transport session, otherwise create proxy session.
+     * @param netconfController netconf controller object
      * @throws NetconfException if there are problems in creating or establishing
      * the underlying NETCONF connection and session.
      */
-    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo, NetconfSessionFactory factory)
+    public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo,
+                                NetconfSessionFactory factory,
+                                boolean isMaster,
+                                NetconfController netconfController)
             throws NetconfException {
         netconfDeviceInfo = deviceInfo;
-        sessionFactory = factory;
         try {
-            netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+            if (isMaster) {
+                netconfSession = factory.createNetconfSession(deviceInfo, netconfController);
+                isMasterSession = true;
+                netconfProxySession = netconfSession;
+            } else {
+                netconfProxySession = new NetconfSessionProxyImpl
+                        .ProxyNetconfSessionFactory()
+                        .createNetconfSession(deviceInfo, netconfController);
+            }
         } catch (NetconfException e) {
             deviceState = false;
             throw new NetconfException("Cannot create connection and session for device " +
@@ -90,23 +116,29 @@
 
     @Override
     public NetconfSession getSession() {
-        return netconfSession;
+        return netconfProxySession;
     }
 
     @Override
     public void disconnect() {
         deviceState = false;
         try {
-            netconfSession.close();
+            if (isMasterSession) {
+                netconfSession.close();
+            }
+            netconfProxySession.close();
         } catch (NetconfException e) {
             log.warn("Cannot communicate with the device {} session already closed", netconfDeviceInfo);
         }
     }
 
     @Override
+    public boolean isMasterSession() {
+        return isMasterSession;
+    }
+
+    @Override
     public NetconfDeviceInfo getDeviceInfo() {
         return netconfDeviceInfo;
     }
-
-
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
new file mode 100644
index 0000000..1a57d4d
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl.impl;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.net.DeviceId;
+import org.onosproject.netconf.NetconfProxyMessage;
+
+import java.util.List;
+
+/**
+ * Default implementation of Netconf Proxy Message.
+ */
+public class DefaultNetconfProxyMessage implements NetconfProxyMessage {
+
+    private final SubjectType subjectType;
+    private final DeviceId deviceId;
+    private final List<String> arguments;
+
+    /**
+     * Create new NetconfProxyMessage with provided informations.
+     * @param subType Message subject type.
+     * @param devId Device information that recieve message.
+     * @param args Messages arguments.
+     */
+    public DefaultNetconfProxyMessage(SubjectType subType,
+                                      DeviceId devId,
+                                      List<String> args) {
+        subjectType = subType;
+        deviceId = devId;
+        arguments = args;
+    }
+
+    @Override
+    public SubjectType subjectType() {
+        return subjectType;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    @Override
+    public List<String> arguments() {
+        return ImmutableList.copyOf(arguments);
+    }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index ece7ca2..b793c13 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -16,15 +16,28 @@
 
 package org.onosproject.netconf.ctl.impl;
 
+
 import org.apache.commons.lang3.tuple.Triple;
 
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Lists;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
 import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.key.DeviceKey;
@@ -39,29 +52,37 @@
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
 import org.onosproject.netconf.NetconfDeviceOutputEventListener;
 import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfProxyMessage;
+import org.onosproject.netconf.NetconfProxyMessageHandler;
+import org.onosproject.netconf.NetconfSession;
 import org.onosproject.netconf.config.NetconfDeviceConfig;
 import org.onosproject.netconf.config.NetconfSshClientLib;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
 import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.security.Security;
+
+import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.getIntegerProperty;
@@ -95,6 +116,16 @@
 
     protected NetconfSshClientLib sshClientLib = NetconfSshClientLib.APACHE_MINA;
 
+
+    private static final String APACHE_MINA_STR = "apache-mina";
+
+
+    private static final MessageSubject SEND_REQUEST_SUBJECT_STRING =
+            new MessageSubject("netconf-session-master-send-message-string");
+
+    private static final MessageSubject SEND_REQUEST_SUBJECT_SET_STRING =
+            new MessageSubject("netconf-session-master-send-message-set-string");
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService cfgService;
 
@@ -110,6 +141,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipService mastershipService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicator;
+
     public static final Logger log = LoggerFactory
             .getLogger(NetconfControllerImpl.class);
 
@@ -120,26 +154,56 @@
     private final NetconfDeviceOutputEventListener downListener = new DeviceDownEventListener();
 
     protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
-    protected NetconfDeviceFactory deviceFactory = DefaultNetconfDevice::new;
+    protected NetconfDeviceFactory deviceFactory = new DefaultNetconfDeviceFactory();
+
+    protected NetconfProxyMessageHandler netconfProxyMessageHandler = new NetconfProxyMessageHandlerImpl();
+
 
     protected final ExecutorService executor =
             Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
                                                          "connection-reopen-%d", log));
 
+    public static final Serializer SERIALIZER = Serializer.using(
+            KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .register(NetconfProxyMessage.class)
+                    .register(NetconfProxyMessage.SubjectType.class)
+                    .register(DefaultNetconfProxyMessage.class)
+                    .register(String.class)
+                    .build("NetconfProxySession"));
+
     @Activate
     public void activate(ComponentContext context) {
         cfgService.registerProperties(getClass());
         modified(context);
         Security.addProvider(new BouncyCastleProvider());
+        clusterCommunicator.<NetconfProxyMessage, String>addSubscriber(
+                SEND_REQUEST_SUBJECT_STRING,
+                SERIALIZER::decode,
+                this::handleProxyMessage,
+                SERIALIZER::encode);
+        clusterCommunicator.<NetconfProxyMessage, Set<String>>addSubscriber(
+                SEND_REQUEST_SUBJECT_SET_STRING,
+                SERIALIZER::decode,
+                this::handleProxyMessage,
+                SERIALIZER::encode);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         netconfDeviceMap.values().forEach(device -> {
-            device.getSession().removeDeviceOutputListener(downListener);
+            if (device.isMasterSession()) {
+                try {
+                    device.getSession().removeDeviceOutputListener(downListener);
+                } catch (NetconfException e) {
+                    log.error("removeDeviceOutputListener Failed {}", e.getMessage());
+                }
+            }
             device.disconnect();
         });
+        clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_STRING);
+        clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_SET_STRING);
         cfgService.unregisterProperties(getClass(), false);
         netconfDeviceListeners.clear();
         netconfDeviceMap.clear();
@@ -228,6 +292,11 @@
 
     @Override
     public NetconfDevice connectDevice(DeviceId deviceId) throws NetconfException {
+        return connectDevice(deviceId, true);
+    }
+
+    @Override
+    public NetconfDevice connectDevice(DeviceId deviceId, boolean isMaster) throws NetconfException {
         NetconfDeviceConfig netCfg  = netCfgService.getConfig(
                 deviceId, NetconfDeviceConfig.class);
         NetconfDeviceInfo deviceInfo = null;
@@ -256,65 +325,77 @@
         mutex.lock();
         try {
             if (netconfDeviceMap.containsKey(deviceId)) {
-                log.debug("Device {} is already present", deviceId);
-                return netconfDeviceMap.get(deviceId);
-            } else if (netCfg != null) {
+                //If not master or already has session: return, otherwise create device again.
+                if (!isMaster || netconfDeviceMap.get(deviceId).isMasterSession()) {
+                    log.debug("Device {} is already present", deviceId);
+                    return netconfDeviceMap.get(deviceId);
+                }
+            }
+
+            if (netCfg != null) {
                 log.debug("Device {} is present in NetworkConfig", deviceId);
                 deviceInfo = new NetconfDeviceInfo(netCfg);
             } else {
                 log.debug("Creating NETCONF device {}", deviceId);
-                Device device = deviceService.getDevice(deviceId);
-                String ip, path = null;
-                int port;
-                if (device != null) {
-                    ip = device.annotations().value("ipaddress");
-                    port = Integer.parseInt(device.annotations().value("port"));
-                    path = device.annotations().value("path");
-                } else {
-                    Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
-                    ip = info.getLeft();
-                    port = info.getMiddle();
-                    path = (info.getRight().isPresent() ? info.getRight().get() : null);
-                }
-                try {
-                    DeviceKey deviceKey = deviceKeyService.getDeviceKey(
-                            DeviceKeyId.deviceKeyId(deviceId.toString()));
-                    if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
-                        UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
-
-                        deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
-                                                           usernamepasswd.password(),
-                                                           IpAddress.valueOf(ip),
-                                                           port,
-                                                           path);
-
-                    } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
-                        String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
-                        String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
-                        String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
-
-                        deviceInfo = new NetconfDeviceInfo(username,
-                                                           password,
-                                                           IpAddress.valueOf(ip),
-                                                           port,
-                                                           path,
-                                                           sshkey);
-                    } else {
-                        log.error("Unknown device key for device {}", deviceId);
-                    }
-                } catch (NullPointerException e) {
-                    throw new NetconfException("No Device Key for device " + deviceId, e);
-                }
+                deviceInfo = createDeviceInfo(deviceId);
             }
-            NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
-            netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
-            return netconfDevicedevice;
+            NetconfDevice netconfDevice = createDevice(deviceInfo, isMaster);
+            if (isMaster) {
+                netconfDevice.getSession().addDeviceOutputListener(downListener);
+            }
+            return netconfDevice;
         } finally {
 
             mutex.unlock();
         }
     }
 
+    private NetconfDeviceInfo createDeviceInfo(DeviceId deviceId) throws NetconfException {
+            Device device = deviceService.getDevice(deviceId);
+            String ip, path = null;
+            int port;
+            if (device != null) {
+                ip = device.annotations().value("ipaddress");
+                port = Integer.parseInt(device.annotations().value("port"));
+            } else {
+                Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
+                ip = info.getLeft();
+                port = info.getMiddle();
+                path = (info.getRight().isPresent() ? info.getRight().get() : null);
+            }
+            try {
+                DeviceKey deviceKey = deviceKeyService.getDeviceKey(
+                        DeviceKeyId.deviceKeyId(deviceId.toString()));
+                if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
+                    UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
+
+                    return new NetconfDeviceInfo(usernamepasswd.username(),
+                                                       usernamepasswd.password(),
+                                                       IpAddress.valueOf(ip),
+                                                       port,
+                                                       path);
+
+                } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
+                    String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
+                    String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
+                    String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+
+                    return new NetconfDeviceInfo(username,
+                                                       password,
+                                                       IpAddress.valueOf(ip),
+                                                       port,
+                                                       path,
+                                                       sshkey);
+                } else {
+                    log.error("Unknown device key for device {}", deviceId);
+                    throw new NetconfException("Unknown device key for device " + deviceId);
+                }
+            } catch (NullPointerException e) {
+                log.error("No Device Key for device {}, {}", deviceId, e);
+                throw new NetconfException("No Device Key for device " + deviceId, e);
+        }
+    }
+
     @Override
     public void disconnectDevice(DeviceId deviceId, boolean remove) {
         if (!netconfDeviceMap.containsKey(deviceId)) {
@@ -364,7 +445,12 @@
     }
 
     private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
-        NetconfDevice netconfDevice = deviceFactory.createNetconfDevice(deviceInfo);
+        return createDevice(deviceInfo, true);
+    }
+
+    private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo,
+                                       boolean isMaster) throws NetconfException {
+        NetconfDevice netconfDevice = deviceFactory.createNetconfDevice(deviceInfo, isMaster);
         netconfDeviceMap.put(deviceInfo.getDeviceId(), netconfDevice);
         for (NetconfDeviceListener l : netconfDeviceListeners) {
             l.deviceAdded(deviceInfo.getDeviceId());
@@ -383,6 +469,102 @@
         return netconfDeviceMap.keySet();
     }
 
+    @Override
+    public <T> CompletableFuture<T> executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+        DeviceId deviceId = proxyMessage.deviceId();
+        if (deviceService.getRole(deviceId).equals(MastershipRole.MASTER)) {
+            return CompletableFuture.completedFuture(
+                    netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+        } else {
+            MessageSubject subject;
+            switch (proxyMessage.subjectType()) {
+                case GET_DEVICE_CAPABILITIES_SET:
+                    subject = SEND_REQUEST_SUBJECT_SET_STRING;
+                    break;
+                default:
+                    subject = SEND_REQUEST_SUBJECT_STRING;
+                    break;
+            }
+
+            return clusterCommunicator
+                    .sendAndReceive(proxyMessage,
+                                    subject,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    mastershipService.getMasterFor(deviceId));
+        }
+    }
+
+    private <T> CompletableFuture<T> handleProxyMessage(NetconfProxyMessage proxyMessage) {
+        try {
+            return CompletableFuture.completedFuture(
+                    netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+        } catch (NetconfException e) {
+            CompletableFuture<T> errorFuture = new CompletableFuture<>();
+            errorFuture.completeExceptionally(e);
+            return errorFuture;
+        }
+    }
+
+    /**
+     * Netconf Proxy Message Handler Implementation class.
+     */
+    private class NetconfProxyMessageHandlerImpl implements NetconfProxyMessageHandler {
+
+        @Override
+        public <T> T handleIncomingMessage(NetconfProxyMessage proxyMessage) throws NetconfException {
+            //TODO: Should throw Netconf Exception in error cases?
+            DeviceId deviceId = proxyMessage.deviceId();
+            NetconfProxyMessage.SubjectType subjectType = proxyMessage.subjectType();
+            NetconfSession secureTransportSession;
+            if (netconfDeviceMap.get(deviceId).isMasterSession()) {
+                secureTransportSession = netconfDeviceMap.get(deviceId).getSession();
+            } else {
+                throw new NetconfException("Ssh session not present");
+            }
+            T reply = null;
+            ArrayList<String> arguments = Lists.newArrayList(proxyMessage.arguments());
+            try {
+                switch (subjectType) {
+                    case RPC:
+                        reply = (T) secureTransportSession.rpc(arguments.get(0))
+                                .get(netconfReplyTimeout, TimeUnit.SECONDS);
+                        break;
+                    case REQUEST_SYNC:
+                        reply = (T) secureTransportSession.requestSync(arguments.get(0));
+                        break;
+                    case START_SUBSCRIPTION:
+                        secureTransportSession.startSubscription(arguments.get(0));
+                        break;
+                    case END_SUBSCRIPTION:
+                        secureTransportSession.endSubscription();
+                        break;
+                    case REQUEST:
+                        reply = (T) secureTransportSession.request(arguments.get(0))
+                                .get(netconfReplyTimeout, TimeUnit.SECONDS);
+                        break;
+                    case GET_SESSION_ID:
+                        reply = (T) secureTransportSession.getSessionId();
+                        break;
+                    case SET_ONOS_CAPABILITIES:
+                        secureTransportSession.setOnosCapabilities(arguments);
+                        break;
+                    case GET_DEVICE_CAPABILITIES_SET:
+                        reply = (T) secureTransportSession.getDeviceCapabilitiesSet();
+                        break;
+                    default:
+                        log.error("Not yet supported for session method {}", subjectType);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new NetconfException(e.getMessage(), e.getCause());
+            } catch (ExecutionException | TimeoutException e) {
+                throw new NetconfException(e.getMessage(), e.getCause());
+            }
+
+            return reply;
+        }
+    }
 
     /**
      * Device factory for the specific NetconfDeviceImpl.
@@ -393,9 +575,20 @@
     private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory {
 
         @Override
-        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo)
+        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+            return createNetconfDevice(netconfDeviceInfo, true);
+        }
+
+        @Beta
+        @Override
+        public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo,
+                                                 boolean isMaster)
                 throws NetconfException {
-            return new DefaultNetconfDevice(netconfDeviceInfo);
+            if (isMaster) {
+                log.info("Creating NETCONF session to {} with {}",
+                         netconfDeviceInfo.getDeviceId(), NetconfSshClientLib.APACHE_MINA);
+            }
+            return new DefaultNetconfDevice(netconfDeviceInfo, isMaster, NetconfControllerImpl.this);
         }
     }
 
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 36c9ec1..2d082e0 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -41,6 +41,7 @@
 import org.onosproject.net.driver.Driver;
 import org.onosproject.net.driver.DriverService;
 import org.onosproject.netconf.AbstractNetconfSession;
+import org.onosproject.netconf.NetconfController;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
 import org.onosproject.netconf.NetconfDeviceOutputEvent.Type;
@@ -505,10 +506,15 @@
 
     @Override
     public String requestSync(String request) throws NetconfException {
-        String reply = sendRequest(request);
+        return requestSync(request, replyTimeout);
+    }
+
+    @Override
+    public String requestSync(String request, int timeout) throws NetconfException {
+        String reply = sendRequest(request, timeout);
         if (!checkReply(reply)) {
             throw new NetconfException("Request not successful with device "
-                    + deviceInfo + " with reply " + reply);
+                                               + deviceInfo + " with reply " + reply);
         }
         return reply;
     }
@@ -621,13 +627,23 @@
         return streamHandler.sendMessage(request, messageId);
     }
 
+    private String sendRequest(String request, boolean isHello) throws NetconfException {
+        return sendRequest(request, isHello, replyTimeout);
+    }
+
     private String sendRequest(String request) throws NetconfException {
         // FIXME probably chunk-encoding too early
         request = formatNetconfMessage(request);
-        return sendRequest(request, false);
+        return sendRequest(request, false, replyTimeout);
     }
 
-    private String sendRequest(String request, boolean isHello) throws NetconfException {
+    private String sendRequest(String request, int timeout) throws NetconfException {
+        // FIXME probably chunk-encoding too early
+        request = formatNetconfMessage(request);
+        return sendRequest(request, false, timeout);
+    }
+
+    private String sendRequest(String request, boolean isHello, int timeout) throws NetconfException {
         checkAndReestablish();
         int messageId = -1;
         if (!isHello) {
@@ -859,7 +875,8 @@
     public static class MinaSshNetconfSessionFactory implements NetconfSessionFactory {
 
         @Override
-        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+                                                   NetconfController netconfController) throws NetconfException {
             return new NetconfSessionMinaImpl(netconfDeviceInfo);
         }
     }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
new file mode 100644
index 0000000..5734820
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl.impl;
+
+import org.onosproject.netconf.AbstractNetconfSession;
+import org.onosproject.netconf.NetconfController;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfProxyMessage;
+import org.onosproject.netconf.NetconfSession;
+import org.onosproject.netconf.NetconfSessionFactory;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class NetconfSessionProxyImpl extends AbstractNetconfSession {
+    protected final NetconfDeviceInfo deviceInfo;
+    protected final NetconfController netconfController;
+    private static final Logger log = getLogger(NetconfSessionMinaImpl.class);
+
+    private static final int CONFIGURE_REPLY_TIMEOUT_SEC = 5;
+
+    public NetconfSessionProxyImpl(NetconfDeviceInfo deviceInfo,
+                                   NetconfController controller) {
+        this.deviceInfo = deviceInfo;
+        this.netconfController = controller;
+    }
+
+    private <T> CompletableFuture<T> executeAtMasterCompletableFuture(
+            NetconfProxyMessage proxyMessage)
+            throws NetconfException {
+        CompletableFuture<T> reply = netconfController.executeAtMaster(proxyMessage);
+        return reply;
+    }
+
+    private <T> T executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+        return executeAtMaster(proxyMessage, CONFIGURE_REPLY_TIMEOUT_SEC);
+    }
+
+    private <T> T executeAtMaster(NetconfProxyMessage proxyMessage, int timeout) throws NetconfException {
+        CompletableFuture<T> reply = executeAtMasterCompletableFuture(proxyMessage);
+        try {
+            return reply.get(timeout, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new NetconfException(e.getMessage(), e.getCause());
+        } catch (ExecutionException e) {
+            throw new NetconfException(e.getMessage(), e.getCause());
+        } catch (TimeoutException e) {
+            throw new NetconfException(e.getMessage(), e.getCause());
+        }
+
+    }
+
+    protected NetconfProxyMessage makeProxyMessage(NetconfProxyMessage.SubjectType subjectType, String request) {
+        return new DefaultNetconfProxyMessage(subjectType,
+                                              deviceInfo.getDeviceId(),
+                                              new ArrayList<>(Arrays.asList(request)));
+    }
+
+    @Override
+    public CompletableFuture<String> rpc(String request)
+            throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.RPC,
+                                                            request);
+
+        return executeAtMasterCompletableFuture(proxyMessage);
+    }
+
+    @Override
+    public CompletableFuture<String> request(String request)
+            throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.REQUEST,
+                                                            request);
+
+        return executeAtMasterCompletableFuture(proxyMessage);
+    }
+
+    @Override
+    public String requestSync(String request, int timeout)
+            throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.REQUEST_SYNC,
+                                                            request);
+
+        return executeAtMaster(proxyMessage, timeout);
+    }
+
+    @Override
+    public String requestSync(String request)
+            throws NetconfException {
+
+        return requestSync(request, CONFIGURE_REPLY_TIMEOUT_SEC);
+    }
+
+    @Override
+    public void startSubscription(String filterSchema) throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.START_SUBSCRIPTION,
+                                                            filterSchema);
+
+        executeAtMaster(proxyMessage);
+    }
+
+    @Override
+    public void endSubscription() throws NetconfException {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.END_SUBSCRIPTION,
+                                                            "");
+
+        executeAtMaster(proxyMessage);
+    }
+
+    @Override
+    public String getSessionId() {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.GET_SESSION_ID,
+                                                            "");
+        try {
+            return executeAtMaster(proxyMessage);
+        } catch (NetconfException e) {
+            log.error("Cannot get session id : {}", e);
+            return String.valueOf(-1);
+        }
+
+    }
+
+    @Override
+    public Set<String> getDeviceCapabilitiesSet() {
+        NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.GET_DEVICE_CAPABILITIES_SET,
+                                                            "");
+        try {
+            return executeAtMaster(proxyMessage);
+        } catch (NetconfException e) {
+            log.error("Could not get device capabilities : {}", e);
+            return null;
+        }
+    }
+
+
+    @Override
+    public void setOnosCapabilities(Iterable<String> capabilities) {
+        ArrayList<String> capabilitiesList = new ArrayList<>();
+        capabilities.spliterator().forEachRemaining(c -> capabilitiesList.add(c));
+
+        NetconfProxyMessage proxyMessage =
+                new DefaultNetconfProxyMessage(
+                        NetconfProxyMessage.SubjectType.SET_ONOS_CAPABILITIES,
+                        deviceInfo.getDeviceId(), capabilitiesList);
+        try {
+            executeAtMaster(proxyMessage);
+        } catch (NetconfException e) {
+            log.error("Could not set onos capabilities : {}", e);
+        }
+    }
+
+
+    public static class ProxyNetconfSessionFactory implements NetconfSessionFactory {
+
+        @Override
+        public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+                                                   NetconfController netconfController) {
+            return new NetconfSessionProxyImpl(netconfDeviceInfo, netconfController);
+        }
+    }
+}
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
index bbac123..5e54559 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
@@ -46,6 +46,8 @@
 import org.onosproject.netconf.NetconfSession;
 import org.onosproject.netconf.config.NetconfDeviceConfig;
 import org.onosproject.netconf.config.NetconfSshClientLib;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.osgi.service.component.ComponentContext;
 
 import java.io.ByteArrayInputStream;
@@ -120,6 +122,8 @@
     private static DeviceKeyService deviceKeyService = new NetconfDeviceKeyServiceMock();
     private final NetworkConfigRegistry netCfgService = new MockNetworkConfigRegistry();
     private final MastershipService mastershipService = new MockmastershipService();
+    private final ClusterCommunicationService clusterCommunicationService =
+            new ClusterCommunicationServiceMock();
 
     private final ComponentContext context = new MockComponentContext();
 
@@ -135,6 +139,7 @@
         NetconfControllerImpl.netconfConnectTimeout = NETCONF_CONNECT_TIMEOUT_DEFAULT;
         NetconfControllerImpl.netconfIdleTimeout = NETCONF_IDLE_TIMEOUT_DEFAULT;
         NetconfControllerImpl.netconfReplyTimeout = NETCONF_REPLY_TIMEOUT_DEFAULT;
+        ctrl.clusterCommunicator = clusterCommunicationService;
 
         //Creating mock devices
         deviceInfo1 = new NetconfDeviceInfo("device1", "001", IpAddress.valueOf(DEVICE_1_IP), DEVICE_1_PORT);
@@ -547,4 +552,6 @@
             return true;
         }
     }
+    private class ClusterCommunicationServiceMock extends ClusterCommunicationServiceAdapter {
+    }
 }
diff --git a/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java b/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java
index d8bb0c7..9b65f25 100644
--- a/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java
+++ b/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java
@@ -18,6 +18,7 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
+import org.onosproject.netconf.NetconfException;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -95,10 +96,16 @@
         providerService = providerRegistry.register(this);
         controller.getNetconfDevices().forEach(id -> {
             NetconfDevice device = controller.getNetconfDevice(id);
-            NetconfSession session = device.getSession();
-            InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
-            session.addDeviceOutputListener(listener);
-            idNotificationListenerMap.put(id, listener);
+            if (device.isMasterSession()) {
+                NetconfSession session = device.getSession();
+                InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
+                try {
+                    session.addDeviceOutputListener(listener);
+                } catch (NetconfException e) {
+                    log.error("addDeviceOutputListener Error {} ", e.getMessage());
+                }
+                idNotificationListenerMap.put(id, listener);
+            }
         });
         controller.addDeviceListener(deviceListener);
         log.info("NetconfAlarmProvider Started");
@@ -108,9 +115,14 @@
     public void deactivate() {
         providerRegistry.unregister(this);
         idNotificationListenerMap.forEach((id, listener) -> {
-            controller.getNetconfDevice(id)
-                    .getSession()
-                    .removeDeviceOutputListener(listener);
+            NetconfDevice device = controller.getNetconfDevice(id);
+            if (device.isMasterSession()) {
+                try {
+                    device.getSession().removeDeviceOutputListener(listener);
+                } catch (NetconfException e) {
+                    log.error("RemoveDeviceOutputListener Error {}", e.getMessage());
+                }
+            }
         });
         controller.removeDeviceListener(deviceListener);
         providerService = null;
@@ -161,11 +173,15 @@
 
         @Override
         public void deviceAdded(DeviceId deviceId) {
-            NetconfDevice device = controller.getNetconfDevice(deviceId);
-            NetconfSession session = device.getSession();
-            InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
-            session.addDeviceOutputListener(listener);
-            idNotificationListenerMap.put(deviceId, listener);
+            try {
+                NetconfDevice device = controller.getNetconfDevice(deviceId);
+                NetconfSession session = device.getSession();
+                InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
+                session.addDeviceOutputListener(listener);
+                idNotificationListenerMap.put(deviceId, listener);
+            } catch (NetconfException e) {
+                log.error("Device add fail {}", e.getMessage());
+            }
         }
 
         @Override
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 98d135f..80b9108 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -274,6 +274,8 @@
                 case STANDBY:
                     //TODO this issue a warning on the first election/connection
                     controller.disconnectDevice(deviceId, false);
+                    withDeviceLock(
+                            () -> initiateConnection(deviceId, newRole), deviceId).run();
                     providerService.receivedRoleReply(deviceId, newRole, MastershipRole.STANDBY);
                     //else no-op
                     break;
@@ -531,6 +533,42 @@
                 UNKNOWN, UNKNOWN, UNKNOWN, UNKNOWN, cid, true, annotations.build());
     }
 
+    /**
+     * Will appropriately create connections with the device.
+     * For Master role: will create secure transport and proxy sessions.
+     * For Standby role: will create only proxy session and disconnect secure transport session.
+     * For none role: will disconnect all sessions.
+     *
+     * @param deviceId device id
+     * @param newRole new role
+     */
+    private void initiateConnection(DeviceId deviceId, MastershipRole newRole) {
+        try {
+            if (isReachable(deviceId)) {
+                NetconfDevice device = null;
+                if (newRole.equals(MastershipRole.MASTER)) {
+                    device = controller.connectDevice(deviceId, true);
+                } else if (newRole.equals(MastershipRole.STANDBY)) {
+                    device = controller.connectDevice(deviceId, false);
+                }
+
+                if (device != null) {
+                    providerService.receivedRoleReply(deviceId, newRole, newRole);
+                } else {
+                    providerService.receivedRoleReply(deviceId, newRole, MastershipRole.NONE);
+                }
+
+            }
+        } catch (Exception e) {
+            if (deviceService.getDevice(deviceId) != null) {
+                providerService.deviceDisconnected(deviceId);
+            }
+            deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
+            throw new IllegalStateException(new NetconfException(
+                    "Can't connect to NETCONF device " + deviceId, e));
+        }
+    }
+
 
     private void discoverOrUpdatePorts(DeviceId deviceId) {
         retriedPortDiscoveryMap.put(deviceId, new AtomicInteger(0));