blob: db0280618a01d714ac9abac19076bb47b92ccfc3 [file] [log] [blame]
ADARA Networks1fb1eb12016-09-01 12:04:07 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.rabbitmq.impl;
17
18import java.io.UnsupportedEncodingException;
19import java.util.Map;
20import java.util.Properties;
21import java.util.concurrent.BlockingQueue;
22import java.util.concurrent.LinkedBlockingQueue;
23import org.apache.commons.lang.exception.ExceptionUtils;
24
25import static org.onosproject.rabbitmq.api.MQConstants.*;
26
27import org.onosproject.event.Event;
28import org.onosproject.net.device.DeviceEvent;
29import org.onosproject.net.link.LinkEvent;
30import org.onosproject.net.packet.PacketContext;
31import org.onosproject.net.topology.TopologyEvent;
32import org.onosproject.rabbitmq.api.MQService;
33import org.onosproject.rabbitmq.api.Manageable;
34import org.onosproject.rabbitmq.util.MQUtil;
35import org.osgi.service.component.ComponentContext;
36import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38import com.google.gson.JsonObject;
39
40
41import com.google.common.collect.Maps;
42
43/**
44 * Default implementation of {@link MQService}.
45 */
46public class MQServiceImpl implements MQService {
47 private static final Logger log = LoggerFactory.getLogger(
48 MQServiceImpl.class);
49
50 private final BlockingQueue<MessageContext> msgOutQueue =
51 new LinkedBlockingQueue<>(10);
52
53 private Manageable manageSender;
54 private String correlationId;
55
56 /**
57 * Initializes using ComponentContext.
58 *
59 * @param context ComponentContext from OSGI
60 */
61 public MQServiceImpl(ComponentContext context) {
62 initializeProducers(context);
63 }
64
65 /**
66 * Initializes MQ sender and receiver with RMQ server.
67 *
68 * @param context ComponentContext from OSGI
69 */
70 private void initializeProducers(ComponentContext context) {
71 BrokerHost rfHost;
72 Properties prop = MQUtil.getProp(context);
73 if (prop == null) {
74 log.error("RabbitMQ configuration file not found...");
75 return;
76 }
77 try {
78 correlationId = prop.getProperty(SENDER_COR_ID);
79 rfHost = new BrokerHost(MQUtil.getMqUrl(
80 prop.getProperty(SERVER_PROTO),
81 prop.getProperty(SERVER_UNAME),
82 prop.getProperty(SERVER_PWD),
83 prop.getProperty(SERVER_ADDR),
84 prop.getProperty(SERVER_PORT),
85 prop.getProperty(SERVER_VHOST)));
86
87 manageSender = registerProducer(rfHost,
88 MQUtil.rfProducerChannelConf(
89 prop.getProperty(SENDER_EXCHG),
90 prop.getProperty(ROUTE_KEY),
91 prop.getProperty(SENDER_QUEUE)),
92 msgOutQueue);
93 } catch (Exception e) {
94 throw new RuntimeException(e);
95 }
96 manageSender.start();
97 }
98
99 /**
100 * Returns the handle to call an api for publishing messages to RMQ server.
101 */
102 private Manageable registerProducer(BrokerHost host, Map<String, String> channelConf,
103 BlockingQueue<MessageContext> msgOutQueue) {
104 return new MQTransportImpl().registerProducer(host, channelConf, msgOutQueue);
105 }
106
107 private byte[] bytesOf(JsonObject jo) {
108 return jo.toString().getBytes();
109 }
110
111 /**
112 * Publishes Device, Topology &amp; Link event message to MQ server.
113 *
114 * @param event Event received from the corresponding service like topology, device etc
115 */
116 @Override
117 public void publish(Event<? extends Enum, ?> event) {
118 byte[] body = null;
119 if (null == event) {
120 log.error("Captured event is null...");
121 return;
122 }
123 if (event instanceof DeviceEvent) {
124 body = bytesOf(MQUtil.json((DeviceEvent) event));
125 } else if (event instanceof TopologyEvent) {
126 body = bytesOf(MQUtil.json((TopologyEvent) event));
127 } else if (event instanceof LinkEvent) {
128 body = bytesOf(MQUtil.json((LinkEvent) event));
129 } else {
130 log.error("Invalid event: '{}'", event);
131 }
132 processAndPublishMessage(body);
133 }
134
135 /**
136 * Publishes packet message to MQ server.
137 *
138 * @param context Context of the packet recieved including details like mac, length etc
139 */
140 @Override
141 public void publish(PacketContext context) {
142 byte[] body = bytesOf(MQUtil.json(context));
143 processAndPublishMessage(body);
144 }
145
146 /*
147 * Constructs message context and publish it to rabbit mq server.
148 *
149 * @param body Byte stream of the event's JSON data
150 */
151 private void processAndPublishMessage(byte[] body) {
152 Map<String, Object> props = Maps.newHashMap();
153 props.put(CORRELATION_ID, correlationId);
154 MessageContext mc = new MessageContext(body, props);
155 try {
156 msgOutQueue.put(mc);
157 String message = new String(body, "UTF-8");
158 log.debug(" [x] Sent '{}'", message);
159 } catch (InterruptedException | UnsupportedEncodingException e) {
160 log.error(ExceptionUtils.getFullStackTrace(e));
161 }
162 manageSender.publish();
163 }
164}