blob: 1043c4500730a66342a43613d11bf3aaac16f6f1 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Thomas Vachuska7f171b22015-08-21 12:49:08 -070018import com.google.common.collect.Lists;
Jonathan Hart4f60f982014-10-27 08:11:17 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090022import org.apache.felix.scr.annotations.Modified;
23import org.apache.felix.scr.annotations.Property;
Jonathan Hart4f60f982014-10-27 08:11:17 -070024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070027import org.onlab.util.KryoNamespace;
sangyun-han9f0af2d2016-08-04 13:04:59 +090028import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070032import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.net.packet.OutboundPacket;
34import org.onosproject.net.packet.PacketEvent;
35import org.onosproject.net.packet.PacketEvent.Type;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070036import org.onosproject.net.packet.PacketPriority;
alshabib42947782015-03-31 14:59:06 -070037import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.net.packet.PacketStore;
39import org.onosproject.net.packet.PacketStoreDelegate;
40import org.onosproject.store.AbstractStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.store.cluster.messaging.MessageSubject;
43import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman8c57a092018-06-04 14:53:06 -070044import org.onosproject.store.service.ConsistentMultimap;
alshabib42947782015-03-31 14:59:06 -070045import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
Jordan Halterman8c57a092018-06-04 14:53:06 -070047import org.onosproject.store.service.Versioned;
sangyun-hanad84e0c2016-02-19 18:30:03 +090048import org.osgi.service.component.ComponentContext;
Jonathan Hart4f60f982014-10-27 08:11:17 -070049import org.slf4j.Logger;
50
Jordan Halterman8c57a092018-06-04 14:53:06 -070051import java.util.Collection;
sangyun-hanad84e0c2016-02-19 18:30:03 +090052import java.util.Dictionary;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070053import java.util.List;
Jonathan Hartfc3f6de2016-09-19 17:55:15 -070054import java.util.Objects;
sangyun-hanad84e0c2016-02-19 18:30:03 +090055import java.util.Properties;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080056import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
58
sangyun-hanad84e0c2016-02-19 18:30:03 +090059import static com.google.common.base.Preconditions.checkArgument;
60import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070061import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090062import static org.onlab.util.Tools.get;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080063import static org.onlab.util.Tools.groupedThreads;
64import static org.slf4j.LoggerFactory.getLogger;
65
Jonathan Hart4f60f982014-10-27 08:11:17 -070066/**
67 * Distributed packet store implementation allowing packets to be sent to
68 * remote instances.
69 */
70@Component(immediate = true)
71@Service
72public class DistributedPacketStore
73 extends AbstractStore<PacketEvent, PacketStoreDelegate>
74 implements PacketStore {
75
76 private final Logger log = getLogger(getClass());
77
sangyun-hanad84e0c2016-02-19 18:30:03 +090078 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080079
Jonathan Hart4f60f982014-10-27 08:11:17 -070080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070081 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070082
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070084 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070085
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070087 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070088
alshabib42947782015-03-31 14:59:06 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected StorageService storageService;
91
sangyun-han9f0af2d2016-08-04 13:04:59 +090092 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected ComponentConfigService cfgService;
94
alshabib42947782015-03-31 14:59:06 -070095 private PacketRequestTracker tracker;
96
Jonathan Hart4f60f982014-10-27 08:11:17 -070097 private static final MessageSubject PACKET_OUT_SUBJECT =
98 new MessageSubject("packet-out");
99
Jordan Halterman2c83a102017-08-20 17:11:41 -0700100 private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Jonathan Hart4f60f982014-10-27 08:11:17 -0700101
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 private ExecutorService messageHandlingExecutor;
103
sangyun-hanad84e0c2016-02-19 18:30:03 +0900104 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
105 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
106 label = "Size of thread pool to assign message handler")
107 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
108
109 private static final int MAX_BACKOFF = 50;
110
Jonathan Hart4f60f982014-10-27 08:11:17 -0700111 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900112 public void activate(ComponentContext context) {
113 cfgService.registerProperties(getClass());
114
115 modified(context);
116
Thomas Vachuska27bee092015-06-23 19:03:10 -0700117 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900118 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700119 groupedThreads("onos/store/packet", "message-handlers", log));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700120
Madan Jampani01e05fb2015-08-13 13:29:36 -0700121 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
122 SERIALIZER::decode,
123 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
124 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800125
Thomas Vachuska27bee092015-06-23 19:03:10 -0700126 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700127
Madan Jampani2af244a2015-02-22 13:12:01 -0800128 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700129 }
130
131 @Deactivate
132 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900133 cfgService.unregisterProperties(getClass(), false);
Madan Jampani2af244a2015-02-22 13:12:01 -0800134 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
135 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700136 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700137 log.info("Stopped");
138 }
139
sangyun-hanad84e0c2016-02-19 18:30:03 +0900140 @Modified
141 public void modified(ComponentContext context) {
142 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
143
144 int newMessageHandlerThreadPoolSize;
145
146 try {
147 String s = get(properties, "messageHandlerThreadPoolSize");
148
149 newMessageHandlerThreadPoolSize =
150 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
151
152 } catch (NumberFormatException e) {
153 log.warn(e.getMessage());
154 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
155 }
156
157 // Any change in the following parameters implies thread pool restart
158 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
159 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
160 restartMessageHandlerThreadPool();
161 }
162
163 log.info(FORMAT, messageHandlerThreadPoolSize);
164 }
165
166
Jonathan Hart4f60f982014-10-27 08:11:17 -0700167 @Override
168 public void emit(OutboundPacket packet) {
169 NodeId myId = clusterService.getLocalNode().id();
170 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
171
Jonathan Hart7466d612014-11-24 17:09:53 -0800172 if (master == null) {
173 return;
174 }
175
Jonathan Hart4f60f982014-10-27 08:11:17 -0700176 if (myId.equals(master)) {
177 notifyDelegate(new PacketEvent(Type.EMIT, packet));
178 return;
179 }
180
Madan Jampani01e05fb2015-08-13 13:29:36 -0700181 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
182 .whenComplete((r, error) -> {
183 if (error != null) {
184 log.warn("Failed to send packet-out to {}", master, error);
185 }
186 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700187 }
188
alshabib42947782015-03-31 14:59:06 -0700189 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700190 public void requestPackets(PacketRequest request) {
191 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700192 }
193
194 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700195 public void cancelPackets(PacketRequest request) {
196 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700197 }
198
199 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700200 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700201 return tracker.requests();
202 }
203
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700204 private final class PacketRequestTracker {
alshabib42947782015-03-31 14:59:06 -0700205
Jordan Halterman8c57a092018-06-04 14:53:06 -0700206 private ConsistentMultimap<RequestKey, PacketRequest> requests;
alshabib42947782015-03-31 14:59:06 -0700207
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700208 private PacketRequestTracker() {
Jordan Halterman8c57a092018-06-04 14:53:06 -0700209 requests = storageService.<RequestKey, PacketRequest>consistentMultimapBuilder()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700210 .withName("onos-packet-requests")
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700211 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
212 .register(KryoNamespaces.API)
213 .register(RequestKey.class)
214 .build()))
Thomas Vachuska27bee092015-06-23 19:03:10 -0700215 .build();
alshabib42947782015-03-31 14:59:06 -0700216 }
217
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700218 private void add(PacketRequest request) {
Jordan Halterman8c57a092018-06-04 14:53:06 -0700219 boolean firstRequest = addInternal(request);
220 if (firstRequest && delegate != null) {
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700221 // The instance that makes the first request will push to all devices
222 delegate.requestPackets(request);
223 }
224 }
225
Jordan Halterman8c57a092018-06-04 14:53:06 -0700226 private boolean addInternal(PacketRequest request) {
Charles Chan4e7de422018-07-05 16:46:11 -0700227 return requests.put(key(request), request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700228 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700229
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700230 private void remove(PacketRequest request) {
Jordan Halterman8c57a092018-06-04 14:53:06 -0700231 boolean removedLast = removeInternal(request);
232 if (removedLast && delegate != null) {
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700233 // The instance that removes the last request will remove from all devices
234 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700235 }
alshabib42947782015-03-31 14:59:06 -0700236 }
237
Jordan Halterman8c57a092018-06-04 14:53:06 -0700238 private boolean removeInternal(PacketRequest request) {
239 Collection<? extends PacketRequest> values =
240 Versioned.valueOrNull(requests.removeAndGet(key(request), request));
241 return values == null || values.isEmpty();
alshabib42947782015-03-31 14:59:06 -0700242 }
243
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700244 private List<PacketRequest> requests() {
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700245 List<PacketRequest> list = Lists.newArrayList();
Jordan Halterman8c57a092018-06-04 14:53:06 -0700246 requests.values().forEach(v -> list.add(v));
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700247 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
248 return list;
alshabib42947782015-03-31 14:59:06 -0700249 }
alshabib42947782015-03-31 14:59:06 -0700250 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900251
252 /**
Jonathan Hartfc3f6de2016-09-19 17:55:15 -0700253 * Creates a new request key from a packet request.
254 *
255 * @param request packet request
256 * @return request key
257 */
258 private static RequestKey key(PacketRequest request) {
259 return new RequestKey(request.selector(), request.priority());
260 }
261
262 /**
263 * Key of a packet request.
264 */
265 private static final class RequestKey {
266 private final TrafficSelector selector;
267 private final PacketPriority priority;
268
269 private RequestKey(TrafficSelector selector, PacketPriority priority) {
270 this.selector = selector;
271 this.priority = priority;
272 }
273
274 @Override
275 public int hashCode() {
276 return Objects.hash(selector, priority);
277 }
278
279 @Override
280 public boolean equals(Object other) {
281 if (other == this) {
282 return true;
283 }
284
285 if (!(other instanceof RequestKey)) {
286 return false;
287 }
288
289 RequestKey that = (RequestKey) other;
290
291 return Objects.equals(selector, that.selector) &&
292 Objects.equals(priority, that.priority);
293 }
294 }
295
296 /**
sangyun-hanad84e0c2016-02-19 18:30:03 +0900297 * Sets thread pool size of message handler.
298 *
299 * @param poolSize
300 */
301 private void setMessageHandlerThreadPoolSize(int poolSize) {
302 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
303 messageHandlerThreadPoolSize = poolSize;
304 }
305
306 /**
307 * Restarts thread pool of message handler.
308 */
309 private void restartMessageHandlerThreadPool() {
310 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700311 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
312 groupedThreads("DistPktStore", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900313 prevExecutor.shutdown();
314 }
315
316 /**
317 * Gets current thread pool size of message handler.
318 *
319 * @return messageHandlerThreadPoolSize
320 */
321 private int getMessageHandlerThreadPoolSize() {
322 return messageHandlerThreadPoolSize;
323 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700324}