blob: c34bb1a87edf9c0985f12fc28483171695d06ded [file] [log] [blame]
ADARA Networks1fb1eb12016-09-01 12:04:07 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.rabbitmq.listener;
18
19import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
20import static org.onlab.util.Tools.groupedThreads;
21
22import java.util.concurrent.ExecutorService;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.onosproject.net.device.DeviceEvent;
30import org.onosproject.net.device.DeviceListener;
31import org.onosproject.net.device.DeviceService;
32import org.onosproject.net.link.LinkEvent;
33import org.onosproject.net.link.LinkListener;
34import org.onosproject.net.link.LinkService;
35import org.onosproject.net.link.ProbedLinkProvider;
36import org.onosproject.net.packet.PacketContext;
37import org.onosproject.net.packet.PacketProcessor;
38import org.onosproject.net.packet.PacketService;
39import org.onosproject.net.provider.AbstractProvider;
40import org.onosproject.net.provider.ProviderId;
41import org.onosproject.net.topology.TopologyEvent;
42import org.onosproject.net.topology.TopologyListener;
43import org.onosproject.net.topology.TopologyService;
44import org.onosproject.rabbitmq.api.MQConstants;
45import org.onosproject.rabbitmq.api.MQService;
46import org.onosproject.rabbitmq.impl.MQServiceImpl;
47import org.osgi.service.component.ComponentContext;
48import org.slf4j.Logger;
49import org.slf4j.LoggerFactory;
50
51/**
52 * Listens to events generated from Device Event/PKT_IN/Topology/Link.
53 * Then publishes events to rabbitmq server via publish() api.
54 */
55
56@Component(immediate = true)
57public class MQEventHandler extends AbstractProvider
58 implements ProbedLinkProvider {
59
60 private static final Logger log = LoggerFactory.getLogger(
61 MQEventHandler.class);
62 private static final String PROVIDER_NAME = MQConstants.ONOS_APP_NAME;
63 private static final int PKT_PROC_PRIO = 1;
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected DeviceService deviceService;
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected PacketService packetService;
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected LinkService linkService;
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected TopologyService topologyService;
73
74 private MQService mqService;
75 private DeviceListener deviceListener;
76 protected ExecutorService eventExecutor;
77
78 private final InternalPacketProcessor packetProcessor =
79 new InternalPacketProcessor();
80 private final LinkListener linkListener = new InternalLinkListener();
81 private final TopologyListener topologyListener =
82 new InternalTopologyListener();
83
84 /**
85 * Initialize parent class with provider.
86 */
87 public MQEventHandler() {
88 super(new ProviderId("rabbitmq", PROVIDER_NAME));
89 }
90
91 @Activate
92 protected void activate(ComponentContext context) {
93 mqService = new MQServiceImpl(context);
94 eventExecutor = newSingleThreadScheduledExecutor(
95 groupedThreads("onos/deviceevents", "events-%d", log));
96 deviceListener = new InternalDeviceListener();
97 deviceService.addListener(deviceListener);
98 packetService.addProcessor(packetProcessor,
99 PacketProcessor.advisor(PKT_PROC_PRIO));
100 linkService.addListener(linkListener);
101 topologyService.addListener(topologyListener);
102 log.info("Started");
103 }
104
105 @Deactivate
106 protected void deactivate() {
107 deviceService.removeListener(deviceListener);
108 packetService.removeProcessor(packetProcessor);
109 eventExecutor.shutdownNow();
110 eventExecutor = null;
111 linkService.removeListener(linkListener);
112 topologyService.removeListener(topologyListener);
113 log.info("Stopped");
114 }
115
116 /**
117 * Captures incoming device events.
118 */
119 private class InternalDeviceListener implements DeviceListener {
120
121 @Override
122 public void event(DeviceEvent event) {
123 if (event == null) {
124 log.error("Device event is null.");
125 return;
126 }
127 mqService.publish(event);
128 }
129 }
130
131 /**
132 * Captures incoming packets from switches connected to ONOS
133 * controller..
134 */
135 private class InternalPacketProcessor implements PacketProcessor {
136 @Override
137 public void process(PacketContext context) {
138 if (context == null) {
139 log.error("Packet context is null.");
140 return;
141 }
142 mqService.publish(context);
143 }
144 }
145
146 /**
147 * Listens to link events and processes the link additions.
148 */
149 private class InternalLinkListener implements LinkListener {
150 @Override
151 public void event(LinkEvent event) {
152 if (event == null) {
153 log.error("Link event is null.");
154 return;
155 }
156 mqService.publish(event);
157 }
158 }
159
160 /**
161 * Listens to topology events and processes the topology changes.
162 */
163 private class InternalTopologyListener implements TopologyListener {
164
165 @Override
166 public void event(TopologyEvent event) {
167 if (event == null) {
168 log.error("Topology event is null.");
169 return;
170 }
171 mqService.publish(event);
172 }
173 }
174}