Changed the how to relay proxy message within cluster

*changed the commnunication way to 'unicast' from 'sendAndReceived'

Change-Id: I0207c074fd2ab7b8378d9b4ce0cf877fc9aeab29
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
index 45d81d2..daf92fa 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015-present Open Networking Foundation
+ * Copyright 2019-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
 package org.onosproject.netconf;
 
 import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 
 import java.util.Map;
@@ -138,4 +139,13 @@
         errorFuture.completeExceptionally(new NetconfException("Method executeAtMaster not implemented"));
         return errorFuture;
     }
+
+    /**
+     * Get a contoller node Id .
+     *
+     * @return controller node Id
+     */
+    default NodeId getLocalNodeId() {
+        return null;
+    }
 }
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java
index 8ebfbe3..66a73b2 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2018-present Open Networking Foundation
+ * Copyright 2019-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
 
 package org.onosproject.netconf;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 
 import java.util.List;
@@ -57,4 +58,10 @@
      * @return arguments
      */
     List<String> arguments();
+
+    /**
+     * Returns the node id of proxymessage sender.
+     * @return NodeId
+     */
+    NodeId senderId();
 }
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java
index a1d4d74..69d29f5 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2018-present Open Networking Foundation
+ * Copyright 2019-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,6 +15,8 @@
  */
 package org.onosproject.netconf;
 
+import java.util.Set;
+
 /**
  * Abstract interface for the implementation of proxy message handler.
  */
@@ -28,4 +30,32 @@
      * @throws NetconfException netconf exception
      */
     <T> T handleIncomingMessage(NetconfProxyMessage proxyMessage) throws NetconfException;
+
+    /**
+     * Will decode the message on case basis and
+     * call the actual method in Netconf Session implementation bound to secure transport.
+     * @param replyMessage incoming reply message
+     * @param <T> return type
+     * @return the value returned by session call
+     * @throws NetconfException netconf exception
+     */
+    <T> T handleReplyMessage(NetconfProxyMessage replyMessage) throws NetconfException;
+
+    /**
+     * Will decode the message on case basis and
+     * call the actual method in Netconf Session implementation bound to secure transport.
+     * @param proxyMessage incoming proxy message
+     * @return the set value returned by session call
+     * @throws NetconfException netconf exception
+     */
+    Set<String> handleIncomingSetMessage(NetconfProxyMessage proxyMessage) throws NetconfException;
+
+    /**
+     * Will decode the message on case basis and
+     * call the actual method in Netconf Session implementation bound to secure transport.
+     * @param replyMessage incoming proxy message
+     * @return the set value returned by session call
+     * @throws NetconfException netconf exception
+     */
+    Set<String> handleReplySetMessage(NetconfProxyMessage replyMessage) throws NetconfException;
 }
\ No newline at end of file
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
index 1a57d4d..da9db37 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2018-present Open Networking Foundation
+ * Copyright 2019-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
 package org.onosproject.netconf.ctl.impl;
 
 import com.google.common.collect.ImmutableList;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.netconf.NetconfProxyMessage;
 
@@ -30,21 +31,26 @@
     private final SubjectType subjectType;
     private final DeviceId deviceId;
     private final List<String> arguments;
+    private final NodeId senderId;
 
     /**
      * Create new NetconfProxyMessage with provided informations.
      * @param subType Message subject type.
      * @param devId Device information that recieve message.
      * @param args Messages arguments.
+     * @param nodeId nodeId of sender
      */
     public DefaultNetconfProxyMessage(SubjectType subType,
                                       DeviceId devId,
-                                      List<String> args) {
+                                      List<String> args,
+                                      NodeId nodeId) {
         subjectType = subType;
         deviceId = devId;
         arguments = args;
+        senderId = nodeId;
     }
 
+
     @Override
     public SubjectType subjectType() {
         return subjectType;
@@ -59,4 +65,9 @@
     public List<String> arguments() {
         return ImmutableList.copyOf(arguments);
     }
+
+    @Override
+    public NodeId senderId() {
+        return senderId;
+    }
 }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index b793c13..d0ee1f0 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015-present Open Networking Foundation
+ * Copyright 2019-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,6 +22,9 @@
 import com.google.common.annotations.Beta;
 import com.google.common.collect.Lists;
 
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -70,12 +73,14 @@
 
 import java.util.ArrayList;
 import java.util.Dictionary;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -116,16 +121,19 @@
 
     protected NetconfSshClientLib sshClientLib = NetconfSshClientLib.APACHE_MINA;
 
-
-    private static final String APACHE_MINA_STR = "apache-mina";
-
-
     private static final MessageSubject SEND_REQUEST_SUBJECT_STRING =
             new MessageSubject("netconf-session-master-send-message-string");
 
+    private static final MessageSubject SEND_REPLY_SUBJECT_STRING =
+            new MessageSubject("netconf-session-master-send-reply-message-string");
+
     private static final MessageSubject SEND_REQUEST_SUBJECT_SET_STRING =
             new MessageSubject("netconf-session-master-send-message-set-string");
 
+    private static final MessageSubject SEND_REPLY_SUBJECT_SET_STRING =
+            new MessageSubject("netconf-session-master-send-reply-message-set-string");
+
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService cfgService;
 
@@ -144,6 +152,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ClusterCommunicationService clusterCommunicator;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     public static final Logger log = LoggerFactory
             .getLogger(NetconfControllerImpl.class);
 
@@ -163,6 +174,15 @@
             Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
                                                          "connection-reopen-%d", log));
 
+    private final ExecutorService remoteRequestExecutor =
+            Executors.newCachedThreadPool();
+
+    protected NodeId localNodeId;
+
+    private CountDownLatch countDownLatch;
+
+    private ArrayList<String> replyArguments = new ArrayList<>();
+
     public static final Serializer SERIALIZER = Serializer.using(
             KryoNamespace.newBuilder()
                     .register(KryoNamespaces.API)
@@ -177,16 +197,30 @@
         cfgService.registerProperties(getClass());
         modified(context);
         Security.addProvider(new BouncyCastleProvider());
-        clusterCommunicator.<NetconfProxyMessage, String>addSubscriber(
+        clusterCommunicator.<NetconfProxyMessage>addSubscriber(
                 SEND_REQUEST_SUBJECT_STRING,
                 SERIALIZER::decode,
                 this::handleProxyMessage,
-                SERIALIZER::encode);
-        clusterCommunicator.<NetconfProxyMessage, Set<String>>addSubscriber(
+                remoteRequestExecutor);
+        clusterCommunicator.<NetconfProxyMessage>addSubscriber(
                 SEND_REQUEST_SUBJECT_SET_STRING,
                 SERIALIZER::decode,
                 this::handleProxyMessage,
-                SERIALIZER::encode);
+                remoteRequestExecutor);
+        clusterCommunicator.<NetconfProxyMessage>addSubscriber(
+                SEND_REPLY_SUBJECT_STRING,
+                SERIALIZER::decode,
+                this::handleProxyReplyMessage,
+                remoteRequestExecutor);
+        clusterCommunicator.<NetconfProxyMessage>addSubscriber(
+                SEND_REPLY_SUBJECT_SET_STRING,
+                SERIALIZER::decode,
+                this::handleProxyReplyMessage,
+                remoteRequestExecutor);
+
+        localNodeId = Optional.ofNullable(clusterService.getLocalNode())
+                .map(ControllerNode::id)
+                .orElseGet(() -> new NodeId("nullNodeId"));
         log.info("Started");
     }
 
@@ -204,6 +238,8 @@
         });
         clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_STRING);
         clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_SET_STRING);
+        clusterCommunicator.removeSubscriber(SEND_REPLY_SUBJECT_STRING);
+        clusterCommunicator.removeSubscriber(SEND_REPLY_SUBJECT_SET_STRING);
         cfgService.unregisterProperties(getClass(), false);
         netconfDeviceListeners.clear();
         netconfDeviceMap.clear();
@@ -350,6 +386,11 @@
         }
     }
 
+    @Override
+    public NodeId getLocalNodeId() {
+        return localNodeId;
+    }
+
     private NetconfDeviceInfo createDeviceInfo(DeviceId deviceId) throws NetconfException {
             Device device = deviceService.getDevice(deviceId);
             String ip, path = null;
@@ -469,36 +510,94 @@
         return netconfDeviceMap.keySet();
     }
 
+    private void unicastRpcToMaster(NetconfProxyMessage proxyMessage, NodeId receiverId) {
+        MessageSubject messageSubject;
+
+        switch (proxyMessage.subjectType()) {
+            case GET_DEVICE_CAPABILITIES_SET:
+                messageSubject = SEND_REQUEST_SUBJECT_SET_STRING;
+                break;
+            default:
+                messageSubject = SEND_REQUEST_SUBJECT_STRING;
+                break;
+        }
+
+        clusterCommunicator
+                .unicast(proxyMessage,
+                         messageSubject,
+                         SERIALIZER::encode,
+                         receiverId);
+    }
+
+    private void unicastReplyToSender(NetconfProxyMessage proxyMessage, NodeId receiverId) {
+        MessageSubject messageSubject;
+
+        switch (proxyMessage.subjectType()) {
+            case GET_DEVICE_CAPABILITIES_SET:
+                messageSubject = SEND_REPLY_SUBJECT_SET_STRING;
+                break;
+            default:
+                messageSubject = SEND_REPLY_SUBJECT_STRING;
+                break;
+        }
+
+        clusterCommunicator
+                .unicast(proxyMessage,
+                         messageSubject,
+                         SERIALIZER::encode,
+                         receiverId);
+    }
+
     @Override
     public <T> CompletableFuture<T> executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
         DeviceId deviceId = proxyMessage.deviceId();
         if (deviceService.getRole(deviceId).equals(MastershipRole.MASTER)) {
-            return CompletableFuture.completedFuture(
-                    netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+            return handleProxyMessage(proxyMessage);
         } else {
-            MessageSubject subject;
+            return relayMessageToMaster(proxyMessage);
+        }
+    }
+
+    public <T> CompletableFuture<T> relayMessageToMaster(NetconfProxyMessage proxyMessage) {
+        DeviceId deviceId = proxyMessage.deviceId();
+
+        countDownLatch = new CountDownLatch(1);
+        unicastRpcToMaster(proxyMessage, mastershipService.getMasterFor(deviceId));
+
+        try {
+            countDownLatch.await(netconfReplyTimeout, TimeUnit.SECONDS);
+
             switch (proxyMessage.subjectType()) {
                 case GET_DEVICE_CAPABILITIES_SET:
-                    subject = SEND_REQUEST_SUBJECT_SET_STRING;
-                    break;
+                    Set<String> forReturnValue = new LinkedHashSet<>(replyArguments);
+                    return CompletableFuture.completedFuture((T) forReturnValue);
                 default:
-                    subject = SEND_REQUEST_SUBJECT_STRING;
-                    break;
+                    String returnValue = Optional.ofNullable(replyArguments.get(0)).orElse(null);
+                    return CompletableFuture.completedFuture((T) returnValue);
             }
-
-            return clusterCommunicator
-                    .sendAndReceive(proxyMessage,
-                                    subject,
-                                    SERIALIZER::encode,
-                                    SERIALIZER::decode,
-                                    mastershipService.getMasterFor(deviceId));
+        } catch (InterruptedException e) {
+            log.error("InterruptedOccured while awaiting because of {}", e);
+            CompletableFuture<T> errorFuture = new CompletableFuture<>();
+            errorFuture.completeExceptionally(e);
+            return errorFuture;
+        } catch (Exception e) {
+            log.error("Exception occured because of {}", e);
+            CompletableFuture<T> errorFuture = new CompletableFuture<>();
+            errorFuture.completeExceptionally(e);
+            return errorFuture;
         }
     }
 
     private <T> CompletableFuture<T> handleProxyMessage(NetconfProxyMessage proxyMessage) {
         try {
-            return CompletableFuture.completedFuture(
-                    netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+            switch (proxyMessage.subjectType()) {
+                case GET_DEVICE_CAPABILITIES_SET:
+                    return CompletableFuture.completedFuture(
+                            (T) netconfProxyMessageHandler.handleIncomingSetMessage(proxyMessage));
+                default:
+                    return CompletableFuture.completedFuture(
+                            netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+            }
         } catch (NetconfException e) {
             CompletableFuture<T> errorFuture = new CompletableFuture<>();
             errorFuture.completeExceptionally(e);
@@ -506,6 +605,25 @@
         }
     }
 
+    private <T> CompletableFuture<T> handleProxyReplyMessage(NetconfProxyMessage replyMessage) {
+        try {
+            switch (replyMessage.subjectType()) {
+                case GET_DEVICE_CAPABILITIES_SET:
+                    return CompletableFuture.completedFuture(
+                            (T) netconfProxyMessageHandler.handleReplySetMessage(replyMessage));
+                default:
+                    return CompletableFuture.completedFuture(
+                            netconfProxyMessageHandler.handleReplyMessage(replyMessage));
+
+            }
+        } catch (NetconfException e) {
+            CompletableFuture<T> errorFuture = new CompletableFuture<>();
+            errorFuture.completeExceptionally(e);
+            return errorFuture;
+        }
+    }
+
+
     /**
      * Netconf Proxy Message Handler Implementation class.
      */
@@ -517,6 +635,7 @@
             DeviceId deviceId = proxyMessage.deviceId();
             NetconfProxyMessage.SubjectType subjectType = proxyMessage.subjectType();
             NetconfSession secureTransportSession;
+
             if (netconfDeviceMap.get(deviceId).isMasterSession()) {
                 secureTransportSession = netconfDeviceMap.get(deviceId).getSession();
             } else {
@@ -546,9 +665,6 @@
                     case GET_SESSION_ID:
                         reply = (T) secureTransportSession.getSessionId();
                         break;
-                    case SET_ONOS_CAPABILITIES:
-                        secureTransportSession.setOnosCapabilities(arguments);
-                        break;
                     case GET_DEVICE_CAPABILITIES_SET:
                         reply = (T) secureTransportSession.getDeviceCapabilitiesSet();
                         break;
@@ -562,8 +678,61 @@
                 throw new NetconfException(e.getMessage(), e.getCause());
             }
 
+            ArrayList<String> returnArgument = new ArrayList<String>();
+            Optional.ofNullable(reply).ifPresent(r -> returnArgument.add((String) r));
+
+            DefaultNetconfProxyMessage replyMessage = new DefaultNetconfProxyMessage(
+                    subjectType,
+                    deviceId,
+                    returnArgument,
+                    localNodeId);
+
+            unicastReplyToSender(replyMessage, proxyMessage.senderId());
+
+
             return reply;
         }
+
+        @Override
+        public <T> T handleReplyMessage(NetconfProxyMessage replyMessage) {
+            replyArguments = new ArrayList<>(replyMessage.arguments());
+            countDownLatch.countDown();
+            return (T) Optional.ofNullable(replyArguments.get(0)).orElse(null);
+        }
+
+        @Override
+        public Set<String> handleIncomingSetMessage(NetconfProxyMessage proxyMessage) throws NetconfException {
+            DeviceId deviceId = proxyMessage.deviceId();
+            NetconfProxyMessage.SubjectType subjectType = proxyMessage.subjectType();
+            NetconfSession secureTransportSession;
+
+            if (netconfDeviceMap.get(deviceId).isMasterSession()) {
+                secureTransportSession = netconfDeviceMap.get(deviceId).getSession();
+            } else {
+                throw new NetconfException("SSH session not present");
+            }
+
+            Set<String> reply = secureTransportSession.getDeviceCapabilitiesSet();
+            ArrayList<String> returnArgument = new ArrayList<String>(reply);
+
+            DefaultNetconfProxyMessage replyMessage = new DefaultNetconfProxyMessage(
+                    subjectType,
+                    deviceId,
+                    returnArgument,
+                    localNodeId);
+
+            unicastReplyToSender(replyMessage, proxyMessage.senderId());
+            return reply;
+        }
+
+        @Override
+        public Set<String> handleReplySetMessage(NetconfProxyMessage replyMessage) {
+            replyArguments = new ArrayList<>(replyMessage.arguments());
+            countDownLatch.countDown();
+
+            return new LinkedHashSet<>(replyArguments);
+
+        }
     }
 
     /**
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
index 5734820..6e6a996 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2018-present Open Networking Foundation
+ * Copyright 2019-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
 
 package org.onosproject.netconf.ctl.impl;
 
+import org.onosproject.cluster.NodeId;
 import org.onosproject.netconf.AbstractNetconfSession;
 import org.onosproject.netconf.NetconfController;
 import org.onosproject.netconf.NetconfDeviceInfo;
@@ -38,14 +39,17 @@
 public class NetconfSessionProxyImpl extends AbstractNetconfSession {
     protected final NetconfDeviceInfo deviceInfo;
     protected final NetconfController netconfController;
+    protected final NodeId sessionNodeId;
     private static final Logger log = getLogger(NetconfSessionMinaImpl.class);
 
     private static final int CONFIGURE_REPLY_TIMEOUT_SEC = 5;
 
     public NetconfSessionProxyImpl(NetconfDeviceInfo deviceInfo,
-                                   NetconfController controller) {
+                                   NetconfController controller,
+                                   NodeId nodeId) {
         this.deviceInfo = deviceInfo;
         this.netconfController = controller;
+        this.sessionNodeId = nodeId;
     }
 
     private <T> CompletableFuture<T> executeAtMasterCompletableFuture(
@@ -77,7 +81,8 @@
     protected NetconfProxyMessage makeProxyMessage(NetconfProxyMessage.SubjectType subjectType, String request) {
         return new DefaultNetconfProxyMessage(subjectType,
                                               deviceInfo.getDeviceId(),
-                                              new ArrayList<>(Arrays.asList(request)));
+                                              new ArrayList<>(Arrays.asList(request)),
+                                              sessionNodeId);
     }
 
     @Override
@@ -164,7 +169,8 @@
         NetconfProxyMessage proxyMessage =
                 new DefaultNetconfProxyMessage(
                         NetconfProxyMessage.SubjectType.SET_ONOS_CAPABILITIES,
-                        deviceInfo.getDeviceId(), capabilitiesList);
+                        deviceInfo.getDeviceId(), capabilitiesList,
+                        sessionNodeId);
         try {
             executeAtMaster(proxyMessage);
         } catch (NetconfException e) {
@@ -178,7 +184,9 @@
         @Override
         public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
                                                    NetconfController netconfController) {
-            return new NetconfSessionProxyImpl(netconfDeviceInfo, netconfController);
+            return new NetconfSessionProxyImpl(netconfDeviceInfo,
+                                               netconfController,
+                                               netconfController.getLocalNodeId());
         }
     }
 }
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
index 5e54559..d420db8 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2019-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -26,6 +25,7 @@
 import org.onlab.packet.IpAddress;
 import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.net.DeviceId;
@@ -60,6 +60,7 @@
 import java.util.Optional;
 import java.util.Set;
 
+import static org.easymock.EasyMock.createMock;
 import static org.hamcrest.Matchers.*;
 import static org.junit.Assert.*;
 import static org.onosproject.netconf.ctl.impl.OsgiPropertyConstants.NETCONF_CONNECT_TIMEOUT_DEFAULT;
@@ -124,6 +125,7 @@
     private final MastershipService mastershipService = new MockmastershipService();
     private final ClusterCommunicationService clusterCommunicationService =
             new ClusterCommunicationServiceMock();
+    private final ClusterService mockClusterService = createMock(ClusterService.class);
 
     private final ComponentContext context = new MockComponentContext();
 
@@ -140,6 +142,7 @@
         NetconfControllerImpl.netconfIdleTimeout = NETCONF_IDLE_TIMEOUT_DEFAULT;
         NetconfControllerImpl.netconfReplyTimeout = NETCONF_REPLY_TIMEOUT_DEFAULT;
         ctrl.clusterCommunicator = clusterCommunicationService;
+        ctrl.clusterService = mockClusterService;
 
         //Creating mock devices
         deviceInfo1 = new NetconfDeviceInfo("device1", "001", IpAddress.valueOf(DEVICE_1_IP), DEVICE_1_PORT);
@@ -240,9 +243,9 @@
      */
     @Test
     public void testAddRemoveDeviceListener() {
-        NetconfDeviceListener deviceListener1 = EasyMock.createMock(NetconfDeviceListener.class);
-        NetconfDeviceListener deviceListener2 = EasyMock.createMock(NetconfDeviceListener.class);
-        NetconfDeviceListener deviceListener3 = EasyMock.createMock(NetconfDeviceListener.class);
+        NetconfDeviceListener deviceListener1 = createMock(NetconfDeviceListener.class);
+        NetconfDeviceListener deviceListener2 = createMock(NetconfDeviceListener.class);
+        NetconfDeviceListener deviceListener3 = createMock(NetconfDeviceListener.class);
 
         ctrl.addDeviceListener(deviceListener1);
         ctrl.addDeviceListener(deviceListener2);
@@ -422,7 +425,7 @@
         public TestNetconfDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
             netconfDeviceInfo = deviceInfo;
             if (!badDeviceInfo3.getDeviceId().equals(deviceInfo.getDeviceId())) {
-                netconfSession = EasyMock.createMock(NetconfSession.class);
+                netconfSession = createMock(NetconfSession.class);
                 deviceState = true;
             } else {
                 throw new NetconfException("Cannot create Connection and Session");