blob: 5fdea0540f00254a7a817ae484b4a899eeef534d [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Brian O'Connor21b028e2015-10-08 22:50:02 -070018import com.google.common.collect.ImmutableSet;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070019import com.google.common.collect.Lists;
Brian O'Connor21b028e2015-10-08 22:50:02 -070020import com.google.common.collect.Sets;
Jonathan Hart4f60f982014-10-27 08:11:17 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
Jonathan Hart4f60f982014-10-27 08:11:17 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070029import org.onlab.util.KryoNamespace;
sangyun-han9f0af2d2016-08-04 13:04:59 +090030import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070034import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.net.packet.OutboundPacket;
36import org.onosproject.net.packet.PacketEvent;
37import org.onosproject.net.packet.PacketEvent.Type;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070038import org.onosproject.net.packet.PacketPriority;
alshabib42947782015-03-31 14:59:06 -070039import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.net.packet.PacketStore;
41import org.onosproject.net.packet.PacketStoreDelegate;
42import org.onosproject.store.AbstractStore;
43import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080044import org.onosproject.store.cluster.messaging.MessageSubject;
45import org.onosproject.store.serializers.KryoNamespaces;
alshabib42947782015-03-31 14:59:06 -070046import org.onosproject.store.service.ConsistentMap;
47import org.onosproject.store.service.Serializer;
48import org.onosproject.store.service.StorageService;
sangyun-hanad84e0c2016-02-19 18:30:03 +090049import org.osgi.service.component.ComponentContext;
Jonathan Hart4f60f982014-10-27 08:11:17 -070050import org.slf4j.Logger;
51
sangyun-hanad84e0c2016-02-19 18:30:03 +090052import java.util.Dictionary;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070053import java.util.List;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070054import java.util.Objects;
sangyun-hanad84e0c2016-02-19 18:30:03 +090055import java.util.Properties;
alshabib42947782015-03-31 14:59:06 -070056import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080057import java.util.concurrent.ExecutorService;
58import java.util.concurrent.Executors;
Brian O'Connor21b028e2015-10-08 22:50:02 -070059import java.util.concurrent.atomic.AtomicBoolean;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080060
sangyun-hanad84e0c2016-02-19 18:30:03 +090061import static com.google.common.base.Preconditions.checkArgument;
62import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070063import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090064import static org.onlab.util.Tools.get;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080065import static org.onlab.util.Tools.groupedThreads;
66import static org.slf4j.LoggerFactory.getLogger;
67
Jonathan Hart4f60f982014-10-27 08:11:17 -070068/**
69 * Distributed packet store implementation allowing packets to be sent to
70 * remote instances.
71 */
72@Component(immediate = true)
73@Service
74public class DistributedPacketStore
75 extends AbstractStore<PacketEvent, PacketStoreDelegate>
76 implements PacketStore {
77
78 private final Logger log = getLogger(getClass());
79
sangyun-hanad84e0c2016-02-19 18:30:03 +090080 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080081
Jonathan Hart4f60f982014-10-27 08:11:17 -070082 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070083 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070084
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070086 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070087
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070089 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070090
alshabib42947782015-03-31 14:59:06 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected StorageService storageService;
93
sangyun-han9f0af2d2016-08-04 13:04:59 +090094 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected ComponentConfigService cfgService;
96
alshabib42947782015-03-31 14:59:06 -070097 private PacketRequestTracker tracker;
98
Jonathan Hart4f60f982014-10-27 08:11:17 -070099 private static final MessageSubject PACKET_OUT_SUBJECT =
100 new MessageSubject("packet-out");
101
Jordan Halterman2c83a102017-08-20 17:11:41 -0700102 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700103
Madan Jampani2af244a2015-02-22 13:12:01 -0800104 private ExecutorService messageHandlingExecutor;
105
sangyun-hanad84e0c2016-02-19 18:30:03 +0900106 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
107 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
108 label = "Size of thread pool to assign message handler")
109 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
110
111 private static final int MAX_BACKOFF = 50;
112
Jonathan Hart4f60f982014-10-27 08:11:17 -0700113 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900114 public void activate(ComponentContext context) {
115 cfgService.registerProperties(getClass());
116
117 modified(context);
118
Thomas Vachuska27bee092015-06-23 19:03:10 -0700119 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900120 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700121 groupedThreads("onos/store/packet", "message-handlers", log));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700122
Madan Jampani01e05fb2015-08-13 13:29:36 -0700123 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
124 SERIALIZER::decode,
125 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
126 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800127
Thomas Vachuska27bee092015-06-23 19:03:10 -0700128 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700129
Madan Jampani2af244a2015-02-22 13:12:01 -0800130 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700131 }
132
133 @Deactivate
134 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900135 cfgService.unregisterProperties(getClass(), false);
Madan Jampani2af244a2015-02-22 13:12:01 -0800136 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
137 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700138 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700139 log.info("Stopped");
140 }
141
sangyun-hanad84e0c2016-02-19 18:30:03 +0900142 @Modified
143 public void modified(ComponentContext context) {
144 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
145
146 int newMessageHandlerThreadPoolSize;
147
148 try {
149 String s = get(properties, "messageHandlerThreadPoolSize");
150
151 newMessageHandlerThreadPoolSize =
152 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
153
154 } catch (NumberFormatException e) {
155 log.warn(e.getMessage());
156 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
157 }
158
159 // Any change in the following parameters implies thread pool restart
160 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
161 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
162 restartMessageHandlerThreadPool();
163 }
164
165 log.info(FORMAT, messageHandlerThreadPoolSize);
166 }
167
168
Jonathan Hart4f60f982014-10-27 08:11:17 -0700169 @Override
170 public void emit(OutboundPacket packet) {
171 NodeId myId = clusterService.getLocalNode().id();
172 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
173
Jonathan Hart7466d612014-11-24 17:09:53 -0800174 if (master == null) {
175 return;
176 }
177
Jonathan Hart4f60f982014-10-27 08:11:17 -0700178 if (myId.equals(master)) {
179 notifyDelegate(new PacketEvent(Type.EMIT, packet));
180 return;
181 }
182
Madan Jampani01e05fb2015-08-13 13:29:36 -0700183 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
184 .whenComplete((r, error) -> {
185 if (error != null) {
186 log.warn("Failed to send packet-out to {}", master, error);
187 }
188 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700189 }
190
alshabib42947782015-03-31 14:59:06 -0700191 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700192 public void requestPackets(PacketRequest request) {
193 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700194 }
195
196 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700197 public void cancelPackets(PacketRequest request) {
198 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700199 }
200
201 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700202 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700203 return tracker.requests();
204 }
205
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700206 private final class PacketRequestTracker {
alshabib42947782015-03-31 14:59:06 -0700207
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700208 private ConsistentMap<RequestKey, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700209
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700210 private PacketRequestTracker() {
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700211 requests = storageService.<RequestKey, Set<PacketRequest>>consistentMapBuilder()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700212 .withName("onos-packet-requests")
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700213 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
214 .register(KryoNamespaces.API)
215 .register(RequestKey.class)
216 .build()))
Thomas Vachuska27bee092015-06-23 19:03:10 -0700217 .build();
alshabib42947782015-03-31 14:59:06 -0700218 }
219
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700220 private void add(PacketRequest request) {
Madan Jampanic6371882016-06-03 21:30:17 -0700221 AtomicBoolean firstRequest = addInternal(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700222 if (firstRequest.get() && delegate != null) {
223 // The instance that makes the first request will push to all devices
224 delegate.requestPackets(request);
225 }
226 }
227
228 private AtomicBoolean addInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700229 AtomicBoolean firstRequest = new AtomicBoolean(false);
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700230 requests.compute(key(request), (s, existingRequests) -> {
Madan Jampanied969ba2016-07-14 10:17:49 -0700231 // Reset to false just in case this is a retry due to
232 // ConcurrentModificationException
233 firstRequest.set(false);
Brian O'Connor21b028e2015-10-08 22:50:02 -0700234 if (existingRequests == null) {
235 firstRequest.set(true);
236 return ImmutableSet.of(request);
237 } else if (!existingRequests.contains(request)) {
Hyunsun Moonb5f4f272016-08-03 14:53:46 -0700238 firstRequest.set(true);
Brian O'Connor21b028e2015-10-08 22:50:02 -0700239 return ImmutableSet.<PacketRequest>builder()
240 .addAll(existingRequests)
241 .add(request)
242 .build();
243 } else {
244 return existingRequests;
245 }
246 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700247 return firstRequest;
248 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700249
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700250 private void remove(PacketRequest request) {
Madan Jampanic6371882016-06-03 21:30:17 -0700251 AtomicBoolean removedLast = removeInternal(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700252 if (removedLast.get() && delegate != null) {
253 // The instance that removes the last request will remove from all devices
254 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700255 }
alshabib42947782015-03-31 14:59:06 -0700256 }
257
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700258 private AtomicBoolean removeInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700259 AtomicBoolean removedLast = new AtomicBoolean(false);
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700260 requests.computeIfPresent(key(request), (s, existingRequests) -> {
Madan Jampanied969ba2016-07-14 10:17:49 -0700261 // Reset to false just in case this is a retry due to
262 // ConcurrentModificationException
263 removedLast.set(false);
Brian O'Connor21b028e2015-10-08 22:50:02 -0700264 if (existingRequests.contains(request)) {
265 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
266 newRequests.remove(request);
267 if (newRequests.size() > 0) {
268 return ImmutableSet.copyOf(newRequests);
269 } else {
270 removedLast.set(true);
271 return null;
272 }
273 } else {
274 return existingRequests;
275 }
276 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700277 return removedLast;
alshabib42947782015-03-31 14:59:06 -0700278 }
279
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700280 private List<PacketRequest> requests() {
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700281 List<PacketRequest> list = Lists.newArrayList();
282 requests.values().forEach(v -> list.addAll(v.value()));
283 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
284 return list;
alshabib42947782015-03-31 14:59:06 -0700285 }
alshabib42947782015-03-31 14:59:06 -0700286 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900287
288 /**
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700289 * Creates a new request key from a packet request.
290 *
291 * @param request packet request
292 * @return request key
293 */
294 private static RequestKey key(PacketRequest request) {
295 return new RequestKey(request.selector(), request.priority());
296 }
297
298 /**
299 * Key of a packet request.
300 */
301 private static final class RequestKey {
302 private final TrafficSelector selector;
303 private final PacketPriority priority;
304
305 private RequestKey(TrafficSelector selector, PacketPriority priority) {
306 this.selector = selector;
307 this.priority = priority;
308 }
309
310 @Override
311 public int hashCode() {
312 return Objects.hash(selector, priority);
313 }
314
315 @Override
316 public boolean equals(Object other) {
317 if (other == this) {
318 return true;
319 }
320
321 if (!(other instanceof RequestKey)) {
322 return false;
323 }
324
325 RequestKey that = (RequestKey) other;
326
327 return Objects.equals(selector, that.selector) &&
328 Objects.equals(priority, that.priority);
329 }
330 }
331
332 /**
sangyun-hanad84e0c2016-02-19 18:30:03 +0900333 * Sets thread pool size of message handler.
334 *
335 * @param poolSize
336 */
337 private void setMessageHandlerThreadPoolSize(int poolSize) {
338 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
339 messageHandlerThreadPoolSize = poolSize;
340 }
341
342 /**
343 * Restarts thread pool of message handler.
344 */
345 private void restartMessageHandlerThreadPool() {
346 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700347 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
348 groupedThreads("DistPktStore", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900349 prevExecutor.shutdown();
350 }
351
352 /**
353 * Gets current thread pool size of message handler.
354 *
355 * @return messageHandlerThreadPoolSize
356 */
357 private int getMessageHandlerThreadPoolSize() {
358 return messageHandlerThreadPoolSize;
359 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700360}