RabbitMQ Integration - Updates changeset 11110 - Review comments incorporated
Change-Id: I0bfd7838b87d55769165b21dc735e1ba4468b611
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQServiceImpl.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQServiceImpl.java
new file mode 100644
index 0000000..db02806
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQServiceImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.impl;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+import static org.onosproject.rabbitmq.api.MQConstants.*;
+
+import org.onosproject.event.Event;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.rabbitmq.api.MQService;
+import org.onosproject.rabbitmq.api.Manageable;
+import org.onosproject.rabbitmq.util.MQUtil;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.gson.JsonObject;
+
+
+import com.google.common.collect.Maps;
+
+/**
+ * Default implementation of {@link MQService}.
+ */
+public class MQServiceImpl implements MQService {
+ private static final Logger log = LoggerFactory.getLogger(
+ MQServiceImpl.class);
+
+ private final BlockingQueue<MessageContext> msgOutQueue =
+ new LinkedBlockingQueue<>(10);
+
+ private Manageable manageSender;
+ private String correlationId;
+
+ /**
+ * Initializes using ComponentContext.
+ *
+ * @param context ComponentContext from OSGI
+ */
+ public MQServiceImpl(ComponentContext context) {
+ initializeProducers(context);
+ }
+
+ /**
+ * Initializes MQ sender and receiver with RMQ server.
+ *
+ * @param context ComponentContext from OSGI
+ */
+ private void initializeProducers(ComponentContext context) {
+ BrokerHost rfHost;
+ Properties prop = MQUtil.getProp(context);
+ if (prop == null) {
+ log.error("RabbitMQ configuration file not found...");
+ return;
+ }
+ try {
+ correlationId = prop.getProperty(SENDER_COR_ID);
+ rfHost = new BrokerHost(MQUtil.getMqUrl(
+ prop.getProperty(SERVER_PROTO),
+ prop.getProperty(SERVER_UNAME),
+ prop.getProperty(SERVER_PWD),
+ prop.getProperty(SERVER_ADDR),
+ prop.getProperty(SERVER_PORT),
+ prop.getProperty(SERVER_VHOST)));
+
+ manageSender = registerProducer(rfHost,
+ MQUtil.rfProducerChannelConf(
+ prop.getProperty(SENDER_EXCHG),
+ prop.getProperty(ROUTE_KEY),
+ prop.getProperty(SENDER_QUEUE)),
+ msgOutQueue);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ manageSender.start();
+ }
+
+ /**
+ * Returns the handle to call an api for publishing messages to RMQ server.
+ */
+ private Manageable registerProducer(BrokerHost host, Map<String, String> channelConf,
+ BlockingQueue<MessageContext> msgOutQueue) {
+ return new MQTransportImpl().registerProducer(host, channelConf, msgOutQueue);
+ }
+
+ private byte[] bytesOf(JsonObject jo) {
+ return jo.toString().getBytes();
+ }
+
+ /**
+ * Publishes Device, Topology & Link event message to MQ server.
+ *
+ * @param event Event received from the corresponding service like topology, device etc
+ */
+ @Override
+ public void publish(Event<? extends Enum, ?> event) {
+ byte[] body = null;
+ if (null == event) {
+ log.error("Captured event is null...");
+ return;
+ }
+ if (event instanceof DeviceEvent) {
+ body = bytesOf(MQUtil.json((DeviceEvent) event));
+ } else if (event instanceof TopologyEvent) {
+ body = bytesOf(MQUtil.json((TopologyEvent) event));
+ } else if (event instanceof LinkEvent) {
+ body = bytesOf(MQUtil.json((LinkEvent) event));
+ } else {
+ log.error("Invalid event: '{}'", event);
+ }
+ processAndPublishMessage(body);
+ }
+
+ /**
+ * Publishes packet message to MQ server.
+ *
+ * @param context Context of the packet recieved including details like mac, length etc
+ */
+ @Override
+ public void publish(PacketContext context) {
+ byte[] body = bytesOf(MQUtil.json(context));
+ processAndPublishMessage(body);
+ }
+
+ /*
+ * Constructs message context and publish it to rabbit mq server.
+ *
+ * @param body Byte stream of the event's JSON data
+ */
+ private void processAndPublishMessage(byte[] body) {
+ Map<String, Object> props = Maps.newHashMap();
+ props.put(CORRELATION_ID, correlationId);
+ MessageContext mc = new MessageContext(body, props);
+ try {
+ msgOutQueue.put(mc);
+ String message = new String(body, "UTF-8");
+ log.debug(" [x] Sent '{}'", message);
+ } catch (InterruptedException | UnsupportedEncodingException e) {
+ log.error(ExceptionUtils.getFullStackTrace(e));
+ }
+ manageSender.publish();
+ }
+}