Initial implementation of XMPP Publish/Subscribe

Change-Id: I9930056b83004fa7a085f72f63ba973f9ec2d95b
diff --git a/protocols/xmpp/pubsub/ctl/BUCK b/protocols/xmpp/pubsub/ctl/BUCK
new file mode 100644
index 0000000..e28ec52
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/BUCK
@@ -0,0 +1,19 @@
+COMPILE_DEPS = [
+    '//lib:CORE_DEPS',
+    '//core/api:onos-api',
+    '//lib:tinder-xmpp',
+    '//lib:concurrent-hashmap',
+    '//lib:gnu-idn',
+    '//protocols/xmpp/pubsub/api:onos-protocols-xmpp-pubsub-api',
+    '//protocols/xmpp/core/api:onos-protocols-xmpp-core-api',
+]
+
+TEST_DEPS = [
+    '//lib:TEST',
+    '//core/api:onos-api-tests',
+]
+
+osgi_jar_with_tests (
+    deps = COMPILE_DEPS,
+    test_deps = TEST_DEPS,
+)
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/ctl/pom.xml b/protocols/xmpp/pubsub/ctl/pom.xml
new file mode 100644
index 0000000..d3d23fa
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2018-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>onos-protocols-xmpp-pubsub</artifactId>
+        <groupId>org.onosproject</groupId>
+        <version>1.13.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>onos-protocols-xmpp-pubsub-ctl</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-protocols-xmpp-pubsub-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-protocols-xmpp-core-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerImpl.java b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerImpl.java
new file mode 100644
index 0000000..cb9900d
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.ctl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.dom4j.Element;
+import org.onosproject.net.DeviceId;
+import org.onosproject.xmpp.core.XmppController;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppIqListener;
+import org.onosproject.xmpp.pubsub.XmppPubSubConstants;
+import org.onosproject.xmpp.pubsub.XmppPubSubController;
+import org.onosproject.xmpp.pubsub.XmppPublishEventsListener;
+import org.onosproject.xmpp.pubsub.XmppSubscribeEventsListener;
+import org.onosproject.xmpp.pubsub.model.XmppEventNotification;
+import org.onosproject.xmpp.pubsub.model.XmppPubSubError;
+import org.onosproject.xmpp.pubsub.model.XmppPublish;
+import org.onosproject.xmpp.pubsub.model.XmppRetract;
+import org.onosproject.xmpp.pubsub.model.XmppSubscribe;
+import org.onosproject.xmpp.pubsub.model.XmppUnsubscribe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.JID;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_ELEMENT;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_NAMESPACE;
+
+/**
+ * The main class implementing XMPP Publish/Subscribe extension.
+ * It listens to IQ stanzas and generates PubSub events based on the payload.
+ */
+@Component(immediate = true)
+@Service
+public class XmppPubSubControllerImpl implements XmppPubSubController {
+
+    private static final Logger log =
+            LoggerFactory.getLogger(XmppPubSubControllerImpl.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected XmppController xmppController;
+
+    protected Set<XmppPublishEventsListener> xmppPublishEventsListeners =
+            new CopyOnWriteArraySet<XmppPublishEventsListener>();
+    protected Set<XmppSubscribeEventsListener> xmppSubscribeEventsListeners =
+            new CopyOnWriteArraySet<XmppSubscribeEventsListener>();
+
+    protected XmppIqListener iqListener = new InternalXmppIqListener();
+
+    @Activate
+    public void activate() {
+        xmppController.addXmppIqListener(iqListener, PUBSUB_NAMESPACE);
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        xmppController.removeXmppIqListener(iqListener, PUBSUB_NAMESPACE);
+        log.info("Stopped");
+    }
+
+    @Override
+    public void notify(DeviceId deviceId, XmppEventNotification eventNotification) {
+        XmppDeviceId xmppDeviceId = asXmppDeviceId(deviceId);
+        xmppController.getDevice(xmppDeviceId).sendPacket(eventNotification);
+    }
+
+    @Override
+    public void notifyError(DeviceId deviceId, XmppPubSubError error) {
+        XmppDeviceId xmppDeviceId = asXmppDeviceId(deviceId);
+        xmppController.getDevice(xmppDeviceId).sendError(error.asPacketError());
+    }
+
+    private XmppDeviceId asXmppDeviceId(DeviceId deviceId) {
+        String[] parts = deviceId.toString().split(":");
+        JID jid = new JID(parts[1]);
+        return new XmppDeviceId(jid);
+    }
+
+    @Override
+    public void addXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener) {
+        xmppPublishEventsListeners.add(xmppPublishEventsListener);
+    }
+
+    @Override
+    public void removeXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener) {
+        xmppPublishEventsListeners.remove(xmppPublishEventsListener);
+    }
+
+    @Override
+    public void addXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener) {
+        xmppSubscribeEventsListeners.add(xmppSubscribeEventsListener);
+    }
+
+    @Override
+    public void removeXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener) {
+        xmppSubscribeEventsListeners.remove(xmppSubscribeEventsListener);
+    }
+
+    private class InternalXmppIqListener implements XmppIqListener {
+        @Override
+        public void handleIqStanza(IQ iq) {
+            if (isPubSub(iq)) {
+                notifyListeners(iq);
+            }
+        }
+    }
+
+    private void notifyListeners(IQ iq) {
+        XmppPubSubConstants.Method method = getMethod(iq);
+        checkNotNull(method);
+        switch (method) {
+            case SUBSCRIBE:
+                XmppSubscribe subscribe = new XmppSubscribe(iq);
+                notifyXmppSubscribe(subscribe);
+                break;
+            case UNSUBSCRIBE:
+                XmppUnsubscribe unsubscribe = new XmppUnsubscribe(iq);
+                notifyXmppUnsubscribe(unsubscribe);
+                break;
+            case PUBLISH:
+                XmppPublish publish = new XmppPublish(iq);
+                notifyXmppPublish(publish);
+                break;
+            case RETRACT:
+                XmppRetract retract = new XmppRetract(iq);
+                notifyXmppRetract(retract);
+                break;
+            default:
+                break;
+        }
+    }
+
+    private void notifyXmppRetract(XmppRetract retractEvent) {
+        for (XmppPublishEventsListener listener : xmppPublishEventsListeners) {
+            listener.handleRetract(retractEvent);
+        }
+    }
+
+    private void notifyXmppPublish(XmppPublish publishEvent) {
+        for (XmppPublishEventsListener listener : xmppPublishEventsListeners) {
+            listener.handlePublish(publishEvent);
+        }
+    }
+
+    private void notifyXmppUnsubscribe(XmppUnsubscribe unsubscribeEvent) {
+        for (XmppSubscribeEventsListener listener : xmppSubscribeEventsListeners) {
+            listener.handleUnsubscribe(unsubscribeEvent);
+        }
+    }
+
+    private void notifyXmppSubscribe(XmppSubscribe subscribeEvent) {
+        for (XmppSubscribeEventsListener listener : xmppSubscribeEventsListeners) {
+            listener.handleSubscribe(subscribeEvent);
+        }
+    }
+
+    private boolean isPubSub(IQ iq) {
+        Element pubsub = iq.getElement().element(PUBSUB_ELEMENT);
+        if (pubsub != null && pubsub.getNamespaceURI().equals(PUBSUB_NAMESPACE)) {
+            return true;
+        }
+        return false;
+    }
+
+    public static XmppPubSubConstants.Method getMethod(IQ iq) {
+        Element pubsubElement = iq.getChildElement();
+        Element methodElement = getChildElement(pubsubElement);
+        String name = methodElement.getName();
+        switch (name) {
+            case "subscribe":
+                return XmppPubSubConstants.Method.SUBSCRIBE;
+            case "unsubscribe":
+                return XmppPubSubConstants.Method.UNSUBSCRIBE;
+            case "publish":
+                return XmppPubSubConstants.Method.PUBLISH;
+            case "retract":
+                return XmppPubSubConstants.Method.RETRACT;
+            default:
+                break;
+        }
+        return null;
+    }
+
+    public static Element getChildElement(Element element) {
+        Element child = (Element) element.elements().get(0); // the first element is related to pubsub operation
+        return child;
+    }
+
+}
diff --git a/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/package-info.java b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/package-info.java
new file mode 100644
index 0000000..ad13be7
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/src/main/java/org/onosproject/xmpp/pubsub/ctl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for implementation of XMPP Publish/Subscribe elements.
+ */
+package org.onosproject.xmpp.pubsub.ctl;
\ No newline at end of file
diff --git a/protocols/xmpp/pubsub/ctl/src/test/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerTest.java b/protocols/xmpp/pubsub/ctl/src/test/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerTest.java
new file mode 100644
index 0000000..28f799d
--- /dev/null
+++ b/protocols/xmpp/pubsub/ctl/src/test/java/org/onosproject/xmpp/pubsub/ctl/XmppPubSubControllerTest.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.xmpp.pubsub.ctl;
+
+import com.google.common.collect.Lists;
+import org.dom4j.Document;
+import org.dom4j.Element;
+import org.dom4j.Namespace;
+import org.dom4j.tree.DefaultElement;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.net.DeviceId;
+import org.onosproject.xmpp.core.XmppController;
+import org.onosproject.xmpp.core.XmppDevice;
+import org.onosproject.xmpp.core.XmppDeviceId;
+import org.onosproject.xmpp.core.XmppDeviceListener;
+import org.onosproject.xmpp.core.XmppIqListener;
+import org.onosproject.xmpp.core.XmppMessageListener;
+import org.onosproject.xmpp.core.XmppPresenceListener;
+import org.onosproject.xmpp.core.XmppSession;
+import org.onosproject.xmpp.pubsub.XmppPubSubConstants;
+import org.onosproject.xmpp.pubsub.XmppPublishEventsListener;
+import org.onosproject.xmpp.pubsub.XmppSubscribeEventsListener;
+import org.onosproject.xmpp.pubsub.model.XmppEventNotification;
+import org.onosproject.xmpp.pubsub.model.XmppPubSubError;
+import org.onosproject.xmpp.pubsub.model.XmppPublish;
+import org.onosproject.xmpp.pubsub.model.XmppRetract;
+import org.onosproject.xmpp.pubsub.model.XmppSubscribe;
+import org.onosproject.xmpp.pubsub.model.XmppUnsubscribe;
+import org.xmpp.packet.IQ;
+import org.xmpp.packet.Packet;
+import org.xmpp.packet.PacketError;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PubSubApplicationCondition.ITEM_NOT_FOUND;
+
+/**
+ * Test class for XmppPubSubController class.
+ */
+public class XmppPubSubControllerTest {
+
+    private String nodeAttribute = "test";
+    private String node = "node";
+    private String toJid = "xmpp@onosproject.org";
+    private String fromJid = "test@xmpp.org";
+    private String pubSub = "pubsub";
+    private String publish = "publish";
+    private String retract = "retract";
+    private String subscribe = "subscribe";
+    private String unsubscribe = "unsubscribe";
+    private String item = "item";
+    private String id = "id";
+    private String itemId = "id-000";
+    private String entry = "entry";
+    private String testNamespace = "jabber:test:item";
+
+
+    XmppPubSubControllerImpl pubSubController;
+    XmppControllerAdapter xmppControllerAdapter;
+    XmppDeviceAdapter testDevice;
+
+    TestXmppPublishEventsListener testXmppPublishEventsListener;
+    TestXmppSubscribeEventsListener testXmppSubscribeEventsListener;
+
+    static class TestXmppPublishEventsListener implements XmppPublishEventsListener {
+
+        final List<XmppPublish> handledPublishMsgs = Lists.newArrayList();
+        final List<XmppRetract> handledRetractMsgs = Lists.newArrayList();
+
+        @Override
+        public void handlePublish(XmppPublish publishEvent) {
+            handledPublishMsgs.add(publishEvent);
+        }
+
+        @Override
+        public void handleRetract(XmppRetract retractEvent) {
+            handledRetractMsgs.add(retractEvent);
+        }
+    }
+
+    static class TestXmppSubscribeEventsListener implements XmppSubscribeEventsListener {
+
+        final List<XmppSubscribe> handledSubscribeMsgs = Lists.newArrayList();
+        final List<XmppUnsubscribe> handledUnsubscribeMsgs = Lists.newArrayList();
+
+
+        @Override
+        public void handleSubscribe(XmppSubscribe subscribeEvent) {
+            handledSubscribeMsgs.add(subscribeEvent);
+        }
+
+        @Override
+        public void handleUnsubscribe(XmppUnsubscribe unsubscribeEvent) {
+            handledUnsubscribeMsgs.add(unsubscribeEvent);
+        }
+    }
+
+    @Before
+    public void setUp() {
+        testDevice = new XmppDeviceAdapter();
+        xmppControllerAdapter = new XmppControllerAdapter();
+        pubSubController = new XmppPubSubControllerImpl();
+        pubSubController.xmppController = xmppControllerAdapter;
+        testXmppPublishEventsListener = new TestXmppPublishEventsListener();
+        testXmppSubscribeEventsListener = new TestXmppSubscribeEventsListener();
+        pubSubController.activate();
+    }
+
+    @Test
+    public void testActivate() {
+        assertThat(xmppControllerAdapter.iqListener, is(notNullValue()));
+    }
+
+    @Test
+    public void testDeactivate() {
+        pubSubController.deactivate();
+        assertThat(xmppControllerAdapter.iqListener, is(nullValue()));
+    }
+
+    @Test
+    public void testAddRemoveListeners() {
+        pubSubController.addXmppPublishEventsListener(testXmppPublishEventsListener);
+        assertThat(pubSubController.xmppPublishEventsListeners.size(), is(1));
+        pubSubController.addXmppSubscribeEventsListener(testXmppSubscribeEventsListener);
+        assertThat(pubSubController.xmppSubscribeEventsListeners.size(), is(1));
+        pubSubController.removeXmppPublishEventsListener(testXmppPublishEventsListener);
+        assertThat(pubSubController.xmppPublishEventsListeners.size(), is(0));
+        pubSubController.removeXmppSubscribeEventsListener(testXmppSubscribeEventsListener);
+        assertThat(pubSubController.xmppSubscribeEventsListeners.size(), is(0));
+    }
+
+    @Test
+    public void testNotifyEvent() {
+        XmppEventNotification eventNotification = new XmppEventNotification(nodeAttribute,
+                                                                            new DefaultElement(nodeAttribute));
+        pubSubController.notify(DeviceId.NONE, eventNotification);
+        assertThat(testDevice.sentPackets.size(), is(1));
+        assertThat(testDevice.sentPackets.get(0), is(eventNotification));
+    }
+
+    @Test
+    public void testNotifyError() {
+        XmppPubSubError xmppPubSubError =
+                new XmppPubSubError(ITEM_NOT_FOUND);
+        pubSubController.notifyError(DeviceId.NONE, xmppPubSubError);
+        assertThat(testDevice.sentErrors.size(), is(1));
+    }
+
+    @Test
+    public void testHandlePubSubMessages() {
+        pubSubController.addXmppPublishEventsListener(testXmppPublishEventsListener);
+        pubSubController.addXmppSubscribeEventsListener(testXmppSubscribeEventsListener);
+        XmppSubscribe xmppSubscribe = buildXmppSubscribe();
+        xmppControllerAdapter.iqListener.handleIqStanza(xmppSubscribe);
+        assertThat(testXmppSubscribeEventsListener.handledSubscribeMsgs.size(), is(1));
+        XmppUnsubscribe xmppUnsubscribe = buildXmppUnsubscribe();
+        xmppControllerAdapter.iqListener.handleIqStanza(xmppUnsubscribe);
+        assertThat(testXmppSubscribeEventsListener.handledUnsubscribeMsgs.size(), is(1));
+        XmppPublish xmppPublish = buildXmppPublish();
+        xmppControllerAdapter.iqListener.handleIqStanza(xmppPublish);
+        assertThat(testXmppPublishEventsListener.handledPublishMsgs.size(), is(1));
+        XmppRetract xmppRetract = buildXmppRetract();
+        xmppControllerAdapter.iqListener.handleIqStanza(xmppRetract);
+        assertThat(testXmppPublishEventsListener.handledRetractMsgs.size(), is(1));
+    }
+
+    private XmppSubscribe buildXmppSubscribe() {
+        IQ iq = new IQ(IQ.Type.set);
+        iq.setTo(toJid);
+        iq.setFrom(fromJid);
+        Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+        Element childElement = new DefaultElement(subscribe);
+        childElement.addAttribute(node, nodeAttribute);
+        element.add(childElement);
+        iq.setChildElement(element);
+        XmppSubscribe xmppSubscribe = new XmppSubscribe(iq);
+        return xmppSubscribe;
+    }
+
+    private XmppUnsubscribe buildXmppUnsubscribe() {
+        IQ iq = new IQ(IQ.Type.set);
+        iq.setTo(toJid);
+        iq.setFrom(fromJid);
+        Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+        Element childElement = new DefaultElement(unsubscribe);
+        childElement.addAttribute(node, nodeAttribute);
+        element.add(childElement);
+        iq.setChildElement(element);
+        XmppUnsubscribe xmppUnsubscribe = new XmppUnsubscribe(iq);
+        return xmppUnsubscribe;
+    }
+
+    private XmppPublish buildXmppPublish() {
+        IQ iq = new IQ(IQ.Type.set);
+        iq.setTo(toJid);
+        iq.setFrom(fromJid);
+        Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+        Element publishElement = new DefaultElement(publish).addAttribute(node, nodeAttribute);
+        Element itemElement = new DefaultElement(item).addAttribute(id, itemId);
+        Element entryElement = new DefaultElement(entry, Namespace.get(testNamespace));
+        itemElement.add(entryElement);
+        publishElement.add(itemElement);
+        element.add(publishElement);
+        iq.setChildElement(element);
+        XmppPublish xmppPublish = new XmppPublish(iq);
+        return xmppPublish;
+    }
+
+    private XmppRetract buildXmppRetract() {
+        IQ iq = new IQ(IQ.Type.set);
+        iq.setTo(toJid);
+        iq.setFrom(fromJid);
+        Element element = new DefaultElement(pubSub, Namespace.get(XmppPubSubConstants.PUBSUB_NAMESPACE));
+        Element retractElement = new DefaultElement(retract).addAttribute(node, nodeAttribute);
+        Element itemElement = new DefaultElement(item).addAttribute(id, itemId);
+        retractElement.add(itemElement);
+        element.add(retractElement);
+        iq.setChildElement(element);
+        XmppRetract xmppRetract = new XmppRetract(iq);
+        return xmppRetract;
+    }
+
+
+    private class XmppControllerAdapter implements XmppController {
+
+        XmppIqListener iqListener;
+
+        @Override
+        public XmppDevice getDevice(XmppDeviceId xmppDeviceId) {
+            return testDevice;
+        }
+
+        @Override
+        public void addXmppDeviceListener(XmppDeviceListener deviceListener) {
+
+        }
+
+        @Override
+        public void removeXmppDeviceListener(XmppDeviceListener deviceListener) {
+
+        }
+
+        @Override
+        public void addXmppIqListener(XmppIqListener iqListener, String namespace) {
+            this.iqListener = iqListener;
+        }
+
+        @Override
+        public void removeXmppIqListener(XmppIqListener iqListener, String namespace) {
+            this.iqListener = null;
+        }
+
+        @Override
+        public void addXmppMessageListener(XmppMessageListener messageListener) {
+
+        }
+
+        @Override
+        public void removeXmppMessageListener(XmppMessageListener messageListener) {
+
+        }
+
+        @Override
+        public void addXmppPresenceListener(XmppPresenceListener presenceListener) {
+
+        }
+
+        @Override
+        public void removeXmppPresenceListener(XmppPresenceListener presenceListener) {
+
+        }
+    }
+
+    private class XmppDeviceAdapter implements XmppDevice {
+
+        final List<Packet> sentPackets = Lists.newArrayList();
+        final List<PacketError> sentErrors = Lists.newArrayList();
+
+        @Override
+        public XmppSession getSession() {
+            return null;
+        }
+
+        @Override
+        public InetSocketAddress getIpAddress() {
+            return null;
+        }
+
+        @Override
+        public void registerConnectedDevice() {
+
+        }
+
+        @Override
+        public void disconnectDevice() {
+
+        }
+
+        @Override
+        public void sendPacket(Packet packet) {
+            sentPackets.add(packet);
+        }
+
+        @Override
+        public void writeRawXml(Document document) {
+
+        }
+
+        @Override
+        public void handlePacket(Packet packet) {
+
+        }
+
+        @Override
+        public void sendError(PacketError packetError) {
+            sentErrors.add(packetError);
+        }
+
+    }
+
+}