blob: 41b7dceb9c65b105968b4f6ae82662f4a3cea431 [file] [log] [blame]
yoonseon322c9c32016-12-07 16:47:02 -08001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.virtual.impl;
18
19import com.google.common.collect.ImmutableList;
20import com.google.common.collect.Lists;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.incubator.net.virtual.AbstractVnetService;
25import org.onosproject.incubator.net.virtual.NetworkId;
26import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
27import org.onosproject.incubator.net.virtual.VirtualNetworkService;
28import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProviderService;
29import org.onosproject.incubator.net.virtual.provider.VirtualPacketProvider;
30import org.onosproject.incubator.net.virtual.provider.VirtualPacketProviderService;
31import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.device.DeviceService;
35import org.onosproject.net.flow.DefaultTrafficTreatment;
36import org.onosproject.net.flow.TrafficSelector;
37import org.onosproject.net.flowobjective.DefaultForwardingObjective;
38import org.onosproject.net.flowobjective.FlowObjectiveService;
39import org.onosproject.net.flowobjective.ForwardingObjective;
40import org.onosproject.net.flowobjective.Objective;
41import org.onosproject.net.flowobjective.ObjectiveContext;
42import org.onosproject.net.flowobjective.ObjectiveError;
43import org.onosproject.net.packet.DefaultPacketRequest;
44import org.onosproject.net.packet.OutboundPacket;
45import org.onosproject.net.packet.PacketContext;
46import org.onosproject.net.packet.PacketEvent;
47import org.onosproject.net.packet.PacketPriority;
48import org.onosproject.net.packet.PacketProcessor;
49import org.onosproject.net.packet.PacketProcessorEntry;
50import org.onosproject.net.packet.PacketRequest;
51import org.onosproject.net.packet.PacketService;
52import org.onosproject.net.packet.PacketStoreDelegate;
53import org.onosproject.net.provider.ProviderId;
54import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57import java.util.List;
58import java.util.Optional;
59import java.util.Set;
60
61public class VirtualNetworkPacketManager extends AbstractVnetService
62 implements PacketService {
63
64 private final Logger log = LoggerFactory.getLogger(getClass());
65
66 private final VirtualNetworkService manager;
67
68 protected VirtualNetworkPacketStore store;
69 private final List<ProcessorEntry> processors = Lists.newCopyOnWriteArrayList();
70
71 private NodeId localNodeId;
72
73 private DeviceService deviceService;
74 private FlowObjectiveService objectiveService;
75
76 private VirtualProviderRegistryService providerRegistryService = null;
77
78 private InternalPacketProviderService providerService = null;
79
80 public VirtualNetworkPacketManager(VirtualNetworkService virtualNetworkManager,
81 NetworkId networkId) {
82 super(virtualNetworkManager, networkId);
83 this.manager = virtualNetworkManager;
84
85 //Set node id as same as the node hosting virtual manager
86 ClusterService clusterService = serviceDirectory.get(ClusterService.class);
87 this.localNodeId = clusterService.getLocalNode().id();
88
89 this.store = serviceDirectory.get(VirtualNetworkPacketStore.class);
90 this.store.setDelegate(networkId(), new InternalStoreDelegate());
91
92 this.deviceService = manager.get(networkId(), DeviceService.class);
93 this.objectiveService = manager.get(networkId(), FlowObjectiveService.class);
94
95 providerRegistryService =
96 serviceDirectory.get(VirtualProviderRegistryService.class);
97 providerService = new InternalPacketProviderService();
98 providerRegistryService.registerProviderService(networkId(), providerService);
99 }
100
101 @Override
102 public void addProcessor(PacketProcessor processor, int priority) {
103 ProcessorEntry entry = new ProcessorEntry(processor, priority);
104
105 // Insert the new processor according to its priority.
106 int i = 0;
107 for (; i < processors.size(); i++) {
108 if (priority < processors.get(i).priority()) {
109 break;
110 }
111 }
112 processors.add(i, entry);
113 }
114
115 @Override
116 public void removeProcessor(PacketProcessor processor) {
117 // Remove the processor entry.
118 for (int i = 0; i < processors.size(); i++) {
119 if (processors.get(i).processor() == processor) {
120 processors.remove(i);
121 break;
122 }
123 }
124 }
125
126 @Override
127 public List<PacketProcessorEntry> getProcessors() {
128 return ImmutableList.copyOf(processors);
129 }
130
131 @Override
132 public void requestPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
133 PacketRequest request = new DefaultPacketRequest(selector, priority, appId,
134 localNodeId, Optional.empty());
135 store.requestPackets(networkId(), request);
136 }
137
138 @Override
139 public void requestPackets(TrafficSelector selector, PacketPriority priority,
140 ApplicationId appId, Optional<DeviceId> deviceId) {
141 PacketRequest request =
142 new DefaultPacketRequest(selector, priority, appId,
143 localNodeId, deviceId);
144
145 store.requestPackets(networkId(), request);
146 }
147
148 @Override
149 public void cancelPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
150 PacketRequest request = new DefaultPacketRequest(selector, priority, appId,
151 localNodeId, Optional.empty());
152 store.cancelPackets(networkId(), request);
153 }
154
155 @Override
156 public void cancelPackets(TrafficSelector selector, PacketPriority priority,
157 ApplicationId appId, Optional<DeviceId> deviceId) {
158 PacketRequest request = new DefaultPacketRequest(selector, priority,
159 appId, localNodeId,
160 deviceId);
161 store.cancelPackets(networkId(), request);
162 }
163
164 @Override
165 public List<PacketRequest> getRequests() {
166 return store.existingRequests(networkId());
167 }
168
169 @Override
170 public void emit(OutboundPacket packet) {
171 store.emit(networkId(), packet);
172 }
173
174 /**
175 * Personalized packet provider service issued to the supplied provider.
176 */
177 private class InternalPacketProviderService
178 extends AbstractVirtualProviderService<VirtualPacketProvider>
179 implements VirtualPacketProviderService {
180
181 protected InternalPacketProviderService() {
182 super();
183
184 Set<ProviderId> providerIds =
185 providerRegistryService.getProvidersByService(this);
186 ProviderId providerId = providerIds.stream().findFirst().get();
187 VirtualPacketProvider provider = (VirtualPacketProvider)
188 providerRegistryService.getProvider(providerId);
189 setProvider(provider);
190 }
191
192 @Override
193 public void processPacket(PacketContext context) {
194 // TODO filter packets sent to processors based on registrations
195 for (ProcessorEntry entry : processors) {
196 try {
197 long start = System.nanoTime();
198 entry.processor().process(context);
199 entry.addNanos(System.nanoTime() - start);
200 } catch (Exception e) {
201 log.warn("Packet processor {} threw an exception", entry.processor(), e);
202 }
203 }
204 }
205
206 }
207
208 /**
209 * Entity for tracking stats for a packet processor.
210 */
211 private class ProcessorEntry implements PacketProcessorEntry {
212 private final PacketProcessor processor;
213 private final int priority;
214 private long invocations = 0;
215 private long nanos = 0;
216
217 public ProcessorEntry(PacketProcessor processor, int priority) {
218 this.processor = processor;
219 this.priority = priority;
220 }
221
222 @Override
223 public PacketProcessor processor() {
224 return processor;
225 }
226
227 @Override
228 public int priority() {
229 return priority;
230 }
231
232 @Override
233 public long invocations() {
234 return invocations;
235 }
236
237 @Override
238 public long totalNanos() {
239 return nanos;
240 }
241
242 @Override
243 public long averageNanos() {
244 return invocations > 0 ? nanos / invocations : 0;
245 }
246
247 void addNanos(long nanos) {
248 this.nanos += nanos;
249 this.invocations++;
250 }
251 }
252
253 private void localEmit(NetworkId networkId, OutboundPacket packet) {
254 Device device = deviceService.getDevice(packet.sendThrough());
255 if (device == null) {
256 return;
257 }
258 VirtualPacketProvider packetProvider =
259 (VirtualPacketProvider) providerService.provider();
260
261 if (packetProvider != null) {
262 packetProvider.emit(networkId, packet);
263 }
264 }
265
266 /**
267 * Internal callback from the packet store.
268 */
269 protected class InternalStoreDelegate implements PacketStoreDelegate {
270 @Override
271 public void notify(PacketEvent event) {
272 localEmit(networkId(), event.subject());
273 }
274
275 @Override
276 public void requestPackets(PacketRequest request) {
277 DeviceId deviceid = request.deviceId().orElse(null);
278
279 if (deviceid != null) {
280 pushRule(deviceService.getDevice(deviceid), request);
281 } else {
282 pushToAllDevices(request);
283 }
284 }
285
286 @Override
287 public void cancelPackets(PacketRequest request) {
288 DeviceId deviceid = request.deviceId().orElse(null);
289
290 if (deviceid != null) {
291 removeRule(deviceService.getDevice(deviceid), request);
292 } else {
293 removeFromAllDevices(request);
294 }
295 }
296 }
297
298 /**
299 * Pushes packet intercept flow rules to the device.
300 *
301 * @param device the device to push the rules to
302 * @param request the packet request
303 */
304 private void pushRule(Device device, PacketRequest request) {
305 if (!device.type().equals(Device.Type.SWITCH)) {
306 return;
307 }
308
309 ForwardingObjective forwarding = createBuilder(request)
310 .add(new ObjectiveContext() {
311 @Override
312 public void onError(Objective objective, ObjectiveError error) {
313 log.warn("Failed to install packet request {} to {}: {}",
314 request, device.id(), error);
315 }
316 });
317
318 objectiveService.forward(device.id(), forwarding);
319 }
320
321 /**
322 * Removes packet intercept flow rules from the device.
323 *
324 * @param device the device to remove the rules deom
325 * @param request the packet request
326 */
327 private void removeRule(Device device, PacketRequest request) {
328 if (!device.type().equals(Device.Type.SWITCH)) {
329 return;
330 }
331 ForwardingObjective forwarding = createBuilder(request)
332 .remove(new ObjectiveContext() {
333 @Override
334 public void onError(Objective objective, ObjectiveError error) {
335 log.warn("Failed to withdraw packet request {} from {}: {}",
336 request, device.id(), error);
337 }
338 });
339 objectiveService.forward(device.id(), forwarding);
340 }
341
342 /**
343 * Pushes a packet request flow rule to all devices.
344 *
345 * @param request the packet request
346 */
347 private void pushToAllDevices(PacketRequest request) {
348 log.debug("Pushing packet request {} to all devices", request);
349 for (Device device : deviceService.getDevices()) {
350 pushRule(device, request);
351 }
352 }
353
354 /**
355 * Removes packet request flow rule from all devices.
356 *
357 * @param request the packet request
358 */
359 private void removeFromAllDevices(PacketRequest request) {
360 deviceService.getAvailableDevices().forEach(d -> removeRule(d, request));
361 }
362
363 private DefaultForwardingObjective.Builder createBuilder(PacketRequest request) {
364 return DefaultForwardingObjective.builder()
365 .withPriority(request.priority().priorityValue())
366 .withSelector(request.selector())
367 .fromApp(manager.getVirtualNetworkApplicationId(networkId()))
368 .withFlag(ForwardingObjective.Flag.VERSATILE)
369 .withTreatment(DefaultTrafficTreatment.builder().punt().build())
370 .makePermanent();
371 }
372}