RabbitMQ Integration - Updates changeset 11110 - Review comments incorporated

Change-Id: I0bfd7838b87d55769165b21dc735e1ba4468b611
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQConstants.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQConstants.java
new file mode 100644
index 0000000..1635489
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQConstants.java
@@ -0,0 +1,325 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.api;
+
+/**
+ * Declares the constants used in this module.
+ */
+public final class MQConstants {
+    // No instantiation
+    private MQConstants() {
+    }
+
+    /**
+     * MQ correlation id.
+     */
+    public static final String CORRELATION_ID = "correlation_id";
+
+    /**
+     * MQ exchange name.
+     */
+    public static final String EXCHANGE_NAME_PROPERTY = "EXCHANGE_NAME_PROPERTY";
+
+    /**
+     * MQ routing key.
+     */
+    public static final String ROUTING_KEY_PROPERTY = "ROUTING_KEY_PROPERTY";
+
+    /**
+     * MQ queue name.
+     */
+    public static final String QUEUE_NAME_PROPERTY = "QUEUE_NAME_PROPERTY";
+
+    /**
+     * Switch id connected to onos controller published via json.
+     */
+    public static final String SWITCH_ID = "switch_id";
+
+    /**
+     * Switch's infrastructure device name published via json.
+     */
+    public static final String INFRA_DEVICE_NAME = "infra_device_name";
+
+    /**
+     * Captured event type published via json.
+     */
+    public static final String EVENT_TYPE = "event_type";
+
+    /**
+     * Signifies device event in json.
+     */
+    public static final String DEVICE_EVENT = "DEVICE_EVENT";
+
+    /**
+     * Port connect via switch.
+     */
+    public static final String PORT_NUMBER = "port_number";
+
+    /**
+     * Describes port status enabled or disabled.
+     */
+    public static final String PORT_ENABLED = "port_enabled";
+
+    /**
+     * Specifies port speed.
+     */
+    public static final String PORT_SPEED = "port_speed";
+
+    /**
+     * Specifies sub event types like device added, device updated etc.
+     */
+    public static final String SUB_EVENT_TYPE = "sub_event_type";
+
+    /**
+     * Specifies hardware version of the switch.
+     */
+    public static final String HW_VERSION = "hw_version";
+
+    /**
+     * Specifies switch's manufacturer.
+     */
+    public static final String MFR = "mfr";
+
+    /**
+     * Specifies the serial number of the connected switch.
+     */
+    public static final String SERIAL = "serial";
+
+    /**
+     * Specifies software version of the switch.
+     */
+    public static final String SW_VERSION = "sw_version";
+
+    /**
+     * Specifies chassis id of the switch.
+     */
+    public static final String CHASIS_ID = "chassis_id";
+
+    /**
+     * Specifies event occurence time.
+     */
+    public static final String OCC_TIME = "occurrence_time";
+
+    /**
+     * Specifies switch's available time.
+     */
+    public static final String AVAILABLE = "available_time";
+
+    /**
+     * Specifies packet_in port details.
+     */
+    public static final String IN_PORT = "in_port";
+
+    /**
+     * Specifies port is logical or not.
+     */
+    public static final String LOGICAL = "logical";
+
+    /**
+     * Specifies packet recieved time.
+     */
+    public static final String RECIEVED = "received";
+
+    /**
+     * Specifies message type.
+     */
+    public static final String MSG_TYPE = "msg_type";
+
+    /**
+     * Specifies packet type.
+     */
+    public static final String PKT_TYPE = "PACKET_IN";
+
+    /**
+     * Specifies sub message type under msg_type.
+     */
+    public static final String SUB_MSG_TYPE = "sub_msg_type";
+
+    /**
+     * Specifies Ethernet type of the packet.
+     */
+    public static final String ETH_TYPE = "eth_type";
+
+    /**
+     * Source MAC address of the packet.
+     */
+    public static final String SRC_MAC_ADDR = "src_mac_address";
+
+    /**
+     * Destination MAC address of the packet.
+     */
+    public static final String DEST_MAC_ADDR = "dest_mac_address";
+
+    /**
+     * Specifies VLAN ID of the packet.
+     */
+    public static final String VLAN_ID = "vlan_id";
+
+    /**
+     * Specifies if the packet is a Broadcast or not.
+     */
+    public static final String B_CAST = "is_bcast";
+
+    /**
+     * Specifies if the packet is a Multicast or not.
+     */
+    public static final String M_CAST = "is_mcast";
+
+    /**
+     * Specifies if the packet is padded or not.
+     */
+    public static final String PAD = "pad";
+
+    /**
+     * Specifies priority of the packet.
+     */
+    public static final String PRIORITY_CODE = "priority_code";
+
+    /**
+     * Specifies length of the payload.
+     */
+    public static final String DATA_LEN = "data_length";
+
+    /**
+     * Packet payload(raw) in unicode format.
+     */
+    public static final String PAYLOAD = "payload";
+
+    /**
+     * Network topology type TopologyEvent.Type.
+     */
+    public static final String TOPO_TYPE = "topology_type";
+
+    /**
+     * Represents number of strongly connected components in the topology.
+     */
+    public static final String CLUSTER_COUNT = "cluster_count";
+
+    /**
+     * Cost for doing topology computation.
+     */
+    public static final String COMPUTE_COST = "compute_cost";
+
+    /**
+     * Represents topology creation time.
+     */
+    public static final String CREATE_TIME = "creation_time";
+
+    /**
+     * Represents number of infrastructure devices in the topology.
+     */
+    public static final String DEVICE_COUNT = "device_count";
+
+    /**
+     * Represents number of links in the topology.
+     */
+    public static final String LINK_COUNT = "link_count";
+
+    /**
+     * Represents links destination DeviceId.
+     */
+    public static final String DEST = "dst";
+
+    /**
+     * Represents links source DeviceId.
+     */
+    public static final String SRC = "src";
+
+    /**
+     * True if the link is expected, false otherwise.
+     */
+    public static final String EXPECTED = "expected";
+
+    /**
+     * Represents link state ACTIVE or INACTIVE.
+     */
+    public static final String STATE = "state";
+
+    /**
+     * Represents link type like LINK_ADDED, LINK_UPDATE, LINK_REMOVED.
+     */
+    public static final String LINK_TYPE = "link_type";
+
+    /**
+     * Represents the rabbit mq server properties stored in resources directory.
+     */
+    public static final String MQ_PROP_NAME = "rabbitmq.properties";
+
+    /**
+     * Represents rabbit mq module name for app initialization.
+     */
+    public static final String ONOS_APP_NAME = "org.onosproject.rabbitmq";
+
+    /**
+     * Represents rabbit mq publisher correlation identifier.
+     */
+    public static final String SENDER_COR_ID = "rmq.sender.correlation.id";
+
+    /**
+     * Represents rabbit mq server protocol.
+     */
+    public static final String SERVER_PROTO = "rmq.server.protocol";
+
+    /**
+     * Represents rabbit mq server user name.
+     */
+    public static final String SERVER_UNAME = "rmq.server.username";
+
+    /**
+     * Represents rabbit mq server password.
+     */
+    public static final String SERVER_PWD = "rmq.server.password";
+
+    /**
+     * Represents rabbit mq server address.
+     */
+    public static final String SERVER_ADDR = "rmq.server.ip.address";
+
+    /**
+     * Represents rabbit mq server port.
+     */
+    public static final String SERVER_PORT = "rmq.server.port";
+
+    /**
+     * Represents rabbit mq server vhost.
+     */
+    public static final String SERVER_VHOST = "rmq.server.vhost";
+
+    /**
+     * Represents rabbit mq server exchange.
+     */
+    public static final String SENDER_EXCHG = "rmq.sender.exchange";
+
+    /**
+     * Represents rabbit mq server routing key binds exchange and queue.
+     */
+    public static final String ROUTE_KEY = "rmq.sender.routing.key";
+
+    /**
+     * Represents rabbit mq server queue for message delivery.
+     */
+    public static final String SENDER_QUEUE = "rmq.sender.queue";
+
+    /**
+     * Represents rabbit mq server topic.
+     */
+    public static final String TOPIC = "topic";
+
+    /**
+     * Represents correlation ID of the sender.
+     */
+    public static final String COR_ID = "onos->rmqserver";
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQService.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQService.java
new file mode 100644
index 0000000..a49d8f1
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQService.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.api;
+
+import org.onosproject.event.Event;
+import org.onosproject.net.packet.PacketContext;
+
+/**
+ * Service apis for publishing device and packet events.
+ */
+public interface MQService {
+
+    /**
+     * Publishes device/link/topology events to MQ server.
+     *
+     * @param event the event type
+     */
+    void publish(Event<? extends Enum, ?> event);
+
+    /**
+     * Publishes packet context message to MQ server.
+     *
+     * @param context for processing an inbound packet
+     */
+    void publish(PacketContext context);
+
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQTransport.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQTransport.java
new file mode 100644
index 0000000..805f5cd
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/MQTransport.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.api;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.onosproject.rabbitmq.impl.BrokerHost;
+import org.onosproject.rabbitmq.impl.MessageContext;
+
+/**
+ * API for registering producer with server.
+ */
+public interface MQTransport {
+    /**
+     * Registers MQ client with the server.
+     *
+     * @param host the broker host
+     * @param channelConf the mq channel configurations
+     * @param queue the message context
+     * @return the sender handle
+     */
+    Manageable registerProducer(BrokerHost host, Map<String, String> channelConf,
+            BlockingQueue<MessageContext> queue);
+
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/Manageable.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/Manageable.java
new file mode 100644
index 0000000..a0de39d
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/Manageable.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.rabbitmq.api;
+
+/**
+ * Interface for declaring a start, publish and stop api's for mq transactions.
+ */
+public interface Manageable {
+    /**
+     * Establishes connection with MQ server.
+     */
+    void start();
+
+    /**
+     * Publishes onos events on to MQ server.
+     */
+    void publish();
+
+    /**
+     * Releases connection and channels.
+     */
+    void stop();
+
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/package-info.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/package-info.java
new file mode 100644
index 0000000..98cb9b4
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/api/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * package for api declarations.
+ */
+package org.onosproject.rabbitmq.api;
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/BrokerHost.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/BrokerHost.java
new file mode 100644
index 0000000..186841c
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/BrokerHost.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.impl;
+
+/**
+ * Represents the URL pointing to MQ Server. Used to connect to MQ Server.
+ */
+public class BrokerHost {
+
+    private final String url;
+
+    /**
+     * Sets the MQ Server URL.
+     *
+     * @param url represents url of the MQ Server
+     */
+    public BrokerHost(String url) {
+        this.url = url;
+    }
+
+    /**
+     * Returns the MQ Server URL.
+     *
+     * @return url of the MQ Server
+     */
+    public String getUrl() {
+        return url;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        BrokerHost that = (BrokerHost) o;
+
+        return url != null ? url.equals(that.url) : that.url == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return url != null ? url.hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+        return url;
+    }
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQSender.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQSender.java
new file mode 100644
index 0000000..47a6e8d
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQSender.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.impl;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+
+import org.onosproject.rabbitmq.api.Manageable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+import static org.onosproject.rabbitmq.api.MQConstants.*;
+
+/**
+ * Connects client with server using start API, publish the messages received
+ * from onos events and disconnect the client from server using stop API.
+ */
+public class MQSender implements Manageable {
+
+    private static final String E_CREATE_CHAN =
+                                  "Error creating the RabbitMQ channel";
+    private static final String E_PUBLISH_CHAN =
+                                  "Error in publishing to the RabbitMQ channel";
+    private static final Logger log = LoggerFactory.getLogger(MQSender.class);
+    private static final int RECOVERY_INTERVAL = 15000;
+
+    private final BlockingQueue<MessageContext> outQueue;
+    private final String exchangeName;
+    private final String routingKey;
+    private final String queueName;
+    private final String url;
+
+    private ExecutorService executorService;
+    private Connection conn;
+    private Channel channel;
+
+
+    /**
+     * Creates a MQSender initialized with the specified parameters.
+     *
+     * @param outQueue     represents message context
+     * @param exchangeName represents mq exchange name
+     * @param routingKey   represents bound routing key
+     * @param queueName    represents mq queue name
+     * @param url          represents the mq server url
+     */
+    public MQSender(BlockingQueue<MessageContext> outQueue, String exchangeName,
+            String routingKey, String queueName, String url) {
+        this.outQueue = outQueue;
+        this.exchangeName = exchangeName;
+        this.routingKey = routingKey;
+        this.queueName = queueName;
+        this.url = url;
+    }
+
+    /**
+     * Sets the executor service.
+     *
+     * @param executorService the executor service to use
+     */
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    @Override
+    public void start() {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setAutomaticRecoveryEnabled(true);
+        factory.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
+        try {
+            factory.setUri(url);
+            if (executorService != null) {
+                conn = factory.newConnection(executorService);
+            } else {
+                conn = factory.newConnection();
+            }
+            channel = conn.createChannel();
+            channel.exchangeDeclare(exchangeName, TOPIC, true);
+            /*
+             * Setting the following parameters to queue
+             * durable    - true
+             * exclusive  - false
+             * autoDelete - false
+             * arguments  - null
+             */
+            channel.queueDeclare(this.queueName, true, false, false, null);
+            channel.queueBind(queueName, exchangeName, routingKey);
+        } catch (Exception e) {
+            log.error(E_CREATE_CHAN, e);
+        }
+    }
+
+    @Override
+    public void publish() {
+        try {
+                MessageContext input = outQueue.poll();
+                channel.basicPublish(exchangeName, routingKey,
+                                new AMQP.BasicProperties.Builder()
+                                .correlationId(COR_ID).build(),
+                                input.getBody());
+                String message1 = new String(input.getBody(), "UTF-8");
+                log.debug(" [x] Sent: '{}'", message1);
+        } catch (Exception e) {
+            log.error(E_PUBLISH_CHAN, e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            channel.close();
+            conn.close();
+        } catch (IOException e) {
+            log.error("Error closing the rabbit MQ connection", e);
+        } catch (TimeoutException e) {
+            log.error("Timeout exception in closing the rabbit MQ connection",
+                      e);
+        }
+    }
+
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQServiceImpl.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQServiceImpl.java
new file mode 100644
index 0000000..db02806
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQServiceImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.impl;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+import static org.onosproject.rabbitmq.api.MQConstants.*;
+
+import org.onosproject.event.Event;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.rabbitmq.api.MQService;
+import org.onosproject.rabbitmq.api.Manageable;
+import org.onosproject.rabbitmq.util.MQUtil;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.gson.JsonObject;
+
+
+import com.google.common.collect.Maps;
+
+/**
+ * Default implementation of {@link MQService}.
+ */
+public class MQServiceImpl implements MQService {
+    private static final Logger log = LoggerFactory.getLogger(
+                                                       MQServiceImpl.class);
+
+    private final BlockingQueue<MessageContext> msgOutQueue =
+                                                  new LinkedBlockingQueue<>(10);
+
+    private Manageable manageSender;
+    private String correlationId;
+
+    /**
+     * Initializes using ComponentContext.
+     *
+     * @param context ComponentContext from OSGI
+     */
+    public MQServiceImpl(ComponentContext context) {
+        initializeProducers(context);
+    }
+
+    /**
+     * Initializes MQ sender and receiver with RMQ server.
+     *
+     * @param context ComponentContext from OSGI
+     */
+    private void initializeProducers(ComponentContext context) {
+        BrokerHost rfHost;
+        Properties prop = MQUtil.getProp(context);
+        if (prop == null) {
+            log.error("RabbitMQ configuration file not found...");
+            return;
+        }
+        try {
+            correlationId = prop.getProperty(SENDER_COR_ID);
+            rfHost = new BrokerHost(MQUtil.getMqUrl(
+                                   prop.getProperty(SERVER_PROTO),
+                                   prop.getProperty(SERVER_UNAME),
+                                   prop.getProperty(SERVER_PWD),
+                                   prop.getProperty(SERVER_ADDR),
+                                   prop.getProperty(SERVER_PORT),
+                                   prop.getProperty(SERVER_VHOST)));
+
+            manageSender = registerProducer(rfHost,
+                                            MQUtil.rfProducerChannelConf(
+                                            prop.getProperty(SENDER_EXCHG),
+                                            prop.getProperty(ROUTE_KEY),
+                                            prop.getProperty(SENDER_QUEUE)),
+                    msgOutQueue);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        manageSender.start();
+    }
+
+    /**
+     * Returns the handle to call an api for publishing messages to RMQ server.
+     */
+    private Manageable registerProducer(BrokerHost host, Map<String, String> channelConf,
+            BlockingQueue<MessageContext> msgOutQueue) {
+        return new MQTransportImpl().registerProducer(host, channelConf, msgOutQueue);
+    }
+
+    private byte[] bytesOf(JsonObject jo) {
+        return jo.toString().getBytes();
+    }
+
+    /**
+     * Publishes Device, Topology &amp; Link event message to MQ server.
+     *
+     * @param event Event received from the corresponding service like topology, device etc
+     */
+    @Override
+    public void publish(Event<? extends Enum, ?> event) {
+        byte[] body = null;
+        if (null == event) {
+                log.error("Captured event is null...");
+                return;
+        }
+        if (event instanceof DeviceEvent) {
+            body = bytesOf(MQUtil.json((DeviceEvent) event));
+        } else if (event instanceof TopologyEvent) {
+            body = bytesOf(MQUtil.json((TopologyEvent) event));
+        } else if (event instanceof LinkEvent) {
+            body = bytesOf(MQUtil.json((LinkEvent) event));
+        } else {
+            log.error("Invalid event: '{}'", event);
+        }
+        processAndPublishMessage(body);
+    }
+
+    /**
+     * Publishes packet message to MQ server.
+     *
+     * @param context Context of the packet recieved including details like mac, length etc
+     */
+    @Override
+    public void publish(PacketContext context) {
+        byte[] body = bytesOf(MQUtil.json(context));
+        processAndPublishMessage(body);
+    }
+
+    /*
+     * Constructs message context and publish it to rabbit mq server.
+     *
+     * @param body Byte stream of the event's JSON data
+     */
+    private void processAndPublishMessage(byte[] body) {
+        Map<String, Object> props = Maps.newHashMap();
+        props.put(CORRELATION_ID, correlationId);
+        MessageContext mc = new MessageContext(body, props);
+        try {
+            msgOutQueue.put(mc);
+            String message = new String(body, "UTF-8");
+            log.debug(" [x] Sent '{}'", message);
+        } catch (InterruptedException | UnsupportedEncodingException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+        manageSender.publish();
+    }
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQTransportImpl.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQTransportImpl.java
new file mode 100644
index 0000000..bb71a37
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MQTransportImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.impl;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.onosproject.rabbitmq.api.MQTransport;
+import org.onosproject.rabbitmq.api.Manageable;
+
+import static org.onosproject.rabbitmq.api.MQConstants.*;
+
+/**
+ * Provides handle to call MQSender for message delivery.
+ */
+public class MQTransportImpl implements MQTransport {
+
+    @Override
+    public Manageable registerProducer(BrokerHost host,
+                                       Map<String, String> channelConf,
+                                       BlockingQueue<MessageContext> queue) {
+        String exchangeName = channelConf.get(EXCHANGE_NAME_PROPERTY);
+        String routingKey = channelConf.get(ROUTING_KEY_PROPERTY);
+        String queueName =  channelConf.get(QUEUE_NAME_PROPERTY);
+        return new MQSender(queue, exchangeName, routingKey, queueName,
+                            host.getUrl());
+    }
+
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MessageContext.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MessageContext.java
new file mode 100644
index 0000000..b91b66e
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/MessageContext.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.impl;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Represents message context like data in byte stream and mq properties for
+ * message delivery.
+ */
+public class MessageContext implements Serializable {
+    private static final long serialVersionUID = -4174900539976805047L;
+    private static final String NULL_ERR =
+                                "The body and properties should be present";
+
+    private final Map<String, Object> properties;
+    private final byte[] body;
+
+    /**
+     * Initializes MessageContext class.
+     *
+     * @param body       Byte stream of the event's JSON data
+     * @param properties Map of the Message Queue properties
+     */
+    public MessageContext(byte[] body, Map<String, Object> properties) {
+        this.body = checkNotNull(body, NULL_ERR);
+        this.properties = checkNotNull(properties, NULL_ERR);
+    }
+
+    /**
+     * Returns the Message Properties Map.
+     *
+     * @return Map of the Message Queue properties
+     */
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    /**
+     * Returns the Message Properties Map.
+     *
+     * @return Byte stream of the event's JSON data
+     */
+    public byte[] getBody() {
+        return body;
+    }
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/package-info.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/package-info.java
new file mode 100644
index 0000000..2ed2f43
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for mq implementation.
+ */
+package org.onosproject.rabbitmq.impl;
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/listener/MQEventHandler.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/listener/MQEventHandler.java
new file mode 100644
index 0000000..c34bb1a
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/listener/MQEventHandler.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.rabbitmq.listener;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.net.link.ProbedLinkProvider;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketService;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.net.topology.TopologyListener;
+import org.onosproject.net.topology.TopologyService;
+import org.onosproject.rabbitmq.api.MQConstants;
+import org.onosproject.rabbitmq.api.MQService;
+import org.onosproject.rabbitmq.impl.MQServiceImpl;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens to events generated from Device Event/PKT_IN/Topology/Link.
+ * Then publishes events to rabbitmq server via publish() api.
+ */
+
+@Component(immediate = true)
+public class MQEventHandler extends AbstractProvider
+                            implements ProbedLinkProvider {
+
+    private static final Logger log = LoggerFactory.getLogger(
+                                                    MQEventHandler.class);
+    private static final String PROVIDER_NAME = MQConstants.ONOS_APP_NAME;
+    private static final int PKT_PROC_PRIO = 1;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PacketService packetService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected TopologyService topologyService;
+
+    private MQService mqService;
+    private DeviceListener deviceListener;
+    protected ExecutorService eventExecutor;
+
+    private final InternalPacketProcessor packetProcessor =
+                                          new InternalPacketProcessor();
+    private final LinkListener linkListener = new InternalLinkListener();
+    private final TopologyListener topologyListener =
+                                           new InternalTopologyListener();
+
+    /**
+     * Initialize parent class with provider.
+     */
+    public MQEventHandler() {
+        super(new ProviderId("rabbitmq", PROVIDER_NAME));
+    }
+
+    @Activate
+    protected void activate(ComponentContext context) {
+        mqService = new MQServiceImpl(context);
+        eventExecutor = newSingleThreadScheduledExecutor(
+                groupedThreads("onos/deviceevents", "events-%d", log));
+        deviceListener = new InternalDeviceListener();
+        deviceService.addListener(deviceListener);
+        packetService.addProcessor(packetProcessor,
+                                   PacketProcessor.advisor(PKT_PROC_PRIO));
+        linkService.addListener(linkListener);
+        topologyService.addListener(topologyListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        deviceService.removeListener(deviceListener);
+        packetService.removeProcessor(packetProcessor);
+        eventExecutor.shutdownNow();
+        eventExecutor = null;
+        linkService.removeListener(linkListener);
+        topologyService.removeListener(topologyListener);
+        log.info("Stopped");
+    }
+
+    /**
+     * Captures incoming device events.
+     */
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            if (event == null) {
+                log.error("Device event is null.");
+                return;
+            }
+            mqService.publish(event);
+        }
+    }
+
+    /**
+     * Captures incoming packets from switches connected to ONOS
+     * controller..
+     */
+    private class InternalPacketProcessor implements PacketProcessor {
+        @Override
+        public void process(PacketContext context) {
+            if (context == null) {
+                log.error("Packet context is null.");
+                return;
+            }
+            mqService.publish(context);
+        }
+    }
+
+    /**
+     * Listens to link events and processes the link additions.
+     */
+    private class InternalLinkListener implements LinkListener {
+        @Override
+        public void event(LinkEvent event) {
+            if (event == null) {
+                log.error("Link event is null.");
+                return;
+            }
+            mqService.publish(event);
+        }
+    }
+
+    /**
+     * Listens to topology events and processes the topology changes.
+     */
+    private class InternalTopologyListener implements TopologyListener {
+
+        @Override
+        public void event(TopologyEvent event) {
+            if (event == null) {
+                log.error("Topology event is null.");
+                return;
+            }
+            mqService.publish(event);
+        }
+    }
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/listener/package-info.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/listener/package-info.java
new file mode 100644
index 0000000..afd3106
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/listener/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * RabbitMQ module used for publishing device and packet events to MQ server.
+ */
+package org.onosproject.rabbitmq.listener;
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/util/MQUtil.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/util/MQUtil.java
new file mode 100644
index 0000000..9b4069e
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/util/MQUtil.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.rabbitmq.util;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+import org.onlab.packet.EthType;
+import org.onosproject.net.Link;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.net.packet.InboundPacket;
+import org.osgi.service.component.ComponentContext;
+import com.google.gson.JsonObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.onosproject.rabbitmq.api.MQConstants.*;
+
+/**
+ * MQ utility class for constructing server url, packet message, device message,
+ * topology message and link message.
+ */
+public final class MQUtil {
+
+    private static final String COLON = ":";
+    private static final String AT = "@";
+    private static final String CDFS = "://";
+    private static final String FS = "/";
+    private static final String UTF8 = "UTF-8";
+    private static final Logger log = LoggerFactory.getLogger(MQUtil.class);
+
+    private MQUtil() {
+    }
+
+    /**
+     * Returns the MQ server url.
+     *
+     * @param proto    mq server protocol
+     * @param userName mq server username
+     * @param password mq server password
+     * @param ipAddr   server ip address
+     * @param port     server port
+     * @param vhost    server vhost
+     * @return         server url
+     */
+    public static String getMqUrl(String proto, String userName,
+            String password, String ipAddr, String port,
+            String vhost) {
+        StringBuilder urlBuilder = new StringBuilder();
+        try {
+            urlBuilder.append(proto).append(CDFS).append(userName).append(COLON)
+                      .append(password).append(AT)
+                      .append(ipAddr).append(COLON).append(port).append(FS)
+                      .append(URLEncoder.encode(vhost, UTF8));
+        } catch (UnsupportedEncodingException e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+        }
+        return urlBuilder.toString().replaceAll("\\s+", "");
+    }
+
+    /**
+     * Initializes and returns publisher channel configuration.
+     *
+     * @param  exchange   the configured mq exchange name
+     * @param  routingKey the configured mq routing key
+     * @param  queueName  the configured mq queue name
+     * @return            the server url
+     */
+    public static Map<String, String> rfProducerChannelConf(String exchange,
+            String routingKey, String queueName) {
+        Map<String, String> channelConf = new HashMap<>();
+        channelConf.put(EXCHANGE_NAME_PROPERTY, exchange);
+        channelConf.put(ROUTING_KEY_PROPERTY, routingKey);
+        channelConf.put(QUEUE_NAME_PROPERTY, queueName);
+        return channelConf;
+    }
+
+    /**
+     * Returns a JSON representation of the given device event.
+     *
+     * @param  event the device event
+     * @return       the device event json message
+     */
+    public static JsonObject json(DeviceEvent event) {
+        JsonObject jo = new JsonObject();
+        jo.addProperty(SWITCH_ID, event.subject().id().toString());
+        jo.addProperty(INFRA_DEVICE_NAME, event.subject().type().name());
+        jo.addProperty(EVENT_TYPE, DEVICE_EVENT);
+        if (event.port() != null) {
+            jo.addProperty(PORT_NUMBER, event.port().number().toLong());
+            jo.addProperty(PORT_ENABLED, event.port().isEnabled());
+            jo.addProperty(PORT_SPEED, event.port().portSpeed());
+            jo.addProperty(SUB_EVENT_TYPE,
+                    event.type().name() != null ? event.type().name() : null);
+        } else {
+            jo.addProperty(SUB_EVENT_TYPE,
+                    event.type().name() != null ? event.type().name() : null);
+        }
+        jo.addProperty(HW_VERSION, event.subject().hwVersion());
+        jo.addProperty(MFR, event.subject().manufacturer());
+        jo.addProperty(SERIAL, event.subject().serialNumber());
+        jo.addProperty(SW_VERSION, event.subject().swVersion());
+        jo.addProperty(CHASIS_ID, event.subject().chassisId().id());
+        jo.addProperty(OCC_TIME, new Date(event.time()).toString());
+        return jo;
+    }
+
+    /**
+     * Returns a JSON representation of the given packet context.
+     *
+     * @param  context the packet context
+     * @return         the inbound packetjson message
+     */
+    public static JsonObject json(PacketContext context) {
+        JsonObject jo = new JsonObject();
+        InboundPacket pkt = context.inPacket();
+        // parse connection host
+        jo.addProperty(SWITCH_ID, pkt.receivedFrom().deviceId().toString());
+        jo.addProperty(IN_PORT, pkt.receivedFrom().port().name());
+        jo.addProperty(LOGICAL, pkt.receivedFrom().port().isLogical());
+        jo.addProperty(RECIEVED, new Date(context.time()).toString());
+        jo.addProperty(MSG_TYPE, PKT_TYPE);
+        // parse ethernet
+        jo.addProperty(SUB_MSG_TYPE,
+                EthType.EtherType.lookup(pkt.parsed().getEtherType()).name());
+        jo.addProperty(ETH_TYPE, pkt.parsed().getEtherType());
+        jo.addProperty(SRC_MAC_ADDR, pkt.parsed().getSourceMAC().toString());
+        jo.addProperty(DEST_MAC_ADDR, pkt.parsed().getDestinationMAC().toString());
+        jo.addProperty(VLAN_ID, pkt.parsed().getVlanID());
+        jo.addProperty(B_CAST, pkt.parsed().isBroadcast());
+        jo.addProperty(M_CAST, pkt.parsed().isMulticast());
+        jo.addProperty(PAD, pkt.parsed().isPad());
+        jo.addProperty(PRIORITY_CODE, pkt.parsed().getPriorityCode());
+        // parse bytebuffer
+        jo.addProperty(DATA_LEN, pkt.unparsed().array().length);
+        jo.addProperty(PAYLOAD, pkt.unparsed().asCharBuffer().toString());
+        return jo;
+    }
+
+    /**
+     * Returns a JSON representation of the given topology event.
+     *
+     * @param  event the topology event
+     * @return       the topology event json message
+     */
+    public static JsonObject json(TopologyEvent event) {
+        Topology topology = event.subject();
+        JsonObject jo = new JsonObject();
+        jo.addProperty(TOPO_TYPE, TopologyEvent.Type.TOPOLOGY_CHANGED.name());
+        jo.addProperty(CLUSTER_COUNT, topology.clusterCount());
+        jo.addProperty(COMPUTE_COST, topology.computeCost());
+        jo.addProperty(CREATE_TIME, new Date(topology.creationTime()).toString());
+        jo.addProperty(DEVICE_COUNT, topology.deviceCount());
+        jo.addProperty(LINK_COUNT, topology.linkCount());
+        jo.addProperty(AVAILABLE, new Date(topology.time()).toString());
+        return jo;
+    }
+
+    /**
+     * Returns a JSON representation of the given link event.
+     *
+     * @param  event the link event
+     * @return       the link event json message
+     */
+    public static JsonObject json(LinkEvent event) {
+        Link link = event.subject();
+        JsonObject jo = new JsonObject();
+        jo.addProperty(EVENT_TYPE, event.type().name());
+        jo.addProperty(DEST, link.dst().deviceId().toString());
+        jo.addProperty(SRC, link.src().deviceId().toString());
+        jo.addProperty(EXPECTED, link.isExpected());
+        jo.addProperty(STATE, link.state().name());
+        jo.addProperty(LINK_TYPE, link.type().name());
+        return jo;
+    }
+
+    /**
+     * Handles load mq property file from resources and returns Properties.
+     *
+     * @param  context          the component context
+     * @return                  the mq server properties
+     * @throws RuntimeException if property file not found.
+     */
+    public static Properties getProp(ComponentContext context) {
+        URL configUrl;
+        try {
+            configUrl = context.getBundleContext().getBundle()
+                                                  .getResource(MQ_PROP_NAME);
+        } catch (Exception ex) {
+            // This will be used only during junit test case since bundle
+            // context will be available during runtime only.
+            File file = new File(
+                    MQUtil.class.getClassLoader().getResource(MQ_PROP_NAME)
+                                                 .getFile());
+            try {
+                configUrl = file.toURL();
+            } catch (MalformedURLException e) {
+                log.error(ExceptionUtils.getFullStackTrace(e));
+                throw new RuntimeException(e);
+            }
+        }
+
+        Properties properties;
+        try {
+            InputStream is = configUrl.openStream();
+            properties = new Properties();
+            properties.load(is);
+        } catch (Exception e) {
+            log.error(ExceptionUtils.getFullStackTrace(e));
+            throw new RuntimeException(e);
+        }
+        return properties;
+    }
+}
diff --git a/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/util/package-info.java b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/util/package-info.java
new file mode 100644
index 0000000..e3e92cc
--- /dev/null
+++ b/apps/rabbitmq/src/main/java/org/onosproject/rabbitmq/util/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Packet for mq utility.
+ */
+package org.onosproject.rabbitmq.util;
diff --git a/apps/rabbitmq/src/main/resources/rabbitmq.properties b/apps/rabbitmq/src/main/resources/rabbitmq.properties
new file mode 100644
index 0000000..82c878d
--- /dev/null
+++ b/apps/rabbitmq/src/main/resources/rabbitmq.properties
@@ -0,0 +1,12 @@
+#Modified the below properties as per your requirements.
+rmq.server.protocol = amqp
+rmq.server.username = onosrmq
+rmq.server.password = onosrocks
+rmq.server.port = 5672
+rmq.server.ip.address = 127.0.0.1
+rmq.server.vhost = /
+rmq.sender.type = topic
+rmq.sender.correlation.id = onos->rmqserver
+rmq.sender.exchange = onos_exchg_wr_to_rmqs
+rmq.sender.routing.key = onos.rkey.rmqs
+rmq.sender.queue = onos_send_queue
diff --git a/apps/rabbitmq/src/test/java/org/onosproject/rabbitmq/listener/MQEventHandlerTest.java b/apps/rabbitmq/src/test/java/org/onosproject/rabbitmq/listener/MQEventHandlerTest.java
new file mode 100644
index 0000000..e50f825
--- /dev/null
+++ b/apps/rabbitmq/src/test/java/org/onosproject/rabbitmq/listener/MQEventHandlerTest.java
@@ -0,0 +1,450 @@
+package org.onosproject.rabbitmq.listener;
+
+import static com.google.common.collect.ImmutableSet.of;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.onosproject.net.DeviceId.deviceId;
+import static org.onosproject.net.NetTestTools.device;
+import static org.onosproject.net.NetTestTools.link;
+import static org.onosproject.net.PortNumber.portNumber;
+import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
+import static org.onosproject.net.device.DeviceEvent.Type.*;
+import static org.onosproject.net.link.LinkEvent.Type.*;
+import static org.onosproject.net.Device.Type.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onlab.packet.ChassisId;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.ONOSLLDP;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.event.AbstractEventTest;
+import org.onosproject.event.Event;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultLink;
+import org.onosproject.net.DefaultPort;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Link;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.net.Port;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.link.LinkListener;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.net.packet.DefaultInboundPacket;
+import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketContext;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketServiceAdapter;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.topology.DefaultGraphDescription;
+import org.onosproject.net.topology.GraphDescription;
+import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyEvent;
+import org.onosproject.net.topology.TopologyListener;
+import org.onosproject.net.topology.TopologyProvider;
+import org.onosproject.net.topology.TopologyProviderRegistry;
+import org.onosproject.net.topology.TopologyProviderService;
+import org.onosproject.net.topology.TopologyService;
+import org.onosproject.rabbitmq.api.MQService;
+import org.onosproject.rabbitmq.api.Manageable;
+import org.osgi.service.component.ComponentContext;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+/**
+ * Junit tests for packet in, device, topology and link events.
+ */
+@RunWith(EasyMockRunner.class)
+public class MQEventHandlerTest extends AbstractEventTest {
+
+    private final MQEventHandler mqEventHandler = new MQEventHandler();
+    private static final DeviceId DID1 = deviceId("of:0000000000000001");
+    private static final DeviceId DID2 = deviceId("of:0000000000000002");
+    private static final DeviceId DID3 = deviceId("of:0000000000000003");
+
+    private ApplicationId appId = new DefaultApplicationId(100,
+                                      "org.onosproject.rabbitmq");
+    private final TestPacketService packetService = new TestPacketService();
+    private final TestDeviceService deviceService = new TestDeviceService();
+    private PacketProcessor testProcessor;
+    private DeviceListener deviceListener;
+    private static Port pd1;
+    private static Port pd2;
+    private static Port pd3;
+    private static Port pd4;
+    private CoreService coreService;
+    @Mock
+    ComponentContext context;
+    @Mock
+    private Manageable manageSender;
+
+    private static final ProviderId PID = new ProviderId("of", "foo");
+    private TestLinkListener testLinkListener = new TestLinkListener();
+    @Mock
+    private LinkService linkService;
+    @Mock
+    Topology topology;
+    @Mock
+    protected TopologyService service;
+    protected TopologyProviderRegistry registry;
+    @Mock
+    protected TopologyProviderService providerService;
+    protected TestProvider provider;
+    protected TestTopologyListener listener = new TestTopologyListener();
+    @Mock
+    MQService mqService;
+
+    @Before
+    public void setUp() {
+        coreService = createMock(CoreService.class);
+        expect(coreService.registerApplication(appId.name()))
+                          .andReturn(appId).anyTimes();
+        replay(coreService);
+        mqEventHandler.deviceService = deviceService;
+        mqEventHandler.packetService = packetService;
+        mqEventHandler.eventExecutor = MoreExecutors.newDirectExecutorService();
+        linkService.addListener(testLinkListener);
+        mqEventHandler.linkService = linkService;
+        mqEventHandler.topologyService = service;
+        mqEventHandler.activate(context);
+    }
+
+    @After
+    public void tearDown() {
+        mqEventHandler.deactivate();
+        mqEventHandler.deviceService = null;
+        mqEventHandler.packetService = null;
+    }
+
+    private DeviceEvent deviceEvent(DeviceEvent.Type type, DeviceId did) {
+        return new DeviceEvent(type, deviceService.getDevice(did));
+
+    }
+
+    private Port port(DeviceId did, long port, boolean enabled) {
+        return new DefaultPort(deviceService.getDevice(did),
+                               portNumber(port), enabled);
+    }
+
+    private DeviceEvent portEvent(DeviceEvent.Type type, DeviceId did,
+                                  Port port) {
+        return new DeviceEvent(type, deviceService.getDevice(did), port);
+    }
+
+    @Test
+    public void switchAdd() {
+        DeviceEvent de = deviceEvent(DEVICE_ADDED, DID1);
+        deviceListener.event(de);
+
+    }
+
+    @Test
+    public void switchRemove() {
+        deviceListener.event(deviceEvent(DEVICE_ADDED, DID1));
+        deviceListener.event(deviceEvent(DEVICE_REMOVED, DID1));
+    }
+
+    @Test
+    public void switchUpdate() {
+        deviceListener.event(deviceEvent(DEVICE_UPDATED, DID1));
+        deviceListener.event(deviceEvent(DEVICE_REMOVED, DID1));
+    }
+
+    @Test
+    public void switchSuspend() {
+        deviceListener.event(deviceEvent(DEVICE_SUSPENDED, DID1));
+        deviceListener.event(deviceEvent(DEVICE_REMOVED, DID1));
+    }
+
+    @Test
+    public void portUp() {
+        deviceListener.event(deviceEvent(DEVICE_ADDED, DID1));
+        deviceListener.event(portEvent(PORT_ADDED, DID1, port(DID1, 3, true)));
+    }
+
+    @Test
+    public void portDown() {
+        deviceListener.event(deviceEvent(DEVICE_ADDED, DID1));
+        deviceListener.event(portEvent(PORT_ADDED, DID1, port(DID1, 1, false)));
+    }
+
+    @Test
+    public void portRemoved() {
+        deviceListener.event(deviceEvent(DEVICE_ADDED, DID1));
+        deviceListener.event(portEvent(PORT_ADDED, DID1, port(DID1, 3, true)));
+        deviceListener.event(portEvent(PORT_REMOVED, DID1,
+                                       port(DID1, 3, true)));
+    }
+
+    @Test
+    public void unknownPktCtx() {
+        // Note: DID3 hasn't been added to TestDeviceService
+        PacketContext pktCtx = new TestPacketContext(device1(DID3));
+        testProcessor.process(pktCtx);
+        assertFalse("Context should still be free", pktCtx.isHandled());
+    }
+
+    private DefaultDevice device1(DeviceId did) {
+        return new DefaultDevice(ProviderId.NONE, did, SWITCH,
+                                 "TESTMF", "TESTHW", "TESTSW", "TESTSN",
+                                 new ChassisId());
+    }
+
+    @Test
+    public void knownPktCtx() {
+        deviceListener.event(deviceEvent(DEVICE_ADDED, DID1));
+        deviceListener.event(deviceEvent(DEVICE_ADDED, DID2));
+        PacketContext pktCtx = new TestPacketContext(
+                                       deviceService.getDevice(DID2));
+        /*
+         * EasyMock.expectLastCall(); EasyMock.replay(manageSender);
+         */
+        testProcessor.process(pktCtx);
+    }
+
+    private class TestDeviceService extends DeviceServiceAdapter {
+
+        private final Map<DeviceId, Device> devices = new HashMap<>();
+        private final ArrayListMultimap<DeviceId, Port> ports =
+                                                  ArrayListMultimap.create();
+
+        public TestDeviceService() {
+            Device d1 = new DefaultDevice(ProviderId.NONE, DID1,
+                                          SWITCH, "TESTMF", "TESTHW",
+                                          "TESTSW", "TESTSN", new ChassisId());
+            Device d2 = new DefaultDevice(ProviderId.NONE, DID2, SWITCH,
+                                          "TESTMF", "TESTHW", "TESTSW",
+                                          "TESTSN", new ChassisId());
+            devices.put(DID1, d1);
+            devices.put(DID2, d2);
+            pd1 = new DefaultPort(d1, portNumber(1), true);
+            pd2 = new DefaultPort(d1, portNumber(2), true);
+            pd3 = new DefaultPort(d2, portNumber(1), true);
+            pd4 = new DefaultPort(d2, portNumber(2), true);
+            ports.putAll(DID1, Lists.newArrayList(pd1, pd2));
+            ports.putAll(DID2, Lists.newArrayList(pd3, pd4));
+        }
+
+        @Override
+        public int getDeviceCount() {
+            return devices.values().size();
+        }
+
+        @Override
+        public Iterable<Device> getDevices() {
+            return ImmutableList.copyOf(devices.values());
+        }
+
+        @Override
+        public Device getDevice(DeviceId deviceId) {
+            return devices.get(deviceId);
+        }
+
+        @Override
+        public MastershipRole getRole(DeviceId deviceId) {
+            return MastershipRole.MASTER;
+        }
+
+        @Override
+        public boolean isAvailable(DeviceId deviceId) {
+            return true;
+        }
+
+        @Override
+        public void addListener(DeviceListener listener) {
+            deviceListener = listener;
+
+        }
+
+        @Override
+        public void removeListener(DeviceListener listener) {
+
+        }
+    }
+
+    private class TestPacketService extends PacketServiceAdapter {
+        @Override
+        public void addProcessor(PacketProcessor processor, int priority) {
+            testProcessor = processor;
+        }
+    }
+
+    private class TestPacketContext implements PacketContext {
+
+        protected Device device;
+        protected boolean blocked = false;
+
+        public TestPacketContext(Device dev) {
+            device = dev;
+        }
+
+        @Override
+        public long time() {
+            return 0;
+        }
+
+        @Override
+        public InboundPacket inPacket() {
+            ONOSLLDP lldp = ONOSLLDP.onosLLDP(deviceService.getDevice(DID1)
+                                              .id().toString(),
+                                              device.chassisId(),
+                                              (int) pd1.number().toLong());
+
+            Ethernet ethPacket = new Ethernet();
+            ethPacket.setEtherType(Ethernet.TYPE_LLDP);
+            ethPacket.setDestinationMACAddress(ONOSLLDP.LLDP_ONLAB);
+            ethPacket.setPayload(lldp);
+            ethPacket.setPad(true);
+
+            ethPacket.setSourceMACAddress("DE:AD:BE:EF:BA:11");
+
+            ConnectPoint cp = new ConnectPoint(device.id(), pd3.number());
+
+            return new DefaultInboundPacket(cp, ethPacket,
+                                            ByteBuffer.wrap(ethPacket
+                                            .serialize()));
+
+        }
+
+        @Override
+        public OutboundPacket outPacket() {
+            return null;
+        }
+
+        @Override
+        public TrafficTreatment.Builder treatmentBuilder() {
+            return null;
+        }
+
+        @Override
+        public void send() {
+
+        }
+
+        @Override
+        public boolean block() {
+            blocked = true;
+            return blocked;
+        }
+
+        @Override
+        public boolean isHandled() {
+            return blocked;
+        }
+    }
+
+    private void submitTopologyGraph() {
+        Set<Device> devices = of(device("a"), device("b"), device("c"),
+                                 device("d"), device("e"), device("f"));
+        Set<Link> links = of(link("a", 1, "b", 1), link("b", 1, "a", 1),
+                             link("b", 2, "c", 1), link("c", 1, "b", 2),
+                             link("c", 2, "d", 1), link("d", 1, "c", 2),
+                             link("d", 2, "a", 2), link("a", 2, "d", 2),
+                             link("e", 1, "f", 1), link("f", 1, "e", 1));
+        GraphDescription data = new DefaultGraphDescription(4321L,
+                                     System.currentTimeMillis(),
+                                     devices, links);
+        providerService.topologyChanged(data, null);
+    }
+
+    protected void validateEvents(Enum... types) {
+        int i = 0;
+        for (Event event : listener.events) {
+            assertEquals("incorrect event type", types[i], event.type());
+            i++;
+        }
+        listener.events.clear();
+    }
+
+    @Test
+    public void testCreateTopology() {
+        submitTopologyGraph();
+        validateEvents(TOPOLOGY_CHANGED);
+    }
+
+    private class TestProvider extends AbstractProvider
+                               implements TopologyProvider {
+        public TestProvider() {
+            super(PID);
+        }
+
+        @Override
+        public void triggerRecompute() {
+        }
+    }
+
+    private class TestTopologyListener implements TopologyListener {
+        final List<TopologyEvent> events = new ArrayList<>();
+
+        @Override
+        public void event(TopologyEvent event) {
+            mqService.publish(event);
+        }
+    }
+
+    private Link createLink() {
+        return DefaultLink.builder().providerId(new ProviderId("of", "foo"))
+                .src(new ConnectPoint(deviceId("of:foo"), portNumber(1)))
+                .dst(new ConnectPoint(deviceId("of:bar"), portNumber(2)))
+                .type(Link.Type.INDIRECT).build();
+    }
+
+    @Test
+    public void testAddLink() throws Exception {
+        Link link = createLink();
+        LinkEvent event = new LinkEvent(LINK_ADDED, link, 123L);
+        validateEvent(event, LINK_ADDED, link, 123L);
+    }
+
+    @Test
+    public void testUpdateLink() throws Exception {
+        Link link = createLink();
+        LinkEvent event = new LinkEvent(LINK_UPDATED, link, 123L);
+        validateEvent(event, LINK_UPDATED, link, 123L);
+    }
+
+    @Test
+    public void testRemoveLink() throws Exception {
+        Link link = createLink();
+        LinkEvent event = new LinkEvent(LINK_ADDED, link, 123L);
+        validateEvent(event, LINK_ADDED, link, 123L);
+        LinkEvent event1 = new LinkEvent(LINK_REMOVED, link, 123L);
+        validateEvent(event1, LINK_REMOVED, link, 123L);
+    }
+
+    private class TestLinkListener implements LinkListener {
+
+        @Override
+        public void event(LinkEvent event) {
+            mqService.publish(event);
+        }
+
+    }
+
+}