Initial implementation of XMPP Publish/Subscribe
Change-Id: I9930056b83004fa7a085f72f63ba973f9ec2d95b
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
index 8b500a3..a16216d 100644
--- a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
@@ -46,18 +46,20 @@
void removeXmppDeviceListener(XmppDeviceListener deviceListener);
/**
- * Register a listener for IQ stanza of XMPP protocol.
+ * Register a listener for IQ stanzas containing specific XML namespace.
*
* @param iqListener the listener to notify
+ * @param namespace the XML namespace to observe
*/
- void addXmppIqListener(XmppIqListener iqListener);
+ void addXmppIqListener(XmppIqListener iqListener, String namespace);
/**
- * Unregister a listener for IQ stanza of XMPP protocol.
+ * Unregister a listener for IQ stanzas containing specific XML namespace.
*
* @param iqListener the listener to unregister
+ * @param namespace the XML namespace to observe
*/
- void removeXmppIqListener(XmppIqListener iqListener);
+ void removeXmppIqListener(XmppIqListener iqListener, String namespace);
/**
* Register a listener for Message stanza of XMPP protocol.
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
index 228aa47..44db54d 100644
--- a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
@@ -16,11 +16,14 @@
package org.onosproject.xmpp.core.ctl;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
-
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
@@ -83,7 +86,7 @@
// listener declaration
protected Set<XmppDeviceListener> xmppDeviceListeners = new CopyOnWriteArraySet<XmppDeviceListener>();
- protected Set<XmppIqListener> xmppIqListeners = new CopyOnWriteArraySet<XmppIqListener>();
+ Multimap<String, XmppIqListener> xmppIqListeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());
protected Set<XmppMessageListener> xmppMessageListeners = new CopyOnWriteArraySet<XmppMessageListener>();
protected Set<XmppPresenceListener> xmppPresenceListeners = new CopyOnWriteArraySet<XmppPresenceListener>();
@@ -97,12 +100,19 @@
@Activate
public void activate(ComponentContext context) {
- log.info("XmppControllerImpl started.");
coreService.registerApplication(APP_ID, this::cleanup);
cfgService.registerProperties(getClass());
deviceFactory.init(agent);
xmppServer.setConfiguration(context.getProperties());
xmppServer.start(deviceFactory);
+ log.info("XmppControllerImpl started.");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ xmppServer.stop();
+ xmppServer.setConfiguration(context.getProperties());
+ xmppServer.start(deviceFactory);
}
@Deactivate
@@ -135,13 +145,13 @@
}
@Override
- public void addXmppIqListener(XmppIqListener iqListener) {
- xmppIqListeners.add(iqListener);
+ public void addXmppIqListener(XmppIqListener iqListener, String namespace) {
+ xmppIqListeners.put(namespace, iqListener);
}
@Override
- public void removeXmppIqListener(XmppIqListener iqListener) {
- xmppIqListeners.remove(iqListener);
+ public void removeXmppIqListener(XmppIqListener iqListener, String namespace) {
+ xmppIqListeners.remove(namespace, iqListener);
}
@Override
@@ -199,19 +209,33 @@
@Override
public void processUpstreamEvent(XmppDeviceId deviceId, Packet packet) {
if (packet instanceof IQ) {
- for (XmppIqListener iqListener : xmppIqListeners) {
- iqListener.handleIqStanza((IQ) packet);
- }
+ IQ iq = (IQ) packet;
+ String namespace = iq.getChildElement().getNamespace().getURI();
+ notifyIqListeners(iq, namespace);
}
if (packet instanceof Message) {
- for (XmppMessageListener messageListener : xmppMessageListeners) {
- messageListener.handleMessageStanza((Message) packet);
- }
+ notifyMessageListeners((Message) packet);
}
if (packet instanceof Presence) {
- for (XmppPresenceListener presenceListener : xmppPresenceListeners) {
- presenceListener.handlePresenceStanza((Presence) packet);
- }
+ notifyPresenceListeners((Presence) packet);
+ }
+ }
+
+ private void notifyPresenceListeners(Presence packet) {
+ for (XmppPresenceListener presenceListener : xmppPresenceListeners) {
+ presenceListener.handlePresenceStanza((Presence) packet);
+ }
+ }
+
+ private void notifyMessageListeners(Message message) {
+ for (XmppMessageListener messageListener : xmppMessageListeners) {
+ messageListener.handleMessageStanza(message);
+ }
+ }
+
+ private void notifyIqListeners(IQ iq, String namespace) {
+ for (XmppIqListener iqListener : xmppIqListeners.get(namespace)) {
+ iqListener.handleIqStanza(iq);
}
}
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
index 3913d64..2bc96fd 100644
--- a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
@@ -108,7 +108,6 @@
}
private void configureBootstrap(ServerBootstrap bootstrap) {
- bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.SO_RCVBUF, 2048);
}
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
index fb19ee1..75780e5 100644
--- a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
@@ -22,6 +22,9 @@
import java.util.Hashtable;
import com.google.common.collect.ImmutableSet;
+import org.dom4j.Element;
+import org.dom4j.Namespace;
+import org.dom4j.tree.DefaultElement;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
@@ -69,6 +72,8 @@
XmppDevice device3;
XmppDeviceId jid3;
+ final String testNamespace = "testns";
+
/**
* Test harness for a device listener.
*/
@@ -133,7 +138,7 @@
testXmppDeviceListener = new TestXmppDeviceListener();
controller.addXmppDeviceListener(testXmppDeviceListener);
testXmppIqListener = new TestXmppIqListener();
- controller.addXmppIqListener(testXmppIqListener);
+ controller.addXmppIqListener(testXmppIqListener, testNamespace);
testXmppMessageListener = new TestXmppMessageListener();
controller.addXmppMessageListener(testXmppMessageListener);
testXmppPresenceListener = new TestXmppPresenceListener();
@@ -166,7 +171,7 @@
@After
public void tearDown() {
controller.removeXmppDeviceListener(testXmppDeviceListener);
- controller.removeXmppIqListener(testXmppIqListener);
+ controller.removeXmppIqListener(testXmppIqListener, testNamespace);
controller.removeXmppMessageListener(testXmppMessageListener);
controller.removeXmppPresenceListener(testXmppPresenceListener);
controller.deactivate();
@@ -215,7 +220,9 @@
@Test
public void handlePackets() {
// IQ packets
- Packet iq = new IQ();
+ IQ iq = new IQ();
+ Element element = new DefaultElement("pubsub", Namespace.get(testNamespace));
+ iq.setChildElement(element);
agent.processUpstreamEvent(jid1, iq);
assertThat(testXmppIqListener.handledIqs, hasSize(1));
agent.processUpstreamEvent(jid2, iq);