blob: 0b22178ab1ff0e54df92aad2046c4c38699ad5c8 [file] [log] [blame]
ADARA Networks1fb1eb12016-09-01 12:04:07 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
ADARA Networks1fb1eb12016-09-01 12:04:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.rabbitmq.impl;
17
18import java.io.IOException;
19import java.util.concurrent.BlockingQueue;
20import java.util.concurrent.ExecutorService;
21import java.util.concurrent.TimeoutException;
22
23import org.onosproject.rabbitmq.api.Manageable;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26
27import com.rabbitmq.client.AMQP;
28import com.rabbitmq.client.Channel;
29import com.rabbitmq.client.Connection;
30import com.rabbitmq.client.ConnectionFactory;
31
32import static org.onosproject.rabbitmq.api.MQConstants.*;
33
34/**
35 * Connects client with server using start API, publish the messages received
36 * from onos events and disconnect the client from server using stop API.
37 */
38public class MQSender implements Manageable {
39
40 private static final String E_CREATE_CHAN =
41 "Error creating the RabbitMQ channel";
42 private static final String E_PUBLISH_CHAN =
43 "Error in publishing to the RabbitMQ channel";
44 private static final Logger log = LoggerFactory.getLogger(MQSender.class);
45 private static final int RECOVERY_INTERVAL = 15000;
46
47 private final BlockingQueue<MessageContext> outQueue;
48 private final String exchangeName;
49 private final String routingKey;
50 private final String queueName;
51 private final String url;
52
53 private ExecutorService executorService;
54 private Connection conn;
55 private Channel channel;
56
57
58 /**
59 * Creates a MQSender initialized with the specified parameters.
60 *
61 * @param outQueue represents message context
62 * @param exchangeName represents mq exchange name
63 * @param routingKey represents bound routing key
64 * @param queueName represents mq queue name
65 * @param url represents the mq server url
66 */
67 public MQSender(BlockingQueue<MessageContext> outQueue, String exchangeName,
68 String routingKey, String queueName, String url) {
69 this.outQueue = outQueue;
70 this.exchangeName = exchangeName;
71 this.routingKey = routingKey;
72 this.queueName = queueName;
73 this.url = url;
74 }
75
76 /**
77 * Sets the executor service.
78 *
79 * @param executorService the executor service to use
80 */
81 public void setExecutorService(ExecutorService executorService) {
82 this.executorService = executorService;
83 }
84
85 @Override
86 public void start() {
87 ConnectionFactory factory = new ConnectionFactory();
88 factory.setAutomaticRecoveryEnabled(true);
89 factory.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
90 try {
91 factory.setUri(url);
92 if (executorService != null) {
93 conn = factory.newConnection(executorService);
94 } else {
95 conn = factory.newConnection();
96 }
97 channel = conn.createChannel();
98 channel.exchangeDeclare(exchangeName, TOPIC, true);
99 /*
100 * Setting the following parameters to queue
101 * durable - true
102 * exclusive - false
103 * autoDelete - false
104 * arguments - null
105 */
106 channel.queueDeclare(this.queueName, true, false, false, null);
107 channel.queueBind(queueName, exchangeName, routingKey);
108 } catch (Exception e) {
109 log.error(E_CREATE_CHAN, e);
110 }
111 }
112
113 @Override
114 public void publish() {
115 try {
116 MessageContext input = outQueue.poll();
117 channel.basicPublish(exchangeName, routingKey,
118 new AMQP.BasicProperties.Builder()
119 .correlationId(COR_ID).build(),
120 input.getBody());
121 String message1 = new String(input.getBody(), "UTF-8");
122 log.debug(" [x] Sent: '{}'", message1);
123 } catch (Exception e) {
124 log.error(E_PUBLISH_CHAN, e);
125 }
126 }
127
128 @Override
129 public void stop() {
130 try {
131 channel.close();
132 conn.close();
133 } catch (IOException e) {
134 log.error("Error closing the rabbit MQ connection", e);
135 } catch (TimeoutException e) {
136 log.error("Timeout exception in closing the rabbit MQ connection",
137 e);
138 }
139 }
140
141}