blob: 450875e54c36338443658cf524e787d7efb51d36 [file] [log] [blame]
Jonathan Hart4f60f982014-10-27 08:11:17 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Jonathan Hart4f60f982014-10-27 08:11:17 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.packet.impl;
Jonathan Hart4f60f982014-10-27 08:11:17 -070017
Brian O'Connor21b028e2015-10-08 22:50:02 -070018import com.google.common.collect.ImmutableSet;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070019import com.google.common.collect.Lists;
Brian O'Connor21b028e2015-10-08 22:50:02 -070020import com.google.common.collect.Sets;
Jonathan Hart4f60f982014-10-27 08:11:17 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
Jonathan Hart4f60f982014-10-27 08:11:17 -070026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.mastership.MastershipService;
Thomas Vachuska27bee092015-06-23 19:03:10 -070032import org.onosproject.net.flow.TrafficSelector;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.net.packet.OutboundPacket;
34import org.onosproject.net.packet.PacketEvent;
35import org.onosproject.net.packet.PacketEvent.Type;
alshabib42947782015-03-31 14:59:06 -070036import org.onosproject.net.packet.PacketRequest;
Brian O'Connorabafb502014-12-02 22:26:20 -080037import org.onosproject.net.packet.PacketStore;
38import org.onosproject.net.packet.PacketStoreDelegate;
39import org.onosproject.store.AbstractStore;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.store.cluster.messaging.MessageSubject;
42import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070043import org.onosproject.store.serializers.StoreSerializer;
alshabib42947782015-03-31 14:59:06 -070044import org.onosproject.store.service.ConsistentMap;
45import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
sangyun-hanad84e0c2016-02-19 18:30:03 +090047import org.osgi.service.component.ComponentContext;
Jonathan Hart4f60f982014-10-27 08:11:17 -070048import org.slf4j.Logger;
49
sangyun-hanad84e0c2016-02-19 18:30:03 +090050import java.util.Dictionary;
Thomas Vachuska7f171b22015-08-21 12:49:08 -070051import java.util.List;
sangyun-hanad84e0c2016-02-19 18:30:03 +090052import java.util.Properties;
alshabib42947782015-03-31 14:59:06 -070053import java.util.Set;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080054import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
Brian O'Connor21b028e2015-10-08 22:50:02 -070056import java.util.concurrent.atomic.AtomicBoolean;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080057
sangyun-hanad84e0c2016-02-19 18:30:03 +090058import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Strings.isNullOrEmpty;
60import static org.onlab.util.Tools.get;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080061import static org.onlab.util.Tools.groupedThreads;
62import static org.slf4j.LoggerFactory.getLogger;
63
Jonathan Hart4f60f982014-10-27 08:11:17 -070064/**
65 * Distributed packet store implementation allowing packets to be sent to
66 * remote instances.
67 */
68@Component(immediate = true)
69@Service
70public class DistributedPacketStore
71 extends AbstractStore<PacketEvent, PacketStoreDelegate>
72 implements PacketStore {
73
74 private final Logger log = getLogger(getClass());
75
sangyun-hanad84e0c2016-02-19 18:30:03 +090076 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080077
Jonathan Hart4f60f982014-10-27 08:11:17 -070078 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070079 protected MastershipService mastershipService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070080
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070082 protected ClusterService clusterService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070083
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuskaff965232015-03-17 14:10:52 -070085 protected ClusterCommunicationService communicationService;
Jonathan Hart4f60f982014-10-27 08:11:17 -070086
alshabib42947782015-03-31 14:59:06 -070087 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected StorageService storageService;
89
90 private PacketRequestTracker tracker;
91
Jonathan Hart4f60f982014-10-27 08:11:17 -070092 private static final MessageSubject PACKET_OUT_SUBJECT =
93 new MessageSubject("packet-out");
94
HIGUCHI Yutae7290652016-05-18 11:29:01 -070095 private static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
Jonathan Hart4f60f982014-10-27 08:11:17 -070096
Madan Jampani2af244a2015-02-22 13:12:01 -080097 private ExecutorService messageHandlingExecutor;
98
sangyun-hanad84e0c2016-02-19 18:30:03 +090099 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
100 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
101 label = "Size of thread pool to assign message handler")
102 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
103
104 private static final int MAX_BACKOFF = 50;
105
Jonathan Hart4f60f982014-10-27 08:11:17 -0700106 @Activate
107 public void activate() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700108 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900109 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700110 groupedThreads("onos/store/packet", "message-handlers", log));
Jonathan Hart4f60f982014-10-27 08:11:17 -0700111
Madan Jampani01e05fb2015-08-13 13:29:36 -0700112 communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
113 SERIALIZER::decode,
114 packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
115 messageHandlingExecutor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800116
Thomas Vachuska27bee092015-06-23 19:03:10 -0700117 tracker = new PacketRequestTracker();
alshabib42947782015-03-31 14:59:06 -0700118
Madan Jampani2af244a2015-02-22 13:12:01 -0800119 log.info("Started");
Jonathan Hart4f60f982014-10-27 08:11:17 -0700120 }
121
122 @Deactivate
123 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800124 communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
125 messageHandlingExecutor.shutdown();
Brian O'Connor21b028e2015-10-08 22:50:02 -0700126 tracker = null;
Jonathan Hart4f60f982014-10-27 08:11:17 -0700127 log.info("Stopped");
128 }
129
sangyun-hanad84e0c2016-02-19 18:30:03 +0900130 @Modified
131 public void modified(ComponentContext context) {
132 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
133
134 int newMessageHandlerThreadPoolSize;
135
136 try {
137 String s = get(properties, "messageHandlerThreadPoolSize");
138
139 newMessageHandlerThreadPoolSize =
140 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
141
142 } catch (NumberFormatException e) {
143 log.warn(e.getMessage());
144 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
145 }
146
147 // Any change in the following parameters implies thread pool restart
148 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
149 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
150 restartMessageHandlerThreadPool();
151 }
152
153 log.info(FORMAT, messageHandlerThreadPoolSize);
154 }
155
156
Jonathan Hart4f60f982014-10-27 08:11:17 -0700157 @Override
158 public void emit(OutboundPacket packet) {
159 NodeId myId = clusterService.getLocalNode().id();
160 NodeId master = mastershipService.getMasterFor(packet.sendThrough());
161
Jonathan Hart7466d612014-11-24 17:09:53 -0800162 if (master == null) {
163 return;
164 }
165
Jonathan Hart4f60f982014-10-27 08:11:17 -0700166 if (myId.equals(master)) {
167 notifyDelegate(new PacketEvent(Type.EMIT, packet));
168 return;
169 }
170
Madan Jampani01e05fb2015-08-13 13:29:36 -0700171 communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
172 .whenComplete((r, error) -> {
173 if (error != null) {
174 log.warn("Failed to send packet-out to {}", master, error);
175 }
176 });
Jonathan Hart4f60f982014-10-27 08:11:17 -0700177 }
178
alshabib42947782015-03-31 14:59:06 -0700179 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700180 public void requestPackets(PacketRequest request) {
181 tracker.add(request);
alshabib42947782015-03-31 14:59:06 -0700182 }
183
184 @Override
Brian O'Connor21b028e2015-10-08 22:50:02 -0700185 public void cancelPackets(PacketRequest request) {
186 tracker.remove(request);
Thomas Vachuska27bee092015-06-23 19:03:10 -0700187 }
188
189 @Override
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700190 public List<PacketRequest> existingRequests() {
alshabib42947782015-03-31 14:59:06 -0700191 return tracker.requests();
192 }
193
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700194 private final class PacketRequestTracker {
alshabib42947782015-03-31 14:59:06 -0700195
Thomas Vachuska27bee092015-06-23 19:03:10 -0700196 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
alshabib42947782015-03-31 14:59:06 -0700197
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700198 private PacketRequestTracker() {
Thomas Vachuska27bee092015-06-23 19:03:10 -0700199 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
200 .withName("onos-packet-requests")
Thomas Vachuska27bee092015-06-23 19:03:10 -0700201 .withSerializer(Serializer.using(KryoNamespaces.API))
202 .build();
alshabib42947782015-03-31 14:59:06 -0700203 }
204
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700205 private void add(PacketRequest request) {
Madan Jampanic6371882016-06-03 21:30:17 -0700206 AtomicBoolean firstRequest = addInternal(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700207 if (firstRequest.get() && delegate != null) {
208 // The instance that makes the first request will push to all devices
209 delegate.requestPackets(request);
210 }
211 }
212
213 private AtomicBoolean addInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700214 AtomicBoolean firstRequest = new AtomicBoolean(false);
215 requests.compute(request.selector(), (s, existingRequests) -> {
216 if (existingRequests == null) {
217 firstRequest.set(true);
218 return ImmutableSet.of(request);
219 } else if (!existingRequests.contains(request)) {
220 return ImmutableSet.<PacketRequest>builder()
221 .addAll(existingRequests)
222 .add(request)
223 .build();
224 } else {
225 return existingRequests;
226 }
227 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700228 return firstRequest;
229 }
Brian O'Connor21b028e2015-10-08 22:50:02 -0700230
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700231 private void remove(PacketRequest request) {
Madan Jampanic6371882016-06-03 21:30:17 -0700232 AtomicBoolean removedLast = removeInternal(request);
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700233 if (removedLast.get() && delegate != null) {
234 // The instance that removes the last request will remove from all devices
235 delegate.cancelPackets(request);
alshabib42947782015-03-31 14:59:06 -0700236 }
alshabib42947782015-03-31 14:59:06 -0700237 }
238
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700239 private AtomicBoolean removeInternal(PacketRequest request) {
Brian O'Connor21b028e2015-10-08 22:50:02 -0700240 AtomicBoolean removedLast = new AtomicBoolean(false);
241 requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
242 if (existingRequests.contains(request)) {
243 Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
244 newRequests.remove(request);
245 if (newRequests.size() > 0) {
246 return ImmutableSet.copyOf(newRequests);
247 } else {
248 removedLast.set(true);
249 return null;
250 }
251 } else {
252 return existingRequests;
253 }
254 });
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700255 return removedLast;
alshabib42947782015-03-31 14:59:06 -0700256 }
257
Thomas Vachuska40e63e62015-10-13 16:16:20 -0700258 private List<PacketRequest> requests() {
Thomas Vachuska7f171b22015-08-21 12:49:08 -0700259 List<PacketRequest> list = Lists.newArrayList();
260 requests.values().forEach(v -> list.addAll(v.value()));
261 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
262 return list;
alshabib42947782015-03-31 14:59:06 -0700263 }
alshabib42947782015-03-31 14:59:06 -0700264 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900265
266 /**
267 * Sets thread pool size of message handler.
268 *
269 * @param poolSize
270 */
271 private void setMessageHandlerThreadPoolSize(int poolSize) {
272 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
273 messageHandlerThreadPoolSize = poolSize;
274 }
275
276 /**
277 * Restarts thread pool of message handler.
278 */
279 private void restartMessageHandlerThreadPool() {
280 ExecutorService prevExecutor = messageHandlingExecutor;
281 messageHandlingExecutor = Executors.newFixedThreadPool(getMessageHandlerThreadPoolSize());
282 prevExecutor.shutdown();
283 }
284
285 /**
286 * Gets current thread pool size of message handler.
287 *
288 * @return messageHandlerThreadPoolSize
289 */
290 private int getMessageHandlerThreadPoolSize() {
291 return messageHandlerThreadPoolSize;
292 }
Jonathan Hart4f60f982014-10-27 08:11:17 -0700293}