blob: 50e890068b600f7eb05aa74f603c6af865fbe08e [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Thomas Vachuska7f171b22015-08-21 12:49:08 -070018import com.google.common.collect.Lists;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070019import org.onlab.util.KryoNamespace;
sangyun-han9f0af2d2016-08-04 13:04:59 +090020import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080021import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070024import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.net.packet.OutboundPacket;
26import org.onosproject.net.packet.PacketEvent;
27import org.onosproject.net.packet.PacketEvent.Type;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070028import org.onosproject.net.packet.PacketPriority;
alshabib42947782015-03-31 14:59:06 -070029import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.net.packet.PacketStore;
31import org.onosproject.net.packet.PacketStoreDelegate;
32import org.onosproject.store.AbstractStore;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.store.cluster.messaging.MessageSubject;
35import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman8c57a092018-06-04 14:53:06 -070036import org.onosproject.store.service.ConsistentMultimap;
alshabib42947782015-03-31 14:59:06 -070037import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
Jordan Halterman8c57a092018-06-04 14:53:06 -070039import org.onosproject.store.service.Versioned;
sangyun-hanad84e0c2016-02-19 18:30:03 +090040import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070041import org.osgi.service.component.annotations.Activate;
42import org.osgi.service.component.annotations.Component;
43import org.osgi.service.component.annotations.Deactivate;
44import org.osgi.service.component.annotations.Modified;
45import org.osgi.service.component.annotations.Reference;
46import org.osgi.service.component.annotations.ReferenceCardinality;
Jonathan Hart4f60f982014-10-27 08:11:17 -070047import org.slf4j.Logger;
48
Jordan Halterman8c57a092018-06-04 14:53:06 -070049import java.util.Collection;
sangyun-hanad84e0c2016-02-19 18:30:03 +090050import java.util.Dictionary;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070051import java.util.List;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070052import java.util.Objects;
sangyun-hanad84e0c2016-02-19 18:30:03 +090053import java.util.Properties;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080054import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56
sangyun-hanad84e0c2016-02-19 18:30:03 +090057import static com.google.common.base.Preconditions.checkArgument;
58import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070059import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090060import static org.onlab.util.Tools.get;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080061import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyb5646e62018-10-16 11:42:18 -070062import static org.onosproject.store.OsgiPropertyConstants.DPS_MESSAGE_HANDLER_THREAD_POOL_SIZE;
63import static org.onosproject.store.OsgiPropertyConstants.DPS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080064import static org.slf4j.LoggerFactory.getLogger;
65
Jonathan Hart4f60f982014-10-27 08:11:17 -070066/**
67 * Distributed packet store implementation allowing packets to be sent to
68 * remote instances.
69 */
Ray Milkeyb5646e62018-10-16 11:42:18 -070070@Component(
71 immediate = true,
72 service = PacketStore.class,
73 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070074 DPS_MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + DPS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -070075 }
76)
Jonathan Hart4f60f982014-10-27 08:11:17 -070077public class DistributedPacketStore
78 extends AbstractStore<PacketEvent, PacketStoreDelegate>
79 implements PacketStore {
80
81 private final Logger log = getLogger(getClass());
82
sangyun-hanad84e0c2016-02-19 18:30:03 +090083 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080084
Ray Milkeyd84f89b2018-08-17 14:54:17 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070086 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070087
Ray Milkeyd84f89b2018-08-17 14:54:17 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070089 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070090
Ray Milkeyd84f89b2018-08-17 14:54:17 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070092 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070093
Ray Milkeyd84f89b2018-08-17 14:54:17 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY)
alshabib42947782015-03-31 14:59:06 -070095 protected StorageService storageService;
96
Ray Milkeyd84f89b2018-08-17 14:54:17 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY)
sangyun-han9f0af2d2016-08-04 13:04:59 +090098 protected ComponentConfigService cfgService;
99
alshabib42947782015-03-31 14:59:06 -0700100 private PacketRequestTracker tracker;
101
Jonathan Hart4f60f982014-10-27 08:11:17 -0700102 private static final MessageSubject PACKET_OUT_SUBJECT =
103 new MessageSubject("packet-out");
104
Jordan Halterman2c83a102017-08-20 17:11:41 -0700105 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700106
Madan Jampani2af244a2015-02-22 13:12:01 -0800107 private ExecutorService messageHandlingExecutor;
108
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700109 /** Size of thread pool to assign message handler. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700110 private static int messageHandlerThreadPoolSize = DPS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
sangyun-hanad84e0c2016-02-19 18:30:03 +0900111
112 private static final int MAX_BACKOFF = 50;
113
Jonathan Hart4f60f982014-10-27 08:11:17 -0700114 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900115 public void activate(ComponentContext context) {
116 cfgService.registerProperties(getClass());
117
118 modified(context);
119
Thomas Vachuska27bee092015-06-23 19:03:10 -0700120 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900121 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700122 groupedThreads("onos/store/packet", "message-handlers", log));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700123
Madan Jampani01e05fb2015-08-13 13:29:36 -0700124 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
125 SERIALIZER::decode,
126 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
127 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800128
Thomas Vachuska27bee092015-06-23 19:03:10 -0700129 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700130
Madan Jampani2af244a2015-02-22 13:12:01 -0800131 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700132 }
133
134 @Deactivate
135 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900136 cfgService.unregisterProperties(getClass(), false);
Madan Jampani2af244a2015-02-22 13:12:01 -0800137 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
138 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700139 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700140 log.info("Stopped");
141 }
142
sangyun-hanad84e0c2016-02-19 18:30:03 +0900143 @Modified
144 public void modified(ComponentContext context) {
145 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
146
147 int newMessageHandlerThreadPoolSize;
148
149 try {
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700150 String s = get(properties, DPS_MESSAGE_HANDLER_THREAD_POOL_SIZE);
sangyun-hanad84e0c2016-02-19 18:30:03 +0900151
152 newMessageHandlerThreadPoolSize =
153 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
154
155 } catch (NumberFormatException e) {
156 log.warn(e.getMessage());
157 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
158 }
159
160 // Any change in the following parameters implies thread pool restart
161 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
162 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
163 restartMessageHandlerThreadPool();
164 }
165
166 log.info(FORMAT, messageHandlerThreadPoolSize);
167 }
168
169
Jonathan Hart4f60f982014-10-27 08:11:17 -0700170 @Override
171 public void emit(OutboundPacket packet) {
172 NodeId myId = clusterService.getLocalNode().id();
173 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
174
Jonathan Hart7466d612014-11-24 17:09:53 -0800175 if (master == null) {
176 return;
177 }
178
Jonathan Hart4f60f982014-10-27 08:11:17 -0700179 if (myId.equals(master)) {
180 notifyDelegate(new PacketEvent(Type.EMIT, packet));
181 return;
182 }
183
Madan Jampani01e05fb2015-08-13 13:29:36 -0700184 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
185 .whenComplete((r, error) -> {
186 if (error != null) {
187 log.warn("Failed to send packet-out to {}", master, error);
188 }
189 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700190 }
191
alshabib42947782015-03-31 14:59:06 -0700192 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700193 public void requestPackets(PacketRequest request) {
194 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700195 }
196
197 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700198 public void cancelPackets(PacketRequest request) {
199 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700200 }
201
202 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700203 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700204 return tracker.requests();
205 }
206
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700207 private final class PacketRequestTracker {
alshabib42947782015-03-31 14:59:06 -0700208
Jordan Halterman8c57a092018-06-04 14:53:06 -0700209 private ConsistentMultimap<RequestKey, PacketRequest> requests;
alshabib42947782015-03-31 14:59:06 -0700210
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700211 private PacketRequestTracker() {
Jordan Halterman8c57a092018-06-04 14:53:06 -0700212 requests = storageService.<RequestKey, PacketRequest>consistentMultimapBuilder()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700213 .withName("onos-packet-requests")
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700214 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
215 .register(KryoNamespaces.API)
216 .register(RequestKey.class)
217 .build()))
Thomas Vachuska27bee092015-06-23 19:03:10 -0700218 .build();
alshabib42947782015-03-31 14:59:06 -0700219 }
220
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700221 private void add(PacketRequest request) {
Jordan Halterman8c57a092018-06-04 14:53:06 -0700222 boolean firstRequest = addInternal(request);
223 if (firstRequest && delegate != null) {
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700224 // The instance that makes the first request will push to all devices
225 delegate.requestPackets(request);
226 }
227 }
228
Jordan Halterman8c57a092018-06-04 14:53:06 -0700229 private boolean addInternal(PacketRequest request) {
Charles Chan4e7de422018-07-05 16:46:11 -0700230 return requests.put(key(request), request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700231 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700232
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700233 private void remove(PacketRequest request) {
Jordan Halterman8c57a092018-06-04 14:53:06 -0700234 boolean removedLast = removeInternal(request);
235 if (removedLast && delegate != null) {
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700236 // The instance that removes the last request will remove from all devices
237 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700238 }
alshabib42947782015-03-31 14:59:06 -0700239 }
240
Jordan Halterman8c57a092018-06-04 14:53:06 -0700241 private boolean removeInternal(PacketRequest request) {
242 Collection<? extends PacketRequest> values =
243 Versioned.valueOrNull(requests.removeAndGet(key(request), request));
244 return values == null || values.isEmpty();
alshabib42947782015-03-31 14:59:06 -0700245 }
246
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700247 private List<PacketRequest> requests() {
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700248 List<PacketRequest> list = Lists.newArrayList();
Jordan Halterman8c57a092018-06-04 14:53:06 -0700249 requests.values().forEach(v -> list.add(v));
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700250 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
251 return list;
alshabib42947782015-03-31 14:59:06 -0700252 }
alshabib42947782015-03-31 14:59:06 -0700253 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900254
255 /**
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700256 * Creates a new request key from a packet request.
257 *
258 * @param request packet request
259 * @return request key
260 */
261 private static RequestKey key(PacketRequest request) {
262 return new RequestKey(request.selector(), request.priority());
263 }
264
265 /**
266 * Key of a packet request.
267 */
268 private static final class RequestKey {
269 private final TrafficSelector selector;
270 private final PacketPriority priority;
271
272 private RequestKey(TrafficSelector selector, PacketPriority priority) {
273 this.selector = selector;
274 this.priority = priority;
275 }
276
277 @Override
278 public int hashCode() {
279 return Objects.hash(selector, priority);
280 }
281
282 @Override
283 public boolean equals(Object other) {
284 if (other == this) {
285 return true;
286 }
287
288 if (!(other instanceof RequestKey)) {
289 return false;
290 }
291
292 RequestKey that = (RequestKey) other;
293
294 return Objects.equals(selector, that.selector) &&
295 Objects.equals(priority, that.priority);
296 }
297 }
298
299 /**
sangyun-hanad84e0c2016-02-19 18:30:03 +0900300 * Sets thread pool size of message handler.
301 *
302 * @param poolSize
303 */
304 private void setMessageHandlerThreadPoolSize(int poolSize) {
305 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
306 messageHandlerThreadPoolSize = poolSize;
307 }
308
309 /**
310 * Restarts thread pool of message handler.
311 */
312 private void restartMessageHandlerThreadPool() {
313 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700314 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
315 groupedThreads("DistPktStore", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900316 prevExecutor.shutdown();
317 }
318
319 /**
320 * Gets current thread pool size of message handler.
321 *
322 * @return messageHandlerThreadPoolSize
323 */
324 private int getMessageHandlerThreadPoolSize() {
325 return messageHandlerThreadPoolSize;
326 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700327}