blob: c54d9836a41198b1ed7feefa9b34b49659e285b5 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Brian O'Connor21b028e2015-10-08 22:50:02 -070018import com.google.common.collect.ImmutableSet;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070019import com.google.common.collect.Lists;
Brian O'Connor21b028e2015-10-08 22:50:02 -070020import com.google.common.collect.Sets;
Jonathan Hart4f60f982014-10-27 08:11:17 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
Jonathan Hart4f60f982014-10-27 08:11:17 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070029import org.onlab.util.KryoNamespace;
sangyun-han9f0af2d2016-08-04 13:04:59 +090030import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070034import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.net.packet.OutboundPacket;
36import org.onosproject.net.packet.PacketEvent;
37import org.onosproject.net.packet.PacketEvent.Type;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070038import org.onosproject.net.packet.PacketPriority;
alshabib42947782015-03-31 14:59:06 -070039import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.net.packet.PacketStore;
41import org.onosproject.net.packet.PacketStoreDelegate;
42import org.onosproject.store.AbstractStore;
43import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080044import org.onosproject.store.cluster.messaging.MessageSubject;
45import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070046import org.onosproject.store.serializers.StoreSerializer;
alshabib42947782015-03-31 14:59:06 -070047import org.onosproject.store.service.ConsistentMap;
48import org.onosproject.store.service.Serializer;
49import org.onosproject.store.service.StorageService;
sangyun-hanad84e0c2016-02-19 18:30:03 +090050import org.osgi.service.component.ComponentContext;
Jonathan Hart4f60f982014-10-27 08:11:17 -070051import org.slf4j.Logger;
52
sangyun-hanad84e0c2016-02-19 18:30:03 +090053import java.util.Dictionary;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070054import java.util.List;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070055import java.util.Objects;
sangyun-hanad84e0c2016-02-19 18:30:03 +090056import java.util.Properties;
alshabib42947782015-03-31 14:59:06 -070057import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080058import java.util.concurrent.ExecutorService;
59import java.util.concurrent.Executors;
Brian O'Connor21b028e2015-10-08 22:50:02 -070060import java.util.concurrent.atomic.AtomicBoolean;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080061
sangyun-hanad84e0c2016-02-19 18:30:03 +090062import static com.google.common.base.Preconditions.checkArgument;
63import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070064import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090065import static org.onlab.util.Tools.get;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080066import static org.onlab.util.Tools.groupedThreads;
67import static org.slf4j.LoggerFactory.getLogger;
68
Jonathan Hart4f60f982014-10-27 08:11:17 -070069/**
70 * Distributed packet store implementation allowing packets to be sent to
71 * remote instances.
72 */
73@Component(immediate = true)
74@Service
75public class DistributedPacketStore
76 extends AbstractStore<PacketEvent, PacketStoreDelegate>
77 implements PacketStore {
78
79 private final Logger log = getLogger(getClass());
80
sangyun-hanad84e0c2016-02-19 18:30:03 +090081 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080082
Jonathan Hart4f60f982014-10-27 08:11:17 -070083 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070084 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070085
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070087 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070088
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070090 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070091
alshabib42947782015-03-31 14:59:06 -070092 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected StorageService storageService;
94
sangyun-han9f0af2d2016-08-04 13:04:59 +090095 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected ComponentConfigService cfgService;
97
alshabib42947782015-03-31 14:59:06 -070098 private PacketRequestTracker tracker;
99
Jonathan Hart4f60f982014-10-27 08:11:17 -0700100 private static final MessageSubject PACKET_OUT_SUBJECT =
101 new MessageSubject("packet-out");
102
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700103 private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700104
Madan Jampani2af244a2015-02-22 13:12:01 -0800105 private ExecutorService messageHandlingExecutor;
106
sangyun-hanad84e0c2016-02-19 18:30:03 +0900107 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
108 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
109 label = "Size of thread pool to assign message handler")
110 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
111
112 private static final int MAX_BACKOFF = 50;
113
Jonathan Hart4f60f982014-10-27 08:11:17 -0700114 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900115 public void activate(ComponentContext context) {
116 cfgService.registerProperties(getClass());
117
118 modified(context);
119
Thomas Vachuska27bee092015-06-23 19:03:10 -0700120 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900121 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700122 groupedThreads("onos/store/packet", "message-handlers", log));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700123
Madan Jampani01e05fb2015-08-13 13:29:36 -0700124 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
125 SERIALIZER::decode,
126 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
127 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800128
Thomas Vachuska27bee092015-06-23 19:03:10 -0700129 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700130
Madan Jampani2af244a2015-02-22 13:12:01 -0800131 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700132 }
133
134 @Deactivate
135 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900136 cfgService.unregisterProperties(getClass(), false);
Madan Jampani2af244a2015-02-22 13:12:01 -0800137 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
138 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700139 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700140 log.info("Stopped");
141 }
142
sangyun-hanad84e0c2016-02-19 18:30:03 +0900143 @Modified
144 public void modified(ComponentContext context) {
145 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
146
147 int newMessageHandlerThreadPoolSize;
148
149 try {
150 String s = get(properties, "messageHandlerThreadPoolSize");
151
152 newMessageHandlerThreadPoolSize =
153 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
154
155 } catch (NumberFormatException e) {
156 log.warn(e.getMessage());
157 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
158 }
159
160 // Any change in the following parameters implies thread pool restart
161 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
162 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
163 restartMessageHandlerThreadPool();
164 }
165
166 log.info(FORMAT, messageHandlerThreadPoolSize);
167 }
168
169
Jonathan Hart4f60f982014-10-27 08:11:17 -0700170 @Override
171 public void emit(OutboundPacket packet) {
172 NodeId myId = clusterService.getLocalNode().id();
173 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
174
Jonathan Hart7466d612014-11-24 17:09:53 -0800175 if (master == null) {
176 return;
177 }
178
Jonathan Hart4f60f982014-10-27 08:11:17 -0700179 if (myId.equals(master)) {
180 notifyDelegate(new PacketEvent(Type.EMIT, packet));
181 return;
182 }
183
Madan Jampani01e05fb2015-08-13 13:29:36 -0700184 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
185 .whenComplete((r, error) -> {
186 if (error != null) {
187 log.warn("Failed to send packet-out to {}", master, error);
188 }
189 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700190 }
191
alshabib42947782015-03-31 14:59:06 -0700192 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700193 public void requestPackets(PacketRequest request) {
194 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700195 }
196
197 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700198 public void cancelPackets(PacketRequest request) {
199 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700200 }
201
202 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700203 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700204 return tracker.requests();
205 }
206
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700207 private final class PacketRequestTracker {
alshabib42947782015-03-31 14:59:06 -0700208
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700209 private ConsistentMap<RequestKey, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700210
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700211 private PacketRequestTracker() {
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700212 requests = storageService.<RequestKey, Set<PacketRequest>>consistentMapBuilder()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700213 .withName("onos-packet-requests")
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700214 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
215 .register(KryoNamespaces.API)
216 .register(RequestKey.class)
217 .build()))
Thomas Vachuska27bee092015-06-23 19:03:10 -0700218 .build();
alshabib42947782015-03-31 14:59:06 -0700219 }
220
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700221 private void add(PacketRequest request) {
Madan Jampanic6371882016-06-03 21:30:17 -0700222 AtomicBoolean firstRequest = addInternal(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700223 if (firstRequest.get() && delegate != null) {
224 // The instance that makes the first request will push to all devices
225 delegate.requestPackets(request);
226 }
227 }
228
229 private AtomicBoolean addInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700230 AtomicBoolean firstRequest = new AtomicBoolean(false);
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700231 requests.compute(key(request), (s, existingRequests) -> {
Madan Jampanied969ba2016-07-14 10:17:49 -0700232 // Reset to false just in case this is a retry due to
233 // ConcurrentModificationException
234 firstRequest.set(false);
Brian O'Connor21b028e2015-10-08 22:50:02 -0700235 if (existingRequests == null) {
236 firstRequest.set(true);
237 return ImmutableSet.of(request);
238 } else if (!existingRequests.contains(request)) {
Hyunsun Moonb5f4f272016-08-03 14:53:46 -0700239 firstRequest.set(true);
Brian O'Connor21b028e2015-10-08 22:50:02 -0700240 return ImmutableSet.<PacketRequest>builder()
241 .addAll(existingRequests)
242 .add(request)
243 .build();
244 } else {
245 return existingRequests;
246 }
247 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700248 return firstRequest;
249 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700250
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700251 private void remove(PacketRequest request) {
Madan Jampanic6371882016-06-03 21:30:17 -0700252 AtomicBoolean removedLast = removeInternal(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700253 if (removedLast.get() && delegate != null) {
254 // The instance that removes the last request will remove from all devices
255 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700256 }
alshabib42947782015-03-31 14:59:06 -0700257 }
258
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700259 private AtomicBoolean removeInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700260 AtomicBoolean removedLast = new AtomicBoolean(false);
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700261 requests.computeIfPresent(key(request), (s, existingRequests) -> {
Madan Jampanied969ba2016-07-14 10:17:49 -0700262 // Reset to false just in case this is a retry due to
263 // ConcurrentModificationException
264 removedLast.set(false);
Brian O'Connor21b028e2015-10-08 22:50:02 -0700265 if (existingRequests.contains(request)) {
266 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
267 newRequests.remove(request);
268 if (newRequests.size() > 0) {
269 return ImmutableSet.copyOf(newRequests);
270 } else {
271 removedLast.set(true);
272 return null;
273 }
274 } else {
275 return existingRequests;
276 }
277 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700278 return removedLast;
alshabib42947782015-03-31 14:59:06 -0700279 }
280
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700281 private List<PacketRequest> requests() {
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700282 List<PacketRequest> list = Lists.newArrayList();
283 requests.values().forEach(v -> list.addAll(v.value()));
284 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
285 return list;
alshabib42947782015-03-31 14:59:06 -0700286 }
alshabib42947782015-03-31 14:59:06 -0700287 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900288
289 /**
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700290 * Creates a new request key from a packet request.
291 *
292 * @param request packet request
293 * @return request key
294 */
295 private static RequestKey key(PacketRequest request) {
296 return new RequestKey(request.selector(), request.priority());
297 }
298
299 /**
300 * Key of a packet request.
301 */
302 private static final class RequestKey {
303 private final TrafficSelector selector;
304 private final PacketPriority priority;
305
306 private RequestKey(TrafficSelector selector, PacketPriority priority) {
307 this.selector = selector;
308 this.priority = priority;
309 }
310
311 @Override
312 public int hashCode() {
313 return Objects.hash(selector, priority);
314 }
315
316 @Override
317 public boolean equals(Object other) {
318 if (other == this) {
319 return true;
320 }
321
322 if (!(other instanceof RequestKey)) {
323 return false;
324 }
325
326 RequestKey that = (RequestKey) other;
327
328 return Objects.equals(selector, that.selector) &&
329 Objects.equals(priority, that.priority);
330 }
331 }
332
333 /**
sangyun-hanad84e0c2016-02-19 18:30:03 +0900334 * Sets thread pool size of message handler.
335 *
336 * @param poolSize
337 */
338 private void setMessageHandlerThreadPoolSize(int poolSize) {
339 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
340 messageHandlerThreadPoolSize = poolSize;
341 }
342
343 /**
344 * Restarts thread pool of message handler.
345 */
346 private void restartMessageHandlerThreadPool() {
347 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700348 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
349 groupedThreads("DistPktStore", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900350 prevExecutor.shutdown();
351 }
352
353 /**
354 * Gets current thread pool size of message handler.
355 *
356 * @return messageHandlerThreadPoolSize
357 */
358 private int getMessageHandlerThreadPoolSize() {
359 return messageHandlerThreadPoolSize;
360 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700361}