blob: 49b5f64be3785da5465c3af207154c873ae8ca96 [file] [log] [blame]
yoonseon322c9c32016-12-07 16:47:02 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
yoonseon322c9c32016-12-07 16:47:02 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.Lists;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.incubator.net.virtual.AbstractVnetService;
25import org.onosproject.incubator.net.virtual.NetworkId;
26import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
27import org.onosproject.incubator.net.virtual.VirtualNetworkService;
28import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProviderService;
29import org.onosproject.incubator.net.virtual.provider.VirtualPacketProvider;
30import org.onosproject.incubator.net.virtual.provider.VirtualPacketProviderService;
31import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.flow.DefaultTrafficTreatment;
36import org.onosproject.net.flow.TrafficSelector;
37import org.onosproject.net.flowobjective.DefaultForwardingObjective;
38import org.onosproject.net.flowobjective.FlowObjectiveService;
39import org.onosproject.net.flowobjective.ForwardingObjective;
40import org.onosproject.net.flowobjective.Objective;
41import org.onosproject.net.flowobjective.ObjectiveContext;
42import org.onosproject.net.flowobjective.ObjectiveError;
43import org.onosproject.net.packet.DefaultPacketRequest;
44import org.onosproject.net.packet.OutboundPacket;
45import org.onosproject.net.packet.PacketContext;
46import org.onosproject.net.packet.PacketEvent;
47import org.onosproject.net.packet.PacketPriority;
48import org.onosproject.net.packet.PacketProcessor;
49import org.onosproject.net.packet.PacketProcessorEntry;
50import org.onosproject.net.packet.PacketRequest;
51import org.onosproject.net.packet.PacketService;
52import org.onosproject.net.packet.PacketStoreDelegate;
53import org.onosproject.net.provider.ProviderId;
54import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57import java.util.List;
58import java.util.Optional;
59import java.util.Set;
60
61public class VirtualNetworkPacketManager extends AbstractVnetService
62 implements PacketService {
63
64 private final Logger log = LoggerFactory.getLogger(getClass());
65
66 private final VirtualNetworkService manager;
67
68 protected VirtualNetworkPacketStore store;
69 private final List<ProcessorEntry> processors = Lists.newCopyOnWriteArrayList();
70
71 private NodeId localNodeId;
72
73 private DeviceService deviceService;
74 private FlowObjectiveService objectiveService;
75
76 private VirtualProviderRegistryService providerRegistryService = null;
77
78 private InternalPacketProviderService providerService = null;
79
80 public VirtualNetworkPacketManager(VirtualNetworkService virtualNetworkManager,
81 NetworkId networkId) {
82 super(virtualNetworkManager, networkId);
83 this.manager = virtualNetworkManager;
84
85 //Set node id as same as the node hosting virtual manager
86 ClusterService clusterService = serviceDirectory.get(ClusterService.class);
87 this.localNodeId = clusterService.getLocalNode().id();
88
89 this.store = serviceDirectory.get(VirtualNetworkPacketStore.class);
90 this.store.setDelegate(networkId(), new InternalStoreDelegate());
91
92 this.deviceService = manager.get(networkId(), DeviceService.class);
93 this.objectiveService = manager.get(networkId(), FlowObjectiveService.class);
94
95 providerRegistryService =
96 serviceDirectory.get(VirtualProviderRegistryService.class);
97 providerService = new InternalPacketProviderService();
98 providerRegistryService.registerProviderService(networkId(), providerService);
99 }
100
101 @Override
102 public void addProcessor(PacketProcessor processor, int priority) {
103 ProcessorEntry entry = new ProcessorEntry(processor, priority);
104
105 // Insert the new processor according to its priority.
106 int i = 0;
107 for (; i < processors.size(); i++) {
108 if (priority < processors.get(i).priority()) {
109 break;
110 }
111 }
112 processors.add(i, entry);
113 }
114
115 @Override
116 public void removeProcessor(PacketProcessor processor) {
117 // Remove the processor entry.
118 for (int i = 0; i < processors.size(); i++) {
119 if (processors.get(i).processor() == processor) {
120 processors.remove(i);
121 break;
122 }
123 }
124 }
125
126 @Override
127 public List<PacketProcessorEntry> getProcessors() {
128 return ImmutableList.copyOf(processors);
129 }
130
131 @Override
132 public void requestPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
133 PacketRequest request = new DefaultPacketRequest(selector, priority, appId,
134 localNodeId, Optional.empty());
135 store.requestPackets(networkId(), request);
136 }
137
138 @Override
139 public void requestPackets(TrafficSelector selector, PacketPriority priority,
140 ApplicationId appId, Optional<DeviceId> deviceId) {
141 PacketRequest request =
142 new DefaultPacketRequest(selector, priority, appId,
143 localNodeId, deviceId);
144
145 store.requestPackets(networkId(), request);
146 }
147
148 @Override
149 public void cancelPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
150 PacketRequest request = new DefaultPacketRequest(selector, priority, appId,
151 localNodeId, Optional.empty());
152 store.cancelPackets(networkId(), request);
153 }
154
155 @Override
156 public void cancelPackets(TrafficSelector selector, PacketPriority priority,
157 ApplicationId appId, Optional<DeviceId> deviceId) {
158 PacketRequest request = new DefaultPacketRequest(selector, priority,
159 appId, localNodeId,
160 deviceId);
161 store.cancelPackets(networkId(), request);
162 }
163
164 @Override
165 public List<PacketRequest> getRequests() {
166 return store.existingRequests(networkId());
167 }
168
169 @Override
170 public void emit(OutboundPacket packet) {
171 store.emit(networkId(), packet);
172 }
173
174 /**
175 * Personalized packet provider service issued to the supplied provider.
176 */
177 private class InternalPacketProviderService
178 extends AbstractVirtualProviderService<VirtualPacketProvider>
179 implements VirtualPacketProviderService {
180
181 protected InternalPacketProviderService() {
182 super();
183
184 Set<ProviderId> providerIds =
185 providerRegistryService.getProvidersByService(this);
186 ProviderId providerId = providerIds.stream().findFirst().get();
187 VirtualPacketProvider provider = (VirtualPacketProvider)
188 providerRegistryService.getProvider(providerId);
189 setProvider(provider);
190 }
191
192 @Override
193 public void processPacket(PacketContext context) {
194 // TODO filter packets sent to processors based on registrations
195 for (ProcessorEntry entry : processors) {
196 try {
197 long start = System.nanoTime();
198 entry.processor().process(context);
199 entry.addNanos(System.nanoTime() - start);
200 } catch (Exception e) {
201 log.warn("Packet processor {} threw an exception", entry.processor(), e);
202 }
203 }
204 }
205
206 }
207
208 /**
209 * Entity for tracking stats for a packet processor.
210 */
211 private class ProcessorEntry implements PacketProcessorEntry {
212 private final PacketProcessor processor;
213 private final int priority;
214 private long invocations = 0;
215 private long nanos = 0;
216
217 public ProcessorEntry(PacketProcessor processor, int priority) {
218 this.processor = processor;
219 this.priority = priority;
220 }
221
222 @Override
223 public PacketProcessor processor() {
224 return processor;
225 }
226
227 @Override
228 public int priority() {
229 return priority;
230 }
231
232 @Override
233 public long invocations() {
234 return invocations;
235 }
236
237 @Override
238 public long totalNanos() {
239 return nanos;
240 }
241
242 @Override
243 public long averageNanos() {
244 return invocations > 0 ? nanos / invocations : 0;
245 }
246
247 void addNanos(long nanos) {
248 this.nanos += nanos;
249 this.invocations++;
250 }
251 }
252
253 private void localEmit(NetworkId networkId, OutboundPacket packet) {
254 Device device = deviceService.getDevice(packet.sendThrough());
255 if (device == null) {
256 return;
257 }
Yoonseon Hanc8089db2017-03-22 20:22:12 +0900258 VirtualPacketProvider packetProvider = providerService.provider();
yoonseon322c9c32016-12-07 16:47:02 -0800259
260 if (packetProvider != null) {
261 packetProvider.emit(networkId, packet);
262 }
263 }
264
265 /**
266 * Internal callback from the packet store.
267 */
268 protected class InternalStoreDelegate implements PacketStoreDelegate {
269 @Override
270 public void notify(PacketEvent event) {
271 localEmit(networkId(), event.subject());
272 }
273
274 @Override
275 public void requestPackets(PacketRequest request) {
276 DeviceId deviceid = request.deviceId().orElse(null);
277
278 if (deviceid != null) {
279 pushRule(deviceService.getDevice(deviceid), request);
280 } else {
281 pushToAllDevices(request);
282 }
283 }
284
285 @Override
286 public void cancelPackets(PacketRequest request) {
287 DeviceId deviceid = request.deviceId().orElse(null);
288
289 if (deviceid != null) {
290 removeRule(deviceService.getDevice(deviceid), request);
291 } else {
292 removeFromAllDevices(request);
293 }
294 }
295 }
296
297 /**
298 * Pushes packet intercept flow rules to the device.
299 *
300 * @param device the device to push the rules to
301 * @param request the packet request
302 */
303 private void pushRule(Device device, PacketRequest request) {
Claudine Chiu93ce3e82017-02-18 14:28:22 -0500304 if (!device.type().equals(Device.Type.VIRTUAL)) {
yoonseon322c9c32016-12-07 16:47:02 -0800305 return;
306 }
307
308 ForwardingObjective forwarding = createBuilder(request)
309 .add(new ObjectiveContext() {
310 @Override
311 public void onError(Objective objective, ObjectiveError error) {
312 log.warn("Failed to install packet request {} to {}: {}",
313 request, device.id(), error);
314 }
315 });
316
317 objectiveService.forward(device.id(), forwarding);
318 }
319
320 /**
321 * Removes packet intercept flow rules from the device.
322 *
323 * @param device the device to remove the rules deom
324 * @param request the packet request
325 */
326 private void removeRule(Device device, PacketRequest request) {
Claudine Chiu93ce3e82017-02-18 14:28:22 -0500327 if (!device.type().equals(Device.Type.VIRTUAL)) {
yoonseon322c9c32016-12-07 16:47:02 -0800328 return;
329 }
330 ForwardingObjective forwarding = createBuilder(request)
331 .remove(new ObjectiveContext() {
332 @Override
333 public void onError(Objective objective, ObjectiveError error) {
334 log.warn("Failed to withdraw packet request {} from {}: {}",
335 request, device.id(), error);
336 }
337 });
338 objectiveService.forward(device.id(), forwarding);
339 }
340
341 /**
342 * Pushes a packet request flow rule to all devices.
343 *
344 * @param request the packet request
345 */
346 private void pushToAllDevices(PacketRequest request) {
347 log.debug("Pushing packet request {} to all devices", request);
348 for (Device device : deviceService.getDevices()) {
349 pushRule(device, request);
350 }
351 }
352
353 /**
354 * Removes packet request flow rule from all devices.
355 *
356 * @param request the packet request
357 */
358 private void removeFromAllDevices(PacketRequest request) {
359 deviceService.getAvailableDevices().forEach(d -> removeRule(d, request));
360 }
361
362 private DefaultForwardingObjective.Builder createBuilder(PacketRequest request) {
363 return DefaultForwardingObjective.builder()
364 .withPriority(request.priority().priorityValue())
365 .withSelector(request.selector())
Yoonseon Hanc8089db2017-03-22 20:22:12 +0900366 .fromApp(request.appId())
yoonseon322c9c32016-12-07 16:47:02 -0800367 .withFlag(ForwardingObjective.Flag.VERSATILE)
368 .withTreatment(DefaultTrafficTreatment.builder().punt().build())
369 .makePermanent();
370 }
371}