blob: 997f62704a7799b77f5b71d124c14b4d0b4fec9b [file] [log] [blame]
Claudine Chiu1f036b82017-03-09 16:45:56 -05001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Claudine Chiu1f036b82017-03-09 16:45:56 -05003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070023import org.osgi.service.component.annotations.Activate;
24import org.osgi.service.component.annotations.Component;
25import org.osgi.service.component.annotations.Deactivate;
26import org.osgi.service.component.annotations.Modified;
27import org.osgi.service.component.annotations.Reference;
28import org.osgi.service.component.annotations.ReferenceCardinality;
Claudine Chiu1f036b82017-03-09 16:45:56 -050029import org.onlab.util.KryoNamespace;
30import org.onosproject.cfg.ComponentConfigService;
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.incubator.net.virtual.NetworkId;
34import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
35import org.onosproject.mastership.MastershipService;
36import org.onosproject.net.flow.TrafficSelector;
37import org.onosproject.net.packet.OutboundPacket;
38import org.onosproject.net.packet.PacketEvent;
39import org.onosproject.net.packet.PacketPriority;
40import org.onosproject.net.packet.PacketRequest;
41import org.onosproject.net.packet.PacketStoreDelegate;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43import org.onosproject.store.cluster.messaging.MessageSubject;
44import org.onosproject.store.serializers.KryoNamespaces;
Claudine Chiu1f036b82017-03-09 16:45:56 -050045import org.onosproject.store.service.ConsistentMap;
46import org.onosproject.store.service.Serializer;
47import org.onosproject.store.service.StorageService;
48import org.osgi.service.component.ComponentContext;
49import org.slf4j.Logger;
50
51import java.util.Dictionary;
52import java.util.List;
53import java.util.Map;
54import java.util.Objects;
55import java.util.Properties;
56import java.util.Set;
57import java.util.concurrent.ExecutorService;
58import java.util.concurrent.Executors;
59import java.util.concurrent.atomic.AtomicBoolean;
60
61import static com.google.common.base.Preconditions.checkArgument;
62import static com.google.common.base.Strings.isNullOrEmpty;
63import static java.util.concurrent.Executors.newFixedThreadPool;
64import static org.onlab.util.Tools.get;
65import static org.onlab.util.Tools.groupedThreads;
66import static org.onosproject.net.packet.PacketEvent.Type.EMIT;
67import static org.slf4j.LoggerFactory.getLogger;
68
69/**
70 * Distributed virtual packet store implementation allowing packets to be sent to
71 * remote instances. Implementation is based on DistributedPacketStore class.
72 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070073@Component(immediate = true, enabled = false, service = VirtualNetworkPacketStore.class)
Claudine Chiu1f036b82017-03-09 16:45:56 -050074public class DistributedVirtualPacketStore
75 extends AbstractVirtualStore<PacketEvent, PacketStoreDelegate>
76 implements VirtualNetworkPacketStore {
77
78 private final Logger log = getLogger(getClass());
79
80 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
81
Ray Milkeyd84f89b2018-08-17 14:54:17 -070082 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Claudine Chiu1f036b82017-03-09 16:45:56 -050083 protected MastershipService mastershipService;
84
Ray Milkeyd84f89b2018-08-17 14:54:17 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Claudine Chiu1f036b82017-03-09 16:45:56 -050086 protected ClusterService clusterService;
87
Ray Milkeyd84f89b2018-08-17 14:54:17 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Claudine Chiu1f036b82017-03-09 16:45:56 -050089 protected ClusterCommunicationService communicationService;
90
Ray Milkeyd84f89b2018-08-17 14:54:17 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Claudine Chiu1f036b82017-03-09 16:45:56 -050092 protected StorageService storageService;
93
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Claudine Chiu1f036b82017-03-09 16:45:56 -050095 protected ComponentConfigService cfgService;
96
97 private PacketRequestTracker tracker;
98
99 private static final MessageSubject PACKET_OUT_SUBJECT =
100 new MessageSubject("virtual-packet-out");
101
Jordan Halterman2c83a102017-08-20 17:11:41 -0700102 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Claudine Chiu1f036b82017-03-09 16:45:56 -0500103
104 private ExecutorService messageHandlingExecutor;
105
106 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700107 //@Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
108 // label = "Size of thread pool to assign message handler")
Claudine Chiu1f036b82017-03-09 16:45:56 -0500109 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
110
111 @Activate
112 public void activate(ComponentContext context) {
113 cfgService.registerProperties(getClass());
114
115 modified(context);
116
117 messageHandlingExecutor = Executors.newFixedThreadPool(
118 messageHandlerThreadPoolSize,
119 groupedThreads("onos/store/packet", "message-handlers", log));
120
121 communicationService.<OutboundPacketWrapper>addSubscriber(PACKET_OUT_SUBJECT,
122 SERIALIZER::decode,
123 packetWrapper -> notifyDelegate(packetWrapper.networkId,
124 new PacketEvent(EMIT,
125 packetWrapper.outboundPacket)),
126 messageHandlingExecutor);
127
128 tracker = new PacketRequestTracker();
129
130 log.info("Started");
131 }
132
133 @Deactivate
134 public void deactivate() {
135 cfgService.unregisterProperties(getClass(), false);
136 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
137 messageHandlingExecutor.shutdown();
138 tracker = null;
139 log.info("Stopped");
140 }
141
142 @Modified
143 public void modified(ComponentContext context) {
144 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
145
146 int newMessageHandlerThreadPoolSize;
147
148 try {
149 String s = get(properties, "messageHandlerThreadPoolSize");
150
151 newMessageHandlerThreadPoolSize =
152 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
153
154 } catch (NumberFormatException e) {
155 log.warn(e.getMessage());
156 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
157 }
158
159 // Any change in the following parameters implies thread pool restart
160 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
161 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
162 restartMessageHandlerThreadPool();
163 }
164
165 log.info(FORMAT, messageHandlerThreadPoolSize);
166 }
167
168 @Override
169 public void emit(NetworkId networkId, OutboundPacket packet) {
170 NodeId myId = clusterService.getLocalNode().id();
171 // TODO revive this when there is MastershipService support for virtual devices
172// NodeId master = mastershipService.getMasterFor(packet.sendThrough());
173//
174// if (master == null) {
175// log.warn("No master found for {}", packet.sendThrough());
176// return;
177// }
178//
179// log.debug("master {} found for {}", myId, packet.sendThrough());
180// if (myId.equals(master)) {
181// notifyDelegate(networkId, new PacketEvent(EMIT, packet));
182// return;
183// }
184//
185// communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
186// .whenComplete((r, error) -> {
187// if (error != null) {
188// log.warn("Failed to send packet-out to {}", master, error);
189// }
190// });
191 }
192
193 @Override
194 public void requestPackets(NetworkId networkId, PacketRequest request) {
195 tracker.add(networkId, request);
196
197 }
198
199 @Override
200 public void cancelPackets(NetworkId networkId, PacketRequest request) {
201 tracker.remove(networkId, request);
202 }
203
204 @Override
205 public List<PacketRequest> existingRequests(NetworkId networkId) {
206 return tracker.requests(networkId);
207 }
208
209 private final class PacketRequestTracker {
210
211 private ConsistentMap<NetworkId, Map<RequestKey, Set<PacketRequest>>> distRequests;
212 private Map<NetworkId, Map<RequestKey, Set<PacketRequest>>> requests;
213
214 private PacketRequestTracker() {
215 distRequests = storageService.<NetworkId, Map<RequestKey, Set<PacketRequest>>>consistentMapBuilder()
216 .withName("onos-virtual-packet-requests")
217 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
218 .register(KryoNamespaces.API)
219 .register(RequestKey.class)
220 .register(NetworkId.class)
221 .build()))
222 .build();
223 requests = distRequests.asJavaMap();
224 }
225
226 private void add(NetworkId networkId, PacketRequest request) {
227 AtomicBoolean firstRequest = addInternal(networkId, request);
228 PacketStoreDelegate delegate = delegateMap.get(networkId);
229 if (firstRequest.get() && delegate != null) {
230 // The instance that makes the first request will push to all devices
231 delegate.requestPackets(request);
232 }
233 }
234
235 private AtomicBoolean addInternal(NetworkId networkId, PacketRequest request) {
236 AtomicBoolean firstRequest = new AtomicBoolean(false);
237 AtomicBoolean changed = new AtomicBoolean(true);
238 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
239 requestsForNetwork.compute(key(request), (s, existingRequests) -> {
240 // Reset to false just in case this is a retry due to
241 // ConcurrentModificationException
242 firstRequest.set(false);
243 if (existingRequests == null) {
244 firstRequest.set(true);
245 return ImmutableSet.of(request);
246 } else if (!existingRequests.contains(request)) {
247 firstRequest.set(true);
248 return ImmutableSet.<PacketRequest>builder()
249 .addAll(existingRequests)
250 .add(request)
251 .build();
252 } else {
253 changed.set(false);
254 return existingRequests;
255 }
256 });
257 if (changed.get()) {
258 requests.put(networkId, requestsForNetwork);
259 }
260 return firstRequest;
261 }
262
263 private void remove(NetworkId networkId, PacketRequest request) {
264 AtomicBoolean removedLast = removeInternal(networkId, request);
265 PacketStoreDelegate delegate = delegateMap.get(networkId);
266 if (removedLast.get() && delegate != null) {
267 // The instance that removes the last request will remove from all devices
268 delegate.cancelPackets(request);
269 }
270 }
271
272 private AtomicBoolean removeInternal(NetworkId networkId, PacketRequest request) {
273 AtomicBoolean removedLast = new AtomicBoolean(false);
274 AtomicBoolean changed = new AtomicBoolean(true);
275 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
276 requestsForNetwork.computeIfPresent(key(request), (s, existingRequests) -> {
277 // Reset to false just in case this is a retry due to
278 // ConcurrentModificationException
279 removedLast.set(false);
280 if (existingRequests.contains(request)) {
281 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
282 newRequests.remove(request);
283 if (newRequests.size() > 0) {
284 return ImmutableSet.copyOf(newRequests);
285 } else {
286 removedLast.set(true);
287 return null;
288 }
289 } else {
290 changed.set(false);
291 return existingRequests;
292 }
293 });
294 if (changed.get()) {
295 requests.put(networkId, requestsForNetwork);
296 }
297 return removedLast;
298 }
299
300 private List<PacketRequest> requests(NetworkId networkId) {
301 Map<RequestKey, Set<PacketRequest>> requestsForNetwork = getMap(networkId);
302 List<PacketRequest> list = Lists.newArrayList();
303 requestsForNetwork.values().forEach(v -> list.addAll(v));
304 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
305 return list;
306 }
307
308 /*
309 * Gets PacketRequests for specified networkId.
310 */
311 private Map<RequestKey, Set<PacketRequest>> getMap(NetworkId networkId) {
312 return requests.computeIfAbsent(networkId, networkId1 -> {
313 log.debug("Creating new map for {}", networkId1);
314 Map newMap = Maps.newHashMap();
315 return newMap;
316 });
317 }
318 }
319
320 /**
321 * Creates a new request key from a packet request.
322 *
323 * @param request packet request
324 * @return request key
325 */
326 private static RequestKey key(PacketRequest request) {
327 return new RequestKey(request.selector(), request.priority());
328 }
329
330 /**
331 * Key of a packet request.
332 */
333 private static final class RequestKey {
334 private final TrafficSelector selector;
335 private final PacketPriority priority;
336
337 private RequestKey(TrafficSelector selector, PacketPriority priority) {
338 this.selector = selector;
339 this.priority = priority;
340 }
341
342 @Override
343 public int hashCode() {
344 return Objects.hash(selector, priority);
345 }
346
347 @Override
348 public boolean equals(Object other) {
349 if (other == this) {
350 return true;
351 }
352
353 if (!(other instanceof RequestKey)) {
354 return false;
355 }
356
357 RequestKey that = (RequestKey) other;
358
359 return Objects.equals(selector, that.selector) &&
360 Objects.equals(priority, that.priority);
361 }
362 }
363
364 private static OutboundPacketWrapper wrapper(NetworkId networkId, OutboundPacket outboundPacket) {
365 return new OutboundPacketWrapper(networkId, outboundPacket);
366 }
367
368 /*
369 * OutboundPacket in
370 */
371 private static final class OutboundPacketWrapper {
372 private NetworkId networkId;
373 private OutboundPacket outboundPacket;
374
375 private OutboundPacketWrapper(NetworkId networkId, OutboundPacket outboundPacket) {
376 this.networkId = networkId;
377 this.outboundPacket = outboundPacket;
378 }
379
380 }
381
382 /**
383 * Sets thread pool size of message handler.
384 *
385 * @param poolSize
386 */
387 private void setMessageHandlerThreadPoolSize(int poolSize) {
388 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
389 messageHandlerThreadPoolSize = poolSize;
390 }
391
392 /**
393 * Restarts thread pool of message handler.
394 */
395 private void restartMessageHandlerThreadPool() {
396 ExecutorService prevExecutor = messageHandlingExecutor;
397 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
398 groupedThreads("DistPktStore", "messageHandling-%d", log));
399 prevExecutor.shutdown();
400 }
401
402 /**
403 * Gets current thread pool size of message handler.
404 *
405 * @return messageHandlerThreadPoolSize
406 */
407 private int getMessageHandlerThreadPoolSize() {
408 return messageHandlerThreadPoolSize;
409 }
410}