blob: 002cbd4c142aa8f6485b24500ec83460f1278df6 [file] [log] [blame]
Claudine Chiu1f036b82017-03-09 16:45:56 -05001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Claudine Chiu1f036b82017-03-09 16:45:56 -05003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onlab.util.KryoNamespace;
32import org.onosproject.cfg.ComponentConfigService;
33import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.incubator.net.virtual.NetworkId;
36import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.flow.TrafficSelector;
39import org.onosproject.net.packet.OutboundPacket;
40import org.onosproject.net.packet.PacketEvent;
41import org.onosproject.net.packet.PacketPriority;
42import org.onosproject.net.packet.PacketRequest;
43import org.onosproject.net.packet.PacketStoreDelegate;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
45import org.onosproject.store.cluster.messaging.MessageSubject;
46import org.onosproject.store.serializers.KryoNamespaces;
Claudine Chiu1f036b82017-03-09 16:45:56 -050047import org.onosproject.store.service.ConsistentMap;
48import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.StorageService;
50import org.osgi.service.component.ComponentContext;
51import org.slf4j.Logger;
52
53import java.util.Dictionary;
54import java.util.List;
55import java.util.Map;
56import java.util.Objects;
57import java.util.Properties;
58import java.util.Set;
59import java.util.concurrent.ExecutorService;
60import java.util.concurrent.Executors;
61import java.util.concurrent.atomic.AtomicBoolean;
62
63import static com.google.common.base.Preconditions.checkArgument;
64import static com.google.common.base.Strings.isNullOrEmpty;
65import static java.util.concurrent.Executors.newFixedThreadPool;
66import static org.onlab.util.Tools.get;
67import static org.onlab.util.Tools.groupedThreads;
68import static org.onosproject.net.packet.PacketEvent.Type.EMIT;
69import static org.slf4j.LoggerFactory.getLogger;
70
71/**
72 * Distributed virtual packet store implementation allowing packets to be sent to
73 * remote instances. Implementation is based on DistributedPacketStore class.
74 */
75@Component(immediate = true, enabled = false)
76@Service
77public class DistributedVirtualPacketStore
78 extends AbstractVirtualStore<PacketEvent, PacketStoreDelegate>
79 implements VirtualNetworkPacketStore {
80
81 private final Logger log = getLogger(getClass());
82
83 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected MastershipService mastershipService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected ClusterService clusterService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected ClusterCommunicationService communicationService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected StorageService storageService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected ComponentConfigService cfgService;
99
100 private PacketRequestTracker tracker;
101
102 private static final MessageSubject PACKET_OUT_SUBJECT =
103 new MessageSubject("virtual-packet-out");
104
Jordan Halterman2c83a102017-08-20 17:11:41 -0700105 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Claudine Chiu1f036b82017-03-09 16:45:56 -0500106
107 private ExecutorService messageHandlingExecutor;
108
109 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
110 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
111 label = "Size of thread pool to assign message handler")
112 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
113
114 @Activate
115 public void activate(ComponentContext context) {
116 cfgService.registerProperties(getClass());
117
118 modified(context);
119
120 messageHandlingExecutor = Executors.newFixedThreadPool(
121 messageHandlerThreadPoolSize,
122 groupedThreads("onos/store/packet", "message-handlers", log));
123
124 communicationService.<OutboundPacketWrapper>addSubscriber(PACKET_OUT_SUBJECT,
125 SERIALIZER::decode,
126 packetWrapper -> notifyDelegate(packetWrapper.networkId,
127 new PacketEvent(EMIT,
128 packetWrapper.outboundPacket)),
129 messageHandlingExecutor);
130
131 tracker = new PacketRequestTracker();
132
133 log.info("Started");
134 }
135
136 @Deactivate
137 public void deactivate() {
138 cfgService.unregisterProperties(getClass(), false);
139 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
140 messageHandlingExecutor.shutdown();
141 tracker = null;
142 log.info("Stopped");
143 }
144
145 @Modified
146 public void modified(ComponentContext context) {
147 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
148
149 int newMessageHandlerThreadPoolSize;
150
151 try {
152 String s = get(properties, "messageHandlerThreadPoolSize");
153
154 newMessageHandlerThreadPoolSize =
155 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
156
157 } catch (NumberFormatException e) {
158 log.warn(e.getMessage());
159 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
160 }
161
162 // Any change in the following parameters implies thread pool restart
163 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
164 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
165 restartMessageHandlerThreadPool();
166 }
167
168 log.info(FORMAT, messageHandlerThreadPoolSize);
169 }
170
171 @Override
172 public void emit(NetworkId networkId, OutboundPacket packet) {
173 NodeId myId = clusterService.getLocalNode().id();
174 // TODO revive this when there is MastershipService support for virtual devices
175// NodeId master = mastershipService.getMasterFor(packet.sendThrough());
176//
177// if (master == null) {
178// log.warn("No master found for {}", packet.sendThrough());
179// return;
180// }
181//
182// log.debug("master {} found for {}", myId, packet.sendThrough());
183// if (myId.equals(master)) {
184// notifyDelegate(networkId, new PacketEvent(EMIT, packet));
185// return;
186// }
187//
188// communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
189// .whenComplete((r, error) -> {
190// if (error != null) {
191// log.warn("Failed to send packet-out to {}", master, error);
192// }
193// });
194 }
195
196 @Override
197 public void requestPackets(NetworkId networkId, PacketRequest request) {
198 tracker.add(networkId, request);
199
200 }
201
202 @Override
203 public void cancelPackets(NetworkId networkId, PacketRequest request) {
204 tracker.remove(networkId, request);
205 }
206
207 @Override
208 public List<PacketRequest> existingRequests(NetworkId networkId) {
209 return tracker.requests(networkId);
210 }
211
212 private final class PacketRequestTracker {
213
214 private ConsistentMap<NetworkId, Map<RequestKey, Set<PacketRequest>>> distRequests;
215 private Map<NetworkId, Map<RequestKey, Set<PacketRequest>>> requests;
216
217 private PacketRequestTracker() {
218 distRequests = storageService.<NetworkId, Map<RequestKey, Set<PacketRequest>>>consistentMapBuilder()
219 .withName("onos-virtual-packet-requests")
220 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
221 .register(KryoNamespaces.API)
222 .register(RequestKey.class)
223 .register(NetworkId.class)
224 .build()))
225 .build();
226 requests = distRequests.asJavaMap();
227 }
228
229 private void add(NetworkId networkId, PacketRequest request) {
230 AtomicBoolean firstRequest = addInternal(networkId, request);
231 PacketStoreDelegate delegate = delegateMap.get(networkId);
232 if (firstRequest.get() && delegate != null) {
233 // The instance that makes the first request will push to all devices
234 delegate.requestPackets(request);
235 }
236 }
237
238 private AtomicBoolean addInternal(NetworkId networkId, PacketRequest request) {
239 AtomicBoolean firstRequest = new AtomicBoolean(false);
240 AtomicBoolean changed = new AtomicBoolean(true);
241 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
242 requestsForNetwork.compute(key(request), (s, existingRequests) -> {
243 // Reset to false just in case this is a retry due to
244 // ConcurrentModificationException
245 firstRequest.set(false);
246 if (existingRequests == null) {
247 firstRequest.set(true);
248 return ImmutableSet.of(request);
249 } else if (!existingRequests.contains(request)) {
250 firstRequest.set(true);
251 return ImmutableSet.<PacketRequest>builder()
252 .addAll(existingRequests)
253 .add(request)
254 .build();
255 } else {
256 changed.set(false);
257 return existingRequests;
258 }
259 });
260 if (changed.get()) {
261 requests.put(networkId, requestsForNetwork);
262 }
263 return firstRequest;
264 }
265
266 private void remove(NetworkId networkId, PacketRequest request) {
267 AtomicBoolean removedLast = removeInternal(networkId, request);
268 PacketStoreDelegate delegate = delegateMap.get(networkId);
269 if (removedLast.get() && delegate != null) {
270 // The instance that removes the last request will remove from all devices
271 delegate.cancelPackets(request);
272 }
273 }
274
275 private AtomicBoolean removeInternal(NetworkId networkId, PacketRequest request) {
276 AtomicBoolean removedLast = new AtomicBoolean(false);
277 AtomicBoolean changed = new AtomicBoolean(true);
278 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
279 requestsForNetwork.computeIfPresent(key(request), (s, existingRequests) -> {
280 // Reset to false just in case this is a retry due to
281 // ConcurrentModificationException
282 removedLast.set(false);
283 if (existingRequests.contains(request)) {
284 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
285 newRequests.remove(request);
286 if (newRequests.size() > 0) {
287 return ImmutableSet.copyOf(newRequests);
288 } else {
289 removedLast.set(true);
290 return null;
291 }
292 } else {
293 changed.set(false);
294 return existingRequests;
295 }
296 });
297 if (changed.get()) {
298 requests.put(networkId, requestsForNetwork);
299 }
300 return removedLast;
301 }
302
303 private List<PacketRequest> requests(NetworkId networkId) {
304 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
305 List<PacketRequest> list = Lists.newArrayList();
306 requestsForNetwork.values().forEach(v -> list.addAll(v));
307 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
308 return list;
309 }
310
311 /*
312 * Gets PacketRequests for specified networkId.
313 */
314 private Map<RequestKey, Set<PacketRequest>> getMap(NetworkId networkId) {
315 return requests.computeIfAbsent(networkId, networkId1 -> {
316 log.debug("Creating new map for {}", networkId1);
317 Map newMap = Maps.newHashMap();
318 return newMap;
319 });
320 }
321 }
322
323 /**
324 * Creates a new request key from a packet request.
325 *
326 * @param request packet request
327 * @return request key
328 */
329 private static RequestKey key(PacketRequest request) {
330 return new RequestKey(request.selector(), request.priority());
331 }
332
333 /**
334 * Key of a packet request.
335 */
336 private static final class RequestKey {
337 private final TrafficSelector selector;
338 private final PacketPriority priority;
339
340 private RequestKey(TrafficSelector selector, PacketPriority priority) {
341 this.selector = selector;
342 this.priority = priority;
343 }
344
345 @Override
346 public int hashCode() {
347 return Objects.hash(selector, priority);
348 }
349
350 @Override
351 public boolean equals(Object other) {
352 if (other == this) {
353 return true;
354 }
355
356 if (!(other instanceof RequestKey)) {
357 return false;
358 }
359
360 RequestKey that = (RequestKey) other;
361
362 return Objects.equals(selector, that.selector) &&
363 Objects.equals(priority, that.priority);
364 }
365 }
366
367 private static OutboundPacketWrapper wrapper(NetworkId networkId, OutboundPacket outboundPacket) {
368 return new OutboundPacketWrapper(networkId, outboundPacket);
369 }
370
371 /*
372 * OutboundPacket in
373 */
374 private static final class OutboundPacketWrapper {
375 private NetworkId networkId;
376 private OutboundPacket outboundPacket;
377
378 private OutboundPacketWrapper(NetworkId networkId, OutboundPacket outboundPacket) {
379 this.networkId = networkId;
380 this.outboundPacket = outboundPacket;
381 }
382
383 }
384
385 /**
386 * Sets thread pool size of message handler.
387 *
388 * @param poolSize
389 */
390 private void setMessageHandlerThreadPoolSize(int poolSize) {
391 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
392 messageHandlerThreadPoolSize = poolSize;
393 }
394
395 /**
396 * Restarts thread pool of message handler.
397 */
398 private void restartMessageHandlerThreadPool() {
399 ExecutorService prevExecutor = messageHandlingExecutor;
400 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
401 groupedThreads("DistPktStore", "messageHandling-%d", log));
402 prevExecutor.shutdown();
403 }
404
405 /**
406 * Gets current thread pool size of message handler.
407 *
408 * @return messageHandlerThreadPoolSize
409 */
410 private int getMessageHandlerThreadPoolSize() {
411 return messageHandlerThreadPoolSize;
412 }
413}