Initial implementation of XMPP Publish/Subscribe

Change-Id: I9930056b83004fa7a085f72f63ba973f9ec2d95b
diff --git a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
index 8b500a3..a16216d 100644
--- a/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
+++ b/protocols/xmpp/core/api/src/main/java/org/onosproject/xmpp/core/XmppController.java
@@ -46,18 +46,20 @@
     void removeXmppDeviceListener(XmppDeviceListener deviceListener);
 
     /**
-     * Register a listener for IQ stanza of XMPP protocol.
+     * Register a listener for IQ stanzas containing specific XML namespace.
      *
      * @param iqListener the listener to notify
+     * @param namespace the XML namespace to observe
      */
-    void addXmppIqListener(XmppIqListener iqListener);
+    void addXmppIqListener(XmppIqListener iqListener, String namespace);
 
     /**
-     * Unregister a listener for IQ stanza of XMPP protocol.
+     * Unregister a listener for IQ stanzas containing specific XML namespace.
      *
      * @param iqListener the listener to unregister
+     * @param namespace the XML namespace to observe
      */
-    void removeXmppIqListener(XmppIqListener iqListener);
+    void removeXmppIqListener(XmppIqListener iqListener, String namespace);
 
     /**
      * Register a listener for Message stanza of XMPP protocol.
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
index 228aa47..44db54d 100644
--- a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppControllerImpl.java
@@ -16,11 +16,14 @@
 
 package org.onosproject.xmpp.core.ctl;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Maps;
-
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
@@ -83,7 +86,7 @@
     // listener declaration
     protected Set<XmppDeviceListener> xmppDeviceListeners = new CopyOnWriteArraySet<XmppDeviceListener>();
 
-    protected Set<XmppIqListener> xmppIqListeners = new CopyOnWriteArraySet<XmppIqListener>();
+    Multimap<String, XmppIqListener> xmppIqListeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());
     protected Set<XmppMessageListener> xmppMessageListeners = new CopyOnWriteArraySet<XmppMessageListener>();
     protected Set<XmppPresenceListener> xmppPresenceListeners = new CopyOnWriteArraySet<XmppPresenceListener>();
 
@@ -97,12 +100,19 @@
 
     @Activate
     public void activate(ComponentContext context) {
-        log.info("XmppControllerImpl started.");
         coreService.registerApplication(APP_ID, this::cleanup);
         cfgService.registerProperties(getClass());
         deviceFactory.init(agent);
         xmppServer.setConfiguration(context.getProperties());
         xmppServer.start(deviceFactory);
+        log.info("XmppControllerImpl started.");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        xmppServer.stop();
+        xmppServer.setConfiguration(context.getProperties());
+        xmppServer.start(deviceFactory);
     }
 
     @Deactivate
@@ -135,13 +145,13 @@
     }
 
     @Override
-    public void addXmppIqListener(XmppIqListener iqListener) {
-        xmppIqListeners.add(iqListener);
+    public void addXmppIqListener(XmppIqListener iqListener, String namespace) {
+        xmppIqListeners.put(namespace, iqListener);
     }
 
     @Override
-    public void removeXmppIqListener(XmppIqListener iqListener) {
-        xmppIqListeners.remove(iqListener);
+    public void removeXmppIqListener(XmppIqListener iqListener, String namespace) {
+        xmppIqListeners.remove(namespace, iqListener);
     }
 
     @Override
@@ -199,19 +209,33 @@
         @Override
         public void processUpstreamEvent(XmppDeviceId deviceId, Packet packet) {
             if (packet instanceof IQ) {
-                for (XmppIqListener iqListener : xmppIqListeners) {
-                    iqListener.handleIqStanza((IQ) packet);
-                }
+                IQ iq = (IQ) packet;
+                String namespace = iq.getChildElement().getNamespace().getURI();
+                notifyIqListeners(iq, namespace);
             }
             if (packet instanceof Message) {
-                for (XmppMessageListener messageListener : xmppMessageListeners) {
-                    messageListener.handleMessageStanza((Message) packet);
-                }
+                notifyMessageListeners((Message) packet);
             }
             if (packet instanceof Presence) {
-                for (XmppPresenceListener presenceListener : xmppPresenceListeners) {
-                    presenceListener.handlePresenceStanza((Presence) packet);
-                }
+                notifyPresenceListeners((Presence) packet);
+            }
+        }
+
+        private void notifyPresenceListeners(Presence packet) {
+            for (XmppPresenceListener presenceListener : xmppPresenceListeners) {
+                presenceListener.handlePresenceStanza((Presence) packet);
+            }
+        }
+
+        private void notifyMessageListeners(Message message) {
+            for (XmppMessageListener messageListener : xmppMessageListeners) {
+                messageListener.handleMessageStanza(message);
+            }
+        }
+
+        private void notifyIqListeners(IQ iq, String namespace) {
+            for (XmppIqListener iqListener : xmppIqListeners.get(namespace)) {
+                iqListener.handleIqStanza(iq);
             }
         }
 
diff --git a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
index 3913d64..2bc96fd 100644
--- a/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
+++ b/protocols/xmpp/core/ctl/src/main/java/org/onosproject/xmpp/core/ctl/XmppServer.java
@@ -108,7 +108,6 @@
     }
 
     private void configureBootstrap(ServerBootstrap bootstrap) {
-        bootstrap.option(ChannelOption.TCP_NODELAY, true);
         bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         bootstrap.option(ChannelOption.SO_RCVBUF, 2048);
     }
diff --git a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
index fb19ee1..75780e5 100644
--- a/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
+++ b/protocols/xmpp/core/ctl/src/test/java/org/onosproject/xmpp/core/ctl/XmppControllerImplTest.java
@@ -22,6 +22,9 @@
 import java.util.Hashtable;
 
 import com.google.common.collect.ImmutableSet;
+import org.dom4j.Element;
+import org.dom4j.Namespace;
+import org.dom4j.tree.DefaultElement;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
@@ -69,6 +72,8 @@
     XmppDevice device3;
     XmppDeviceId jid3;
 
+    final String testNamespace = "testns";
+
     /**
      * Test harness for a device listener.
      */
@@ -133,7 +138,7 @@
         testXmppDeviceListener = new TestXmppDeviceListener();
         controller.addXmppDeviceListener(testXmppDeviceListener);
         testXmppIqListener = new TestXmppIqListener();
-        controller.addXmppIqListener(testXmppIqListener);
+        controller.addXmppIqListener(testXmppIqListener, testNamespace);
         testXmppMessageListener = new TestXmppMessageListener();
         controller.addXmppMessageListener(testXmppMessageListener);
         testXmppPresenceListener = new TestXmppPresenceListener();
@@ -166,7 +171,7 @@
     @After
     public void tearDown() {
         controller.removeXmppDeviceListener(testXmppDeviceListener);
-        controller.removeXmppIqListener(testXmppIqListener);
+        controller.removeXmppIqListener(testXmppIqListener, testNamespace);
         controller.removeXmppMessageListener(testXmppMessageListener);
         controller.removeXmppPresenceListener(testXmppPresenceListener);
         controller.deactivate();
@@ -215,7 +220,9 @@
     @Test
     public void handlePackets() {
         // IQ packets
-        Packet iq = new IQ();
+        IQ iq = new IQ();
+        Element element = new DefaultElement("pubsub", Namespace.get(testNamespace));
+        iq.setChildElement(element);
         agent.processUpstreamEvent(jid1, iq);
         assertThat(testXmppIqListener.handledIqs, hasSize(1));
         agent.processUpstreamEvent(jid2, iq);