blob: a8c2ef401fb215fa4b740810a10fbd527e2e9dc8 [file] [log] [blame]
Jian Lie0e01c22016-02-09 14:02:49 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.provider.of.message.impl;
17
Jian Li1d13c262016-02-09 14:58:28 -080018import com.google.common.collect.Maps;
Jian Lie0e01c22016-02-09 14:02:49 -080019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li1d13c262016-02-09 14:58:28 -080024import org.onlab.metrics.MetricsService;
Jian Lie0e01c22016-02-09 14:02:49 -080025import org.onosproject.cpman.message.ControlMessageProvider;
26import org.onosproject.cpman.message.ControlMessageProviderRegistry;
27import org.onosproject.cpman.message.ControlMessageProviderService;
Jian Li1d13c262016-02-09 14:58:28 -080028import org.onosproject.net.DeviceId;
Jian Lie0e01c22016-02-09 14:02:49 -080029import org.onosproject.net.provider.AbstractProvider;
30import org.onosproject.net.provider.ProviderId;
Jian Li1d13c262016-02-09 14:58:28 -080031import org.onosproject.openflow.controller.Dpid;
32import org.onosproject.openflow.controller.OpenFlowController;
33import org.onosproject.openflow.controller.OpenFlowEventListener;
34import org.onosproject.openflow.controller.OpenFlowSwitch;
35import org.onosproject.openflow.controller.OpenFlowSwitchListener;
36import org.onosproject.openflow.controller.RoleState;
37import org.projectfloodlight.openflow.protocol.OFMessage;
38import org.projectfloodlight.openflow.protocol.OFPortStatus;
Jian Lie0e01c22016-02-09 14:02:49 -080039import org.slf4j.Logger;
40
Jian Li1d13c262016-02-09 14:58:28 -080041import java.util.HashMap;
Jian Li1d13c262016-02-09 14:58:28 -080042import java.util.concurrent.ScheduledExecutorService;
43import java.util.concurrent.ScheduledFuture;
44import java.util.concurrent.TimeUnit;
45
Jian Li1b4cb332016-03-02 16:32:51 -080046import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
47import static org.onlab.util.Tools.groupedThreads;
48import static org.onlab.util.Tools.loggableScheduledExecutor;
Jian Li1d13c262016-02-09 14:58:28 -080049import static org.onosproject.net.DeviceId.deviceId;
50import static org.onosproject.openflow.controller.Dpid.uri;
Jian Lie0e01c22016-02-09 14:02:49 -080051import static org.slf4j.LoggerFactory.getLogger;
52
53/**
54 * Provider which uses an OpenFlow controller to collect control message.
55 */
56@Component(immediate = true)
57public class OpenFlowControlMessageProvider extends AbstractProvider
58 implements ControlMessageProvider {
59
Jian Li1d13c262016-02-09 14:58:28 -080060 private final Logger log = getLogger(getClass());
Jian Lie0e01c22016-02-09 14:02:49 -080061
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected ControlMessageProviderRegistry providerRegistry;
64
Jian Li1d13c262016-02-09 14:58:28 -080065 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected OpenFlowController controller;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected MetricsService metricsService;
70
Jian Lie0e01c22016-02-09 14:02:49 -080071 private ControlMessageProviderService providerService;
72
Jian Li1d13c262016-02-09 14:58:28 -080073 private final InternalDeviceProvider listener = new InternalDeviceProvider();
74
75 private final InternalIncomingMessageProvider inMsgListener =
76 new InternalIncomingMessageProvider();
77
78 private final InternalOutgoingMessageProvider outMsgListener =
79 new InternalOutgoingMessageProvider();
80
81 private HashMap<Dpid, OpenFlowControlMessageAggregator> aggregators = Maps.newHashMap();
82 private ScheduledExecutorService executor;
83 private static final int AGGR_INIT_DELAY = 1;
84 private static final int AGGR_PERIOD = 1;
85 private static final TimeUnit AGGR_TIME_UNIT = TimeUnit.MINUTES;
86 private HashMap<Dpid, ScheduledFuture<?>> executorResults = Maps.newHashMap();
87
Jian Lie0e01c22016-02-09 14:02:49 -080088 /**
89 * Creates a provider with the supplier identifier.
90 */
91 public OpenFlowControlMessageProvider() {
92 super(new ProviderId("of", "org.onosproject.provider.openflow"));
93 }
94
95 @Activate
96 protected void activate() {
97 providerService = providerRegistry.register(this);
Jian Li1d13c262016-02-09 14:58:28 -080098
99 // listens all OpenFlow device related events
100 controller.addListener(listener);
101
102 // listens all OpenFlow incoming message events
103 controller.addEventListener(inMsgListener);
104 controller.monitorAllEvents(true);
105
106 // listens all OpenFlow outgoing message events
107 controller.getSwitches().forEach(sw -> sw.addEventListener(outMsgListener));
108
Jian Li1b4cb332016-03-02 16:32:51 -0800109 executor = loggableScheduledExecutor(
110 newSingleThreadScheduledExecutor(groupedThreads("onos/provider",
111 "aggregator")));
Jian Li1d13c262016-02-09 14:58:28 -0800112
113 connectInitialDevices();
114 log.info("Started");
Jian Lie0e01c22016-02-09 14:02:49 -0800115 }
116
117 @Deactivate
118 protected void deactivate() {
Jian Li1d13c262016-02-09 14:58:28 -0800119 controller.removeListener(listener);
Jian Lie0e01c22016-02-09 14:02:49 -0800120 providerRegistry.unregister(this);
121 providerService = null;
Jian Li1d13c262016-02-09 14:58:28 -0800122
123 // stops listening all OpenFlow incoming message events
124 controller.monitorAllEvents(false);
125 controller.removeEventListener(inMsgListener);
126
127 // stops listening all OpenFlow outgoing message events
128 controller.getSwitches().forEach(sw -> sw.removeEventListener(outMsgListener));
129
130 log.info("Stopped");
131 }
132
133 private void connectInitialDevices() {
134 for (OpenFlowSwitch sw: controller.getSwitches()) {
135 try {
136 listener.switchAdded(new Dpid(sw.getId()));
137 } catch (Exception e) {
138 log.warn("Failed initially adding {} : {}", sw.getStringId(), e.getMessage());
139 log.debug("Error details:", e);
140 }
141 }
142 }
143
144 /**
145 * A listener for OpenFlow switch event.
146 */
147 private class InternalDeviceProvider implements OpenFlowSwitchListener {
148
149 @Override
150 public void switchAdded(Dpid dpid) {
151 if (providerService == null) {
152 return;
153 }
154
155 OpenFlowSwitch sw = controller.getSwitch(dpid);
156 if (sw != null) {
157 // start to monitor the outgoing control messages
158 sw.addEventListener(outMsgListener);
159 }
160
161 DeviceId deviceId = deviceId(uri(dpid));
162 OpenFlowControlMessageAggregator ofcma =
163 new OpenFlowControlMessageAggregator(metricsService,
164 providerService, deviceId);
165 ScheduledFuture result = executor.scheduleAtFixedRate(ofcma,
166 AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT);
167 aggregators.put(dpid, ofcma);
168 executorResults.put(dpid, result);
169 }
170
171 @Override
172 public void switchRemoved(Dpid dpid) {
173 if (providerService == null) {
174 return;
175 }
176
177 OpenFlowSwitch sw = controller.getSwitch(dpid);
178 if (sw != null) {
179 // stop monitoring the outgoing control messages
180 sw.removeEventListener(outMsgListener);
181 }
182
183 // removes the aggregator when switch is removed
184 // this also stops the aggregator from running
185 OpenFlowControlMessageAggregator aggregator = aggregators.remove(dpid);
186 if (aggregator != null) {
187 executorResults.get(dpid).cancel(true);
188 executorResults.remove(dpid);
189 }
190 }
191
192 @Override
193 public void switchChanged(Dpid dpid) {
194 }
195
196 @Override
197 public void portChanged(Dpid dpid, OFPortStatus status) {
198 }
199
200 @Override
201 public void receivedRoleReply(Dpid dpid, RoleState requested, RoleState response) {
202 }
203 }
204
205 /**
206 * A listener for incoming OpenFlow messages.
207 */
208 private class InternalIncomingMessageProvider implements OpenFlowEventListener {
209
210 @Override
211 public void handleMessage(Dpid dpid, OFMessage msg) {
212 aggregators.get(dpid).increment(msg);
213 }
214 }
215
216 /**
217 * A listener for outgoing OpenFlow messages.
218 */
219 private class InternalOutgoingMessageProvider implements OpenFlowEventListener {
220
221 @Override
222 public void handleMessage(Dpid dpid, OFMessage msg) {
223 aggregators.get(dpid).increment(msg);
224 }
Jian Lie0e01c22016-02-09 14:02:49 -0800225 }
226}