blob: 18a8cc3e26df4c3313a192c0a5491dfd3205364f [file] [log] [blame]
Claudine Chiu1f036b82017-03-09 16:45:56 -05001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onlab.util.KryoNamespace;
32import org.onosproject.cfg.ComponentConfigService;
33import org.onosproject.cluster.ClusterService;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.incubator.net.virtual.NetworkId;
36import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.flow.TrafficSelector;
39import org.onosproject.net.packet.OutboundPacket;
40import org.onosproject.net.packet.PacketEvent;
41import org.onosproject.net.packet.PacketPriority;
42import org.onosproject.net.packet.PacketRequest;
43import org.onosproject.net.packet.PacketStoreDelegate;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
45import org.onosproject.store.cluster.messaging.MessageSubject;
46import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.serializers.StoreSerializer;
48import org.onosproject.store.service.ConsistentMap;
49import org.onosproject.store.service.Serializer;
50import org.onosproject.store.service.StorageService;
51import org.osgi.service.component.ComponentContext;
52import org.slf4j.Logger;
53
54import java.util.Dictionary;
55import java.util.List;
56import java.util.Map;
57import java.util.Objects;
58import java.util.Properties;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61import java.util.concurrent.Executors;
62import java.util.concurrent.atomic.AtomicBoolean;
63
64import static com.google.common.base.Preconditions.checkArgument;
65import static com.google.common.base.Strings.isNullOrEmpty;
66import static java.util.concurrent.Executors.newFixedThreadPool;
67import static org.onlab.util.Tools.get;
68import static org.onlab.util.Tools.groupedThreads;
69import static org.onosproject.net.packet.PacketEvent.Type.EMIT;
70import static org.slf4j.LoggerFactory.getLogger;
71
72/**
73 * Distributed virtual packet store implementation allowing packets to be sent to
74 * remote instances. Implementation is based on DistributedPacketStore class.
75 */
76@Component(immediate = true, enabled = false)
77@Service
78public class DistributedVirtualPacketStore
79 extends AbstractVirtualStore<PacketEvent, PacketStoreDelegate>
80 implements VirtualNetworkPacketStore {
81
82 private final Logger log = getLogger(getClass());
83
84 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87 protected MastershipService mastershipService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected ClusterService clusterService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected ClusterCommunicationService communicationService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected StorageService storageService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected ComponentConfigService cfgService;
100
101 private PacketRequestTracker tracker;
102
103 private static final MessageSubject PACKET_OUT_SUBJECT =
104 new MessageSubject("virtual-packet-out");
105
106 private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
107
108 private ExecutorService messageHandlingExecutor;
109
110 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
111 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
112 label = "Size of thread pool to assign message handler")
113 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
114
115 @Activate
116 public void activate(ComponentContext context) {
117 cfgService.registerProperties(getClass());
118
119 modified(context);
120
121 messageHandlingExecutor = Executors.newFixedThreadPool(
122 messageHandlerThreadPoolSize,
123 groupedThreads("onos/store/packet", "message-handlers", log));
124
125 communicationService.<OutboundPacketWrapper>addSubscriber(PACKET_OUT_SUBJECT,
126 SERIALIZER::decode,
127 packetWrapper -> notifyDelegate(packetWrapper.networkId,
128 new PacketEvent(EMIT,
129 packetWrapper.outboundPacket)),
130 messageHandlingExecutor);
131
132 tracker = new PacketRequestTracker();
133
134 log.info("Started");
135 }
136
137 @Deactivate
138 public void deactivate() {
139 cfgService.unregisterProperties(getClass(), false);
140 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
141 messageHandlingExecutor.shutdown();
142 tracker = null;
143 log.info("Stopped");
144 }
145
146 @Modified
147 public void modified(ComponentContext context) {
148 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
149
150 int newMessageHandlerThreadPoolSize;
151
152 try {
153 String s = get(properties, "messageHandlerThreadPoolSize");
154
155 newMessageHandlerThreadPoolSize =
156 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
157
158 } catch (NumberFormatException e) {
159 log.warn(e.getMessage());
160 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
161 }
162
163 // Any change in the following parameters implies thread pool restart
164 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
165 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
166 restartMessageHandlerThreadPool();
167 }
168
169 log.info(FORMAT, messageHandlerThreadPoolSize);
170 }
171
172 @Override
173 public void emit(NetworkId networkId, OutboundPacket packet) {
174 NodeId myId = clusterService.getLocalNode().id();
175 // TODO revive this when there is MastershipService support for virtual devices
176// NodeId master = mastershipService.getMasterFor(packet.sendThrough());
177//
178// if (master == null) {
179// log.warn("No master found for {}", packet.sendThrough());
180// return;
181// }
182//
183// log.debug("master {} found for {}", myId, packet.sendThrough());
184// if (myId.equals(master)) {
185// notifyDelegate(networkId, new PacketEvent(EMIT, packet));
186// return;
187// }
188//
189// communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
190// .whenComplete((r, error) -> {
191// if (error != null) {
192// log.warn("Failed to send packet-out to {}", master, error);
193// }
194// });
195 }
196
197 @Override
198 public void requestPackets(NetworkId networkId, PacketRequest request) {
199 tracker.add(networkId, request);
200
201 }
202
203 @Override
204 public void cancelPackets(NetworkId networkId, PacketRequest request) {
205 tracker.remove(networkId, request);
206 }
207
208 @Override
209 public List<PacketRequest> existingRequests(NetworkId networkId) {
210 return tracker.requests(networkId);
211 }
212
213 private final class PacketRequestTracker {
214
215 private ConsistentMap<NetworkId, Map<RequestKey, Set<PacketRequest>>> distRequests;
216 private Map<NetworkId, Map<RequestKey, Set<PacketRequest>>> requests;
217
218 private PacketRequestTracker() {
219 distRequests = storageService.<NetworkId, Map<RequestKey, Set<PacketRequest>>>consistentMapBuilder()
220 .withName("onos-virtual-packet-requests")
221 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
222 .register(KryoNamespaces.API)
223 .register(RequestKey.class)
224 .register(NetworkId.class)
225 .build()))
226 .build();
227 requests = distRequests.asJavaMap();
228 }
229
230 private void add(NetworkId networkId, PacketRequest request) {
231 AtomicBoolean firstRequest = addInternal(networkId, request);
232 PacketStoreDelegate delegate = delegateMap.get(networkId);
233 if (firstRequest.get() && delegate != null) {
234 // The instance that makes the first request will push to all devices
235 delegate.requestPackets(request);
236 }
237 }
238
239 private AtomicBoolean addInternal(NetworkId networkId, PacketRequest request) {
240 AtomicBoolean firstRequest = new AtomicBoolean(false);
241 AtomicBoolean changed = new AtomicBoolean(true);
242 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
243 requestsForNetwork.compute(key(request), (s, existingRequests) -> {
244 // Reset to false just in case this is a retry due to
245 // ConcurrentModificationException
246 firstRequest.set(false);
247 if (existingRequests == null) {
248 firstRequest.set(true);
249 return ImmutableSet.of(request);
250 } else if (!existingRequests.contains(request)) {
251 firstRequest.set(true);
252 return ImmutableSet.<PacketRequest>builder()
253 .addAll(existingRequests)
254 .add(request)
255 .build();
256 } else {
257 changed.set(false);
258 return existingRequests;
259 }
260 });
261 if (changed.get()) {
262 requests.put(networkId, requestsForNetwork);
263 }
264 return firstRequest;
265 }
266
267 private void remove(NetworkId networkId, PacketRequest request) {
268 AtomicBoolean removedLast = removeInternal(networkId, request);
269 PacketStoreDelegate delegate = delegateMap.get(networkId);
270 if (removedLast.get() && delegate != null) {
271 // The instance that removes the last request will remove from all devices
272 delegate.cancelPackets(request);
273 }
274 }
275
276 private AtomicBoolean removeInternal(NetworkId networkId, PacketRequest request) {
277 AtomicBoolean removedLast = new AtomicBoolean(false);
278 AtomicBoolean changed = new AtomicBoolean(true);
279 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
280 requestsForNetwork.computeIfPresent(key(request), (s, existingRequests) -> {
281 // Reset to false just in case this is a retry due to
282 // ConcurrentModificationException
283 removedLast.set(false);
284 if (existingRequests.contains(request)) {
285 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
286 newRequests.remove(request);
287 if (newRequests.size() > 0) {
288 return ImmutableSet.copyOf(newRequests);
289 } else {
290 removedLast.set(true);
291 return null;
292 }
293 } else {
294 changed.set(false);
295 return existingRequests;
296 }
297 });
298 if (changed.get()) {
299 requests.put(networkId, requestsForNetwork);
300 }
301 return removedLast;
302 }
303
304 private List<PacketRequest> requests(NetworkId networkId) {
305 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
306 List<PacketRequest> list = Lists.newArrayList();
307 requestsForNetwork.values().forEach(v -> list.addAll(v));
308 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
309 return list;
310 }
311
312 /*
313 * Gets PacketRequests for specified networkId.
314 */
315 private Map<RequestKey, Set<PacketRequest>> getMap(NetworkId networkId) {
316 return requests.computeIfAbsent(networkId, networkId1 -> {
317 log.debug("Creating new map for {}", networkId1);
318 Map newMap = Maps.newHashMap();
319 return newMap;
320 });
321 }
322 }
323
324 /**
325 * Creates a new request key from a packet request.
326 *
327 * @param request packet request
328 * @return request key
329 */
330 private static RequestKey key(PacketRequest request) {
331 return new RequestKey(request.selector(), request.priority());
332 }
333
334 /**
335 * Key of a packet request.
336 */
337 private static final class RequestKey {
338 private final TrafficSelector selector;
339 private final PacketPriority priority;
340
341 private RequestKey(TrafficSelector selector, PacketPriority priority) {
342 this.selector = selector;
343 this.priority = priority;
344 }
345
346 @Override
347 public int hashCode() {
348 return Objects.hash(selector, priority);
349 }
350
351 @Override
352 public boolean equals(Object other) {
353 if (other == this) {
354 return true;
355 }
356
357 if (!(other instanceof RequestKey)) {
358 return false;
359 }
360
361 RequestKey that = (RequestKey) other;
362
363 return Objects.equals(selector, that.selector) &&
364 Objects.equals(priority, that.priority);
365 }
366 }
367
368 private static OutboundPacketWrapper wrapper(NetworkId networkId, OutboundPacket outboundPacket) {
369 return new OutboundPacketWrapper(networkId, outboundPacket);
370 }
371
372 /*
373 * OutboundPacket in
374 */
375 private static final class OutboundPacketWrapper {
376 private NetworkId networkId;
377 private OutboundPacket outboundPacket;
378
379 private OutboundPacketWrapper(NetworkId networkId, OutboundPacket outboundPacket) {
380 this.networkId = networkId;
381 this.outboundPacket = outboundPacket;
382 }
383
384 }
385
386 /**
387 * Sets thread pool size of message handler.
388 *
389 * @param poolSize
390 */
391 private void setMessageHandlerThreadPoolSize(int poolSize) {
392 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
393 messageHandlerThreadPoolSize = poolSize;
394 }
395
396 /**
397 * Restarts thread pool of message handler.
398 */
399 private void restartMessageHandlerThreadPool() {
400 ExecutorService prevExecutor = messageHandlingExecutor;
401 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
402 groupedThreads("DistPktStore", "messageHandling-%d", log));
403 prevExecutor.shutdown();
404 }
405
406 /**
407 * Gets current thread pool size of message handler.
408 *
409 * @return messageHandlerThreadPoolSize
410 */
411 private int getMessageHandlerThreadPoolSize() {
412 return messageHandlerThreadPoolSize;
413 }
414}