Initial implementation of XMPP Publish/Subscribe
Change-Id: I9930056b83004fa7a085f72f63ba973f9ec2d95b
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
index 8b500a3..a16216d 100644
--- a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
@@ -46,18 +46,20 @@
void removeXmppDeviceListener(XmppDeviceListener deviceListener);
/**
- * Register a listener for IQ stanza of XMPP protocol.
+ * Register a listener for IQ stanzas containing specific XML namespace.
*
* @param iqListener the listener to notify
+ * @param namespace the XML namespace to observe
*/
- void addXmppIqListener(XmppIqListener iqListener);
+ void addXmppIqListener(XmppIqListener iqListener, String namespace);
/**
- * Unregister a listener for IQ stanza of XMPP protocol.
+ * Unregister a listener for IQ stanzas containing specific XML namespace.
*
* @param iqListener the listener to unregister
+ * @param namespace the XML namespace to observe
*/
- void removeXmppIqListener(XmppIqListener iqListener);
+ void removeXmppIqListener(XmppIqListener iqListener, String namespace);
/**
* Register a listener for Message stanza of XMPP protocol.
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
index 228aa47..44db54d 100644
--- a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
@@ -16,11 +16,14 @@
package org.onosproject.xmpp.core.ctl;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
-
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
@@ -83,7 +86,7 @@
// listener declaration
protected Set<XmppDeviceListener> xmppDeviceListeners = new CopyOnWriteArraySet<XmppDeviceListener>();
- protected Set<XmppIqListener> xmppIqListeners = new CopyOnWriteArraySet<XmppIqListener>();
+ Multimap<String, XmppIqListener> xmppIqListeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());
protected Set<XmppMessageListener> xmppMessageListeners = new CopyOnWriteArraySet<XmppMessageListener>();
protected Set<XmppPresenceListener> xmppPresenceListeners = new CopyOnWriteArraySet<XmppPresenceListener>();
@@ -97,12 +100,19 @@
@Activate
public void activate(ComponentContext context) {
- log.info("XmppControllerImpl started.");
coreService.registerApplication(APP_ID, this::cleanup);
cfgService.registerProperties(getClass());
deviceFactory.init(agent);
xmppServer.setConfiguration(context.getProperties());
xmppServer.start(deviceFactory);
+ log.info("XmppControllerImpl started.");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ xmppServer.stop();
+ xmppServer.setConfiguration(context.getProperties());
+ xmppServer.start(deviceFactory);
}
@Deactivate
@@ -135,13 +145,13 @@
}
@Override
- public void addXmppIqListener(XmppIqListener iqListener) {
- xmppIqListeners.add(iqListener);
+ public void addXmppIqListener(XmppIqListener iqListener, String namespace) {
+ xmppIqListeners.put(namespace, iqListener);
}
@Override
- public void removeXmppIqListener(XmppIqListener iqListener) {
- xmppIqListeners.remove(iqListener);
+ public void removeXmppIqListener(XmppIqListener iqListener, String namespace) {
+ xmppIqListeners.remove(namespace, iqListener);
}
@Override
@@ -199,19 +209,33 @@
@Override
public void processUpstreamEvent(XmppDeviceId deviceId, Packet packet) {
if (packet instanceof IQ) {
- for (XmppIqListener iqListener : xmppIqListeners) {
- iqListener.handleIqStanza((IQ) packet);
- }
+ IQ iq = (IQ) packet;
+ String namespace = iq.getChildElement().getNamespace().getURI();
+ notifyIqListeners(iq, namespace);
}
if (packet instanceof Message) {
- for (XmppMessageListener messageListener : xmppMessageListeners) {
- messageListener.handleMessageStanza((Message) packet);
- }
+ notifyMessageListeners((Message) packet);
}
if (packet instanceof Presence) {
- for (XmppPresenceListener presenceListener : xmppPresenceListeners) {
- presenceListener.handlePresenceStanza((Presence) packet);
- }
+ notifyPresenceListeners((Presence) packet);
+ }
+ }
+
+ private void notifyPresenceListeners(Presence packet) {
+ for (XmppPresenceListener presenceListener : xmppPresenceListeners) {
+ presenceListener.handlePresenceStanza((Presence) packet);
+ }
+ }
+
+ private void notifyMessageListeners(Message message) {
+ for (XmppMessageListener messageListener : xmppMessageListeners) {
+ messageListener.handleMessageStanza(message);
+ }
+ }
+
+ private void notifyIqListeners(IQ iq, String namespace) {
+ for (XmppIqListener iqListener : xmppIqListeners.get(namespace)) {
+ iqListener.handleIqStanza(iq);
}
}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
index 3913d64..2bc96fd 100644
--- a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
@@ -108,7 +108,6 @@
}
private void configureBootstrap(ServerBootstrap bootstrap) {
- bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.SO_RCVBUF, 2048);
}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
index fb19ee1..75780e5 100644
--- a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
@@ -22,6 +22,9 @@
import java.util.Hashtable;
import com.google.common.collect.ImmutableSet;
+import org.dom4j.Element;
+import org.dom4j.Namespace;
+import org.dom4j.tree.DefaultElement;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
@@ -69,6 +72,8 @@
XmppDevice device3;
XmppDeviceId jid3;
+ final String testNamespace = "testns";
+
/**
* Test harness for a device listener.
*/
@@ -133,7 +138,7 @@
testXmppDeviceListener = new TestXmppDeviceListener();
controller.addXmppDeviceListener(testXmppDeviceListener);
testXmppIqListener = new TestXmppIqListener();
- controller.addXmppIqListener(testXmppIqListener);
+ controller.addXmppIqListener(testXmppIqListener, testNamespace);
testXmppMessageListener = new TestXmppMessageListener();
controller.addXmppMessageListener(testXmppMessageListener);
testXmppPresenceListener = new TestXmppPresenceListener();
@@ -166,7 +171,7 @@
@After
public void tearDown() {
controller.removeXmppDeviceListener(testXmppDeviceListener);
- controller.removeXmppIqListener(testXmppIqListener);
+ controller.removeXmppIqListener(testXmppIqListener, testNamespace);
controller.removeXmppMessageListener(testXmppMessageListener);
controller.removeXmppPresenceListener(testXmppPresenceListener);
controller.deactivate();
@@ -215,7 +220,9 @@
@Test
public void handlePackets() {
// IQ packets
- Packet iq = new IQ();
+ IQ iq = new IQ();
+ Element element = new DefaultElement("pubsub", Namespace.get(testNamespace));
+ iq.setChildElement(element);
agent.processUpstreamEvent(jid1, iq);
assertThat(testXmppIqListener.handledIqs, hasSize(1));
agent.processUpstreamEvent(jid2, iq);
diff --git a/protocols/xmpp/pom.xml b/protocols/xmpp/pom.xml
index 6a16076..1675916 100644
--- a/protocols/xmpp/pom.xml
+++ b/protocols/xmpp/pom.xml
@@ -10,14 +10,14 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>onos-protocols-xmpp</artifactId>
- <description>ONOS XMPP Protocol subsystem</description>
<packaging>pom</packaging>
+ <description>ONOS XMPP Protocol subsystem</description>
<modules>
<module>core</module>
+ <module>pubsub</module>
</modules>
-
<build>
<plugins>
<plugin>
diff --git a/protocols/xmpp/pubsub/BUCK b/protocols/xmpp/pubsub/BUCK
new file mode 100644
index 0000000..9a69e4c
--- /dev/null
+++ b/protocols/xmpp/pubsub/BUCK
@@ -0,0 +1,20 @@
+BUNDLES = [
+ '//protocols/xmpp/core/api:onos-protocols-xmpp-core-api',
+ '//protocols/xmpp/pubsub/api:onos-protocols-xmpp-pubsub-api',
+ '//protocols/xmpp/pubsub/ctl:onos-protocols-xmpp-pubsub-ctl',
+ '//lib:tinder-xmpp',
+ '//lib:concurrent-hashmap',
+ '//lib:gnu-idn',
+]
+
+onos_app(
+ app_name = 'org.onosproject.protocols.xmpp.pubsub',
+ title = 'XMPP Publish/Subscribe protocol extension subsystem',
+ category = 'Protocol',
+ url = 'http://onosproject.org',
+ description = 'XMPP Publish/Subscribe protocol extension subsystem',
+ included_bundles = BUNDLES,
+ required_apps = [
+ 'org.onosproject.protocols.xmpp',
+ ]
+)
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/api/BUCK b/protocols/xmpp/pubsub/api/BUCK
new file mode 100644
index 0000000..d856754
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/BUCK
@@ -0,0 +1,9 @@
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+ '//core/api:onos-api',
+ '//lib:tinder-xmpp',
+]
+
+osgi_jar_with_tests (
+ deps = COMPILE_DEPS,
+)
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/api/pom.xml b/protocols/xmpp/pubsub/api/pom.xml
new file mode 100644
index 0000000..5357060
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-protocols-xmpp-pubsub</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-protocols-xmpp-pubsub-api</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubConstants.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubConstants.java
new file mode 100644
index 0000000..a705263
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubConstants.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub;
+
+import com.google.common.collect.ImmutableMap;
+import org.xmpp.packet.PacketError;
+
+/**
+ * Constant values used across PubSub extension.
+ */
+public final class XmppPubSubConstants {
+
+ public static final String PUBSUB_NAMESPACE = "http://jabber.org/protocol/pubsub";
+ public static final String PUBSUB_EVENT_NS = "http://jabber.org/protocol/pubsub#event";
+ public static final String PUBSUB_ERROR_NS = "http://jabber.org/protocol/pubsub#errors";
+ public static final String PUBSUB_ELEMENT = "pubsub";
+ public static final ImmutableMap<PubSubApplicationCondition, PacketError.Condition>
+ APP_BASE_CONDITION_MAP = new ImmutableMap.Builder<PubSubApplicationCondition, PacketError.Condition>()
+ .put(PubSubApplicationCondition.ITEM_NOT_FOUND, PacketError.Condition.item_not_found)
+ .put(PubSubApplicationCondition.NOT_SUBSCRIBED, PacketError.Condition.unexpected_request)
+ .build();
+
+ public enum PubSubApplicationCondition {
+ NOT_SUBSCRIBED,
+ ITEM_NOT_FOUND
+ }
+
+ private XmppPubSubConstants() {
+ }
+
+ public enum Method {
+ SUBSCRIBE,
+ UNSUBSCRIBE,
+ PUBLISH,
+ RETRACT
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubController.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubController.java
new file mode 100644
index 0000000..ee6e340
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubController.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.xmpp.pubsub.model.XmppEventNotification;
+import org.onosproject.xmpp.pubsub.model.XmppPubSubError;
+
+/**
+ * Responsible for controlling Publish/Subscribe functionality of XMPP.
+ */
+public interface XmppPubSubController {
+
+ /**
+ * Method allows to send event notification to XMPP device.
+ *
+ * @param deviceId identifier of underlaying device
+ * @param eventNotification payload of event notification
+ */
+ void notify(DeviceId deviceId, XmppEventNotification eventNotification);
+
+ /**
+ * Method allows to send error notification to XMPP device.
+ *
+ * @param deviceId identifier of underlaying device
+ * @param error payload of error notification
+ */
+ void notifyError(DeviceId deviceId, XmppPubSubError error);
+
+ /**
+ * Register listener for Publish/Retract events.
+ *
+ * @param xmppPublishEventsListener listener to notify
+ */
+ void addXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener);
+
+ /**
+ * Unregister listener for Publish/Retract events.
+ *
+ * @param xmppPublishEventsListener listener to unregister
+ */
+ void removeXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener);
+
+ /**
+ * Register listener for Subscribe/Unsubscribe events.
+ *
+ * @param xmppSubscribeEventsListener listener to notify
+ */
+ void addXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener);
+
+ /**
+ * Unregister listener for Subscribe/Unsubscribe events.
+ *
+ * @param xmppSubscribeEventsListener listener to unregister
+ */
+ void removeXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener);
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubEvent.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubEvent.java
new file mode 100644
index 0000000..d4c3b76
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPubSubEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Abstracts event of XMPP Publish/Subscribe.
+ */
+public final class XmppPubSubEvent<S> {
+
+ /**
+ * Types of XMPP Publish/Subscribe messages.
+ */
+ public enum Type {
+ SUBSCRIBE,
+ UNSUBSCRIBE,
+ PUBLISH,
+ RETRACT
+ }
+
+ private final Type type;
+ private final S subject;
+
+ public XmppPubSubEvent(Type type, S subject) {
+ this.type = type;
+ this.subject = subject;
+ }
+
+ /**
+ * Returns the type of event.
+ *
+ * @return event type
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the subject of event.
+ *
+ * @return subject to which this event pertains
+ */
+ public S subject() {
+ return subject;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("type", type())
+ .add("subject", subject()).toString();
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPublishEventsListener.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPublishEventsListener.java
new file mode 100644
index 0000000..406b1d6
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppPublishEventsListener.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub;
+
+import org.onosproject.xmpp.pubsub.model.XmppPublish;
+import org.onosproject.xmpp.pubsub.model.XmppRetract;
+
+/**
+ * Allows for providers interested in XMPP Publish/Retract events to be notified.
+ */
+public interface XmppPublishEventsListener {
+
+ /**
+ * Method for handling incoming XMPP Publish message.
+ *
+ * @param publishEvent event related to incoming XMPP Publish message
+ */
+ void handlePublish(XmppPublish publishEvent);
+
+ /**
+ * Method for handling incoming XMPP Retract message.
+ *
+ * @param retractEvent event related to incoming XMPP Retract message
+ */
+ void handleRetract(XmppRetract retractEvent);
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppSubscribeEventsListener.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppSubscribeEventsListener.java
new file mode 100644
index 0000000..4a5d42d
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/XmppSubscribeEventsListener.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub;
+
+import org.onosproject.xmpp.pubsub.model.XmppSubscribe;
+import org.onosproject.xmpp.pubsub.model.XmppUnsubscribe;
+
+/**
+ * Allows for providers interested in XMPP Subscribe/Unsubscribe events to be notified.
+ */
+public interface XmppSubscribeEventsListener {
+
+ /**
+ * Method for handling incoming XMPP Subscribe message.
+ *
+ * @param subscribeEvent event related to incoming XMPP Subscribe message
+ */
+ void handleSubscribe(XmppSubscribe subscribeEvent);
+
+ /**
+ * Method for handling incoming XMPP Unsubscribe message.
+ *
+ * @param unsubscribeEvent event related to incoming XMPP Unsubscribe message
+ */
+ void handleUnsubscribe(XmppUnsubscribe unsubscribeEvent);
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppEventNotification.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppEventNotification.java
new file mode 100644
index 0000000..a4e64b7
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppEventNotification.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.model;
+
+import org.dom4j.Element;
+import org.xmpp.packet.Message;
+
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_EVENT_NS;
+
+/**
+ * Abstracts Event Notification message of XMPP PubSub protocol.
+ */
+public class XmppEventNotification extends Message {
+
+ /**
+ * Constructor for XmppEventNotification class. It generates representation
+ * of XMPP NOTIFY message.
+ *
+ * @param node node attribute of PubSub extension
+ * @param payload XML payload for XMPP NOTIFY message
+ */
+ public XmppEventNotification(String node, Element payload) {
+ super(docFactory.createDocument().addElement("message"));
+ this.addChildElement("event", PUBSUB_EVENT_NS);
+ Element items = docFactory.createElement("items");
+ items.addAttribute("node", node);
+ items.add(payload);
+ this.getElement().element("event").add(items);
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppPubSubError.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppPubSubError.java
new file mode 100644
index 0000000..dcf60d2
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppPubSubError.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.model;
+
+import org.xmpp.packet.PacketError;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.APP_BASE_CONDITION_MAP;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_ERROR_NS;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PubSubApplicationCondition;
+
+/**
+ * Abstracts Publish/Subscribe error message of XMPP PubSub protocol.
+ */
+public class XmppPubSubError {
+
+ private PacketError.Condition baseCondition;
+ private PubSubApplicationCondition applicationCondition;
+
+ public XmppPubSubError(PubSubApplicationCondition applicationCondition) {
+ this.applicationCondition = applicationCondition;
+ this.baseCondition = setBasedOnAppCondition();
+ }
+
+ private PacketError.Condition setBasedOnAppCondition() {
+ return APP_BASE_CONDITION_MAP.getOrDefault(this.applicationCondition,
+ PacketError.Condition.undefined_condition);
+ }
+
+ public PacketError asPacketError() {
+ PacketError packetError = new PacketError(this.baseCondition);
+ if (applicationCondition != null) {
+ packetError.setApplicationCondition(applicationCondition.toString().toLowerCase(),
+ PUBSUB_ERROR_NS);
+ }
+ return packetError;
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppPublish.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppPublish.java
new file mode 100644
index 0000000..97dc2a7
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppPublish.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.model;
+
+import org.dom4j.Element;
+import org.xmpp.packet.IQ;
+
+/**
+ * Abstracts Publish message of XMPP PubSub protocol.
+ */
+public class XmppPublish extends IQ {
+
+ private String jabberId;
+ private String nodeID;
+ private Element item;
+ private String itemID;
+ private Element itemEntry;
+ private String itemEntryNamespace;
+
+ /**
+ * Constructor for XmppPublish class.
+ * @param iq XMPP IQ stanza, which XmppPublish is based on.
+ */
+ public XmppPublish(IQ iq) {
+ super(iq.getElement());
+ this.jabberId = this.fromJID.toString();
+ this.nodeID = this.getChildElement().element("publish").attribute("node").getValue();
+ this.item = this.getChildElement().element("publish").element("item");
+ this.itemID = this.item.attribute("id").getValue();
+ this.itemEntry = this.item.element("entry");
+ this.itemEntryNamespace = this.itemEntry.getNamespaceURI();
+ }
+
+ public String getJabberId() {
+ return this.jabberId;
+ }
+
+ public String getNodeID() {
+ return this.nodeID;
+ }
+
+ public Element getItem() {
+ return this.item;
+ }
+
+ public String getItemID() {
+ return this.itemID;
+ }
+
+ public Element getItemEntry() {
+ return this.itemEntry;
+ }
+
+ public String getItemEntryNamespace() {
+ return this.itemEntryNamespace;
+ }
+
+ @Override
+ public String toString() {
+ return "Publish{" +
+ "JID=" + fromJID +
+ "NodeID=" + this.getNodeID() +
+ "Item=\n" + this.getItem().asXML() +
+ '}';
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppRetract.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppRetract.java
new file mode 100644
index 0000000..83308dd
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppRetract.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.model;
+
+import org.xmpp.packet.IQ;
+
+/**
+ * Abstracts Retract message of XMPP PubSub protocol.
+ */
+public class XmppRetract extends IQ {
+
+ private String jabberId;
+ private String nodeID;
+ private String itemID;
+
+ /**
+ * Constructor for XmppRetract class.
+ *
+ * @param iq XMPP IQ stanza, which XmppRetract is based on.
+ */
+ public XmppRetract(IQ iq) {
+ super(iq.getElement());
+ this.jabberId = this.fromJID.toString();
+ this.nodeID = this.getChildElement().element("retract").attribute("node").getValue();
+ this.itemID = this.getChildElement().element("retract").element("item").attribute("id").getValue();
+ }
+
+ public String getJabberId() {
+ return this.jabberId;
+ }
+
+ public String getNodeID() {
+ return this.nodeID;
+ }
+
+ public String getItemID() {
+ return this.itemID;
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppSubscribe.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppSubscribe.java
new file mode 100644
index 0000000..c3040d5
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppSubscribe.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.model;
+
+import org.xmpp.packet.IQ;
+
+/**
+ * Abstracts Subscribe message of XMPP PubSub protocol.
+ */
+public class XmppSubscribe extends IQ {
+
+ private String jabberId;
+ private String nodeID;
+
+ /**
+ * Constructor for XmppSubscribe class.
+ *
+ * @param iq XMPP IQ stanza, which XmppSubscribe is based on.
+ */
+ public XmppSubscribe(IQ iq) {
+ super(iq.getElement());
+ this.jabberId = this.fromJID.toString();
+ this.nodeID = this.getChildElement().element("subscribe").attribute("node").getValue();
+ }
+
+ public String getJabberId() {
+ return this.jabberId;
+ }
+
+ public String getNodeID() {
+ return this.nodeID;
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppUnsubscribe.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppUnsubscribe.java
new file mode 100644
index 0000000..a256c61
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/XmppUnsubscribe.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.model;
+
+import org.xmpp.packet.IQ;
+
+/**
+ * Abstracts Unsubscribe message of XMPP PubSub protocol.
+ */
+public class XmppUnsubscribe extends IQ {
+
+ private String jabberId;
+ private String nodeID;
+
+ /**
+ * Constructor for XmppUnsubscribe class.
+ *
+ * @param iq XMPP IQ stanza, which XmppUnsubscribe is based on.
+ */
+ public XmppUnsubscribe(IQ iq) {
+ super(iq.getElement());
+ this.jabberId = this.fromJID.toString();
+ this.nodeID = this.getChildElement().element("unsubscribe").attribute("node").getValue();
+ }
+
+ public String getJabberId() {
+ return this.jabberId;
+ }
+
+ public String getNodeID() {
+ return this.nodeID;
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/package-info.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/package-info.java
new file mode 100644
index 0000000..4196048
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/model/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP Publish/Subscribe abstractions.
+ */
+package org.onosproject.xmpp.pubsub.model;
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/package-info.java b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/package-info.java
new file mode 100644
index 0000000..4021f65
--- /dev/null
+++ b/protocols/xmpp/pubsub/api/src/main/java/org/onosproject/xmpp/pubsub/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP Publish/Subscribe APIs.
+ */
+package org.onosproject.xmpp.pubsub;
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/ctl/BUCK b/protocols/xmpp/pubsub/ctl/BUCK
new file mode 100644
index 0000000..e28ec52
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/BUCK
@@ -0,0 +1,19 @@
+COMPILE_DEPS = [
+ '//lib:CORE_DEPS',
+ '//core/api:onos-api',
+ '//lib:tinder-xmpp',
+ '//lib:concurrent-hashmap',
+ '//lib:gnu-idn',
+ '//protocols/xmpp/pubsub/api:onos-protocols-xmpp-pubsub-api',
+ '//protocols/xmpp/core/api:onos-protocols-xmpp-core-api',
+]
+
+TEST_DEPS = [
+ '//lib:TEST',
+ '//core/api:onos-api-tests',
+]
+
+osgi_jar_with_tests (
+ deps = COMPILE_DEPS,
+ test_deps = TEST_DEPS,
+)
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/ctl/pom.xml b/protocols/xmpp/pubsub/ctl/pom.xml
new file mode 100644
index 0000000..d3d23fa
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-protocols-xmpp-pubsub</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-protocols-xmpp-pubsub-ctl</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-protocols-xmpp-pubsub-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-protocols-xmpp-core-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerImpl.java b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerImpl.java
new file mode 100644
index 0000000..cb9900d
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.ctl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.dom4j.Element;
+import org.onosproject.net.DeviceId;
+import org.onosproject.xmpp.core.XmppController;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppIqListener;
+import org.onosproject.xmpp.pubsub.XmppPubSubConstants;
+import org.onosproject.xmpp.pubsub.XmppPubSubController;
+import org.onosproject.xmpp.pubsub.XmppPublishEventsListener;
+import org.onosproject.xmpp.pubsub.XmppSubscribeEventsListener;
+import org.onosproject.xmpp.pubsub.model.XmppEventNotification;
+import org.onosproject.xmpp.pubsub.model.XmppPubSubError;
+import org.onosproject.xmpp.pubsub.model.XmppPublish;
+import org.onosproject.xmpp.pubsub.model.XmppRetract;
+import org.onosproject.xmpp.pubsub.model.XmppSubscribe;
+import org.onosproject.xmpp.pubsub.model.XmppUnsubscribe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.JID;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_ELEMENT;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_NAMESPACE;
+
+/**
+ * The main class implementing XMPP Publish/Subscribe extension.
+ * It listens to IQ stanzas and generates PubSub events based on the payload.
+ */
+@Component(immediate = true)
+@Service
+public class XmppPubSubControllerImpl implements XmppPubSubController {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(XmppPubSubControllerImpl.class);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected XmppController xmppController;
+
+ protected Set<XmppPublishEventsListener> xmppPublishEventsListeners =
+ new CopyOnWriteArraySet<XmppPublishEventsListener>();
+ protected Set<XmppSubscribeEventsListener> xmppSubscribeEventsListeners =
+ new CopyOnWriteArraySet<XmppSubscribeEventsListener>();
+
+ protected XmppIqListener iqListener = new InternalXmppIqListener();
+
+ @Activate
+ public void activate() {
+ xmppController.addXmppIqListener(iqListener, PUBSUB_NAMESPACE);
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ xmppController.removeXmppIqListener(iqListener, PUBSUB_NAMESPACE);
+ log.info("Stopped");
+ }
+
+ @Override
+ public void notify(DeviceId deviceId, XmppEventNotification eventNotification) {
+ XmppDeviceId xmppDeviceId = asXmppDeviceId(deviceId);
+ xmppController.getDevice(xmppDeviceId).sendPacket(eventNotification);
+ }
+
+ @Override
+ public void notifyError(DeviceId deviceId, XmppPubSubError error) {
+ XmppDeviceId xmppDeviceId = asXmppDeviceId(deviceId);
+ xmppController.getDevice(xmppDeviceId).sendError(error.asPacketError());
+ }
+
+ private XmppDeviceId asXmppDeviceId(DeviceId deviceId) {
+ String[] parts = deviceId.toString().split(":");
+ JID jid = new JID(parts[1]);
+ return new XmppDeviceId(jid);
+ }
+
+ @Override
+ public void addXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener) {
+ xmppPublishEventsListeners.add(xmppPublishEventsListener);
+ }
+
+ @Override
+ public void removeXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener) {
+ xmppPublishEventsListeners.remove(xmppPublishEventsListener);
+ }
+
+ @Override
+ public void addXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener) {
+ xmppSubscribeEventsListeners.add(xmppSubscribeEventsListener);
+ }
+
+ @Override
+ public void removeXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener) {
+ xmppSubscribeEventsListeners.remove(xmppSubscribeEventsListener);
+ }
+
+ private class InternalXmppIqListener implements XmppIqListener {
+ @Override
+ public void handleIqStanza(IQ iq) {
+ if (isPubSub(iq)) {
+ notifyListeners(iq);
+ }
+ }
+ }
+
+ private void notifyListeners(IQ iq) {
+ XmppPubSubConstants.Method method = getMethod(iq);
+ checkNotNull(method);
+ switch (method) {
+ case SUBSCRIBE:
+ XmppSubscribe subscribe = new XmppSubscribe(iq);
+ notifyXmppSubscribe(subscribe);
+ break;
+ case UNSUBSCRIBE:
+ XmppUnsubscribe unsubscribe = new XmppUnsubscribe(iq);
+ notifyXmppUnsubscribe(unsubscribe);
+ break;
+ case PUBLISH:
+ XmppPublish publish = new XmppPublish(iq);
+ notifyXmppPublish(publish);
+ break;
+ case RETRACT:
+ XmppRetract retract = new XmppRetract(iq);
+ notifyXmppRetract(retract);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void notifyXmppRetract(XmppRetract retractEvent) {
+ for (XmppPublishEventsListener listener : xmppPublishEventsListeners) {
+ listener.handleRetract(retractEvent);
+ }
+ }
+
+ private void notifyXmppPublish(XmppPublish publishEvent) {
+ for (XmppPublishEventsListener listener : xmppPublishEventsListeners) {
+ listener.handlePublish(publishEvent);
+ }
+ }
+
+ private void notifyXmppUnsubscribe(XmppUnsubscribe unsubscribeEvent) {
+ for (XmppSubscribeEventsListener listener : xmppSubscribeEventsListeners) {
+ listener.handleUnsubscribe(unsubscribeEvent);
+ }
+ }
+
+ private void notifyXmppSubscribe(XmppSubscribe subscribeEvent) {
+ for (XmppSubscribeEventsListener listener : xmppSubscribeEventsListeners) {
+ listener.handleSubscribe(subscribeEvent);
+ }
+ }
+
+ private boolean isPubSub(IQ iq) {
+ Element pubsub = iq.getElement().element(PUBSUB_ELEMENT);
+ if (pubsub != null && pubsub.getNamespaceURI().equals(PUBSUB_NAMESPACE)) {
+ return true;
+ }
+ return false;
+ }
+
+ public static XmppPubSubConstants.Method getMethod(IQ iq) {
+ Element pubsubElement = iq.getChildElement();
+ Element methodElement = getChildElement(pubsubElement);
+ String name = methodElement.getName();
+ switch (name) {
+ case "subscribe":
+ return XmppPubSubConstants.Method.SUBSCRIBE;
+ case "unsubscribe":
+ return XmppPubSubConstants.Method.UNSUBSCRIBE;
+ case "publish":
+ return XmppPubSubConstants.Method.PUBLISH;
+ case "retract":
+ return XmppPubSubConstants.Method.RETRACT;
+ default:
+ break;
+ }
+ return null;
+ }
+
+ public static Element getChildElement(Element element) {
+ Element child = (Element) element.elements().get(0); // the first element is related to pubsub operation
+ return child;
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/package-info.java b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/package-info.java
new file mode 100644
index 0000000..ad13be7
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP Publish/Subscribe elements.
+ */
+package org.onosproject.xmpp.pubsub.ctl;
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/ctl/src/test/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerTest.java b/protocols/xmpp/pubsub/ctl/src/test/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerTest.java
new file mode 100644
index 0000000..28f799d
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/src/test/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerTest.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.ctl;
+
+import com.google.common.collect.Lists;
+import org.dom4j.Document;
+import org.dom4j.Element;
+import org.dom4j.Namespace;
+import org.dom4j.tree.DefaultElement;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.net.DeviceId;
+import org.onosproject.xmpp.core.XmppController;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppDeviceListener;
+import org.onosproject.xmpp.core.XmppIqListener;
+import org.onosproject.xmpp.core.XmppMessageListener;
+import org.onosproject.xmpp.core.XmppPresenceListener;
+import org.onosproject.xmpp.core.XmppSession;
+import org.onosproject.xmpp.pubsub.XmppPubSubConstants;
+import org.onosproject.xmpp.pubsub.XmppPublishEventsListener;
+import org.onosproject.xmpp.pubsub.XmppSubscribeEventsListener;
+import org.onosproject.xmpp.pubsub.model.XmppEventNotification;
+import org.onosproject.xmpp.pubsub.model.XmppPubSubError;
+import org.onosproject.xmpp.pubsub.model.XmppPublish;
+import org.onosproject.xmpp.pubsub.model.XmppRetract;
+import org.onosproject.xmpp.pubsub.model.XmppSubscribe;
+import org.onosproject.xmpp.pubsub.model.XmppUnsubscribe;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.PacketError;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PubSubApplicationCondition.ITEM_NOT_FOUND;
+
+/**
+ * Test class for XmppPubSubController class.
+ */
+public class XmppPubSubControllerTest {
+
+ private String nodeAttribute = "test";
+ private String node = "node";
+ private String toJid = "xmpp@onosproject.org";
+ private String fromJid = "test@xmpp.org";
+ private String pubSub = "pubsub";
+ private String publish = "publish";
+ private String retract = "retract";
+ private String subscribe = "subscribe";
+ private String unsubscribe = "unsubscribe";
+ private String item = "item";
+ private String id = "id";
+ private String itemId = "id-000";
+ private String entry = "entry";
+ private String testNamespace = "jabber:test:item";
+
+
+ XmppPubSubControllerImpl pubSubController;
+ XmppControllerAdapter xmppControllerAdapter;
+ XmppDeviceAdapter testDevice;
+
+ TestXmppPublishEventsListener testXmppPublishEventsListener;
+ TestXmppSubscribeEventsListener testXmppSubscribeEventsListener;
+
+ static class TestXmppPublishEventsListener implements XmppPublishEventsListener {
+
+ final List<XmppPublish> handledPublishMsgs = Lists.newArrayList();
+ final List<XmppRetract> handledRetractMsgs = Lists.newArrayList();
+
+ @Override
+ public void handlePublish(XmppPublish publishEvent) {
+ handledPublishMsgs.add(publishEvent);
+ }
+
+ @Override
+ public void handleRetract(XmppRetract retractEvent) {
+ handledRetractMsgs.add(retractEvent);
+ }
+ }
+
+ static class TestXmppSubscribeEventsListener implements XmppSubscribeEventsListener {
+
+ final List<XmppSubscribe> handledSubscribeMsgs = Lists.newArrayList();
+ final List<XmppUnsubscribe> handledUnsubscribeMsgs = Lists.newArrayList();
+
+
+ @Override
+ public void handleSubscribe(XmppSubscribe subscribeEvent) {
+ handledSubscribeMsgs.add(subscribeEvent);
+ }
+
+ @Override
+ public void handleUnsubscribe(XmppUnsubscribe unsubscribeEvent) {
+ handledUnsubscribeMsgs.add(unsubscribeEvent);
+ }
+ }
+
+ @Before
+ public void setUp() {
+ testDevice = new XmppDeviceAdapter();
+ xmppControllerAdapter = new XmppControllerAdapter();
+ pubSubController = new XmppPubSubControllerImpl();
+ pubSubController.xmppController = xmppControllerAdapter;
+ testXmppPublishEventsListener = new TestXmppPublishEventsListener();
+ testXmppSubscribeEventsListener = new TestXmppSubscribeEventsListener();
+ pubSubController.activate();
+ }
+
+ @Test
+ public void testActivate() {
+ assertThat(xmppControllerAdapter.iqListener, is(notNullValue()));
+ }
+
+ @Test
+ public void testDeactivate() {
+ pubSubController.deactivate();
+ assertThat(xmppControllerAdapter.iqListener, is(nullValue()));
+ }
+
+ @Test
+ public void testAddRemoveListeners() {
+ pubSubController.addXmppPublishEventsListener(testXmppPublishEventsListener);
+ assertThat(pubSubController.xmppPublishEventsListeners.size(), is(1));
+ pubSubController.addXmppSubscribeEventsListener(testXmppSubscribeEventsListener);
+ assertThat(pubSubController.xmppSubscribeEventsListeners.size(), is(1));
+ pubSubController.removeXmppPublishEventsListener(testXmppPublishEventsListener);
+ assertThat(pubSubController.xmppPublishEventsListeners.size(), is(0));
+ pubSubController.removeXmppSubscribeEventsListener(testXmppSubscribeEventsListener);
+ assertThat(pubSubController.xmppSubscribeEventsListeners.size(), is(0));
+ }
+
+ @Test
+ public void testNotifyEvent() {
+ XmppEventNotification eventNotification = new XmppEventNotification(nodeAttribute,
+ new DefaultElement(nodeAttribute));
+ pubSubController.notify(DeviceId.NONE, eventNotification);
+ assertThat(testDevice.sentPackets.size(), is(1));
+ assertThat(testDevice.sentPackets.get(0), is(eventNotification));
+ }
+
+ @Test
+ public void testNotifyError() {
+ XmppPubSubError xmppPubSubError =
+ new XmppPubSubError(ITEM_NOT_FOUND);
+ pubSubController.notifyError(DeviceId.NONE, xmppPubSubError);
+ assertThat(testDevice.sentErrors.size(), is(1));
+ }
+
+ @Test
+ public void testHandlePubSubMessages() {
+ pubSubController.addXmppPublishEventsListener(testXmppPublishEventsListener);
+ pubSubController.addXmppSubscribeEventsListener(testXmppSubscribeEventsListener);
+ XmppSubscribe xmppSubscribe = buildXmppSubscribe();
+ xmppControllerAdapter.iqListener.handleIqStanza(xmppSubscribe);
+ assertThat(testXmppSubscribeEventsListener.handledSubscribeMsgs.size(), is(1));
+ XmppUnsubscribe xmppUnsubscribe = buildXmppUnsubscribe();
+ xmppControllerAdapter.iqListener.handleIqStanza(xmppUnsubscribe);
+ assertThat(testXmppSubscribeEventsListener.handledUnsubscribeMsgs.size(), is(1));
+ XmppPublish xmppPublish = buildXmppPublish();
+ xmppControllerAdapter.iqListener.handleIqStanza(xmppPublish);
+ assertThat(testXmppPublishEventsListener.handledPublishMsgs.size(), is(1));
+ XmppRetract xmppRetract = buildXmppRetract();
+ xmppControllerAdapter.iqListener.handleIqStanza(xmppRetract);
+ assertThat(testXmppPublishEventsListener.handledRetractMsgs.size(), is(1));
+ }
+
+ private XmppSubscribe buildXmppSubscribe() {
+ IQ iq = new IQ(IQ.Type.set);
+ iq.setTo(toJid);
+ iq.setFrom(fromJid);
+ Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+ Element childElement = new DefaultElement(subscribe);
+ childElement.addAttribute(node, nodeAttribute);
+ element.add(childElement);
+ iq.setChildElement(element);
+ XmppSubscribe xmppSubscribe = new XmppSubscribe(iq);
+ return xmppSubscribe;
+ }
+
+ private XmppUnsubscribe buildXmppUnsubscribe() {
+ IQ iq = new IQ(IQ.Type.set);
+ iq.setTo(toJid);
+ iq.setFrom(fromJid);
+ Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+ Element childElement = new DefaultElement(unsubscribe);
+ childElement.addAttribute(node, nodeAttribute);
+ element.add(childElement);
+ iq.setChildElement(element);
+ XmppUnsubscribe xmppUnsubscribe = new XmppUnsubscribe(iq);
+ return xmppUnsubscribe;
+ }
+
+ private XmppPublish buildXmppPublish() {
+ IQ iq = new IQ(IQ.Type.set);
+ iq.setTo(toJid);
+ iq.setFrom(fromJid);
+ Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+ Element publishElement = new DefaultElement(publish).addAttribute(node, nodeAttribute);
+ Element itemElement = new DefaultElement(item).addAttribute(id, itemId);
+ Element entryElement = new DefaultElement(entry, Namespace.get(testNamespace));
+ itemElement.add(entryElement);
+ publishElement.add(itemElement);
+ element.add(publishElement);
+ iq.setChildElement(element);
+ XmppPublish xmppPublish = new XmppPublish(iq);
+ return xmppPublish;
+ }
+
+ private XmppRetract buildXmppRetract() {
+ IQ iq = new IQ(IQ.Type.set);
+ iq.setTo(toJid);
+ iq.setFrom(fromJid);
+ Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+ Element retractElement = new DefaultElement(retract).addAttribute(node, nodeAttribute);
+ Element itemElement = new DefaultElement(item).addAttribute(id, itemId);
+ retractElement.add(itemElement);
+ element.add(retractElement);
+ iq.setChildElement(element);
+ XmppRetract xmppRetract = new XmppRetract(iq);
+ return xmppRetract;
+ }
+
+
+ private class XmppControllerAdapter implements XmppController {
+
+ XmppIqListener iqListener;
+
+ @Override
+ public XmppDevice getDevice(XmppDeviceId xmppDeviceId) {
+ return testDevice;
+ }
+
+ @Override
+ public void addXmppDeviceListener(XmppDeviceListener deviceListener) {
+
+ }
+
+ @Override
+ public void removeXmppDeviceListener(XmppDeviceListener deviceListener) {
+
+ }
+
+ @Override
+ public void addXmppIqListener(XmppIqListener iqListener, String namespace) {
+ this.iqListener = iqListener;
+ }
+
+ @Override
+ public void removeXmppIqListener(XmppIqListener iqListener, String namespace) {
+ this.iqListener = null;
+ }
+
+ @Override
+ public void addXmppMessageListener(XmppMessageListener messageListener) {
+
+ }
+
+ @Override
+ public void removeXmppMessageListener(XmppMessageListener messageListener) {
+
+ }
+
+ @Override
+ public void addXmppPresenceListener(XmppPresenceListener presenceListener) {
+
+ }
+
+ @Override
+ public void removeXmppPresenceListener(XmppPresenceListener presenceListener) {
+
+ }
+ }
+
+ private class XmppDeviceAdapter implements XmppDevice {
+
+ final List<Packet> sentPackets = Lists.newArrayList();
+ final List<PacketError> sentErrors = Lists.newArrayList();
+
+ @Override
+ public XmppSession getSession() {
+ return null;
+ }
+
+ @Override
+ public InetSocketAddress getIpAddress() {
+ return null;
+ }
+
+ @Override
+ public void registerConnectedDevice() {
+
+ }
+
+ @Override
+ public void disconnectDevice() {
+
+ }
+
+ @Override
+ public void sendPacket(Packet packet) {
+ sentPackets.add(packet);
+ }
+
+ @Override
+ public void writeRawXml(Document document) {
+
+ }
+
+ @Override
+ public void handlePacket(Packet packet) {
+
+ }
+
+ @Override
+ public void sendError(PacketError packetError) {
+ sentErrors.add(packetError);
+ }
+
+ }
+
+}
diff --git a/protocols/xmpp/pubsub/pom.xml b/protocols/xmpp/pubsub/pom.xml
new file mode 100644
index 0000000..4024009
--- /dev/null
+++ b/protocols/xmpp/pubsub/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>onos-protocols-xmpp</artifactId>
+ <groupId>org.onosproject</groupId>
+ <version>1.13.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>onos-protocols-xmpp-pubsub</artifactId>
+ <packaging>pom</packaging>
+ <modules>
+ <module>api</module>
+ <module>ctl</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.igniterealtime</groupId>
+ <artifactId>tinder</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file