XMPP as SBI support: implementation of core XMPP and Xmpp Device Provider
State machine handled by XmppSession interface, most of tests implemented
XmppDeviceFactory re-designed, tests updated
pom and BUCK files updated
Change-Id: I4c6955e091169c945415084cbb000c61b474c0fc
diff --git a/lib/BUCK b/lib/BUCK
index d088fdd..68e2f65 100644
--- a/lib/BUCK
+++ b/lib/BUCK
@@ -1599,3 +1599,47 @@
visibility = [ 'PUBLIC' ],
)
+remote_jar (
+ name = 'tinder-xmpp',
+ out = 'tinder-1.3.0.jar',
+ url = 'mvn:org.igniterealtime:tinder:jar:1.3.0',
+ sha1 = '46353ded2a1d1a87d17600206d61814eb0b8a711',
+ maven_coords = 'org.igniterealtime:tinder:jar:NON-OSGI:1.3.0',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'aalto-xml',
+ out = 'aalto-xml-1.0.0.jar',
+ url = 'mvn:com.fasterxml:aalto-xml:jar:1.0.0',
+ sha1 = 'aeae9e8a71914e7f5efc8a69d2f5cb1f2224f2c6',
+ maven_coords = 'com.fasterxml:aalto-xml:1.0.0',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'stax2-api',
+ out = 'stax2-api-4.0.0.jar',
+ url = 'mvn:org.codehaus.woodstox:stax2-api:jar:4.0.0',
+ sha1 = '6fa8b05f7587a3cb819d223ee0b0de0c126e3dd1',
+ maven_coords = 'org.codehaus.woodstox:stax2-api:4.0.0',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'concurrent-hashmap',
+ out = 'concurrentlinkedhashmap-lru-1.0.jar',
+ url = 'mvn:com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:jar:1.0',
+ sha1 = 'db7b7a28b835db4717d4aaf31f5d4441887a6d46',
+ maven_coords = 'com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:jar:NON-OSGI:1.0',
+ visibility = [ 'PUBLIC' ],
+)
+
+remote_jar (
+ name = 'gnu-idn',
+ out = 'libidn-1.15.jar',
+ url = 'mvn:org.gnu.inet:libidn:jar:1.15',
+ sha1 = 'b5bede3c1c031a827b604da31768ddaf833495c6',
+ maven_coords = 'org.gnu.inet:libidn:jar:NON-OSGI:1.15',
+ visibility = [ 'PUBLIC' ],
+)
diff --git a/lib/deps.json b/lib/deps.json
index 0b04ba4..8d5783e 100644
--- a/lib/deps.json
+++ b/lib/deps.json
@@ -277,6 +277,11 @@
"bcprov-jdk15on": "mvn:org.bouncycastle:bcprov-jdk15on:1.58",
"hamcrest-optional": "mvn:com.spotify:hamcrest-optional:1.1.0",
"swagger-annotations": "mvn:io.swagger:swagger-annotations:1.5.16",
- "kafka-clients": "mvn:org.apache.servicemix.bundles:org.apache.servicemix.bundles.kafka-clients:0.8.2.2_1"
+ "kafka-clients": "mvn:org.apache.servicemix.bundles:org.apache.servicemix.bundles.kafka-clients:0.8.2.2_1",
+ "tinder-xmpp": "mvn:org.igniterealtime:tinder:1.3.0",
+ "aalto-xml": "mvn:com.fasterxml:aalto-xml:1.0.0",
+ "stax2-api": "mvn:org.codehaus.woodstox:stax2-api:4.0.0",
+ "concurrent-hashmap": "mvn:com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.0",
+ "gnu-idn": "mvn:org.gnu.inet:libidn:1.15"
}
}
diff --git a/modules.defs b/modules.defs
index 65bef76..f995044 100644
--- a/modules.defs
+++ b/modules.defs
@@ -56,6 +56,8 @@
'//protocols/tl1/ctl:onos-protocols-tl1-ctl',
'//protocols/restconf/client/api:onos-protocols-restconf-client-api',
'//protocols/restconf/client/ctl:onos-protocols-restconf-client-ctl',
+ '//protocols/xmpp/core/api:onos-protocols-xmpp-core-api',
+ '//protocols/xmpp/core/ctl:onos-protocols-xmpp-core-ctl',
'//drivers/utilities:onos-drivers-utilities',
@@ -139,6 +141,7 @@
'//providers/general:onos-providers-general-oar',
'//providers/p4runtime:onos-providers-p4runtime-oar',
# '//providers/ietfte:onos-providers-ietfte-oar',
+ '//providers/xmpp/device:onos-providers-xmpp-device-oar',
]
ONOS_APPS = [
@@ -242,6 +245,7 @@
'//protocols/grpc:onos-protocols-grpc-oar',
'//protocols/p4runtime:onos-protocols-p4runtime-oar',
'//protocols/gnmi:onos-protocols-gnmi-oar',
+ '//protocols/xmpp/core:onos-protocols-xmpp-core-oar',
]
MODELS = [
diff --git a/protocols/pom.xml b/protocols/pom.xml
index 465b23b..a231ce9 100644
--- a/protocols/pom.xml
+++ b/protocols/pom.xml
@@ -45,6 +45,7 @@
<module>tl1</module>
<module>grpc</module>
<module>p4runtime</module>
+ <module>xmpp</module>
</modules>
<dependencies>
diff --git a/protocols/xmpp/core/BUCK b/protocols/xmpp/core/BUCK
new file mode 100644
index 0000000..97af973
--- /dev/null
+++ b/protocols/xmpp/core/BUCK
@@ -0,0 +1,23 @@
+BUNDLES = [
+ '//protocols/xmpp/core/api:onos-protocols-xmpp-core-api',
+ '//protocols/xmpp/core/ctl:onos-protocols-xmpp-core-ctl',
+ '//lib:tinder-xmpp',
+ '//lib:org.apache.servicemix.bundles.dom4j',
+ '//lib:netty-common',
+ '//lib:netty-transport',
+ '//lib:netty-buffer',
+ '//lib:netty-codec',
+ '//lib:stax2-api',
+ '//lib:aalto-xml',
+ '//lib:concurrent-hashmap',
+ '//lib:gnu-idn',
+]
+
+onos_app(
+ app_name = 'org.onosproject.protocols.xmpp',
+ title = 'XMPP Core Protocol Subsystem',
+ category = 'Protocol',
+ url = 'https://wiki.onosproject.org/display/ONOS/XMPP+as+SBI',
+ description = 'ONOS XMPP core protocol subsystem',
+ included_bundles = BUNDLES,
+)
\ No newline at end of file
diff --git a/protocols/xmpp/core/api/BUCK b/protocols/xmpp/core/api/BUCK
new file mode 100644
index 0000000..8d191ee
--- /dev/null
+++ b/protocols/xmpp/core/api/BUCK
@@ -0,0 +1,12 @@
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+ '//core/api:onos-api',
+ '//lib:tinder-xmpp',
+ '//lib:netty-transport',
+ '//lib:netty-common',
+ '//lib:org.apache.servicemix.bundles.dom4j',
+]
+
+osgi_jar_with_tests (
+ deps = COMPILE_DEPS,
+)
\ No newline at end of file
diff --git a/protocols/xmpp/core/api/pom.xml b/protocols/xmpp/core/api/pom.xml
new file mode 100644
index 0000000..a53c3df
--- /dev/null
+++ b/protocols/xmpp/core/api/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-xmpp-core</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-xmpp-core-api</artifactId>
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.igniterealtime</groupId>
+ <artifactId>tinder</artifactId>
+ <version>RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-xmpp-ctl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>dom4j</groupId>
+ <artifactId>dom4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+
+</project>
\ No newline at end of file
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppConstants.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppConstants.java
new file mode 100644
index 0000000..0904299
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+/*
+ * Stores XMPP protocol constant parameters.
+ */
+public final class XmppConstants {
+
+ private XmppConstants() {}
+
+ public static final String IQ_QNAME = "iq";
+ public static final String PRESENCE_QNAME = "presence";
+ public static final String MESSAGE_QNAME = "message";
+ public static final String STREAM_QNAME = "stream";
+ public static final String SERVER_JID = "xmpp.onosproject.org";
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
new file mode 100644
index 0000000..8b500a3
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+/**
+ * Abstraction of a XMPP controller. Serves as a one stop
+ * shop for obtaining XMPP devices and (un)register listeners
+ * on XMPP events
+ */
+public interface XmppController {
+
+ /**
+ * Method allows to retrieve XMPP device for given XmppDeviceId, if one exists.
+ *
+ * @param xmppDeviceId the device to retrieve
+ * @return the interface to XMPP Device
+ */
+ XmppDevice getDevice(XmppDeviceId xmppDeviceId);
+
+ /**
+ * Register a listener for device events.
+ *
+ * @param deviceListener the listener to notify
+ */
+ void addXmppDeviceListener(XmppDeviceListener deviceListener);
+
+ /**
+ * Unregister a listener for device events.
+ *
+ * @param deviceListener the listener to unregister
+ */
+ void removeXmppDeviceListener(XmppDeviceListener deviceListener);
+
+ /**
+ * Register a listener for IQ stanza of XMPP protocol.
+ *
+ * @param iqListener the listener to notify
+ */
+ void addXmppIqListener(XmppIqListener iqListener);
+
+ /**
+ * Unregister a listener for IQ stanza of XMPP protocol.
+ *
+ * @param iqListener the listener to unregister
+ */
+ void removeXmppIqListener(XmppIqListener iqListener);
+
+ /**
+ * Register a listener for Message stanza of XMPP protocol.
+ *
+ * @param messageListener the listener to notify
+ */
+ void addXmppMessageListener(XmppMessageListener messageListener);
+
+ /**
+ * Unregister a listener for Message stanza of XMPP protocol.
+ *
+ * @param messageListener the listener to unregister
+ */
+ void removeXmppMessageListener(XmppMessageListener messageListener);
+
+ /**
+ * Register a listener for Presence stanza of XMPP protocol.
+ *
+ * @param presenceListener the listener to notify
+ */
+ void addXmppPresenceListener(XmppPresenceListener presenceListener);
+
+ /**
+ * Unregister a listener for Presence stanza of XMPP protocol.
+ *
+ * @param presenceListener the listener to unregister
+ */
+ void removeXmppPresenceListener(XmppPresenceListener presenceListener);
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDevice.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDevice.java
new file mode 100644
index 0000000..93a6cd7
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDevice.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.dom4j.Document;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.PacketError;
+
+import java.net.InetSocketAddress;
+
+
+/**
+ * Abstracts XMPP device.
+ */
+public interface XmppDevice {
+
+ /**
+ * Returns an associated Netty channel for device.
+ *
+ * @return Netty Socket Channel
+ */
+ XmppSession getSession();
+
+ /**
+ * Returns an IP address of underlaying XMPP device/client.
+ *
+ * @return IP address
+ */
+ InetSocketAddress getIpAddress();
+
+ /**
+ * Register a device that has just connected to the system.
+ *
+ */
+ void registerConnectedDevice();
+
+ /**
+ * Disconnects the device by closing the TCP connection and unregistering device.
+ *
+ */
+ void disconnectDevice();
+
+
+ /**
+ * Sends a XMPP packet to the client.
+ *
+ * @param packet the XMPP packet to send
+ */
+ void sendPacket(Packet packet);
+
+ /**
+ * Method for sending raw XML data as XMPP packet.
+ *
+ * @param document the XML data
+ */
+ void writeRawXml(Document document);
+
+ /**
+ * Handle a XMPP packet from device.
+ *
+ * @param packet the XMPP packet
+ */
+ void handlePacket(Packet packet);
+
+ /**
+ * Sends a XMPP error to client.
+ *
+ * @param packetError XMPP error message
+ */
+ void sendError(PacketError packetError);
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceAgent.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceAgent.java
new file mode 100644
index 0000000..fa4fcfd
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceAgent.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.xmpp.packet.Packet;
+
+/**
+ * Responsible for keeping track of the current set of devices
+ * connected to the system. As well as notifying Xmpp Stanza listeners.
+ *
+ */
+public interface XmppDeviceAgent {
+
+ /**
+ * Add a device that has just connected to the system.
+ * @param deviceId the identifier of device to add.
+ * @param device the actual device object.
+ * @return true if added, false otherwise.
+ */
+ boolean addConnectedDevice(XmppDeviceId deviceId, XmppDevice device);
+
+ /**
+ * Remove a device from a local repository that has been disconnected
+ * from the local controller. Notify device listeners.
+ * @param deviceId the identifier of device to remove.
+ */
+ void removeConnectedDevice(XmppDeviceId deviceId);
+
+ /**
+ * Returns XMPP device object that is stored in the local repository
+ * containing already connected devices.
+ * @param deviceId the identifier of device
+ * @return the XMPP device object
+ */
+ XmppDevice getDevice(XmppDeviceId deviceId);
+
+ /**
+ * Process an event (incoming Packet) coming from a device.
+ * @param deviceId the identifier of device the packet came on.
+ * @param packet the packet to process
+ */
+ void processUpstreamEvent(XmppDeviceId deviceId, Packet packet);
+
+}
+
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceFactory.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceFactory.java
new file mode 100644
index 0000000..42ebd20
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.xmpp.packet.JID;
+
+/**
+ * Interface for creating new XMPP devices.
+ */
+public interface XmppDeviceFactory {
+
+
+ /**
+ * Creates instance of XMPP device.
+ *
+ * @param jabberId Jabber ID identifying XMPP device
+ * @param session session associated with XMPP device
+ * @return XMPP device
+ */
+ public XmppDevice getXmppDevice(JID jabberId, XmppSession session);
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceId.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceId.java
new file mode 100644
index 0000000..41225ea
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceId.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.onlab.util.Identifier;
+import org.onosproject.net.DeviceId;
+import org.xmpp.packet.JID;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * The class representing a network device identifier.
+ * This class is immutable.
+ */
+public final class XmppDeviceId extends Identifier<String> {
+
+ private static final String SCHEME = "xmpp";
+
+ private JID jid;
+
+ public XmppDeviceId(JID jid) {
+ super(uri(jid.toString()).toString());
+ this.jid = jid;
+ }
+
+ @Override
+ public String toString() {
+ return identifier.toString();
+ }
+
+ public JID getJid() {
+ return jid;
+ }
+
+ public static URI uri(String string) {
+ try {
+ return new URI(SCHEME, string, null);
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+
+ public static DeviceId asDeviceId(JID jid) {
+ return DeviceId.deviceId(XmppDeviceId.uri(jid));
+ }
+
+ public static URI uri(JID jid) {
+ try {
+ return new URI(SCHEME, jid.toString(), null);
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceListener.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceListener.java
new file mode 100644
index 0000000..7fa6eda
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppDeviceListener.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+/**
+ * Allows for providers interested in XMPP device events to be notified.
+ */
+public interface XmppDeviceListener {
+
+ /**
+ * Notify that the device was added.
+ *
+ * @param deviceId identifier of the device where the event occurred
+ */
+ void deviceConnected(XmppDeviceId deviceId);
+
+ /**
+ * Notify that the device was removed.
+ *
+ * @param deviceId identifier of the device where the event occurred
+ */
+ void deviceDisconnected(XmppDeviceId deviceId);
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppIqListener.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppIqListener.java
new file mode 100644
index 0000000..046bfb1
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppIqListener.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.xmpp.packet.IQ;
+
+/**
+ * Allows for Providers interested in XMPP IQ stanzas to be notified.
+ */
+public interface XmppIqListener {
+
+ /**
+ * Handles incoming IQ stanzas.
+ *
+ * @param iq IQ stanza to handle
+ */
+ void handleIqStanza(IQ iq);
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppMessageListener.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppMessageListener.java
new file mode 100644
index 0000000..67aa245
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppMessageListener.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.xmpp.packet.Message;
+
+
+/**
+ * Allows for providers interested in XMPP Message stanzas to be notified.
+ */
+public interface XmppMessageListener {
+
+ /**
+ * Handles incoming Message stanzas.
+ *
+ * @param message Message stanza to handle
+ */
+ void handleMessageStanza(Message message);
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppPresenceListener.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppPresenceListener.java
new file mode 100644
index 0000000..3c1ae3d
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppPresenceListener.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.xmpp.packet.Presence;
+
+/**
+ * Allows for providers interested in XMPP Presence stanzas to be notified.
+ */
+public interface XmppPresenceListener {
+
+ /**
+ * Handles incoming Presence stanzas.
+ *
+ * @param presence Presence stanza to handle
+ */
+ void handlePresenceStanza(Presence presence);
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppSession.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppSession.java
new file mode 100644
index 0000000..1d4a714
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppSession.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core;
+
+import org.xmpp.packet.Packet;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Represents XMPP session.
+ */
+public interface XmppSession {
+
+ /**
+ * Returns session state.
+ *
+ * @return true if active.
+ */
+ boolean isActive();
+
+ /**
+ * Returns address of remote client.
+ *
+ * @return socket address
+ */
+ InetSocketAddress remoteAddress();
+
+ /**
+ * Requests to close this XMPP session.
+ */
+ void closeSession();
+
+ /**
+ * Sends XMPP packet over the session.
+ *
+ * @param xmppPacket to send
+ * @return true if packet was sent
+ */
+ boolean sendPacket(Packet xmppPacket);
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/package-info.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/package-info.java
new file mode 100644
index 0000000..fcd5910
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP APIs.
+ */
+package org.onosproject.xmpp.core;
\ No newline at end of file
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamClose.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamClose.java
new file mode 100644
index 0000000..38fc451
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamClose.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.stream;
+
+/**
+ * Abstracts XMPP stream close event.
+ */
+public class XmppStreamClose implements XmppStreamEvent {
+
+ @Override
+ public String toXml() {
+ return "</stream:stream>";
+ }
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamError.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamError.java
new file mode 100644
index 0000000..c96e8f1
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamError.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.stream;
+
+import org.xmpp.packet.StreamError;
+
+/**
+ * Abstracts XMPP stream error event.
+ */
+public class XmppStreamError extends StreamError implements XmppStreamEvent {
+
+ public XmppStreamError(Condition condition) {
+ super(condition);
+ }
+
+ @Override
+ public String toXml() {
+ return super.toXML();
+ }
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamEvent.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamEvent.java
new file mode 100644
index 0000000..5fc67d3
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamEvent.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.stream;
+
+/**
+ * Abstracts XMPP Stream events such as Close, Open or Error event.
+ */
+public interface XmppStreamEvent {
+
+ /**
+ * Represent XMPP Stream Event as XML element.
+ * @return the XML element.
+ */
+ String toXml();
+
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamOpen.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamOpen.java
new file mode 100644
index 0000000..967296b
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/XmppStreamOpen.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.stream;
+
+import org.dom4j.Attribute;
+import org.dom4j.Element;
+import org.dom4j.Namespace;
+import org.dom4j.io.OutputFormat;
+import org.dom4j.io.XMLWriter;
+import org.xmpp.packet.JID;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.List;
+
+/**
+ * Abstracts XMPP stream open event.
+ */
+public class XmppStreamOpen implements XmppStreamEvent {
+
+ public static final String QNAME = "stream";
+
+ private Element element;
+
+ public XmppStreamOpen(Element element) {
+ this.element = element;
+ }
+
+ @Override
+ public String toXml() {
+ StringWriter out = new StringWriter();
+ XMLWriter writer = new XMLWriter(out, OutputFormat.createCompactFormat());
+ try {
+ out.write("<");
+ writer.write(element.getQualifiedName());
+ for (Attribute attr : (List<Attribute>) element.attributes()) {
+ writer.write(attr);
+ }
+ writer.write(Namespace.get(this.element.getNamespacePrefix(), this.element.getNamespaceURI()));
+ writer.write(Namespace.get("jabber:client"));
+ out.write(">");
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ return out.toString();
+ }
+
+ public JID getFromJid() {
+ String jid = this.element.attribute("from").getValue();
+ return new JID(jid);
+ }
+
+ public Element getElement() {
+ return this.element;
+ }
+
+ public JID getToJid() {
+ String jid = this.element.attribute("to").getValue();
+ return new JID(jid);
+ }
+}
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/package-info.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/package-info.java
new file mode 100644
index 0000000..38510fa
--- /dev/null
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/stream/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation fo XMPP Stream abstractions.
+ */
+package org.onosproject.xmpp.core.stream;
\ No newline at end of file
diff --git a/protocols/xmpp/core/ctl/BUCK b/protocols/xmpp/core/ctl/BUCK
new file mode 100644
index 0000000..a055257
--- /dev/null
+++ b/protocols/xmpp/core/ctl/BUCK
@@ -0,0 +1,25 @@
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+ '//protocols/xmpp/core/api:onos-protocols-xmpp-core-api',
+ '//lib:netty-common',
+ '//lib:netty-transport',
+ '//lib:netty-transport-native-epoll',
+ '//lib:netty-buffer',
+ '//lib:netty-codec',
+ '//lib:org.apache.servicemix.bundles.dom4j',
+ '//lib:tinder-xmpp',
+ '//lib:stax2-api',
+ '//lib:aalto-xml',
+ '//lib:concurrent-hashmap',
+ '//lib:gnu-idn',
+]
+
+TEST_DEPS = [
+ '//lib:TEST',
+ '//core/api:onos-api-tests',
+]
+
+osgi_jar_with_tests (
+ deps = COMPILE_DEPS,
+ test_deps = TEST_DEPS,
+)
\ No newline at end of file
diff --git a/protocols/xmpp/core/ctl/pom.xml b/protocols/xmpp/core/ctl/pom.xml
new file mode 100644
index 0000000..cd8dbcd
--- /dev/null
+++ b/protocols/xmpp/core/ctl/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-xmpp-core</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-xmpp-core-ctl</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>ONOS XMPP controller subsystem API</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/org.igniterealtime/tinder -->
+ <dependency>
+ <groupId>org.igniterealtime</groupId>
+ <artifactId>tinder</artifactId>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/com.fasterxml/aalto-xml -->
+ <dependency>
+ <groupId>com.fasterxml</groupId>
+ <artifactId>aalto-xml</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-xmpp-core-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <version>${netty4.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/DefaultXmppDevice.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/DefaultXmppDevice.java
new file mode 100644
index 0000000..38749c7
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/DefaultXmppDevice.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.xmpp.core.ctl;
+
+
+import com.google.common.base.Preconditions;
+import org.dom4j.Document;
+import org.dom4j.Element;
+import org.onosproject.xmpp.core.XmppConstants;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppSession;
+import org.onosproject.xmpp.core.XmppDeviceAgent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.Message;
+import org.xmpp.packet.Presence;
+import org.xmpp.packet.JID;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.PacketError;
+
+import java.net.InetSocketAddress;
+
+
+/**
+ * Abstraction of XMPP client.
+ */
+public class DefaultXmppDevice implements XmppDevice {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ protected XmppSession session;
+ protected XmppDeviceId deviceId;
+ protected XmppDeviceAgent agent;
+
+ public DefaultXmppDevice(XmppDeviceId xmppDeviceId, XmppDeviceAgent agent, XmppSession xmppSession) {
+ this.deviceId = xmppDeviceId;
+ setAgent(agent);
+ setSession(xmppSession);
+ }
+
+ private void setAgent(XmppDeviceAgent agent) {
+ if (this.agent == null) {
+ this.agent = agent;
+ }
+ }
+
+ public void setSession(XmppSession session) {
+ if (this.session == null) {
+ this.session = session;
+ }
+ }
+
+ @Override
+ public XmppSession getSession() {
+ return this.session;
+ }
+
+ @Override
+ public InetSocketAddress getIpAddress() {
+ return this.session.remoteAddress();
+ }
+
+ @Override
+ public void registerConnectedDevice() {
+ this.agent.addConnectedDevice(deviceId, this);
+ }
+
+ @Override
+ public void disconnectDevice() {
+ this.session.closeSession();
+ this.agent.removeConnectedDevice(deviceId);
+ }
+
+ @Override
+ public void writeRawXml(Document document) {
+ Element root = document.getRootElement();
+ Packet packet = null;
+ if (root.getName().equals("iq")) {
+ packet = new IQ(root);
+ } else if (root.getName().equals("message")) {
+ packet = new Message(root);
+ } else if (root.getName().equals("presence")) {
+ packet = new Presence(root);
+ }
+ sendPacket(packet);
+ }
+
+ @Override
+ public void sendPacket(Packet packet) {
+ packet.setTo(this.deviceId.getJid());
+ packet.setFrom(new JID(XmppConstants.SERVER_JID));
+ Preconditions.checkNotNull(packet);
+ if (this.session.isActive()) {
+ this.session.sendPacket(packet);
+ } else {
+ logger.warn("Dropping XMPP packets for switch {} because channel is not connected: {}",
+ this.deviceId, packet);
+ }
+ }
+
+ @Override
+ public void handlePacket(Packet packet) {
+ logger.info("HANDLING PACKET from " + deviceId);
+ this.agent.processUpstreamEvent(deviceId, packet);
+ }
+
+ @Override
+ public void sendError(PacketError packetError) {
+ Packet packet = new IQ();
+ packet.setTo(this.deviceId.getJid());
+ packet.setFrom(new JID(XmppConstants.SERVER_JID));
+ packet.setError(packetError);
+ this.session.sendPacket(packet);
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/DefaultXmppDeviceFactory.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/DefaultXmppDeviceFactory.java
new file mode 100644
index 0000000..ae9f523
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/DefaultXmppDeviceFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceFactory;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppDeviceAgent;
+import org.onosproject.xmpp.core.XmppSession;
+import org.slf4j.Logger;
+import org.xmpp.packet.JID;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Generates XMPP device objects.
+ */
+public final class DefaultXmppDeviceFactory implements XmppDeviceFactory {
+
+ private final Logger logger = getLogger(getClass());
+
+ protected XmppDeviceAgent agent;
+
+ public void init(XmppDeviceAgent manager) {
+ setAgent(manager);
+ }
+
+ /**
+ * Configures XMPP device manager only if it is not initialized.
+ *
+ * @param agent reference object of XMPP device manager
+ */
+ private void setAgent(XmppDeviceAgent agent) {
+ synchronized (agent) {
+ if (this.agent == null) {
+ this.agent = agent;
+ } else {
+ logger.warn("XMPP device manager has already been set.");
+ }
+ }
+ }
+
+ public void cleanAgent() {
+ synchronized (agent) {
+ if (this.agent != null) {
+ this.agent = null;
+ } else {
+ logger.warn("Manager for XMPP device is not configured");
+ }
+ }
+ }
+
+ public XmppDevice getXmppDevice(JID jid, XmppSession session) {
+ XmppDeviceId xmppDeviceId = new XmppDeviceId(jid);
+
+ return getXmppDeviceInstance(xmppDeviceId, session);
+ }
+
+ private XmppDevice getXmppDeviceInstance(XmppDeviceId xmppDeviceId, XmppSession session) {
+ XmppDevice device = agent.getDevice(xmppDeviceId);
+ if (device != null) {
+ return device;
+ } else {
+ XmppDevice newDevice = createXmppDeviceInstance(xmppDeviceId, session);
+ return newDevice;
+ }
+ }
+
+ private XmppDevice createXmppDeviceInstance(XmppDeviceId xmppDeviceId, XmppSession session) {
+ XmppDevice xmppDevice = new DefaultXmppDevice(xmppDeviceId, this.agent, session);
+ return xmppDevice;
+ }
+
+
+
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppChannelInitializer.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppChannelInitializer.java
new file mode 100644
index 0000000..1d44e29
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppChannelInitializer.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import org.onosproject.xmpp.core.XmppDeviceFactory;
+import org.onosproject.xmpp.core.ctl.handlers.XmppChannelHandler;
+import org.onosproject.xmpp.core.ctl.handlers.XmlMerger;
+import org.onosproject.xmpp.core.ctl.handlers.XmlStreamDecoder;
+import org.onosproject.xmpp.core.ctl.handlers.XmppDecoder;
+import org.onosproject.xmpp.core.ctl.handlers.XmppEncoder;
+
+
+/**
+ * Creates pipeline for server-side XMPP channel.
+ */
+public class XmppChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ protected XmppDeviceFactory deviceFactory;
+
+ public XmppChannelInitializer(XmppDeviceFactory deviceFactory) {
+ this.deviceFactory = deviceFactory;
+ }
+
+ /**
+ * Initializes pipeline for XMPP channel.
+ * @throws Exception
+ */
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+
+ XmppChannelHandler handler = new XmppChannelHandler(deviceFactory);
+
+ pipeline.addLast("xmppencoder", new XmppEncoder());
+ pipeline.addLast("xmlstreamdecoder", new XmlStreamDecoder());
+ pipeline.addLast("xmlmerger", new XmlMerger());
+ pipeline.addLast("xmppdecoder", new XmppDecoder());
+ pipeline.addLast("handler", handler);
+
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
new file mode 100644
index 0000000..228aa47
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import com.google.common.collect.Maps;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.CoreService;
+import org.onosproject.xmpp.core.XmppController;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppDeviceListener;
+import org.onosproject.xmpp.core.XmppIqListener;
+import org.onosproject.xmpp.core.XmppMessageListener;
+import org.onosproject.xmpp.core.XmppPresenceListener;
+import org.onosproject.xmpp.core.XmppDeviceAgent;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.Message;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.Presence;
+
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+
+/**
+ * The main class (bundle) of XMPP protocol.
+ * Responsible for:
+ * 1. Initialization and starting XMPP server.
+ * 2. Handling XMPP packets from clients and writing to clients.
+ * 3. Configuration parameters initialization.
+ * 4. Notifing listeners about XMPP events/packets.
+ */
+@Component(immediate = true)
+@Service
+public class XmppControllerImpl implements XmppController {
+
+ private static final String APP_ID = "org.onosproject.xmpp";
+ private static final String XMPP_PORT = "5269";
+
+ private static final Logger log =
+ LoggerFactory.getLogger(XmppControllerImpl.class);
+
+ // core services declaration
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+
+ // configuration properties definition
+ @Property(name = "xmppPort", value = XMPP_PORT,
+ label = "Port number used by XMPP protocol; default is 5269")
+ private String xmppPort = XMPP_PORT;
+
+
+ // listener declaration
+ protected Set<XmppDeviceListener> xmppDeviceListeners = new CopyOnWriteArraySet<XmppDeviceListener>();
+
+ protected Set<XmppIqListener> xmppIqListeners = new CopyOnWriteArraySet<XmppIqListener>();
+ protected Set<XmppMessageListener> xmppMessageListeners = new CopyOnWriteArraySet<XmppMessageListener>();
+ protected Set<XmppPresenceListener> xmppPresenceListeners = new CopyOnWriteArraySet<XmppPresenceListener>();
+
+ protected XmppDeviceAgent agent = new DefaultXmppDeviceAgent();
+
+ private final XmppServer xmppServer = new XmppServer();
+ private DefaultXmppDeviceFactory deviceFactory = new DefaultXmppDeviceFactory();
+
+ ConcurrentMap<XmppDeviceId, XmppDevice> connectedDevices = Maps.newConcurrentMap();
+ ConcurrentMap<InetSocketAddress, XmppDeviceId> addressDeviceIdMap = Maps.newConcurrentMap();
+
+ @Activate
+ public void activate(ComponentContext context) {
+ log.info("XmppControllerImpl started.");
+ coreService.registerApplication(APP_ID, this::cleanup);
+ cfgService.registerProperties(getClass());
+ deviceFactory.init(agent);
+ xmppServer.setConfiguration(context.getProperties());
+ xmppServer.start(deviceFactory);
+ }
+
+ @Deactivate
+ public void deactivate() {
+ cleanup();
+ cfgService.unregisterProperties(getClass(), false);
+ log.info("Stopped");
+ }
+
+ private void cleanup() {
+ xmppServer.stop();
+ deviceFactory.cleanAgent();
+ connectedDevices.values().forEach(XmppDevice::disconnectDevice);
+ connectedDevices.clear();
+ }
+
+ @Override
+ public XmppDevice getDevice(XmppDeviceId xmppDeviceId) {
+ return connectedDevices.get(xmppDeviceId);
+ }
+
+ @Override
+ public void addXmppDeviceListener(XmppDeviceListener deviceListener) {
+ xmppDeviceListeners.add(deviceListener);
+ }
+
+ @Override
+ public void removeXmppDeviceListener(XmppDeviceListener deviceListener) {
+ xmppDeviceListeners.remove(deviceListener);
+ }
+
+ @Override
+ public void addXmppIqListener(XmppIqListener iqListener) {
+ xmppIqListeners.add(iqListener);
+ }
+
+ @Override
+ public void removeXmppIqListener(XmppIqListener iqListener) {
+ xmppIqListeners.remove(iqListener);
+ }
+
+ @Override
+ public void addXmppMessageListener(XmppMessageListener messageListener) {
+ xmppMessageListeners.add(messageListener);
+ }
+
+ @Override
+ public void removeXmppMessageListener(XmppMessageListener messageListener) {
+ xmppMessageListeners.remove(messageListener);
+ }
+
+ @Override
+ public void addXmppPresenceListener(XmppPresenceListener presenceListener) {
+ xmppPresenceListeners.add(presenceListener);
+ }
+
+ @Override
+ public void removeXmppPresenceListener(XmppPresenceListener presenceListener) {
+ xmppPresenceListeners.remove(presenceListener);
+ }
+
+
+ private class DefaultXmppDeviceAgent implements XmppDeviceAgent {
+
+ @Override
+ public boolean addConnectedDevice(XmppDeviceId deviceId, XmppDevice device) {
+ if (connectedDevices.get(deviceId) != null) {
+ log.warn("Trying to add Xmpp Device but found a previous " +
+ "value for XMPP deviceId: {}", deviceId);
+ return false;
+ } else {
+ log.info("Added XMPP device: {}", deviceId);
+ connectedDevices.put(deviceId, device);
+ for (XmppDeviceListener listener : xmppDeviceListeners) {
+ listener.deviceConnected(deviceId);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public void removeConnectedDevice(XmppDeviceId deviceId) {
+ connectedDevices.remove(deviceId);
+ for (XmppDeviceListener listener : xmppDeviceListeners) {
+ listener.deviceDisconnected(deviceId);
+ }
+ }
+
+ @Override
+ public XmppDevice getDevice(XmppDeviceId deviceId) {
+ return connectedDevices.get(deviceId);
+ }
+
+ @Override
+ public void processUpstreamEvent(XmppDeviceId deviceId, Packet packet) {
+ if (packet instanceof IQ) {
+ for (XmppIqListener iqListener : xmppIqListeners) {
+ iqListener.handleIqStanza((IQ) packet);
+ }
+ }
+ if (packet instanceof Message) {
+ for (XmppMessageListener messageListener : xmppMessageListeners) {
+ messageListener.handleMessageStanza((Message) packet);
+ }
+ }
+ if (packet instanceof Presence) {
+ for (XmppPresenceListener presenceListener : xmppPresenceListeners) {
+ presenceListener.handlePresenceStanza((Presence) packet);
+ }
+ }
+ }
+
+ }
+
+
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
new file mode 100644
index 0000000..42c9b39
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import com.google.common.base.Strings;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.onosproject.xmpp.core.XmppDeviceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Dictionary;
+
+import static org.onlab.util.Tools.get;
+
+/**
+ * The XMPP server class. Starts XMPP server and listens to new XMPP device TCP connections.
+ */
+public class XmppServer {
+
+ protected static final Logger log = LoggerFactory.getLogger(XmppServer.class);
+
+ protected Integer port = 5259;
+
+ protected Channel channel;
+ protected EventLoopGroup eventLoopGroup;
+ protected Class<? extends AbstractChannel> channelClass;
+
+
+ /**
+ * Initializes XMPP server.
+ */
+ public void init() {
+
+ }
+
+ /**
+ * Runs XMPP server thread.
+ * @param deviceFactory XMPP devices factory
+ */
+ public void run(XmppDeviceFactory deviceFactory) {
+ try {
+ final ServerBootstrap bootstrap = createServerBootStrap(deviceFactory);
+
+ InetSocketAddress socketAddress = new InetSocketAddress(port);
+ channel = bootstrap.bind(socketAddress).sync().channel().closeFuture().channel();
+
+ log.info("Listening for device connections on {}", socketAddress);
+
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private ServerBootstrap createServerBootStrap(XmppDeviceFactory deviceFactory) {
+
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ configureBootstrap(bootstrap);
+ initEventLoopGroup();
+
+ bootstrap.group(eventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new XmppChannelInitializer(deviceFactory));
+
+ return bootstrap;
+ }
+
+ /**
+ * Initializes event loop group.
+ */
+ private void initEventLoopGroup() {
+
+ // try to use EpollEventLoopGroup if possible,
+ // if OS does not support native Epoll, fallback to use netty NIO
+ try {
+ eventLoopGroup = new EpollEventLoopGroup();
+ channelClass = EpollSocketChannel.class;
+ return;
+ } catch (Error e) {
+ log.debug("Failed to initialize native (epoll) transport. "
+ + "Reason: {}. Proceeding with NIO event group.", e);
+ }
+ eventLoopGroup = new NioEventLoopGroup();
+ channelClass = NioServerSocketChannel.class;
+ }
+
+ private void configureBootstrap(ServerBootstrap bootstrap) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
+ bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ bootstrap.option(ChannelOption.SO_RCVBUF, 2048);
+ }
+
+ /**
+ * TLS/SSL setup. If needed.
+ */
+ private void initTls() {
+ // TODO: add support for TLS/SSL
+ }
+
+ /**
+ * Sets configuration parameters defined via ComponentConfiguration subsystem.
+ * @param properties properties to be set
+ */
+ public void setConfiguration(Dictionary<?, ?> properties) {
+ String port = get(properties, "xmppPort");
+ if (!Strings.isNullOrEmpty(port)) {
+ this.port = Integer.parseInt(port);
+ }
+ log.debug("XMPP port set to {}", this.port);
+ }
+
+ /**
+ * Starts XMPP server.
+ *
+ * @param deviceFactory XMPP devices factory
+ */
+ public void start(XmppDeviceFactory deviceFactory) {
+ log.info("XMPP Server has started.");
+ this.run(deviceFactory);
+ }
+
+ /**
+ * Stops XMPP server.
+ *
+ */
+ public void stop() {
+ log.info("Stopping XMPP I/O");
+ channel.close();
+ eventLoopGroup.shutdownGracefully();
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppValidator.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppValidator.java
new file mode 100644
index 0000000..5e4c708
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppValidator.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import org.dom4j.Element;
+import org.onosproject.xmpp.core.XmppConstants;
+import org.onosproject.xmpp.core.ctl.exception.XmppValidationException;
+import org.onosproject.xmpp.core.stream.XmppStreamOpen;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.Message;
+import org.xmpp.packet.Presence;
+import org.xmpp.packet.JID;
+import org.xmpp.packet.Packet;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Validates incoming XMPP packets.
+ */
+public class XmppValidator {
+
+ public void validateStream(XmppStreamOpen stream) throws XmppValidationException {
+ try {
+ String jid = stream.getElement().attribute("from").getValue();
+ validateJid(jid);
+ } catch (Exception e) {
+ throw new XmppValidationException(true);
+ }
+ }
+
+ public void validate(Packet packet) throws XmppValidationException {
+ validateBasicXmpp(packet);
+ Element root = packet.getElement();
+ if (root.getName().equals(XmppConstants.IQ_QNAME)) {
+ validateIQ((IQ) packet);
+ } else if (root.getName().equals(XmppConstants.MESSAGE_QNAME)) {
+ validateMessage((Message) packet);
+ } else if (root.getName().equals(XmppConstants.PRESENCE_QNAME)) {
+ validatePresence((Presence) packet);
+ }
+ }
+
+ public void validateIQ(IQ iq) throws XmppValidationException{
+ // TODO: implement IQ validation
+ }
+
+ public void validateMessage(Message message) throws XmppValidationException {
+ // TODO: implement Message validation
+ }
+
+ public void validatePresence(Presence presence) throws XmppValidationException {
+ // TODO: implement Presence validation
+ }
+
+ private void validateBasicXmpp(Packet packet) throws XmppValidationException {
+ try {
+ validateJid(packet.getFrom());
+ validateJid(packet.getTo());
+ } catch (Exception e) {
+ throw new XmppValidationException(false);
+ }
+ }
+
+ public void validateJid(String jid) throws XmppValidationException {
+ try {
+ checkNotNull(jid);
+ JID testJid = new JID(jid);
+ } catch (Exception e) {
+ throw new XmppValidationException(false);
+ }
+ }
+
+ public void validateJid(JID jid) {
+ checkNotNull(jid);
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/UnsupportedStanzaTypeException.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/UnsupportedStanzaTypeException.java
new file mode 100644
index 0000000..ca5f2f0
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/UnsupportedStanzaTypeException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.exception;
+
+/**
+ * Exception is thrown when the unsupported stanza type is received.
+ */
+public class UnsupportedStanzaTypeException extends Exception {
+
+ public UnsupportedStanzaTypeException(String message) {
+ super(message);
+ }
+
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/XmlRestrictionsException.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/XmlRestrictionsException.java
new file mode 100644
index 0000000..0947314
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/XmlRestrictionsException.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.exception;
+
+/**
+ * Exception is thrown when prohibited XML element is received.
+ */
+public class XmlRestrictionsException extends Exception {
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/XmppValidationException.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/XmppValidationException.java
new file mode 100644
index 0000000..ccc81dd
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/XmppValidationException.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.exception;
+
+/**
+ * Exception is thrown when the validation of XMPP packet fails.
+ */
+public class XmppValidationException extends Exception {
+
+ private boolean streamValidation;
+
+ public XmppValidationException(boolean streamValidation) {
+ this.streamValidation = streamValidation;
+ }
+
+ public boolean isStreamValidationException() {
+ return streamValidation;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ XmppValidationException that = (XmppValidationException) o;
+
+ return streamValidation == that.streamValidation;
+ }
+
+ @Override
+ public int hashCode() {
+ return (streamValidation ? 1 : 0);
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/package-info.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/package-info.java
new file mode 100644
index 0000000..6db4694
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/exception/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP exceptions.
+ */
+package org.onosproject.xmpp.core.ctl.exception;
\ No newline at end of file
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmlMerger.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmlMerger.java
new file mode 100644
index 0000000..8cbea8c
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmlMerger.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+import com.fasterxml.aalto.stax.OutputFactoryImpl;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.dom4j.io.DOMReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.stream.XMLEventWriter;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.XMLEvent;
+import javax.xml.transform.dom.DOMResult;
+import java.util.List;
+
+/**
+ * Merges incoming XML events (elements) into XML document.
+ */
+public class XmlMerger extends MessageToMessageDecoder {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ protected final XMLOutputFactory xmlOutputFactory = new OutputFactoryImpl();
+ protected DocumentBuilder docBuilder;
+ protected Document document;
+ protected DOMResult result;
+ protected XMLEventWriter writer;
+ protected int depth;
+
+ public XmlMerger() throws ParserConfigurationException {
+ initDocBuilder();
+ this.resetWriter();
+ }
+
+ private void initDocBuilder() throws ParserConfigurationException {
+ try {
+ final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+ docBuilderFactory.setIgnoringElementContentWhitespace(true);
+ docBuilderFactory.setIgnoringComments(true);
+ this.docBuilder = docBuilderFactory.newDocumentBuilder();
+ } catch (ParserConfigurationException e) {
+ throw e;
+ }
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, Object object, List out) throws Exception {
+ try {
+ if (object instanceof XMLEvent) {
+ final XMLEvent event = (XMLEvent) object;
+
+ if (event.isStartDocument() || event.isEndDocument()) {
+ return;
+ }
+
+ if (event.isCharacters() && depth <= 1) {
+ return;
+ }
+
+ if (depth < 1 && event.isStartElement()) {
+ out.add(object);
+ depth++;
+ return;
+ }
+
+ if (depth <= 1 && event.isEndElement()) {
+ out.add(object);
+ depth--;
+ return;
+ }
+
+ writer.add(event);
+
+ if (event.isStartElement()) {
+ depth++;
+ } else if (event.isEndElement()) {
+ depth--;
+
+ if (depth == 1) {
+ writer.flush();
+ org.dom4j.Element xmlElement = transform().getRootElement();
+ out.add(xmlElement);
+ writer.close();
+ resetWriter();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.info(e.getCause().getMessage());
+ throw e;
+ }
+ }
+
+ private org.dom4j.Document transform() {
+ org.dom4j.io.DOMReader reader = new DOMReader();
+ return reader.read(document);
+ }
+
+ private void resetWriter() {
+ try {
+ document = newDocument();
+ result = new DOMResult(document);
+ writer = xmlOutputFactory.createXMLEventWriter(result);
+ } catch (XMLStreamException e) {
+ throw new InternalError("Error creating writer");
+ }
+ }
+
+ private Document newDocument() {
+ return docBuilder.newDocument();
+ }
+
+
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmlStreamDecoder.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmlStreamDecoder.java
new file mode 100644
index 0000000..21425de
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmlStreamDecoder.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+
+import com.fasterxml.aalto.AsyncByteArrayFeeder;
+import com.fasterxml.aalto.AsyncXMLInputFactory;
+import com.fasterxml.aalto.AsyncXMLStreamReader;
+import com.fasterxml.aalto.stax.InputFactoryImpl;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.codehaus.stax2.ri.evt.Stax2EventAllocatorImpl;
+import org.dom4j.DocumentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.stream.XMLStreamException;
+import java.util.List;
+
+/**
+ * Decodes a incoming data from XML stream.
+ */
+public class XmlStreamDecoder extends ByteToMessageDecoder {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private static final AsyncXMLInputFactory XML_INPUT_FACTORY = new InputFactoryImpl();
+ private Stax2EventAllocatorImpl allocator = new Stax2EventAllocatorImpl();
+ private AsyncXMLStreamReader<AsyncByteArrayFeeder> streamReader = XML_INPUT_FACTORY.createAsyncForByteArray();
+ private DocumentFactory df = DocumentFactory.getInstance();
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+
+ AsyncByteArrayFeeder streamFeeder = streamReader.getInputFeeder();
+ logger.info("Decoding XMPP data.. ");
+
+ byte[] buffer = new byte[in.readableBytes()];
+ in.readBytes(buffer);
+ logger.debug("Buffer length: " + buffer.length);
+ try {
+ streamFeeder.feedInput(buffer, 0, buffer.length);
+ } catch (XMLStreamException exception) {
+ logger.info(exception.getMessage());
+ in.skipBytes(in.readableBytes());
+ logger.info("Bytes skipped");
+ throw exception;
+ }
+
+ while (streamReader.hasNext() && streamReader.next() != AsyncXMLStreamReader.EVENT_INCOMPLETE) {
+ out.add(allocator.allocate(streamReader));
+ }
+
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppChannelHandler.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppChannelHandler.java
new file mode 100644
index 0000000..548129f
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppChannelHandler.java
@@ -0,0 +1,311 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+import com.fasterxml.aalto.WFCException;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.CombinedChannelDuplexHandler;
+import org.dom4j.Element;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceFactory;
+import org.onosproject.xmpp.core.XmppSession;
+import org.onosproject.xmpp.core.ctl.exception.UnsupportedStanzaTypeException;
+import org.onosproject.xmpp.core.ctl.exception.XmppValidationException;
+
+import org.onosproject.xmpp.core.stream.XmppStreamClose;
+import org.onosproject.xmpp.core.stream.XmppStreamError;
+import org.onosproject.xmpp.core.stream.XmppStreamOpen;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmpp.packet.Packet;
+
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+
+/**
+ * Handles a XMPP channel related events and implements XMPP state machine.
+ */
+public class XmppChannelHandler extends CombinedChannelDuplexHandler implements XmppSession {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ protected ExecutorService executorService =
+ Executors.newFixedThreadPool(32, groupedThreads("onos/xmpp", "message-stats-%d", logger));
+
+ protected volatile ChannelState state;
+ protected Channel channel;
+
+ protected XmppDevice xmppDevice;
+ private XmppDeviceFactory deviceFactory;
+
+ public XmppChannelHandler(XmppDeviceFactory deviceFactory) {
+ ChannelInboundHandlerAdapter inboundHandlerAdapter = new ChannelInboundHandlerAdapter();
+ ChannelOutboundHandlerAdapter outboundHandlerAdapter = new ChannelOutboundHandlerAdapter();
+ this.init(inboundHandlerAdapter, outboundHandlerAdapter);
+ this.state = ChannelState.IDLE;
+ this.deviceFactory = deviceFactory;
+ }
+
+ @Override
+ public boolean isActive() {
+ return channel.isActive();
+ }
+
+ @Override
+ public InetSocketAddress remoteAddress() {
+ return (InetSocketAddress) channel.remoteAddress();
+ }
+
+ @Override
+ public void closeSession() {
+ sendStreamCloseReply();
+ }
+
+ @Override
+ public boolean sendPacket(Packet xmppPacket) {
+ if (channel.isActive()) {
+ channel.writeAndFlush(xmppPacket, channel.voidPromise());
+ return true;
+ } else {
+ logger.warn("Dropping messages for device {} because channel is not connected: {}",
+ xmppDevice.getIpAddress(), xmppPacket);
+ return false;
+ }
+ }
+
+ enum XmppEvent {
+ XmppStreamClose, XmppStreamOpen, XmppStreamError, IQ, Message, Presence
+ }
+
+ enum ChannelState {
+
+ IDLE() {
+ @Override
+ void processStreamClose(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamClose msg) {
+ // ignore
+ }
+
+ @Override
+ void processStreamError(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamError error) {
+ // ignore
+ }
+
+ @Override
+ void processUpstreamXmppPacket(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
+ // ignore
+ handler.logger.info("XMPP Packet in state IDLE received. Packet ignored..");
+ }
+ },
+
+ WAIT_STREAM_CLOSE() {
+ @Override
+ void processDownstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
+ /**
+ * Block all downstream events during WAIT_STREAM_CLOSE.
+ *
+ * RFC 6120
+ * 4.4 Closing a Stream
+ * "2. Refrain from sending any further data over its outbound stream to the other entity,
+ * but continue to process data received from the other entity (and, if necessary, process such data)."
+ */
+ }
+
+ @Override
+ void processStreamClose(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamClose msg) {
+ handler.xmppDevice.disconnectDevice();
+ handler.closeChannel();
+ handler.setState(IDLE);
+ }
+
+ @Override
+ void processStreamOpen(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamOpen streamOpen) {
+ // ignore
+ }
+ },
+
+ STREAM_OPEN() {
+ @Override
+ void processStreamOpen(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamOpen streamOpen) {
+ // ignore
+ }
+
+ @Override
+ void processDownstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
+ if (msg instanceof XmppStreamClose) {
+ handler.setState(ChannelState.WAIT_STREAM_CLOSE);
+ }
+ ctx.writeAndFlush(msg);
+ }
+ };
+
+ void processStreamError(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamError streamError) {
+ handler.handleStreamError(streamError);
+ }
+
+ void processStreamOpen(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamOpen xmppStreamOpen) {
+ handler.xmppDevice = handler.deviceFactory.getXmppDevice(xmppStreamOpen.getFromJid(), handler);
+ handler.sendStreamOpenReply(xmppStreamOpen);
+ handler.xmppDevice.registerConnectedDevice();
+ handler.setState(STREAM_OPEN);
+ }
+
+ void processStreamClose(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamClose msg) {
+ handler.sendStreamCloseReply();
+ handler.xmppDevice.disconnectDevice();
+ }
+
+ void processUpstreamXmppPacket(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
+ handler.executorService.execute(new XmppPacketHandler(handler.xmppDevice, ctx, (Packet) msg));
+ }
+
+ void processDownstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
+ ctx.writeAndFlush(msg);
+ }
+
+ void processUpstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
+ XmppEvent event = XmppEvent.valueOf(msg.getClass().getSimpleName());
+ handler.logger.info("XMPP event {} received in STATE={} for device: {}",
+ event, handler.state, ctx.channel().remoteAddress());
+ switch (event) {
+ case XmppStreamOpen:
+ handler.state.processStreamOpen(handler, ctx, (XmppStreamOpen) msg);
+ break;
+ case XmppStreamClose:
+ handler.state.processStreamClose(handler, ctx, (XmppStreamClose) msg);
+ break;
+ case XmppStreamError:
+ handler.state.processStreamError(handler, ctx, (XmppStreamError) msg);
+ break;
+ case IQ:
+ case Message:
+ case Presence:
+ handler.state.processUpstreamXmppPacket(handler, ctx, msg);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private void closeChannel() {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+
+ private void handleStreamError(XmppStreamError streamError) {
+ // TODO: handle stream errors
+ }
+
+ private void sendStreamCloseReply() {
+ XmppStreamClose streamClose = new XmppStreamClose();
+ channel.writeAndFlush(streamClose);
+ }
+
+ private void sendStreamOpenReply(XmppStreamOpen xmppStreamOpen) {
+ Element element = xmppStreamOpen.getElement().createCopy();
+ element.addAttribute("from", xmppStreamOpen.getToJid().toString());
+ element.addAttribute("to", xmppStreamOpen.getFromJid().toString());
+ XmppStreamOpen xmppStreamOpenReply = new XmppStreamOpen(element);
+ channel.writeAndFlush(xmppStreamOpenReply);
+ }
+
+ private void sendStreamError(XmppStreamError.Condition condition) {
+ XmppStreamError error = new XmppStreamError(condition);
+ channel.writeAndFlush(error);
+ }
+
+ private void handleChannelException(Throwable cause) {
+ XmppStreamError.Condition condition = getStreamErrorCondition(cause.getCause());
+ sendStreamError(condition);
+ sendStreamCloseReply();
+ }
+
+ protected void setState(ChannelState state) {
+ logger.info("Transition from state {} to {}", this.state, state);
+ this.state = state;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel = ctx.channel();
+ logger.info("New device connection from {}",
+ channel.remoteAddress());
+ this.state = ChannelState.IDLE;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ this.state.processUpstreamXmppEvent(this, ctx, msg);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ logger.info("Exception caught: {}", cause.getMessage());
+ handleChannelException(cause.getCause());
+ }
+
+ private XmppStreamError.Condition getStreamErrorCondition(Throwable cause) {
+ //TODO: add error handle mechanisms for each cases
+ if (cause instanceof UnsupportedStanzaTypeException) {
+ return XmppStreamError.Condition.unsupported_stanza_type;
+ } else if (cause instanceof WFCException) {
+ return XmppStreamError.Condition.bad_format;
+ } else if (cause instanceof XmppValidationException) {
+ return XmppStreamError.Condition.bad_format;
+ } else {
+ return XmppStreamError.Condition.internal_server_error;
+ }
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ this.state.processDownstreamXmppEvent(this, ctx, msg);
+ logger.info("Writing packet... Current State " + this.state.toString());
+ }
+
+ /**
+ * XMPP message handler.
+ */
+ private static final class XmppPacketHandler implements Runnable {
+
+ protected final ChannelHandlerContext ctx;
+ protected final Packet packet;
+ protected final XmppDevice xmppDevice;
+
+ public XmppPacketHandler(XmppDevice xmppDevice, ChannelHandlerContext ctx, Packet packet) {
+ this.ctx = ctx;
+ this.packet = packet;
+ this.xmppDevice = xmppDevice;
+ }
+
+ @Override
+ public void run() {
+ xmppDevice.handlePacket(packet);
+ }
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppDecoder.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppDecoder.java
new file mode 100644
index 0000000..9c2a7e9
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppDecoder.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.dom4j.DocumentFactory;
+import org.dom4j.Element;
+import org.dom4j.QName;
+import org.onosproject.xmpp.core.XmppConstants;
+import org.onosproject.xmpp.core.ctl.XmppValidator;
+import org.onosproject.xmpp.core.ctl.exception.UnsupportedStanzaTypeException;
+import org.onosproject.xmpp.core.ctl.exception.XmppValidationException;
+import org.onosproject.xmpp.core.stream.XmppStreamClose;
+import org.onosproject.xmpp.core.stream.XmppStreamError;
+import org.onosproject.xmpp.core.stream.XmppStreamOpen;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.Message;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.Presence;
+
+import javax.xml.stream.events.Attribute;
+import javax.xml.stream.events.Namespace;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+import java.util.Iterator;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Translates XML Element to XMPP Packet.
+ */
+public class XmppDecoder extends MessageToMessageDecoder {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private XmppValidator validator = new XmppValidator();
+
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext, Object object, List out) throws Exception {
+ if (object instanceof Element) {
+ Element root = (Element) object;
+
+ try {
+ Packet packet = recognizeAndReturnXmppPacket(root);
+ validate(packet);
+ out.add(packet);
+ } catch (UnsupportedStanzaTypeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new XmppValidationException(false);
+ }
+
+ } else if (object instanceof XMLEvent) {
+
+ XMLEvent event = (XMLEvent) object;
+ if (event.isStartElement()) {
+ final StartElement element = event.asStartElement();
+
+ if (element.getName().getLocalPart().equals(XmppConstants.STREAM_QNAME)) {
+ DocumentFactory df = DocumentFactory.getInstance();
+ QName qname = (element.getName().getPrefix() == null) ?
+ df.createQName(element.getName().getLocalPart(),
+ element.getName().getNamespaceURI()) :
+ df.createQName(element.getName().getLocalPart(),
+ element.getName().getPrefix(), element.getName().getNamespaceURI());
+
+ Element newElement = df.createElement(qname);
+
+ Iterator nsIt = element.getNamespaces();
+ // add all relevant XML namespaces to Element
+ while (nsIt.hasNext()) {
+ Namespace ns = (Namespace) nsIt.next();
+ newElement.addNamespace(ns.getPrefix(), ns.getNamespaceURI());
+ }
+
+ Iterator attrIt = element.getAttributes();
+ // add all attributes to Element
+ while (attrIt.hasNext()) {
+ Attribute attr = (Attribute) attrIt.next();
+ newElement.addAttribute(attr.getName().getLocalPart(), attr.getValue());
+ }
+ XmppStreamOpen xmppStreamOpen = new XmppStreamOpen(newElement);
+ validator.validateStream(xmppStreamOpen);
+ out.add(xmppStreamOpen);
+ }
+ } else if (event.isEndElement()) {
+ out.add(new XmppStreamClose());
+ }
+ }
+
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ logger.info("Exception caught: {}", cause.getMessage());
+ if (cause.getCause() instanceof XmppValidationException) {
+ if (((XmppValidationException) cause.getCause()).isStreamValidationException()) {
+ XmppStreamError.Condition condition = XmppStreamError.Condition.bad_format;
+ XmppStreamError error = new XmppStreamError(condition);
+ ctx.channel().writeAndFlush(error);
+ ctx.channel().writeAndFlush(new XmppStreamClose());
+ return;
+ }
+ }
+ logger.info("Not a StreamValidationException. Sending exception upstream.");
+ ctx.fireExceptionCaught(cause);
+ }
+
+
+ private void validate(Packet packet) throws UnsupportedStanzaTypeException, XmppValidationException {
+ validator.validate(packet);
+ }
+
+ protected Packet recognizeAndReturnXmppPacket(Element root)
+ throws UnsupportedStanzaTypeException, IllegalArgumentException {
+ checkNotNull(root);
+
+ Packet packet = null;
+ if (root.getName().equals(XmppConstants.IQ_QNAME)) {
+ packet = new IQ(root);
+ } else if (root.getName().equals(XmppConstants.MESSAGE_QNAME)) {
+ packet = new Message(root);
+ } else if (root.getName().equals(XmppConstants.PRESENCE_QNAME)) {
+ packet = new Presence(root);
+ } else {
+ throw new UnsupportedStanzaTypeException("Unrecognized XMPP Packet");
+ }
+ logger.info("XMPP Packet received\n" + root.asXML());
+ return packet;
+ }
+
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppEncoder.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppEncoder.java
new file mode 100644
index 0000000..00c8e0a
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/XmppEncoder.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import io.netty.util.CharsetUtil;
+import org.onosproject.xmpp.core.stream.XmppStreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmpp.packet.Packet;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Encodes XMPP message and writes XML data to channel.
+ */
+public class XmppEncoder extends MessageToByteEncoder {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
+ byte[] bytes = null;
+
+ if (msg instanceof XmppStreamEvent) {
+ XmppStreamEvent streamEvent = (XmppStreamEvent) msg;
+ logger.info("SENDING: {}", streamEvent.toXml());
+ bytes = streamEvent.toXml().getBytes(CharsetUtil.UTF_8);
+ }
+
+ if (msg instanceof Packet) {
+ Packet pkt = (Packet) msg;
+ logger.info("SENDING /n, {}", pkt.toString());
+ bytes = pkt.toXML().getBytes(CharsetUtil.UTF_8);
+ }
+
+ out.writeBytes(checkNotNull(bytes));
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/package-info.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/package-info.java
new file mode 100644
index 0000000..20526e6
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/handlers/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP channel handlers.
+ */
+package org.onosproject.xmpp.core.ctl.handlers;
\ No newline at end of file
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/package-info.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/package-info.java
new file mode 100644
index 0000000..ffbd8dc
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP elements.
+ */
+package org.onosproject.xmpp.core.ctl;
\ No newline at end of file
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/ChannelAdapter.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/ChannelAdapter.java
new file mode 100644
index 0000000..0bf6322
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/ChannelAdapter.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+
+import java.net.SocketAddress;
+
+/**
+ * Adapter for Netty channel.
+ */
+public class ChannelAdapter implements Channel {
+ @Override
+ public ChannelId id() {
+ return null;
+ }
+
+ @Override
+ public EventLoop eventLoop() {
+ return null;
+ }
+
+ @Override
+ public Channel parent() {
+ return null;
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return false;
+ }
+
+ @Override
+ public boolean isActive() {
+ return true;
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture closeFuture() {
+ return null;
+ }
+
+ @Override
+ public boolean isWritable() {
+ return false;
+ }
+
+ @Override
+ public long bytesBeforeUnwritable() {
+ return 0;
+ }
+
+ @Override
+ public long bytesBeforeWritable() {
+ return 0;
+ }
+
+ @Override
+ public Unsafe unsafe() {
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public Channel read() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public Channel flush() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ return null;
+ }
+
+ @Override
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+ return null;
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+ return false;
+ }
+
+ @Override
+ public int compareTo(Channel o) {
+ return 0;
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/ChannelHandlerContextAdapter.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/ChannelHandlerContextAdapter.java
new file mode 100644
index 0000000..a8ce1eb
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/ChannelHandlerContextAdapter.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutor;
+
+import java.net.SocketAddress;
+
+/**
+ * Adapter for testing against a netty channel handler context.
+ */
+public class ChannelHandlerContextAdapter implements ChannelHandlerContext {
+ @Override
+ public Channel channel() {
+ return new ChannelAdapter();
+ }
+
+ @Override
+ public EventExecutor executor() {
+ return null;
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandler handler() {
+ return null;
+ }
+
+ @Override
+ public boolean isRemoved() {
+ return false;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRegistered() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelUnregistered() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelActive() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelInactive() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireUserEventTriggered(Object evt) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRead(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelReadComplete() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelWritabilityChanged() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext read() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext flush() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return null;
+ }
+
+ @Override
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+ return null;
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+ return false;
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
new file mode 100644
index 0000000..fb19ee1
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import com.google.common.collect.ImmutableSet;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.*;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.CoreService;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppDeviceListener;
+import org.onosproject.xmpp.core.XmppIqListener;
+import org.onosproject.xmpp.core.XmppMessageListener;
+import org.onosproject.xmpp.core.XmppPresenceListener;
+import org.onosproject.xmpp.core.XmppDeviceAgent;
+import org.osgi.service.component.ComponentContext;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.JID;
+import org.xmpp.packet.Message;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.Presence;
+
+
+/**
+ * Test class for XmppControllerImpl.
+ */
+public class XmppControllerImplTest {
+
+ XmppControllerImpl controller;
+ XmppDeviceAgent agent;
+ TestXmppDeviceListener testXmppDeviceListener;
+ TestXmppIqListener testXmppIqListener;
+ TestXmppMessageListener testXmppMessageListener;
+ TestXmppPresenceListener testXmppPresenceListener;
+
+ XmppDevice device1;
+ XmppDeviceId jid1;
+ XmppDevice device2;
+ XmppDeviceId jid2;
+ XmppDevice device3;
+ XmppDeviceId jid3;
+
+ /**
+ * Test harness for a device listener.
+ */
+ static class TestXmppDeviceListener implements XmppDeviceListener {
+ final List<XmppDeviceId> removedDevices = new ArrayList<>();
+ final List<XmppDeviceId> addedDevices = new ArrayList<>();
+
+ @Override
+ public void deviceConnected(XmppDeviceId deviceId) {
+ addedDevices.add(deviceId);
+ }
+
+ @Override
+ public void deviceDisconnected(XmppDeviceId deviceId) {
+ removedDevices.add(deviceId);
+ }
+ }
+
+ static class TestXmppIqListener implements XmppIqListener {
+ final List<IQ> handledIqs = new ArrayList<>();
+
+ @Override
+ public void handleIqStanza(IQ iq) {
+ handledIqs.add(iq);
+ }
+
+ }
+
+ static class TestXmppMessageListener implements XmppMessageListener {
+ final List<Message> handledMessages = new ArrayList<>();
+
+ @Override
+ public void handleMessageStanza(Message message) {
+ handledMessages.add(message);
+ }
+ }
+
+ static class TestXmppPresenceListener implements XmppPresenceListener {
+ final List<Presence> handledPresenceStanzas = new ArrayList<>();
+
+ @Override
+ public void handlePresenceStanza(Presence presence) {
+ handledPresenceStanzas.add(presence);
+ }
+ }
+
+ /**
+ * Sets up devices to use as data, mocks and launches a controller instance.
+ */
+ @Before
+ public void setUp() {
+ device1 = new XmppDeviceAdapter();
+ jid1 = new XmppDeviceId(new JID("agent1@testxmpp.org"));
+ device2 = new XmppDeviceAdapter();
+ jid2 = new XmppDeviceId(new JID("agent2@testxmpp.org"));
+ device3 = new XmppDeviceAdapter();
+ jid3 = new XmppDeviceId(new JID("agent3@testxmpp.org"));
+
+ controller = new XmppControllerImpl();
+ agent = controller.agent;
+
+ testXmppDeviceListener = new TestXmppDeviceListener();
+ controller.addXmppDeviceListener(testXmppDeviceListener);
+ testXmppIqListener = new TestXmppIqListener();
+ controller.addXmppIqListener(testXmppIqListener);
+ testXmppMessageListener = new TestXmppMessageListener();
+ controller.addXmppMessageListener(testXmppMessageListener);
+ testXmppPresenceListener = new TestXmppPresenceListener();
+ controller.addXmppPresenceListener(testXmppPresenceListener);
+
+ CoreService mockCoreService =
+ EasyMock.createMock(CoreService.class);
+ controller.coreService = mockCoreService;
+
+ ComponentConfigService mockCfgService =
+ EasyMock.createMock(ComponentConfigService.class);
+ expect(mockCfgService.getProperties(anyObject())).andReturn(ImmutableSet.of());
+ mockCfgService.registerProperties(controller.getClass());
+ expectLastCall();
+ mockCfgService.unregisterProperties(controller.getClass(), false);
+ expectLastCall();
+ expect(mockCfgService.getProperties(anyObject())).andReturn(ImmutableSet.of());
+ controller.cfgService = mockCfgService;
+ replay(mockCfgService);
+
+ ComponentContext mockContext = EasyMock.createMock(ComponentContext.class);
+ Dictionary<String, Object> properties = new Hashtable<>();
+ properties.put("xmppPort",
+ "5269");
+ expect(mockContext.getProperties()).andReturn(properties);
+ replay(mockContext);
+ controller.activate(mockContext);
+ }
+
+ @After
+ public void tearDown() {
+ controller.removeXmppDeviceListener(testXmppDeviceListener);
+ controller.removeXmppIqListener(testXmppIqListener);
+ controller.removeXmppMessageListener(testXmppMessageListener);
+ controller.removeXmppPresenceListener(testXmppPresenceListener);
+ controller.deactivate();
+ }
+
+ /**
+ * Tests adding and removing connected devices.
+ */
+ @Test
+ public void testAddRemoveConnectedDevice() {
+ // test adding connected devices
+ boolean add1 = agent.addConnectedDevice(jid1, device1);
+ assertThat(add1, is(true));
+ assertThat(testXmppDeviceListener.addedDevices, hasSize(1));
+ boolean add2 = agent.addConnectedDevice(jid2, device2);
+ assertThat(add2, is(true));
+ assertThat(testXmppDeviceListener.addedDevices, hasSize(2));
+ boolean add3 = agent.addConnectedDevice(jid3, device3);
+ assertThat(add3, is(true));
+ assertThat(testXmppDeviceListener.addedDevices, hasSize(3));
+
+ assertThat(testXmppDeviceListener.addedDevices, hasItems(jid1, jid2, jid3));
+
+ // Test adding a device twice - it should fail
+ boolean addError1 = agent.addConnectedDevice(jid1, device1);
+ assertThat(addError1, is(false));
+
+ assertThat(controller.connectedDevices.size(), is(3));
+
+ // test querying the individual device
+ XmppDevice queriedDevice = controller.getDevice(jid1);
+ assertThat(queriedDevice, is(device1));
+
+ // test removing device
+ agent.removeConnectedDevice(jid3);
+ assertThat(controller.connectedDevices.size(), is(2));
+
+ // Make sure the listener delete callbacks fired
+ assertThat(testXmppDeviceListener.removedDevices, hasSize(1));
+ assertThat(testXmppDeviceListener.removedDevices, hasItems(jid3));
+ }
+
+ /**
+ * Tests adding, removing IQ listeners and handling IQ stanzas.
+ */
+ @Test
+ public void handlePackets() {
+ // IQ packets
+ Packet iq = new IQ();
+ agent.processUpstreamEvent(jid1, iq);
+ assertThat(testXmppIqListener.handledIqs, hasSize(1));
+ agent.processUpstreamEvent(jid2, iq);
+ assertThat(testXmppIqListener.handledIqs, hasSize(2));
+ // Message packets
+ Packet message = new Message();
+ agent.processUpstreamEvent(jid1, message);
+ assertThat(testXmppMessageListener.handledMessages, hasSize(1));
+ agent.processUpstreamEvent(jid2, message);
+ assertThat(testXmppMessageListener.handledMessages, hasSize(2));
+ Packet presence = new Presence();
+ agent.processUpstreamEvent(jid1, presence);
+ assertThat(testXmppPresenceListener.handledPresenceStanzas, hasSize(1));
+ agent.processUpstreamEvent(jid2, presence);
+ assertThat(testXmppPresenceListener.handledPresenceStanzas, hasSize(2));
+ }
+
+
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppDeviceAdapter.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppDeviceAdapter.java
new file mode 100644
index 0000000..cd86acf
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppDeviceAdapter.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import org.dom4j.Document;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppSession;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.PacketError;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Testing adapter for the XMPP device driver class.
+ */
+public class XmppDeviceAdapter implements XmppDevice {
+
+ @java.lang.Override
+ public XmppSession getSession() {
+ return null;
+ }
+
+ @java.lang.Override
+ public InetSocketAddress getIpAddress() {
+ return null;
+ }
+
+ @java.lang.Override
+ public void registerConnectedDevice() {
+
+ }
+
+ @java.lang.Override
+ public void disconnectDevice() {
+
+ }
+
+ @java.lang.Override
+ public void sendPacket(Packet packet) {
+
+ }
+
+ @java.lang.Override
+ public void writeRawXml(Document document) {
+
+ }
+
+ @java.lang.Override
+ public void handlePacket(Packet packet) {
+
+ }
+
+ @java.lang.Override
+ public void sendError(PacketError packetError) {
+
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppServerTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppServerTest.java
new file mode 100644
index 0000000..d48c2a4
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppServerTest.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl;
+
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.onosproject.xmpp.core.XmppDeviceFactory;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test class for XmppServer class.
+ */
+public class XmppServerTest {
+
+ XmppServer server = new XmppServer();
+
+ @Test
+ public void testStart() {
+ XmppDeviceFactory mockXmppDeviceFactory = EasyMock.createMock(XmppDeviceFactory.class);
+ server.start(mockXmppDeviceFactory);
+ assertNotNull(server.channel);
+ assertNotNull(server.channelClass);
+ assertNotNull(server.eventLoopGroup);
+ }
+
+ @Test
+ public void testSetConfiguration() {
+ Dictionary<String, String> properties = new Hashtable<>();
+ properties.put("xmppPort", "5222");
+ server.setConfiguration(properties);
+ assertThat(server.port, is(5222));
+ }
+
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmlMergerTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmlMergerTest.java
new file mode 100644
index 0000000..ab2e53a
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmlMergerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+import com.fasterxml.aalto.AsyncByteArrayFeeder;
+import com.fasterxml.aalto.AsyncXMLStreamReader;
+import com.fasterxml.aalto.stax.InputFactoryImpl;
+import com.google.common.collect.Lists;
+import org.codehaus.stax2.ri.evt.Stax2EventAllocatorImpl;
+import org.dom4j.Element;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.xmpp.core.ctl.ChannelHandlerContextAdapter;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.XMLEvent;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Test class for XmlMerger class.
+ */
+public class XmlMergerTest {
+
+ List<Object> streamOpenXmlEventList;
+ List<Object> streamCloseXmlEventList;
+ List<Object> subscribeMsgEventList;
+ List<Object> publishMsgEventList;
+
+ XmlMerger xmlMerger;
+
+ private String streamOpenMsg = String.format("<stream:stream to='%s' %s %s %s %s %s>", "xmpp.onosproject.org",
+ "from='test@xmpp.org'",
+ "xmlns:stream='http://etherx.jabber.org/streams'",
+ "xmlns='jabber:client'", "xml:lang='en'", "version='1.0'");
+
+ private String streamCloseMsg = "</stream:stream>";
+
+ private String publishMsg = "<iq type='set'\n" +
+ " from='test@xmpp.org'\n" +
+ " to='xmpp.onosproject.org'\n" +
+ " id='request1'>\n" +
+ " <pubsub xmlns='http://jabber.org/protocol/pubsub'>\n" +
+ " <publish node='test'>\n" +
+ " <item id=\"test\">\n" +
+ " <entry xmlns='http://ietf.org/protocol/bgpvpn'>\n" +
+ " <nlri af='1'>10.0.0.1</nlri>\n" +
+ " <next-hop af='1'>169.1.1.1</next-hop>\n" +
+ " <version id='1'/>\n" +
+ " <label>10000</label>\n" +
+ " </entry> \n" +
+ " </item>\n" +
+ " </publish>\n" +
+ " </pubsub>\n" +
+ "</iq>\n";
+
+ private String subscribeMsg = "<iq type='set'\n" +
+ " from='test@xmpp.org'\n" +
+ " to='xmpp.onosproject.org/other-peer'\n" +
+ " id='sub1'>\n" +
+ " <pubsub xmlns='http://jabber.org/protocol/pubsub'>\n" +
+ " <subscribe node='test'/>\n" +
+ " </pubsub>\n" +
+ "</iq>";
+
+ AsyncXMLStreamReader<AsyncByteArrayFeeder> streamReader =
+ new InputFactoryImpl().createAsyncForByteArray();
+ Stax2EventAllocatorImpl allocator = new Stax2EventAllocatorImpl();
+
+ @Before
+ public void setUp() throws Exception {
+ xmlMerger = new XmlMerger();
+ streamOpenXmlEventList = Lists.newArrayList();
+ streamCloseXmlEventList = Lists.newArrayList();
+ subscribeMsgEventList = Lists.newArrayList();
+ publishMsgEventList = Lists.newArrayList();
+ initXmlEventList(streamOpenXmlEventList, streamOpenMsg);
+ initXmlEventList(subscribeMsgEventList, subscribeMsg);
+ initXmlEventList(publishMsgEventList, publishMsg);
+ initXmlEventList(streamCloseXmlEventList, streamCloseMsg);
+ streamReader.closeCompletely();
+ }
+
+ private void initXmlEventList(List<Object> xmlEventList, String xmlMessage)
+ throws XMLStreamException, UnsupportedEncodingException {
+ AsyncByteArrayFeeder streamFeeder = streamReader.getInputFeeder();
+ byte[] buffer = xmlMessage.getBytes("UTF-8");
+ streamFeeder.feedInput(buffer, 0, buffer.length);
+ while (streamReader.hasNext() && streamReader.next() != AsyncXMLStreamReader.EVENT_INCOMPLETE) {
+ xmlEventList.add(allocator.allocate(streamReader));
+ }
+ }
+
+ @Test
+ public void testInit() throws Exception {
+ assertThat(xmlMerger.docBuilder, is(notNullValue()));
+ assertThat(xmlMerger.document, is(notNullValue()));
+ assertThat(xmlMerger.result, is(notNullValue()));
+ assertThat(xmlMerger.writer, is(notNullValue()));
+ assertThat(xmlMerger.xmlOutputFactory, is(notNullValue()));
+ }
+
+ @Test
+ public void testMergeStreamOpen() throws Exception {
+ List<Object> list = Lists.newArrayList();
+ streamOpenXmlEventList.forEach(xmlEvent -> {
+ try {
+ xmlMerger.decode(new ChannelHandlerContextAdapter(), xmlEvent, list);
+ } catch (Exception e) {
+ fail();
+ }
+ });
+ // StreamOpen should not be merged, should be passed as XMLEvent
+ assertThat(list.size(), Matchers.is(1));
+ assertThat(list.get(0), Matchers.is(instanceOf(XMLEvent.class)));
+ assertThat(((XMLEvent) list.get(0)).isStartElement(), Matchers.is(true));
+ }
+
+ @Test
+ public void testMergeSubscribeMsg() throws Exception {
+ List<Object> list = Lists.newArrayList();
+ xmlMerger.depth = 1;
+ subscribeMsgEventList.forEach(xmlEvent -> {
+ try {
+ xmlMerger.decode(new ChannelHandlerContextAdapter(), xmlEvent, list);
+ } catch (Exception e) {
+ fail();
+ }
+ });
+ assertThat("Output list should have size of 1", list.size(), Matchers.is(1));
+ assertThat("Output object should be of type org.dom4j.Element",
+ list.get(0), Matchers.is(instanceOf(Element.class)));
+ Element root = (Element) list.get(0);
+ assertThat("Top level element should be of type IQ",
+ root.getQName().getName(), Matchers.is("iq"));
+ assertThat(root.attributes().size(), Matchers.is(4));
+ assertThat(root.attribute("type").getValue(), Matchers.is("set"));
+ assertNotNull("<pubsub> element should be accessible", root.element("pubsub"));
+ assertThat(root.element("pubsub").getNamespaceURI(), Matchers.is("http://jabber.org/protocol/pubsub"));
+ assertNotNull("<subscribe> element should be accessible",
+ root.element("pubsub").element("subscribe"));
+
+ }
+
+ @Test
+ public void testMergePublishMsg() throws Exception {
+ List<Object> list = Lists.newArrayList();
+ xmlMerger.depth = 1;
+ publishMsgEventList.forEach(xmlEvent -> {
+ try {
+ xmlMerger.decode(new ChannelHandlerContextAdapter(), xmlEvent, list);
+ } catch (Exception e) {
+ fail();
+ }
+ });
+ Element root = (Element) list.get(0);
+ assertThat("Top level element should be of type IQ",
+ root.getQName().getName(), Matchers.is("iq"));
+ assertThat(root.attributes().size(), Matchers.is(4));
+ assertNotNull("<pubsub> element should be accessible", root.element("pubsub"));
+ assertNotNull("<publish> element should be accessible",
+ root.element("pubsub").element("publish"));
+ assertThat(root.element("pubsub").getNamespaceURI(), Matchers.is("http://jabber.org/protocol/pubsub"));
+ assertThat(root.element("pubsub").element("publish").attribute("node").getValue(),
+ Matchers.is("test"));
+ }
+
+ @Test
+ public void testMergeStreamClose() throws Exception {
+ List<Object> list = Lists.newArrayList();
+ streamCloseXmlEventList.forEach(xmlEvent -> {
+ try {
+ xmlMerger.decode(new ChannelHandlerContextAdapter(), xmlEvent, list);
+ } catch (Exception e) {
+ fail();
+ }
+ });
+ // StreamClose should not be merged, should be passed as XMLEvent
+ assertThat(list.size(), Matchers.is(1));
+ assertThat(list.get(0), Matchers.is(instanceOf(XMLEvent.class)));
+ assertThat(((XMLEvent) list.get(0)).isEndElement(), Matchers.is(true));
+
+ }
+
+
+
+
+
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmlStreamDecoderTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmlStreamDecoderTest.java
new file mode 100644
index 0000000..93b17e3
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmlStreamDecoderTest.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import org.apache.commons.io.Charsets;
+import org.junit.Test;
+import org.onosproject.xmpp.core.ctl.ChannelAdapter;
+import org.onosproject.xmpp.core.ctl.ChannelHandlerContextAdapter;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.events.EndElement;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Test class for XmlStreamDecoder.
+ */
+public class XmlStreamDecoderTest {
+
+ private String streamOpenMsg = String.format("<stream:stream to='%s' %s %s %s %s %s>", "xmpp.onosproject.org",
+ "from='test@xmpp.org'",
+ "xmlns:stream='http://etherx.jabber.org/streams'",
+ "xmlns='jabber:client'", "xml:lang='en'", "version='1.0'");
+
+ private String streamCloseMsg = "</stream:stream>";
+
+ private String subscribeMsg = "<iq type='set'" +
+ " from='test@xmpp.org'" +
+ " to='xmpp.onosproject.org'" +
+ " id='sub1'>" +
+ " <pubsub xmlns='http://jabber.org/protocol/pubsub'>" +
+ " <subscribe node='test'/>" +
+ " </pubsub>" +
+ "</iq>";
+
+
+ public class ActiveChannelHandlerContextAdapter
+ extends ChannelHandlerContextAdapter {
+ @Override
+ public Channel channel() {
+ return new ChannelAdapter() {
+ @Override
+ public boolean isActive() {
+ return true;
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testDecodeNoChannel() throws Exception {
+ XmlStreamDecoder decoder = new XmlStreamDecoder();
+
+ List<Object> list = Lists.newArrayList();
+ decoder.decode(new ActiveChannelHandlerContextAdapter(),
+ Unpooled.buffer(), list);
+ assertThat(list.size(), is(0));
+ }
+
+ @Test
+ public void testDecodeStreamOpen() throws Exception {
+ XmlStreamDecoder decoder = new XmlStreamDecoder();
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeBytes(streamOpenMsg.getBytes(Charsets.UTF_8));
+ List<Object> list = Lists.newArrayList();
+ decoder.decode(new ChannelHandlerContextAdapter(), buffer, list);
+ list.forEach(object -> {
+ assertThat(object, is(instanceOf(XMLEvent.class)));
+ });
+ assertThat(list.size(), is(2));
+ assertThat(((XMLEvent) list.get(0)).isStartDocument(), is(true));
+ ((XMLEvent) list.get(0)).isStartElement();
+ }
+
+ @Test
+ public void testDecodeStreamClose() throws Exception {
+ XmlStreamDecoder decoder = new XmlStreamDecoder();
+ // open stream
+ ByteBuf buffer1 = Unpooled.buffer();
+ buffer1.writeBytes(streamOpenMsg.getBytes(Charsets.UTF_8));
+ List<Object> list1 = Lists.newArrayList();
+ decoder.decode(new ChannelHandlerContextAdapter(), buffer1, list1);
+
+ // close stream
+ ByteBuf buffer2 = Unpooled.buffer();
+ buffer2.writeBytes(streamCloseMsg.getBytes(Charsets.UTF_8));
+ List<Object> list2 = Lists.newArrayList();
+ decoder.decode(new ChannelHandlerContextAdapter(), buffer2, list2);
+ list2.forEach(object -> {
+ assertThat(object, is(instanceOf(XMLEvent.class)));
+ });
+ assertThat(list2.size(), is(1));
+ assertThat(((XMLEvent) list2.get(0)).isEndElement(), is(true));
+ }
+
+ @Test
+ public void testDecodeXmppStanza() throws Exception {
+ XmlStreamDecoder decoder = new XmlStreamDecoder();
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeBytes(subscribeMsg.getBytes(Charsets.UTF_8));
+ List<Object> list = Lists.newArrayList();
+ decoder.decode(new ChannelHandlerContextAdapter(), buffer, list);
+ assertThat(list.size(), is(10));
+ list.forEach(object -> {
+ assertThat(object, is(instanceOf(XMLEvent.class)));
+ });
+ assertThat(((XMLEvent) list.get(0)).isStartDocument(), is(true));
+ XMLEvent secondEvent = (XMLEvent) list.get(1);
+ assertThat(secondEvent.isStartElement(), is(true));
+ StartElement secondEventAsStartElement = (StartElement) secondEvent;
+ assertThat(secondEventAsStartElement.getName().getLocalPart(), is("iq"));
+ assertThat(Lists.newArrayList(secondEventAsStartElement.getAttributes()).size(), is(4));
+ assertThat(secondEventAsStartElement.getAttributeByName(QName.valueOf("type")).getValue(), is("set"));
+ assertThat(secondEventAsStartElement.getAttributeByName(QName.valueOf("from")).getValue(),
+ is("test@xmpp.org"));
+ assertThat(secondEventAsStartElement.getAttributeByName(QName.valueOf("to")).getValue(),
+ is("xmpp.onosproject.org"));
+ assertThat(secondEventAsStartElement.getAttributeByName(QName.valueOf("id")).getValue(),
+ is("sub1"));
+ XMLEvent fourthEvent = (XMLEvent) list.get(3);
+ assertThat(fourthEvent.isStartElement(), is(true));
+ StartElement fourthEventAsStartElement = (StartElement) fourthEvent;
+ assertThat(fourthEventAsStartElement.getName().getLocalPart(), is("pubsub"));
+ assertThat(fourthEventAsStartElement.getNamespaceURI(""),
+ is("http://jabber.org/protocol/pubsub"));
+ XMLEvent fifthEvent = (XMLEvent) list.get(5);
+ assertThat(fifthEvent.isStartElement(), is(true));
+ StartElement fifthEventAsStartElement = (StartElement) fifthEvent;
+ assertThat(fifthEventAsStartElement.getName().getLocalPart(), is("subscribe"));
+ assertThat(fifthEventAsStartElement.getAttributeByName(QName.valueOf("node")).getValue(), is("test"));
+ XMLEvent sixthEvent = (XMLEvent) list.get(6);
+ assertThat(sixthEvent.isEndElement(), is(true));
+ EndElement sixthEventAsEndElement = (EndElement) sixthEvent;
+ assertThat(sixthEventAsEndElement.getName().getLocalPart(), is("subscribe"));
+ XMLEvent seventhEvent = (XMLEvent) list.get(8);
+ assertThat(seventhEvent.isEndElement(), is(true));
+ EndElement seventhEventAsEndElement = (EndElement) seventhEvent;
+ assertThat(seventhEventAsEndElement.getName().getLocalPart(), is("pubsub"));
+ XMLEvent eighthEvent = (XMLEvent) list.get(9);
+ assertThat(eighthEvent.isEndElement(), is(true));
+ EndElement eighthEventAsEndElement = (EndElement) eighthEvent;
+ assertThat(eighthEventAsEndElement.getName().getLocalPart(), is("iq"));
+ }
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmppDecoderTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmppDecoderTest.java
new file mode 100644
index 0000000..fe355cc
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmppDecoderTest.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+import com.google.common.collect.Lists;
+import io.netty.channel.ChannelHandlerContext;
+import org.codehaus.stax2.ri.evt.AttributeEventImpl;
+import org.codehaus.stax2.ri.evt.NamespaceEventImpl;
+import org.codehaus.stax2.ri.evt.StartElementEventImpl;
+import org.dom4j.Element;
+import org.dom4j.tree.DefaultElement;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.xmpp.core.ctl.exception.UnsupportedStanzaTypeException;
+import org.onosproject.xmpp.core.stream.XmppStreamOpen;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.JID;
+import org.xmpp.packet.Message;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.Presence;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.Location;
+import javax.xml.stream.events.Attribute;
+import javax.xml.stream.events.Namespace;
+import javax.xml.stream.events.XMLEvent;
+
+import java.util.List;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+
+/**
+ * Test class for XmppDecoder testing.
+ */
+public class XmppDecoderTest {
+
+ XmppDecoder xmppDecoder;
+ Element xmppStanzaElement;
+ XMLEvent streamOpen;
+
+ Element iqElement = new IQ().getElement();
+ Element messageElement = new Message().getElement();
+ Element presenceElement = new Presence().getElement();
+
+ ChannelHandlerContext mockChannelHandlerContext;
+ Location mockLocation;
+
+ @Before
+ public void setUp() {
+ xmppDecoder = new XmppDecoder();
+ mockChannelHandlerContext = EasyMock.createMock(ChannelHandlerContext.class);
+ mockLocation = EasyMock.createMock(Location.class);
+ buildXmppStanza();
+ buildXmppStreamOpen();
+
+ }
+
+ private void buildXmppStreamOpen() {
+ QName qName = new QName("http://etherx.jabber.org/streams", "stream", "stream");
+ Attribute attrTo = new AttributeEventImpl(mockLocation, QName.valueOf("to"), "xmpp.onosproject.org", true);
+ Attribute attrFrom = new AttributeEventImpl(mockLocation, QName.valueOf("from"), "test@xmpp.org", true);
+ List<Attribute> attributes = Lists.newArrayList();
+ attributes.add(attrTo);
+ attributes.add(attrFrom);
+ Namespace streamNs = NamespaceEventImpl.constructNamespace(mockLocation, "stream",
+ "http://etherx.jabber.org/streams");
+ Namespace jabberNs = NamespaceEventImpl.constructDefaultNamespace(mockLocation, "jabber:client");
+ List<Namespace> namespaces = Lists.newArrayList();
+ namespaces.add(streamNs);
+ namespaces.add(jabberNs);
+ streamOpen = StartElementEventImpl.construct(mockLocation, qName, attributes.iterator(),
+ namespaces.iterator(), null);
+ }
+
+
+ private void buildXmppStanza() {
+ xmppStanzaElement = new DefaultElement("iq");
+ xmppStanzaElement.addAttribute("type", "set");
+ xmppStanzaElement.addAttribute("from", "test@xmpp.org");
+ xmppStanzaElement.addAttribute("to", "xmpp.onosproject.org");
+ Element pubsub = new DefaultElement("pubsub",
+ new org.dom4j.Namespace("", "http://jabber.org/protocol/pubsub"));
+ Element subscribe = new DefaultElement("subscribe");
+ subscribe.addAttribute("node", "test");
+ pubsub.add(subscribe);
+ xmppStanzaElement.add(pubsub);
+ }
+
+ @Test
+ public void testDecodeStream() throws Exception {
+ List<Object> out = Lists.newArrayList();
+ xmppDecoder.decode(mockChannelHandlerContext, streamOpen, out);
+ assertThat(out.size(), is(1));
+ assertThat(out.get(0), is(instanceOf(XmppStreamOpen.class)));
+ XmppStreamOpen stream = (XmppStreamOpen) out.get(0);
+ assertThat(stream.getElement(), is(notNullValue()));
+ assertThat(stream.getToJid(), is(new JID("xmpp.onosproject.org")));
+ assertThat(stream.getFromJid(), is(new JID("test@xmpp.org")));
+ }
+
+ @Test
+ public void testDecodeXmppStanza() throws Exception {
+ // TODO: complete it
+ List<Object> out = Lists.newArrayList();
+ xmppDecoder.decode(mockChannelHandlerContext, xmppStanzaElement, out);
+ assertThat(out.size(), is(1));
+ assertThat(out.get(0), is(instanceOf(Packet.class)));
+ assertThat(out.get(0), is(instanceOf(IQ.class)));
+ IQ iq = (IQ) out.get(0);
+ assertThat(iq.getElement(), is(notNullValue()));
+ assertThat(iq.getFrom(), is(new JID("test@xmpp.org")));
+ assertThat(iq.getTo(), is(new JID("xmpp.onosproject.org")));
+ assertThat(iq.getType(), is(IQ.Type.set));
+ }
+
+ @Test
+ public void testRecognizePacket() throws Exception {
+ Packet iqPacket = xmppDecoder.recognizeAndReturnXmppPacket(iqElement);
+ assertThat(iqPacket, is(instanceOf(IQ.class)));
+ Packet messagePacket = xmppDecoder.recognizeAndReturnXmppPacket(messageElement);
+ assertThat(messagePacket, is(instanceOf(Message.class)));
+ Packet presencePacket = xmppDecoder.recognizeAndReturnXmppPacket(presenceElement);
+ assertThat(presencePacket, is(instanceOf(Presence.class)));
+ Element wrongElement = new DefaultElement("test");
+ try {
+ xmppDecoder.recognizeAndReturnXmppPacket(wrongElement);
+ } catch (Exception e) {
+ assertThat(e, is(instanceOf(UnsupportedStanzaTypeException.class)));
+ }
+ }
+
+}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmppEncoderTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmppEncoderTest.java
new file mode 100644
index 0000000..4178d38
--- /dev/null
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/handlers/XmppEncoderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.core.ctl.handlers;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import org.easymock.EasyMock;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.Packet;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test class for XmppEncoder class.
+ */
+public class XmppEncoderTest {
+
+ private XmppEncoder xmppEncoder;
+ private ChannelHandlerContext channelHandlerContext;
+
+ @Before
+ public void setUp() throws Exception {
+ xmppEncoder = new XmppEncoder();
+ channelHandlerContext = EasyMock.createMock(ChannelHandlerContext.class);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ xmppEncoder = null;
+ }
+
+ @Test
+ public void testEncode() throws Exception {
+ Packet iq = new IQ();
+ ByteBuf buffer = Unpooled.buffer();
+ xmppEncoder.encode(channelHandlerContext, iq, buffer);
+ assertThat(buffer.hasArray(), Matchers.is(true));
+ }
+
+}
diff --git a/protocols/xmpp/core/pom.xml b/protocols/xmpp/core/pom.xml
new file mode 100644
index 0000000..a330e96
--- /dev/null
+++ b/protocols/xmpp/core/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-xmpp</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-xmpp-core</artifactId>
+ <packaging>bundle</packaging>
+
+ <modules>
+ <module>onos-xmpp-core-api</module>
+ <module>onos-xmpp-core-ctl</module>
+ </modules>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/protocols/xmpp/pom.xml b/protocols/xmpp/pom.xml
new file mode 100644
index 0000000..af048c52
--- /dev/null
+++ b/protocols/xmpp/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-protocols</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-xmpp</artifactId>
+ <description>ONOS XMPP Protocol subsystem</description>
+
+ <modules>
+ <module>onos-xmpp-core</module>
+ </modules>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/providers/pom.xml b/providers/pom.xml
index bed5a3d..79cdcb1 100644
--- a/providers/pom.xml
+++ b/providers/pom.xml
@@ -54,6 +54,7 @@
-->
<module>tl1</module>
<module>general</module>
+ <module>xmpp</module>
</modules>
<dependencies>
diff --git a/providers/xmpp/device/BUCK b/providers/xmpp/device/BUCK
new file mode 100644
index 0000000..265d928
--- /dev/null
+++ b/providers/xmpp/device/BUCK
@@ -0,0 +1,26 @@
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+ '//lib:tinder-xmpp',
+ '//protocols/xmpp/core/api:onos-protocols-xmpp-core-api',
+ '//protocols/xmpp/core/ctl:onos-protocols-xmpp-core-ctl',
+]
+
+TEST_DEPS = [
+ '//lib:TEST_ADAPTERS',
+]
+
+osgi_jar_with_tests (
+ deps = COMPILE_DEPS,
+ test_deps = TEST_DEPS,
+)
+
+onos_app (
+ app_name = 'org.onosproject.xmpp.device',
+ title = 'XMPP Device Provider',
+ category = 'Provider',
+ url = 'https://wiki.onosproject.org/display/ONOS/XMPP+as+SBI',
+ description = 'XMPP protocol southbound provider.',
+ required_apps = [
+ 'org.onosproject.protocols.xmpp',
+ ]
+)
\ No newline at end of file
diff --git a/providers/xmpp/device/pom.xml b/providers/xmpp/device/pom.xml
new file mode 100644
index 0000000..ec84565
--- /dev/null
+++ b/providers/xmpp/device/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-xmpp-providers</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>onos-xmpp-provider-device</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>ONOS XMPP protocol Device provider</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.igniterealtime</groupId>
+ <artifactId>tinder</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-xmpp-core-api</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/providers/xmpp/device/src/main/java/org/onosproject/provider/xmpp/device/impl/XmppDeviceProvider.java b/providers/xmpp/device/src/main/java/org/onosproject/provider/xmpp/device/impl/XmppDeviceProvider.java
new file mode 100644
index 0000000..069fc05
--- /dev/null
+++ b/providers/xmpp/device/src/main/java/org/onosproject/provider/xmpp/device/impl/XmppDeviceProvider.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.xmpp.device.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.packet.ChassisId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.device.DefaultDeviceDescription;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.xmpp.core.XmppController;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppDeviceListener;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.xmpp.packet.JID;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provider which will try to fetch the details of XMPP devices from the core and run a capability discovery on each of
+ * the device.
+ */
+@Component(immediate = true)
+public class XmppDeviceProvider extends AbstractProvider implements DeviceProvider {
+
+ private final Logger logger = getLogger(getClass());
+
+ private static final String PROVIDER = "org.onosproject.provider.xmpp.device";
+ private static final String APP_NAME = "org.onosproject.xmpp";
+ private static final String XMPP = "xmpp";
+ private static final String ADDRESS = "address";
+
+ private static final String HARDWARE_VERSION = "XMPP Device";
+ private static final String SOFTWARE_VERSION = "1.0";
+ private static final String SERIAL_NUMBER = "unknown";
+ private static final String IS_NULL_MSG = "XMPP device info is null";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceProviderRegistry providerRegistry;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected XmppController controller;
+
+ protected DeviceProviderService providerService;
+
+ protected ApplicationId appId;
+
+ private XmppDeviceListener deviceListener = new InternalXmppDeviceListener();
+
+ public XmppDeviceProvider() {
+ super(new ProviderId(XMPP, PROVIDER));
+ }
+
+ @Activate
+ public void activate(ComponentContext context) {
+ providerService = providerRegistry.register(this);
+ appId = coreService.registerApplication(APP_NAME);
+ controller.addXmppDeviceListener(deviceListener);
+ logger.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ controller.removeXmppDeviceListener(deviceListener);
+ providerRegistry.unregister(this);
+ providerService = null;
+ }
+
+ @Override
+ public void triggerProbe(DeviceId deviceId) {
+
+ }
+
+ @Override
+ public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
+
+ }
+
+ @Override
+ public boolean isReachable(DeviceId deviceId) {
+ String id = deviceId.uri().getSchemeSpecificPart();
+ JID jid = new JID(id);
+ XmppDeviceId xmppDeviceId = new XmppDeviceId(jid);
+ return controller.getDevice(xmppDeviceId) != null;
+ }
+
+ @Override
+ public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
+
+ }
+
+ private void connectDevice(XmppDeviceId xmppDeviceId) {
+ DeviceId deviceId = DeviceId.deviceId(xmppDeviceId.id());
+ String ipAddress = controller.getDevice(xmppDeviceId).getIpAddress().getAddress().getHostAddress();
+ // Assumption: manufacturer is uniquely identified by domain part of JID
+ String manufacturer = xmppDeviceId.getJid().getDomain();
+
+ ChassisId cid = new ChassisId();
+
+ SparseAnnotations annotations = DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL, XMPP.toUpperCase())
+ .set("IpAddress", ipAddress)
+ .build();
+ DeviceDescription deviceDescription = new DefaultDeviceDescription(
+ deviceId.uri(),
+ Device.Type.OTHER,
+ manufacturer, HARDWARE_VERSION,
+ SOFTWARE_VERSION, SERIAL_NUMBER,
+ cid, true,
+ annotations);
+
+ if (deviceService.getDevice(deviceId) == null) {
+ providerService.deviceConnected(deviceId, deviceDescription);
+ }
+ }
+
+ private void disconnectDevice(XmppDeviceId xmppDeviceId) {
+ Preconditions.checkNotNull(xmppDeviceId, IS_NULL_MSG);
+
+ DeviceId deviceId = DeviceId.deviceId(xmppDeviceId.id());
+ if (deviceService.getDevice(deviceId) != null) {
+ providerService.deviceDisconnected(deviceId);
+ logger.info("XMPP device {} removed from XMPP controller", deviceId);
+ } else {
+ logger.warn("XMPP device {} does not exist in the store, " +
+ "or it may already have been removed", deviceId);
+ }
+ }
+
+ private class InternalXmppDeviceListener implements XmppDeviceListener {
+
+ @Override
+ public void deviceConnected(XmppDeviceId deviceId) {
+ logger.info("NOTIFICATION: device {} connected", deviceId);
+ connectDevice(deviceId);
+ }
+
+ @Override
+ public void deviceDisconnected(XmppDeviceId deviceId) {
+ logger.info("NOTIFICATION: device {} disconnected", deviceId);
+ disconnectDevice(deviceId);
+ }
+ }
+
+
+}
diff --git a/providers/xmpp/device/src/main/java/org/onosproject/provider/xmpp/device/impl/package-info.java b/providers/xmpp/device/src/main/java/org/onosproject/provider/xmpp/device/impl/package-info.java
new file mode 100644
index 0000000..d9908ce
--- /dev/null
+++ b/providers/xmpp/device/src/main/java/org/onosproject/provider/xmpp/device/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provider that uses XMPP as a means of device discovery.
+ */
+package org.onosproject.provider.xmpp.device.impl;
\ No newline at end of file
diff --git a/providers/xmpp/device/src/test/java/org/onosproject/provider/xmpp/device/impl/XmppDeviceProviderTest.java b/providers/xmpp/device/src/test/java/org/onosproject/provider/xmpp/device/impl/XmppDeviceProviderTest.java
new file mode 100644
index 0000000..f5ca6b6
--- /dev/null
+++ b/providers/xmpp/device/src/test/java/org/onosproject/provider/xmpp/device/impl/XmppDeviceProviderTest.java
@@ -0,0 +1,317 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.xmpp.device.impl;
+
+import org.dom4j.Document;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceProvider;
+import org.onosproject.net.device.DeviceProviderRegistry;
+import org.onosproject.net.device.DeviceProviderRegistryAdapter;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceProviderServiceAdapter;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.device.DeviceStoreAdapter;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.xmpp.core.XmppController;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppDeviceListener;
+import org.onosproject.xmpp.core.XmppIqListener;
+import org.onosproject.xmpp.core.XmppMessageListener;
+import org.onosproject.xmpp.core.XmppPresenceListener;
+import org.onosproject.xmpp.core.XmppSession;
+import org.xmpp.packet.JID;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.PacketError;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+/**
+ * Testing class for XmppDeviceProvider.
+ */
+public class XmppDeviceProviderTest {
+
+
+ private final XmppDeviceProvider provider = new XmppDeviceProvider();
+ XmppControllerAdapter xmppController = new XmppControllerAdapter();
+
+ //Provider Mock
+ private final DeviceProviderRegistry deviceRegistry = new MockDeviceProviderRegistry();
+ private final DeviceProviderService providerService = new MockDeviceProviderService();
+ private final DeviceServiceAdapter deviceService = new DeviceServiceAdapter();
+ private final MockDeviceStore deviceStore = new MockDeviceStore();
+
+ //Provider related classes
+ private CoreService coreService;
+ private final ApplicationId appId =
+ new DefaultApplicationId(100, APP_NAME);
+ private static final String APP_NAME = "org.onosproject.xmpp";
+
+ private final HashMap<DeviceId, Device> devices = new HashMap<>();
+
+ private final String agentOneId = "agent1@test.org";
+ private final XmppDeviceId agentOneXmppId = new XmppDeviceId(new JID(agentOneId));
+
+ @Before
+ public void setUp() throws IOException {
+ coreService = createMock(CoreService.class);
+ expect(coreService.registerApplication(APP_NAME))
+ .andReturn(appId).anyTimes();
+ replay(coreService);
+ provider.coreService = coreService;
+ provider.providerRegistry = deviceRegistry;
+ provider.deviceService = deviceService;
+ provider.providerService = providerService;
+ provider.controller = xmppController;
+ provider.activate(null);
+ devices.clear();
+ }
+
+ @Test
+ public void activate() throws Exception {
+ assertTrue("Provider should be registered", deviceRegistry.getProviders().contains(provider.id()));
+ assertEquals("Incorrect device service", deviceService, provider.deviceService);
+ assertEquals("Incorrect provider service", providerService, provider.providerService);
+ assertEquals("Incorrent application id", appId, provider.appId);
+ assertNotNull("XMPP device listener should be added", xmppController.listener);
+ }
+
+ @Test
+ public void deactivate() throws Exception {
+ provider.deactivate();
+ assertNull("Device listener should be removed", xmppController.listener);
+ assertFalse("Provider should not be registered", deviceRegistry.getProviders().contains(provider));
+ assertNull("Provider service should be null", provider.providerService);
+ }
+
+ @Test
+ public void testDeviceAdded() {
+ xmppController.listener.deviceConnected(agentOneXmppId);
+ assertEquals("XMPP device added", 1, devices.size());
+ }
+
+ @Test
+ public void testDeviceRemoved() {
+ xmppController.listener.deviceDisconnected(agentOneXmppId);
+ assertEquals("XMPP device removed", 0, devices.size());
+ }
+
+ @Test
+ public void testIsReachable() {
+ assertTrue(provider.isReachable(DeviceId.deviceId("reachable@xmpp.org")));
+ assertFalse(provider.isReachable(DeviceId.deviceId("non-reachable@xmpp.org")));
+ }
+
+
+ private class MockDeviceProviderRegistry extends DeviceProviderRegistryAdapter {
+
+ final Set<ProviderId> providers = new HashSet<>();
+
+ @Override
+ public DeviceProviderService register(DeviceProvider provider) {
+ providers.add(provider.id());
+ return providerService;
+ }
+
+ @Override
+ public void unregister(DeviceProvider provider) {
+ providers.remove(provider.id());
+ }
+
+ @Override
+ public Set<ProviderId> getProviders() {
+ return providers;
+ }
+
+ }
+
+ private class MockDeviceProviderService extends DeviceProviderServiceAdapter {
+
+ @Override
+ public void deviceConnected(DeviceId deviceId, DeviceDescription desc) {
+ assertNotNull("DeviceId should be not null", deviceId);
+ assertNotNull("DeviceDescription should be not null", desc);
+ deviceStore.createOrUpdateDevice(ProviderId.NONE, deviceId, desc);
+ }
+
+
+ @Override
+ public void deviceDisconnected(DeviceId deviceId) {
+ deviceStore.removeDevice(deviceId);
+ }
+
+ }
+
+ private class MockDeviceStore extends DeviceStoreAdapter {
+
+ @Override
+ public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+ DeviceDescription desc) {
+
+ devices.put(deviceId, new DefaultDevice(providerId, deviceId, desc.type(),
+ desc.manufacturer(), desc.hwVersion(),
+ desc.swVersion(), desc.serialNumber(),
+ desc.chassisId(), desc.annotations()));
+ return null;
+ }
+
+ @Override
+ public DeviceEvent removeDevice(DeviceId deviceId) {
+ devices.remove(deviceId);
+ return null;
+ }
+
+ @Override
+ public Device getDevice(DeviceId deviceId) {
+ return devices.get(deviceId);
+ }
+
+ @Override
+ public int getDeviceCount() {
+ return devices.size();
+ }
+
+ }
+
+ private class XmppControllerAdapter implements XmppController {
+
+ XmppDeviceListener listener = null;
+ Map<XmppDeviceId, XmppDevice> xmppDevices = new HashMap();
+
+ XmppControllerAdapter() {
+ XmppDeviceAdapter reachable = new XmppDeviceAdapter("reachable@xmpp.org", "127.0.0.1", 54333);
+ xmppDevices.put(reachable.xmppDeviceId, reachable);
+ XmppDeviceAdapter testDevice = new XmppDeviceAdapter(agentOneId, "127.0.0.1", 54334);
+ xmppDevices.put(testDevice.xmppDeviceId, testDevice);
+ }
+
+ @java.lang.Override
+ public XmppDevice getDevice(XmppDeviceId xmppDeviceId) {
+ return xmppDevices.get(xmppDeviceId);
+ }
+
+ @java.lang.Override
+ public void addXmppDeviceListener(XmppDeviceListener deviceListener) {
+ this.listener = deviceListener;
+ }
+
+ @java.lang.Override
+ public void removeXmppDeviceListener(XmppDeviceListener deviceListener) {
+ this.listener = null;
+ }
+
+ @java.lang.Override
+ public void addXmppIqListener(XmppIqListener iqListener) {
+
+ }
+
+ @java.lang.Override
+ public void removeXmppIqListener(XmppIqListener iqListener) {
+
+ }
+
+ @java.lang.Override
+ public void addXmppMessageListener(XmppMessageListener messageListener) {
+
+ }
+
+ @java.lang.Override
+ public void removeXmppMessageListener(XmppMessageListener messageListener) {
+
+ }
+
+ @java.lang.Override
+ public void addXmppPresenceListener(XmppPresenceListener presenceListener) {
+
+ }
+
+ @java.lang.Override
+ public void removeXmppPresenceListener(XmppPresenceListener presenceListener) {
+
+ }
+ }
+
+ private class XmppDeviceAdapter implements XmppDevice {
+
+ InetSocketAddress testAddress;
+ XmppDeviceId xmppDeviceId;
+
+ public XmppDeviceAdapter(String jid, String address, int port) {
+ testAddress = new InetSocketAddress(address, port);
+ this.xmppDeviceId = new XmppDeviceId(new JID(jid));
+ }
+
+ @Override
+ public XmppSession getSession() {
+ return null;
+ }
+
+ @Override
+ public InetSocketAddress getIpAddress() {
+ return testAddress;
+ }
+
+ @Override
+ public void registerConnectedDevice() {
+
+ }
+
+ @Override
+ public void disconnectDevice() {
+
+ }
+
+ @Override
+ public void sendPacket(Packet packet) {
+
+ }
+
+ @Override
+ public void writeRawXml(Document document) {
+
+ }
+
+ @Override
+ public void handlePacket(Packet packet) {
+
+ }
+
+ @Override
+ public void sendError(PacketError packetError) {
+
+ }
+
+ }
+
+}
diff --git a/providers/xmpp/pom.xml b/providers/xmpp/pom.xml
new file mode 100644
index 0000000..3ce40d1
--- /dev/null
+++ b/providers/xmpp/pom.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-providers</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-xmpp-providers</artifactId>
+
+ <modules>
+ <module>onos-xmpp-provider-device</module>
+ </modules>
+
+</project>
\ No newline at end of file