blob: 04fd01b4d917c88f551c7525336d90132313cbd4 [file] [log] [blame]
Tomek Osiński7a1db182018-02-03 17:15:06 +01001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.xmpp.pubsub.ctl;
18
Tomek Osiński7a1db182018-02-03 17:15:06 +010019import org.dom4j.Element;
20import org.onosproject.net.DeviceId;
21import org.onosproject.xmpp.core.XmppController;
22import org.onosproject.xmpp.core.XmppDeviceId;
23import org.onosproject.xmpp.core.XmppIqListener;
24import org.onosproject.xmpp.pubsub.XmppPubSubConstants;
25import org.onosproject.xmpp.pubsub.XmppPubSubController;
26import org.onosproject.xmpp.pubsub.XmppPublishEventsListener;
27import org.onosproject.xmpp.pubsub.XmppSubscribeEventsListener;
28import org.onosproject.xmpp.pubsub.model.XmppEventNotification;
29import org.onosproject.xmpp.pubsub.model.XmppPubSubError;
30import org.onosproject.xmpp.pubsub.model.XmppPublish;
31import org.onosproject.xmpp.pubsub.model.XmppRetract;
32import org.onosproject.xmpp.pubsub.model.XmppSubscribe;
33import org.onosproject.xmpp.pubsub.model.XmppUnsubscribe;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070034import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
Tomek Osiński7a1db182018-02-03 17:15:06 +010039import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41import org.xmpp.packet.IQ;
42import org.xmpp.packet.JID;
43
44import java.util.Set;
45import java.util.concurrent.CopyOnWriteArraySet;
46
47import static com.google.common.base.Preconditions.checkNotNull;
48import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_ELEMENT;
49import static org.onosproject.xmpp.pubsub.XmppPubSubConstants.PUBSUB_NAMESPACE;
50
51/**
52 * The main class implementing XMPP Publish/Subscribe extension.
53 * It listens to IQ stanzas and generates PubSub events based on the payload.
54 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070055@Component(immediate = true, service = XmppPubSubController.class)
Tomek Osiński7a1db182018-02-03 17:15:06 +010056public class XmppPubSubControllerImpl implements XmppPubSubController {
57
58 private static final Logger log =
59 LoggerFactory.getLogger(XmppPubSubControllerImpl.class);
60
Ray Milkeyd84f89b2018-08-17 14:54:17 -070061 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Tomek Osiński7a1db182018-02-03 17:15:06 +010062 protected XmppController xmppController;
63
64 protected Set<XmppPublishEventsListener> xmppPublishEventsListeners =
65 new CopyOnWriteArraySet<XmppPublishEventsListener>();
66 protected Set<XmppSubscribeEventsListener> xmppSubscribeEventsListeners =
67 new CopyOnWriteArraySet<XmppSubscribeEventsListener>();
68
69 protected XmppIqListener iqListener = new InternalXmppIqListener();
70
71 @Activate
72 public void activate() {
73 xmppController.addXmppIqListener(iqListener, PUBSUB_NAMESPACE);
74 log.info("Started.");
75 }
76
77 @Deactivate
78 public void deactivate() {
79 xmppController.removeXmppIqListener(iqListener, PUBSUB_NAMESPACE);
80 log.info("Stopped");
81 }
82
83 @Override
84 public void notify(DeviceId deviceId, XmppEventNotification eventNotification) {
85 XmppDeviceId xmppDeviceId = asXmppDeviceId(deviceId);
86 xmppController.getDevice(xmppDeviceId).sendPacket(eventNotification);
87 }
88
89 @Override
90 public void notifyError(DeviceId deviceId, XmppPubSubError error) {
91 XmppDeviceId xmppDeviceId = asXmppDeviceId(deviceId);
92 xmppController.getDevice(xmppDeviceId).sendError(error.asPacketError());
93 }
94
95 private XmppDeviceId asXmppDeviceId(DeviceId deviceId) {
96 String[] parts = deviceId.toString().split(":");
97 JID jid = new JID(parts[1]);
98 return new XmppDeviceId(jid);
99 }
100
101 @Override
102 public void addXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener) {
103 xmppPublishEventsListeners.add(xmppPublishEventsListener);
104 }
105
106 @Override
107 public void removeXmppPublishEventsListener(XmppPublishEventsListener xmppPublishEventsListener) {
108 xmppPublishEventsListeners.remove(xmppPublishEventsListener);
109 }
110
111 @Override
112 public void addXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener) {
113 xmppSubscribeEventsListeners.add(xmppSubscribeEventsListener);
114 }
115
116 @Override
117 public void removeXmppSubscribeEventsListener(XmppSubscribeEventsListener xmppSubscribeEventsListener) {
118 xmppSubscribeEventsListeners.remove(xmppSubscribeEventsListener);
119 }
120
121 private class InternalXmppIqListener implements XmppIqListener {
122 @Override
123 public void handleIqStanza(IQ iq) {
124 if (isPubSub(iq)) {
125 notifyListeners(iq);
126 }
127 }
128 }
129
130 private void notifyListeners(IQ iq) {
131 XmppPubSubConstants.Method method = getMethod(iq);
132 checkNotNull(method);
133 switch (method) {
134 case SUBSCRIBE:
135 XmppSubscribe subscribe = new XmppSubscribe(iq);
136 notifyXmppSubscribe(subscribe);
137 break;
138 case UNSUBSCRIBE:
139 XmppUnsubscribe unsubscribe = new XmppUnsubscribe(iq);
140 notifyXmppUnsubscribe(unsubscribe);
141 break;
142 case PUBLISH:
143 XmppPublish publish = new XmppPublish(iq);
144 notifyXmppPublish(publish);
145 break;
146 case RETRACT:
147 XmppRetract retract = new XmppRetract(iq);
148 notifyXmppRetract(retract);
149 break;
150 default:
151 break;
152 }
153 }
154
155 private void notifyXmppRetract(XmppRetract retractEvent) {
156 for (XmppPublishEventsListener listener : xmppPublishEventsListeners) {
157 listener.handleRetract(retractEvent);
158 }
159 }
160
161 private void notifyXmppPublish(XmppPublish publishEvent) {
162 for (XmppPublishEventsListener listener : xmppPublishEventsListeners) {
163 listener.handlePublish(publishEvent);
164 }
165 }
166
167 private void notifyXmppUnsubscribe(XmppUnsubscribe unsubscribeEvent) {
168 for (XmppSubscribeEventsListener listener : xmppSubscribeEventsListeners) {
169 listener.handleUnsubscribe(unsubscribeEvent);
170 }
171 }
172
173 private void notifyXmppSubscribe(XmppSubscribe subscribeEvent) {
174 for (XmppSubscribeEventsListener listener : xmppSubscribeEventsListeners) {
175 listener.handleSubscribe(subscribeEvent);
176 }
177 }
178
179 private boolean isPubSub(IQ iq) {
180 Element pubsub = iq.getElement().element(PUBSUB_ELEMENT);
181 if (pubsub != null && pubsub.getNamespaceURI().equals(PUBSUB_NAMESPACE)) {
182 return true;
183 }
184 return false;
185 }
186
187 public static XmppPubSubConstants.Method getMethod(IQ iq) {
188 Element pubsubElement = iq.getChildElement();
189 Element methodElement = getChildElement(pubsubElement);
190 String name = methodElement.getName();
191 switch (name) {
192 case "subscribe":
193 return XmppPubSubConstants.Method.SUBSCRIBE;
194 case "unsubscribe":
195 return XmppPubSubConstants.Method.UNSUBSCRIBE;
196 case "publish":
197 return XmppPubSubConstants.Method.PUBLISH;
198 case "retract":
199 return XmppPubSubConstants.Method.RETRACT;
200 default:
201 break;
202 }
203 return null;
204 }
205
206 public static Element getChildElement(Element element) {
207 Element child = (Element) element.elements().get(0); // the first element is related to pubsub operation
208 return child;
209 }
210
211}