[ONOS-7566] Implementation of NetconfProxySession
Change-Id: I01cbe0b10ac36cb6db53127555b551f405acdeb1
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java
index 2254572..7843f7d 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/AbstractNetconfSession.java
@@ -80,6 +80,21 @@
@Override
public abstract CompletableFuture<String> rpc(String request) throws NetconfException;
+ protected CompletableFuture<CharSequence> executeRpc(String rpcString) throws NetconfException {
+ return rpc(rpcString)
+ .thenApply(msg -> {
+ // crude way of removing rpc-reply envelope
+ int begin = msg.indexOf("<data>");
+ int end = msg.lastIndexOf("</data>");
+ if (begin != -1 && end != -1) {
+ return msg.subSequence(begin, end + "</data>".length());
+ } else {
+ // FIXME probably should exceptionally fail here.
+ return msg;
+ }
+ });
+ }
+
@Override
public CompletableFuture<CharSequence> asyncGetConfig(DatastoreId datastore) throws NetconfException {
StringBuilder rpc = new StringBuilder();
@@ -102,7 +117,6 @@
@Override
public CompletableFuture<CharSequence> asyncGet() throws NetconfException {
StringBuilder rpc = new StringBuilder();
-
rpc.append(RPC_OPEN);
rpc.append(NETCONF_BASE_NAMESPACE).append(">\n");
rpc.append(GET_OPEN).append(NEW_LINE);
@@ -113,22 +127,6 @@
return executeRpc(rpc.toString());
}
- protected CompletableFuture<CharSequence> executeRpc(String rpcString) throws NetconfException {
- return rpc(rpcString)
- .thenApply(msg -> {
- // crude way of removing rpc-reply envelope
- int begin = msg.indexOf("<data>");
- int end = msg.lastIndexOf("</data>");
- if (begin != -1 && end != -1) {
- return msg.subSequence(begin, end + "</data>".length());
- } else {
- // FIXME probably should exceptionally fail here.
- return msg;
- }
- });
-
- }
-
@Override
public String get(String request) throws NetconfException {
return requestSync(request);
@@ -362,10 +360,14 @@
public abstract Set<String> getDeviceCapabilitiesSet();
@Override
- public abstract void addDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+ public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException {
+ throw new NetconfException("Only master session can call addDeviceOutputListener");
+ }
@Override
- public abstract void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+ public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException {
+ throw new NetconfException("Only master session can call removeDeviceOutputListener");
+ }
/**
* Checks errors in reply from the session.
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
index 041c379..45d81d2 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfController.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
/**
* Abstraction of an NETCONF controller. Serves as a one stop shop for obtaining
@@ -53,6 +54,20 @@
NetconfDevice connectDevice(DeviceId deviceId) throws NetconfException;
/**
+ * Tries to connect to a specific NETCONF device, if the connection is succesful
+ * it creates and adds the device to the ONOS core as a NetconfDevice.
+ * If isMaster true: Will create two sessions for a device : secure transport session and proxy session.
+ * If isMaster false: Will create only proxy session.
+ * @param deviceId deviceId of the device to connect
+ * @param isMaster if the controller is master for the device
+ * @return NetconfDevice Netconf device
+ * @throws NetconfException when device is not available
+ */
+ default NetconfDevice connectDevice(DeviceId deviceId, boolean isMaster) throws NetconfException {
+ return connectDevice(deviceId);
+ }
+
+ /**
* Disconnects a Netconf device and removes it from the core.
*
* @param deviceId id of the device to remove
@@ -107,4 +122,20 @@
* @return NetconfDevice Netconf device
*/
NetconfDevice getNetconfDevice(IpAddress ip, int port, String path);
+
+ /**
+ * If master, will execute the call locally else will use
+ * clusterCommunicationManager to execute at master controller.
+ * Meant only for internal synchronization and not to be used by applications.
+ *
+ * @param proxyMessage proxy message
+ * @param <T> return type
+ * @return Completable future of class T
+ * @throws NetconfException netconf exception
+ */
+ default <T> CompletableFuture<T> executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+ CompletableFuture<T> errorFuture = new CompletableFuture<>();
+ errorFuture.completeExceptionally(new NetconfException("Method executeAtMaster not implemented"));
+ return errorFuture;
+ }
}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java
index 639f0a8..8c756e5 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDevice.java
@@ -24,13 +24,22 @@
/**
* Returns whether a device is a NETCONF device with a capabilities list
- * and is accessible.
+ * and is accessible, through a secure transport session or a proxy session.
*
* @return true if device is accessible, false otherwise
*/
boolean isActive();
/**
+ * Returns whether the device has secure transport session.
+ *
+ * @return true if secure transport session exists, false otherwise
+ */
+ default boolean isMasterSession() {
+ return isActive();
+ }
+
+ /**
* Returns a NETCONF session context for this device.
*
* @return netconf session
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java
index 312c7c3..53e5db7 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfDeviceFactory.java
@@ -16,6 +16,8 @@
package org.onosproject.netconf;
+import com.google.common.annotations.Beta;
+
/**
* Abstract interface for the creation of a NETCONF device.
*/
@@ -31,4 +33,19 @@
*/
NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo)
throws NetconfException;
+
+ /**
+ * Creates a new NETCONF device based on the supplied information.
+ * @param netconfDeviceInfo information of the device to create.
+ * @param isMaster if true create secure transport session with the device,
+ * else just create a proxy session
+ * @return Instance of NetconfDevice.
+ * @throws NetconfException when problems arise creating the device and establishing
+ * the connection.
+ */
+ @Beta
+ default NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo,
+ boolean isMaster) throws NetconfException {
+ return createNetconfDevice(netconfDeviceInfo);
+ }
}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java
new file mode 100644
index 0000000..8ebfbe3
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf;
+
+import org.onosproject.net.DeviceId;
+
+import java.util.List;
+
+/**
+ * Interface representing a NETCONF proxy message.
+ */
+public interface NetconfProxyMessage {
+
+ enum SubjectType {
+ RPC,
+ // FIXME in the final form there should only be (async) RPC
+ // and REQUEST, REQUEST_SYNC should go away
+ // once NetconfSession methods got cleaned up.
+ REQUEST,
+ REQUEST_SYNC,
+ START_SUBSCRIPTION,
+ END_SUBSCRIPTION,
+ GET_SESSION_ID,
+ GET_DEVICE_CAPABILITIES_SET,
+ SET_ONOS_CAPABILITIES
+ }
+
+ /**
+ * Returns the subject of the message.
+ *
+ * @return subject in enum subject type
+ */
+ NetconfProxyMessage.SubjectType subjectType();
+
+ /**
+ * Returns the device id of the device to which the message is intended.
+ * @return device id
+ */
+ DeviceId deviceId();
+
+ /**
+ * Returns the arguments of the intended method call in order.
+ * @return arguments
+ */
+ List<String> arguments();
+}
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java
new file mode 100644
index 0000000..a1d4d74
--- /dev/null
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfProxyMessageHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netconf;
+
+/**
+ * Abstract interface for the implementation of proxy message handler.
+ */
+public interface NetconfProxyMessageHandler {
+ /**
+ * Will decode the message on case basis and
+ * call the actual method in Netconf Session implementation bound to secure transport.
+ * @param proxyMessage incoming proxy message
+ * @param <T> return type
+ * @return the value returned by session call
+ * @throws NetconfException netconf exception
+ */
+ <T> T handleIncomingMessage(NetconfProxyMessage proxyMessage) throws NetconfException;
+}
\ No newline at end of file
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
index bc04118..d0c80de 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSession.java
@@ -122,6 +122,19 @@
String requestSync(String request) throws NetconfException;
/**
+ * Executes an synchronous RPC to the server with specific reply TIMEOUT.
+ *
+ * @param request the XML containing the RPC for the server.
+ * @param timeout the reply timeout.
+ * @return Server response or ERROR
+ * @throws NetconfException when there is a problem in the communication process on
+ * the underlying connection
+ */
+ default String requestSync(String request, int timeout) throws NetconfException {
+ return "";
+ }
+
+ /**
* Retrieves the specified configuration.
*
* @param netconfTargetConfig the type of configuration to retrieve.
@@ -341,15 +354,17 @@
* Add a listener to the underlying stream handler implementation.
*
* @param listener event listener.
+ * @throws NetconfException when this method will be called by STANDBY or NONE node.
*/
- void addDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+ void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException;
/**
* Remove a listener from the underlying stream handler implementation.
*
* @param listener event listener.
+ * @throws NetconfException when this method will be called by STANDBY or NONE node.
*/
- void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener);
+ void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) throws NetconfException;
/**
* Read the connect timeout that this session was created with.
diff --git a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java
index 806eccb..95f32a4 100644
--- a/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java
+++ b/protocols/netconf/api/src/main/java/org/onosproject/netconf/NetconfSessionFactory.java
@@ -25,9 +25,11 @@
/**
* Creates a new NETCONF session for the specified device.
* @param netconfDeviceInfo information of the device to create the session for.
+ * @param netconfController netconf controller object
* @return Instance of NetconfSession.
* @throws NetconfException when problems arise establishing the connection.
*/
- NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo)
+ NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+ NetconfController netconfController)
throws NetconfException;
}
diff --git a/protocols/netconf/ctl/BUILD b/protocols/netconf/ctl/BUILD
index 0e9a912..7a2b9b6 100644
--- a/protocols/netconf/ctl/BUILD
+++ b/protocols/netconf/ctl/BUILD
@@ -1,8 +1,10 @@
-COMPILE_DEPS = CORE_DEPS + JACKSON + CLI + [
+COMPILE_DEPS = CORE_DEPS + JACKSON + CLI + KRYO + [
"@ganymed_ssh2//jar",
"@sshd_core//jar",
"@bcpkix_jdk15on//jar",
"@bcprov_jdk15on//jar",
+ "@org_osgi_service_cm//jar",
+ "//core/store/serializers:onos-core-serializers",
"//protocols/netconf/api:onos-protocols-netconf-api",
]
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
index c952027..dfbe760 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfDevice.java
@@ -16,6 +16,7 @@
package org.onosproject.netconf.ctl.impl;
+import org.onosproject.netconf.NetconfController;
import org.onosproject.netconf.NetconfDevice;
import org.onosproject.netconf.NetconfDeviceInfo;
import org.onosproject.netconf.NetconfException;
@@ -34,24 +35,37 @@
private NetconfDeviceInfo netconfDeviceInfo;
private boolean deviceState = true;
- private final NetconfSessionFactory sessionFactory;
private NetconfSession netconfSession;
+ private boolean isMasterSession = false;
+ private NetconfSession netconfProxySession;
// will block until hello RPC handshake completes
/**
* Creates a new default NETCONF device with the information provided.
* The device gets created only if no exception is thrown while connecting to
* it and establishing the NETCONF session.
+ * The secure transport session will only be created if isMaster is true.
* @param deviceInfo information about the device to be created.
+ * @param isMaster if true create secure transport session, otherwise create proxy session.
+ * @param netconfController netconf controller object
* @throws NetconfException if there are problems in creating or establishing
* the underlying NETCONF connection and session.
*/
- public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo)
+ public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo,
+ boolean isMaster,
+ NetconfController netconfController)
throws NetconfException {
netconfDeviceInfo = deviceInfo;
- sessionFactory = (ncDevInfo) -> new NetconfSessionMinaImpl(ncDevInfo);
try {
- netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+ if (isMaster) {
+ netconfSession = new NetconfSessionMinaImpl(deviceInfo);
+ isMasterSession = true;
+ netconfProxySession = netconfSession;
+ } else {
+ netconfProxySession = new NetconfSessionProxyImpl
+ .ProxyNetconfSessionFactory()
+ .createNetconfSession(deviceInfo, netconfController);
+ }
} catch (NetconfException e) {
deviceState = false;
throw new NetconfException("Cannot create connection and session for device " +
@@ -64,18 +78,30 @@
* Creates a new default NETCONF device with the information provided.
* The device gets created only if no exception is thrown while connecting to
* it and establishing the NETCONF session.
- *
+ * The secure transport session will only be created if isMaster is true.
* @param deviceInfo information about the device to be created.
* @param factory the factory used to create the session
+ * @param isMaster if true create secure transport session, otherwise create proxy session.
+ * @param netconfController netconf controller object
* @throws NetconfException if there are problems in creating or establishing
* the underlying NETCONF connection and session.
*/
- public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo, NetconfSessionFactory factory)
+ public DefaultNetconfDevice(NetconfDeviceInfo deviceInfo,
+ NetconfSessionFactory factory,
+ boolean isMaster,
+ NetconfController netconfController)
throws NetconfException {
netconfDeviceInfo = deviceInfo;
- sessionFactory = factory;
try {
- netconfSession = sessionFactory.createNetconfSession(deviceInfo);
+ if (isMaster) {
+ netconfSession = factory.createNetconfSession(deviceInfo, netconfController);
+ isMasterSession = true;
+ netconfProxySession = netconfSession;
+ } else {
+ netconfProxySession = new NetconfSessionProxyImpl
+ .ProxyNetconfSessionFactory()
+ .createNetconfSession(deviceInfo, netconfController);
+ }
} catch (NetconfException e) {
deviceState = false;
throw new NetconfException("Cannot create connection and session for device " +
@@ -90,23 +116,29 @@
@Override
public NetconfSession getSession() {
- return netconfSession;
+ return netconfProxySession;
}
@Override
public void disconnect() {
deviceState = false;
try {
- netconfSession.close();
+ if (isMasterSession) {
+ netconfSession.close();
+ }
+ netconfProxySession.close();
} catch (NetconfException e) {
log.warn("Cannot communicate with the device {} session already closed", netconfDeviceInfo);
}
}
@Override
+ public boolean isMasterSession() {
+ return isMasterSession;
+ }
+
+ @Override
public NetconfDeviceInfo getDeviceInfo() {
return netconfDeviceInfo;
}
-
-
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
new file mode 100644
index 0000000..1a57d4d
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/DefaultNetconfProxyMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl.impl;
+
+import com.google.common.collect.ImmutableList;
+import org.onosproject.net.DeviceId;
+import org.onosproject.netconf.NetconfProxyMessage;
+
+import java.util.List;
+
+/**
+ * Default implementation of Netconf Proxy Message.
+ */
+public class DefaultNetconfProxyMessage implements NetconfProxyMessage {
+
+ private final SubjectType subjectType;
+ private final DeviceId deviceId;
+ private final List<String> arguments;
+
+ /**
+ * Create new NetconfProxyMessage with provided informations.
+ * @param subType Message subject type.
+ * @param devId Device information that recieve message.
+ * @param args Messages arguments.
+ */
+ public DefaultNetconfProxyMessage(SubjectType subType,
+ DeviceId devId,
+ List<String> args) {
+ subjectType = subType;
+ deviceId = devId;
+ arguments = args;
+ }
+
+ @Override
+ public SubjectType subjectType() {
+ return subjectType;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ @Override
+ public List<String> arguments() {
+ return ImmutableList.copyOf(arguments);
+ }
+}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index ece7ca2..b793c13 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -16,15 +16,28 @@
package org.onosproject.netconf.ctl.impl;
+
import org.apache.commons.lang3.tuple.Triple;
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Lists;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.onlab.packet.IpAddress;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.key.DeviceKey;
@@ -39,29 +52,37 @@
import org.onosproject.netconf.NetconfDeviceOutputEvent;
import org.onosproject.netconf.NetconfDeviceOutputEventListener;
import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfProxyMessage;
+import org.onosproject.netconf.NetconfProxyMessageHandler;
+import org.onosproject.netconf.NetconfSession;
import org.onosproject.netconf.config.NetconfDeviceConfig;
import org.onosproject.netconf.config.NetconfSshClientLib;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Security;
+
+import java.util.ArrayList;
import java.util.Dictionary;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.getIntegerProperty;
@@ -95,6 +116,16 @@
protected NetconfSshClientLib sshClientLib = NetconfSshClientLib.APACHE_MINA;
+
+ private static final String APACHE_MINA_STR = "apache-mina";
+
+
+ private static final MessageSubject SEND_REQUEST_SUBJECT_STRING =
+ new MessageSubject("netconf-session-master-send-message-string");
+
+ private static final MessageSubject SEND_REQUEST_SUBJECT_SET_STRING =
+ new MessageSubject("netconf-session-master-send-message-set-string");
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService cfgService;
@@ -110,6 +141,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MastershipService mastershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterCommunicationService clusterCommunicator;
+
public static final Logger log = LoggerFactory
.getLogger(NetconfControllerImpl.class);
@@ -120,26 +154,56 @@
private final NetconfDeviceOutputEventListener downListener = new DeviceDownEventListener();
protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
- protected NetconfDeviceFactory deviceFactory = DefaultNetconfDevice::new;
+ protected NetconfDeviceFactory deviceFactory = new DefaultNetconfDeviceFactory();
+
+ protected NetconfProxyMessageHandler netconfProxyMessageHandler = new NetconfProxyMessageHandlerImpl();
+
protected final ExecutorService executor =
Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
"connection-reopen-%d", log));
+ public static final Serializer SERIALIZER = Serializer.using(
+ KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(NetconfProxyMessage.class)
+ .register(NetconfProxyMessage.SubjectType.class)
+ .register(DefaultNetconfProxyMessage.class)
+ .register(String.class)
+ .build("NetconfProxySession"));
+
@Activate
public void activate(ComponentContext context) {
cfgService.registerProperties(getClass());
modified(context);
Security.addProvider(new BouncyCastleProvider());
+ clusterCommunicator.<NetconfProxyMessage, String>addSubscriber(
+ SEND_REQUEST_SUBJECT_STRING,
+ SERIALIZER::decode,
+ this::handleProxyMessage,
+ SERIALIZER::encode);
+ clusterCommunicator.<NetconfProxyMessage, Set<String>>addSubscriber(
+ SEND_REQUEST_SUBJECT_SET_STRING,
+ SERIALIZER::decode,
+ this::handleProxyMessage,
+ SERIALIZER::encode);
log.info("Started");
}
@Deactivate
public void deactivate() {
netconfDeviceMap.values().forEach(device -> {
- device.getSession().removeDeviceOutputListener(downListener);
+ if (device.isMasterSession()) {
+ try {
+ device.getSession().removeDeviceOutputListener(downListener);
+ } catch (NetconfException e) {
+ log.error("removeDeviceOutputListener Failed {}", e.getMessage());
+ }
+ }
device.disconnect();
});
+ clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_STRING);
+ clusterCommunicator.removeSubscriber(SEND_REQUEST_SUBJECT_SET_STRING);
cfgService.unregisterProperties(getClass(), false);
netconfDeviceListeners.clear();
netconfDeviceMap.clear();
@@ -228,6 +292,11 @@
@Override
public NetconfDevice connectDevice(DeviceId deviceId) throws NetconfException {
+ return connectDevice(deviceId, true);
+ }
+
+ @Override
+ public NetconfDevice connectDevice(DeviceId deviceId, boolean isMaster) throws NetconfException {
NetconfDeviceConfig netCfg = netCfgService.getConfig(
deviceId, NetconfDeviceConfig.class);
NetconfDeviceInfo deviceInfo = null;
@@ -256,65 +325,77 @@
mutex.lock();
try {
if (netconfDeviceMap.containsKey(deviceId)) {
- log.debug("Device {} is already present", deviceId);
- return netconfDeviceMap.get(deviceId);
- } else if (netCfg != null) {
+ //If not master or already has session: return, otherwise create device again.
+ if (!isMaster || netconfDeviceMap.get(deviceId).isMasterSession()) {
+ log.debug("Device {} is already present", deviceId);
+ return netconfDeviceMap.get(deviceId);
+ }
+ }
+
+ if (netCfg != null) {
log.debug("Device {} is present in NetworkConfig", deviceId);
deviceInfo = new NetconfDeviceInfo(netCfg);
} else {
log.debug("Creating NETCONF device {}", deviceId);
- Device device = deviceService.getDevice(deviceId);
- String ip, path = null;
- int port;
- if (device != null) {
- ip = device.annotations().value("ipaddress");
- port = Integer.parseInt(device.annotations().value("port"));
- path = device.annotations().value("path");
- } else {
- Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
- ip = info.getLeft();
- port = info.getMiddle();
- path = (info.getRight().isPresent() ? info.getRight().get() : null);
- }
- try {
- DeviceKey deviceKey = deviceKeyService.getDeviceKey(
- DeviceKeyId.deviceKeyId(deviceId.toString()));
- if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
- UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
-
- deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
- usernamepasswd.password(),
- IpAddress.valueOf(ip),
- port,
- path);
-
- } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
- String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
- String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
- String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
-
- deviceInfo = new NetconfDeviceInfo(username,
- password,
- IpAddress.valueOf(ip),
- port,
- path,
- sshkey);
- } else {
- log.error("Unknown device key for device {}", deviceId);
- }
- } catch (NullPointerException e) {
- throw new NetconfException("No Device Key for device " + deviceId, e);
- }
+ deviceInfo = createDeviceInfo(deviceId);
}
- NetconfDevice netconfDevicedevice = createDevice(deviceInfo);
- netconfDevicedevice.getSession().addDeviceOutputListener(downListener);
- return netconfDevicedevice;
+ NetconfDevice netconfDevice = createDevice(deviceInfo, isMaster);
+ if (isMaster) {
+ netconfDevice.getSession().addDeviceOutputListener(downListener);
+ }
+ return netconfDevice;
} finally {
mutex.unlock();
}
}
+ private NetconfDeviceInfo createDeviceInfo(DeviceId deviceId) throws NetconfException {
+ Device device = deviceService.getDevice(deviceId);
+ String ip, path = null;
+ int port;
+ if (device != null) {
+ ip = device.annotations().value("ipaddress");
+ port = Integer.parseInt(device.annotations().value("port"));
+ } else {
+ Triple<String, Integer, Optional<String>> info = extractIpPortPath(deviceId);
+ ip = info.getLeft();
+ port = info.getMiddle();
+ path = (info.getRight().isPresent() ? info.getRight().get() : null);
+ }
+ try {
+ DeviceKey deviceKey = deviceKeyService.getDeviceKey(
+ DeviceKeyId.deviceKeyId(deviceId.toString()));
+ if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
+ UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
+
+ return new NetconfDeviceInfo(usernamepasswd.username(),
+ usernamepasswd.password(),
+ IpAddress.valueOf(ip),
+ port,
+ path);
+
+ } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
+ String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
+ String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
+ String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+
+ return new NetconfDeviceInfo(username,
+ password,
+ IpAddress.valueOf(ip),
+ port,
+ path,
+ sshkey);
+ } else {
+ log.error("Unknown device key for device {}", deviceId);
+ throw new NetconfException("Unknown device key for device " + deviceId);
+ }
+ } catch (NullPointerException e) {
+ log.error("No Device Key for device {}, {}", deviceId, e);
+ throw new NetconfException("No Device Key for device " + deviceId, e);
+ }
+ }
+
@Override
public void disconnectDevice(DeviceId deviceId, boolean remove) {
if (!netconfDeviceMap.containsKey(deviceId)) {
@@ -364,7 +445,12 @@
}
private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
- NetconfDevice netconfDevice = deviceFactory.createNetconfDevice(deviceInfo);
+ return createDevice(deviceInfo, true);
+ }
+
+ private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo,
+ boolean isMaster) throws NetconfException {
+ NetconfDevice netconfDevice = deviceFactory.createNetconfDevice(deviceInfo, isMaster);
netconfDeviceMap.put(deviceInfo.getDeviceId(), netconfDevice);
for (NetconfDeviceListener l : netconfDeviceListeners) {
l.deviceAdded(deviceInfo.getDeviceId());
@@ -383,6 +469,102 @@
return netconfDeviceMap.keySet();
}
+ @Override
+ public <T> CompletableFuture<T> executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+ DeviceId deviceId = proxyMessage.deviceId();
+ if (deviceService.getRole(deviceId).equals(MastershipRole.MASTER)) {
+ return CompletableFuture.completedFuture(
+ netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+ } else {
+ MessageSubject subject;
+ switch (proxyMessage.subjectType()) {
+ case GET_DEVICE_CAPABILITIES_SET:
+ subject = SEND_REQUEST_SUBJECT_SET_STRING;
+ break;
+ default:
+ subject = SEND_REQUEST_SUBJECT_STRING;
+ break;
+ }
+
+ return clusterCommunicator
+ .sendAndReceive(proxyMessage,
+ subject,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ mastershipService.getMasterFor(deviceId));
+ }
+ }
+
+ private <T> CompletableFuture<T> handleProxyMessage(NetconfProxyMessage proxyMessage) {
+ try {
+ return CompletableFuture.completedFuture(
+ netconfProxyMessageHandler.handleIncomingMessage(proxyMessage));
+ } catch (NetconfException e) {
+ CompletableFuture<T> errorFuture = new CompletableFuture<>();
+ errorFuture.completeExceptionally(e);
+ return errorFuture;
+ }
+ }
+
+ /**
+ * Netconf Proxy Message Handler Implementation class.
+ */
+ private class NetconfProxyMessageHandlerImpl implements NetconfProxyMessageHandler {
+
+ @Override
+ public <T> T handleIncomingMessage(NetconfProxyMessage proxyMessage) throws NetconfException {
+ //TODO: Should throw Netconf Exception in error cases?
+ DeviceId deviceId = proxyMessage.deviceId();
+ NetconfProxyMessage.SubjectType subjectType = proxyMessage.subjectType();
+ NetconfSession secureTransportSession;
+ if (netconfDeviceMap.get(deviceId).isMasterSession()) {
+ secureTransportSession = netconfDeviceMap.get(deviceId).getSession();
+ } else {
+ throw new NetconfException("Ssh session not present");
+ }
+ T reply = null;
+ ArrayList<String> arguments = Lists.newArrayList(proxyMessage.arguments());
+ try {
+ switch (subjectType) {
+ case RPC:
+ reply = (T) secureTransportSession.rpc(arguments.get(0))
+ .get(netconfReplyTimeout, TimeUnit.SECONDS);
+ break;
+ case REQUEST_SYNC:
+ reply = (T) secureTransportSession.requestSync(arguments.get(0));
+ break;
+ case START_SUBSCRIPTION:
+ secureTransportSession.startSubscription(arguments.get(0));
+ break;
+ case END_SUBSCRIPTION:
+ secureTransportSession.endSubscription();
+ break;
+ case REQUEST:
+ reply = (T) secureTransportSession.request(arguments.get(0))
+ .get(netconfReplyTimeout, TimeUnit.SECONDS);
+ break;
+ case GET_SESSION_ID:
+ reply = (T) secureTransportSession.getSessionId();
+ break;
+ case SET_ONOS_CAPABILITIES:
+ secureTransportSession.setOnosCapabilities(arguments);
+ break;
+ case GET_DEVICE_CAPABILITIES_SET:
+ reply = (T) secureTransportSession.getDeviceCapabilitiesSet();
+ break;
+ default:
+ log.error("Not yet supported for session method {}", subjectType);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new NetconfException(e.getMessage(), e.getCause());
+ } catch (ExecutionException | TimeoutException e) {
+ throw new NetconfException(e.getMessage(), e.getCause());
+ }
+
+ return reply;
+ }
+ }
/**
* Device factory for the specific NetconfDeviceImpl.
@@ -393,9 +575,20 @@
private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory {
@Override
- public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo)
+ public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+ return createNetconfDevice(netconfDeviceInfo, true);
+ }
+
+ @Beta
+ @Override
+ public NetconfDevice createNetconfDevice(NetconfDeviceInfo netconfDeviceInfo,
+ boolean isMaster)
throws NetconfException {
- return new DefaultNetconfDevice(netconfDeviceInfo);
+ if (isMaster) {
+ log.info("Creating NETCONF session to {} with {}",
+ netconfDeviceInfo.getDeviceId(), NetconfSshClientLib.APACHE_MINA);
+ }
+ return new DefaultNetconfDevice(netconfDeviceInfo, isMaster, NetconfControllerImpl.this);
}
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 36c9ec1..2d082e0 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -41,6 +41,7 @@
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverService;
import org.onosproject.netconf.AbstractNetconfSession;
+import org.onosproject.netconf.NetconfController;
import org.onosproject.netconf.NetconfDeviceInfo;
import org.onosproject.netconf.NetconfDeviceOutputEvent;
import org.onosproject.netconf.NetconfDeviceOutputEvent.Type;
@@ -505,10 +506,15 @@
@Override
public String requestSync(String request) throws NetconfException {
- String reply = sendRequest(request);
+ return requestSync(request, replyTimeout);
+ }
+
+ @Override
+ public String requestSync(String request, int timeout) throws NetconfException {
+ String reply = sendRequest(request, timeout);
if (!checkReply(reply)) {
throw new NetconfException("Request not successful with device "
- + deviceInfo + " with reply " + reply);
+ + deviceInfo + " with reply " + reply);
}
return reply;
}
@@ -621,13 +627,23 @@
return streamHandler.sendMessage(request, messageId);
}
+ private String sendRequest(String request, boolean isHello) throws NetconfException {
+ return sendRequest(request, isHello, replyTimeout);
+ }
+
private String sendRequest(String request) throws NetconfException {
// FIXME probably chunk-encoding too early
request = formatNetconfMessage(request);
- return sendRequest(request, false);
+ return sendRequest(request, false, replyTimeout);
}
- private String sendRequest(String request, boolean isHello) throws NetconfException {
+ private String sendRequest(String request, int timeout) throws NetconfException {
+ // FIXME probably chunk-encoding too early
+ request = formatNetconfMessage(request);
+ return sendRequest(request, false, timeout);
+ }
+
+ private String sendRequest(String request, boolean isHello, int timeout) throws NetconfException {
checkAndReestablish();
int messageId = -1;
if (!isHello) {
@@ -859,7 +875,8 @@
public static class MinaSshNetconfSessionFactory implements NetconfSessionFactory {
@Override
- public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo) throws NetconfException {
+ public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+ NetconfController netconfController) throws NetconfException {
return new NetconfSessionMinaImpl(netconfDeviceInfo);
}
}
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
new file mode 100644
index 0000000..5734820
--- /dev/null
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionProxyImpl.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netconf.ctl.impl;
+
+import org.onosproject.netconf.AbstractNetconfSession;
+import org.onosproject.netconf.NetconfController;
+import org.onosproject.netconf.NetconfDeviceInfo;
+import org.onosproject.netconf.NetconfException;
+import org.onosproject.netconf.NetconfProxyMessage;
+import org.onosproject.netconf.NetconfSession;
+import org.onosproject.netconf.NetconfSessionFactory;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class NetconfSessionProxyImpl extends AbstractNetconfSession {
+ protected final NetconfDeviceInfo deviceInfo;
+ protected final NetconfController netconfController;
+ private static final Logger log = getLogger(NetconfSessionMinaImpl.class);
+
+ private static final int CONFIGURE_REPLY_TIMEOUT_SEC = 5;
+
+ public NetconfSessionProxyImpl(NetconfDeviceInfo deviceInfo,
+ NetconfController controller) {
+ this.deviceInfo = deviceInfo;
+ this.netconfController = controller;
+ }
+
+ private <T> CompletableFuture<T> executeAtMasterCompletableFuture(
+ NetconfProxyMessage proxyMessage)
+ throws NetconfException {
+ CompletableFuture<T> reply = netconfController.executeAtMaster(proxyMessage);
+ return reply;
+ }
+
+ private <T> T executeAtMaster(NetconfProxyMessage proxyMessage) throws NetconfException {
+ return executeAtMaster(proxyMessage, CONFIGURE_REPLY_TIMEOUT_SEC);
+ }
+
+ private <T> T executeAtMaster(NetconfProxyMessage proxyMessage, int timeout) throws NetconfException {
+ CompletableFuture<T> reply = executeAtMasterCompletableFuture(proxyMessage);
+ try {
+ return reply.get(timeout, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new NetconfException(e.getMessage(), e.getCause());
+ } catch (ExecutionException e) {
+ throw new NetconfException(e.getMessage(), e.getCause());
+ } catch (TimeoutException e) {
+ throw new NetconfException(e.getMessage(), e.getCause());
+ }
+
+ }
+
+ protected NetconfProxyMessage makeProxyMessage(NetconfProxyMessage.SubjectType subjectType, String request) {
+ return new DefaultNetconfProxyMessage(subjectType,
+ deviceInfo.getDeviceId(),
+ new ArrayList<>(Arrays.asList(request)));
+ }
+
+ @Override
+ public CompletableFuture<String> rpc(String request)
+ throws NetconfException {
+ NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.RPC,
+ request);
+
+ return executeAtMasterCompletableFuture(proxyMessage);
+ }
+
+ @Override
+ public CompletableFuture<String> request(String request)
+ throws NetconfException {
+ NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.REQUEST,
+ request);
+
+ return executeAtMasterCompletableFuture(proxyMessage);
+ }
+
+ @Override
+ public String requestSync(String request, int timeout)
+ throws NetconfException {
+ NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.REQUEST_SYNC,
+ request);
+
+ return executeAtMaster(proxyMessage, timeout);
+ }
+
+ @Override
+ public String requestSync(String request)
+ throws NetconfException {
+
+ return requestSync(request, CONFIGURE_REPLY_TIMEOUT_SEC);
+ }
+
+ @Override
+ public void startSubscription(String filterSchema) throws NetconfException {
+ NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.START_SUBSCRIPTION,
+ filterSchema);
+
+ executeAtMaster(proxyMessage);
+ }
+
+ @Override
+ public void endSubscription() throws NetconfException {
+ NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.END_SUBSCRIPTION,
+ "");
+
+ executeAtMaster(proxyMessage);
+ }
+
+ @Override
+ public String getSessionId() {
+ NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.GET_SESSION_ID,
+ "");
+ try {
+ return executeAtMaster(proxyMessage);
+ } catch (NetconfException e) {
+ log.error("Cannot get session id : {}", e);
+ return String.valueOf(-1);
+ }
+
+ }
+
+ @Override
+ public Set<String> getDeviceCapabilitiesSet() {
+ NetconfProxyMessage proxyMessage = makeProxyMessage(NetconfProxyMessage.SubjectType.GET_DEVICE_CAPABILITIES_SET,
+ "");
+ try {
+ return executeAtMaster(proxyMessage);
+ } catch (NetconfException e) {
+ log.error("Could not get device capabilities : {}", e);
+ return null;
+ }
+ }
+
+
+ @Override
+ public void setOnosCapabilities(Iterable<String> capabilities) {
+ ArrayList<String> capabilitiesList = new ArrayList<>();
+ capabilities.spliterator().forEachRemaining(c -> capabilitiesList.add(c));
+
+ NetconfProxyMessage proxyMessage =
+ new DefaultNetconfProxyMessage(
+ NetconfProxyMessage.SubjectType.SET_ONOS_CAPABILITIES,
+ deviceInfo.getDeviceId(), capabilitiesList);
+ try {
+ executeAtMaster(proxyMessage);
+ } catch (NetconfException e) {
+ log.error("Could not set onos capabilities : {}", e);
+ }
+ }
+
+
+ public static class ProxyNetconfSessionFactory implements NetconfSessionFactory {
+
+ @Override
+ public NetconfSession createNetconfSession(NetconfDeviceInfo netconfDeviceInfo,
+ NetconfController netconfController) {
+ return new NetconfSessionProxyImpl(netconfDeviceInfo, netconfController);
+ }
+ }
+}
diff --git a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
index bbac123..5e54559 100644
--- a/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
+++ b/protocols/netconf/ctl/src/test/java/org/onosproject/netconf/ctl/impl/NetconfControllerImplTest.java
@@ -46,6 +46,8 @@
import org.onosproject.netconf.NetconfSession;
import org.onosproject.netconf.config.NetconfDeviceConfig;
import org.onosproject.netconf.config.NetconfSshClientLib;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.osgi.service.component.ComponentContext;
import java.io.ByteArrayInputStream;
@@ -120,6 +122,8 @@
private static DeviceKeyService deviceKeyService = new NetconfDeviceKeyServiceMock();
private final NetworkConfigRegistry netCfgService = new MockNetworkConfigRegistry();
private final MastershipService mastershipService = new MockmastershipService();
+ private final ClusterCommunicationService clusterCommunicationService =
+ new ClusterCommunicationServiceMock();
private final ComponentContext context = new MockComponentContext();
@@ -135,6 +139,7 @@
NetconfControllerImpl.netconfConnectTimeout = NETCONF_CONNECT_TIMEOUT_DEFAULT;
NetconfControllerImpl.netconfIdleTimeout = NETCONF_IDLE_TIMEOUT_DEFAULT;
NetconfControllerImpl.netconfReplyTimeout = NETCONF_REPLY_TIMEOUT_DEFAULT;
+ ctrl.clusterCommunicator = clusterCommunicationService;
//Creating mock devices
deviceInfo1 = new NetconfDeviceInfo("device1", "001", IpAddress.valueOf(DEVICE_1_IP), DEVICE_1_PORT);
@@ -547,4 +552,6 @@
return true;
}
}
+ private class ClusterCommunicationServiceMock extends ClusterCommunicationServiceAdapter {
+ }
}
diff --git a/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java b/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java
index d8bb0c7..9b65f25 100644
--- a/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java
+++ b/providers/netconf/alarm/src/main/java/org/onosproject/provider/netconf/alarm/NetconfAlarmProvider.java
@@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
+import org.onosproject.netconf.NetconfException;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -95,10 +96,16 @@
providerService = providerRegistry.register(this);
controller.getNetconfDevices().forEach(id -> {
NetconfDevice device = controller.getNetconfDevice(id);
- NetconfSession session = device.getSession();
- InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
- session.addDeviceOutputListener(listener);
- idNotificationListenerMap.put(id, listener);
+ if (device.isMasterSession()) {
+ NetconfSession session = device.getSession();
+ InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
+ try {
+ session.addDeviceOutputListener(listener);
+ } catch (NetconfException e) {
+ log.error("addDeviceOutputListener Error {} ", e.getMessage());
+ }
+ idNotificationListenerMap.put(id, listener);
+ }
});
controller.addDeviceListener(deviceListener);
log.info("NetconfAlarmProvider Started");
@@ -108,9 +115,14 @@
public void deactivate() {
providerRegistry.unregister(this);
idNotificationListenerMap.forEach((id, listener) -> {
- controller.getNetconfDevice(id)
- .getSession()
- .removeDeviceOutputListener(listener);
+ NetconfDevice device = controller.getNetconfDevice(id);
+ if (device.isMasterSession()) {
+ try {
+ device.getSession().removeDeviceOutputListener(listener);
+ } catch (NetconfException e) {
+ log.error("RemoveDeviceOutputListener Error {}", e.getMessage());
+ }
+ }
});
controller.removeDeviceListener(deviceListener);
providerService = null;
@@ -161,11 +173,15 @@
@Override
public void deviceAdded(DeviceId deviceId) {
- NetconfDevice device = controller.getNetconfDevice(deviceId);
- NetconfSession session = device.getSession();
- InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
- session.addDeviceOutputListener(listener);
- idNotificationListenerMap.put(deviceId, listener);
+ try {
+ NetconfDevice device = controller.getNetconfDevice(deviceId);
+ NetconfSession session = device.getSession();
+ InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
+ session.addDeviceOutputListener(listener);
+ idNotificationListenerMap.put(deviceId, listener);
+ } catch (NetconfException e) {
+ log.error("Device add fail {}", e.getMessage());
+ }
}
@Override
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index 98d135f..80b9108 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -274,6 +274,8 @@
case STANDBY:
//TODO this issue a warning on the first election/connection
controller.disconnectDevice(deviceId, false);
+ withDeviceLock(
+ () -> initiateConnection(deviceId, newRole), deviceId).run();
providerService.receivedRoleReply(deviceId, newRole, MastershipRole.STANDBY);
//else no-op
break;
@@ -531,6 +533,42 @@
UNKNOWN, UNKNOWN, UNKNOWN, UNKNOWN, cid, true, annotations.build());
}
+ /**
+ * Will appropriately create connections with the device.
+ * For Master role: will create secure transport and proxy sessions.
+ * For Standby role: will create only proxy session and disconnect secure transport session.
+ * For none role: will disconnect all sessions.
+ *
+ * @param deviceId device id
+ * @param newRole new role
+ */
+ private void initiateConnection(DeviceId deviceId, MastershipRole newRole) {
+ try {
+ if (isReachable(deviceId)) {
+ NetconfDevice device = null;
+ if (newRole.equals(MastershipRole.MASTER)) {
+ device = controller.connectDevice(deviceId, true);
+ } else if (newRole.equals(MastershipRole.STANDBY)) {
+ device = controller.connectDevice(deviceId, false);
+ }
+
+ if (device != null) {
+ providerService.receivedRoleReply(deviceId, newRole, newRole);
+ } else {
+ providerService.receivedRoleReply(deviceId, newRole, MastershipRole.NONE);
+ }
+
+ }
+ } catch (Exception e) {
+ if (deviceService.getDevice(deviceId) != null) {
+ providerService.deviceDisconnected(deviceId);
+ }
+ deviceKeyAdminService.removeKey(DeviceKeyId.deviceKeyId(deviceId.toString()));
+ throw new IllegalStateException(new NetconfException(
+ "Can't connect to NETCONF device " + deviceId, e));
+ }
+ }
+
private void discoverOrUpdatePorts(DeviceId deviceId) {
retriedPortDiscoveryMap.put(deviceId, new AtomicInteger(0));