blob: 7e63d89f9e7730467f41d6f17ee277b6d2dccb24 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Brian O'Connor21b028e2015-10-08 22:50:02 -070018import com.google.common.collect.ImmutableSet;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070019import com.google.common.collect.Lists;
Brian O'Connor21b028e2015-10-08 22:50:02 -070020import com.google.common.collect.Sets;
Jonathan Hart4f60f982014-10-27 08:11:17 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
Jonathan Hart4f60f982014-10-27 08:11:17 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080029import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.cluster.ClusterService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070033import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.net.packet.OutboundPacket;
35import org.onosproject.net.packet.PacketEvent;
36import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070037import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.net.packet.PacketStore;
39import org.onosproject.net.packet.PacketStoreDelegate;
40import org.onosproject.store.AbstractStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.store.cluster.messaging.MessageSubject;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.serializers.KryoSerializer;
alshabib42947782015-03-31 14:59:06 -070045import org.onosproject.store.service.ConsistentMap;
Thomas Vachuska40e63e62015-10-13 16:16:20 -070046import org.onosproject.store.service.ConsistentMapException;
alshabib42947782015-03-31 14:59:06 -070047import org.onosproject.store.service.Serializer;
48import org.onosproject.store.service.StorageService;
sangyun-hanad84e0c2016-02-19 18:30:03 +090049import org.osgi.service.component.ComponentContext;
Jonathan Hart4f60f982014-10-27 08:11:17 -070050import org.slf4j.Logger;
51
sangyun-hanad84e0c2016-02-19 18:30:03 +090052import java.util.Dictionary;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070053import java.util.List;
sangyun-hanad84e0c2016-02-19 18:30:03 +090054import java.util.Properties;
alshabib42947782015-03-31 14:59:06 -070055import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080056import java.util.concurrent.ExecutorService;
57import java.util.concurrent.Executors;
Brian O'Connor21b028e2015-10-08 22:50:02 -070058import java.util.concurrent.atomic.AtomicBoolean;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080059
sangyun-hanad84e0c2016-02-19 18:30:03 +090060import static com.google.common.base.Preconditions.checkArgument;
61import static com.google.common.base.Strings.isNullOrEmpty;
62import static org.onlab.util.Tools.get;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080063import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska40e63e62015-10-13 16:16:20 -070064import static org.onlab.util.Tools.retryable;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080065import static org.slf4j.LoggerFactory.getLogger;
66
Jonathan Hart4f60f982014-10-27 08:11:17 -070067/**
68 * Distributed packet store implementation allowing packets to be sent to
69 * remote instances.
70 */
71@Component(immediate = true)
72@Service
73public class DistributedPacketStore
74 extends AbstractStore<PacketEvent, PacketStoreDelegate>
75 implements PacketStore {
76
77 private final Logger log = getLogger(getClass());
78
sangyun-hanad84e0c2016-02-19 18:30:03 +090079 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080080
Jonathan Hart4f60f982014-10-27 08:11:17 -070081 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070082 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070083
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070085 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070086
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070088 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070089
alshabib42947782015-03-31 14:59:06 -070090 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected StorageService storageService;
92
93 private PacketRequestTracker tracker;
94
Jonathan Hart4f60f982014-10-27 08:11:17 -070095 private static final MessageSubject PACKET_OUT_SUBJECT =
96 new MessageSubject("packet-out");
97
98 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
99 @Override
100 protected void setupKryoPool() {
101 serializerPool = KryoNamespace.newBuilder()
102 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800103 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
104 .build();
Jonathan Hart4f60f982014-10-27 08:11:17 -0700105 }
106 };
107
Madan Jampani2af244a2015-02-22 13:12:01 -0800108 private ExecutorService messageHandlingExecutor;
109
sangyun-hanad84e0c2016-02-19 18:30:03 +0900110 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
111 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
112 label = "Size of thread pool to assign message handler")
113 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
114
115 private static final int MAX_BACKOFF = 50;
116
Jonathan Hart4f60f982014-10-27 08:11:17 -0700117 @Activate
118 public void activate() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700119 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900120 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700121 groupedThreads("onos/store/packet", "message-handlers", log));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700122
Madan Jampani01e05fb2015-08-13 13:29:36 -0700123 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
124 SERIALIZER::decode,
125 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
126 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800127
Thomas Vachuska27bee092015-06-23 19:03:10 -0700128 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700129
Madan Jampani2af244a2015-02-22 13:12:01 -0800130 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700131 }
132
133 @Deactivate
134 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800135 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
136 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700137 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700138 log.info("Stopped");
139 }
140
sangyun-hanad84e0c2016-02-19 18:30:03 +0900141 @Modified
142 public void modified(ComponentContext context) {
143 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
144
145 int newMessageHandlerThreadPoolSize;
146
147 try {
148 String s = get(properties, "messageHandlerThreadPoolSize");
149
150 newMessageHandlerThreadPoolSize =
151 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
152
153 } catch (NumberFormatException e) {
154 log.warn(e.getMessage());
155 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
156 }
157
158 // Any change in the following parameters implies thread pool restart
159 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
160 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
161 restartMessageHandlerThreadPool();
162 }
163
164 log.info(FORMAT, messageHandlerThreadPoolSize);
165 }
166
167
Jonathan Hart4f60f982014-10-27 08:11:17 -0700168 @Override
169 public void emit(OutboundPacket packet) {
170 NodeId myId = clusterService.getLocalNode().id();
171 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
172
Jonathan Hart7466d612014-11-24 17:09:53 -0800173 if (master == null) {
174 return;
175 }
176
Jonathan Hart4f60f982014-10-27 08:11:17 -0700177 if (myId.equals(master)) {
178 notifyDelegate(new PacketEvent(Type.EMIT, packet));
179 return;
180 }
181
Madan Jampani01e05fb2015-08-13 13:29:36 -0700182 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
183 .whenComplete((r, error) -> {
184 if (error != null) {
185 log.warn("Failed to send packet-out to {}", master, error);
186 }
187 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700188 }
189
alshabib42947782015-03-31 14:59:06 -0700190 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700191 public void requestPackets(PacketRequest request) {
192 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700193 }
194
195 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700196 public void cancelPackets(PacketRequest request) {
197 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700198 }
199
200 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700201 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700202 return tracker.requests();
203 }
204
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700205 private final class PacketRequestTracker {
alshabib42947782015-03-31 14:59:06 -0700206
Thomas Vachuska27bee092015-06-23 19:03:10 -0700207 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700208
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700209 private PacketRequestTracker() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700210 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
211 .withName("onos-packet-requests")
Thomas Vachuska19f12292015-04-20 16:29:15 -0700212 .withPartitionsDisabled()
Thomas Vachuska27bee092015-06-23 19:03:10 -0700213 .withSerializer(Serializer.using(KryoNamespaces.API))
214 .build();
alshabib42947782015-03-31 14:59:06 -0700215 }
216
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700217 private void add(PacketRequest request) {
218 AtomicBoolean firstRequest =
Madan Jampanid61f2b12015-12-10 17:01:52 -0800219 retryable(this::addInternal, ConsistentMapException.ConcurrentModification.class,
220 Integer.MAX_VALUE, MAX_BACKOFF).apply(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700221 if (firstRequest.get() && delegate != null) {
222 // The instance that makes the first request will push to all devices
223 delegate.requestPackets(request);
224 }
225 }
226
227 private AtomicBoolean addInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700228 AtomicBoolean firstRequest = new AtomicBoolean(false);
229 requests.compute(request.selector(), (s, existingRequests) -> {
230 if (existingRequests == null) {
231 firstRequest.set(true);
232 return ImmutableSet.of(request);
233 } else if (!existingRequests.contains(request)) {
234 return ImmutableSet.<PacketRequest>builder()
235 .addAll(existingRequests)
236 .add(request)
237 .build();
238 } else {
239 return existingRequests;
240 }
241 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700242 return firstRequest;
243 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700244
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700245 private void remove(PacketRequest request) {
246 AtomicBoolean removedLast =
Madan Jampanid61f2b12015-12-10 17:01:52 -0800247 retryable(this::removeInternal, ConsistentMapException.ConcurrentModification.class,
248 Integer.MAX_VALUE, MAX_BACKOFF).apply(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700249 if (removedLast.get() && delegate != null) {
250 // The instance that removes the last request will remove from all devices
251 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700252 }
alshabib42947782015-03-31 14:59:06 -0700253 }
254
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700255 private AtomicBoolean removeInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700256 AtomicBoolean removedLast = new AtomicBoolean(false);
257 requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
258 if (existingRequests.contains(request)) {
259 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
260 newRequests.remove(request);
261 if (newRequests.size() > 0) {
262 return ImmutableSet.copyOf(newRequests);
263 } else {
264 removedLast.set(true);
265 return null;
266 }
267 } else {
268 return existingRequests;
269 }
270 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700271 return removedLast;
alshabib42947782015-03-31 14:59:06 -0700272 }
273
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700274 private List<PacketRequest> requests() {
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700275 List<PacketRequest> list = Lists.newArrayList();
276 requests.values().forEach(v -> list.addAll(v.value()));
277 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
278 return list;
alshabib42947782015-03-31 14:59:06 -0700279 }
alshabib42947782015-03-31 14:59:06 -0700280 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900281
282 /**
283 * Sets thread pool size of message handler.
284 *
285 * @param poolSize
286 */
287 private void setMessageHandlerThreadPoolSize(int poolSize) {
288 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
289 messageHandlerThreadPoolSize = poolSize;
290 }
291
292 /**
293 * Restarts thread pool of message handler.
294 */
295 private void restartMessageHandlerThreadPool() {
296 ExecutorService prevExecutor = messageHandlingExecutor;
297 messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
298 prevExecutor.shutdown();
299 }
300
301 /**
302 * Gets current thread pool size of message handler.
303 *
304 * @return messageHandlerThreadPoolSize
305 */
306 private int getMessageHandlerThreadPoolSize() {
307 return messageHandlerThreadPoolSize;
308 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700309}