blob: acf73fc9e7bff6b421057cb9aded8fab09e02b0d [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.statistic.impl;
alshabib3d643ec2014-10-22 18:33:00 -070017
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070019import org.onlab.util.Tools;
sangyun-han9f0af2d2016-08-04 13:04:59 +090020import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080021import org.onosproject.cluster.ClusterService;
Madan Jampanic156dd02015-08-12 15:57:46 -070022import org.onosproject.cluster.NodeId;
23import org.onosproject.mastership.MastershipService;
Brian O'Connorabafb502014-12-02 22:26:20 -080024import org.onosproject.net.ConnectPoint;
25import org.onosproject.net.DeviceId;
26import org.onosproject.net.PortNumber;
27import org.onosproject.net.flow.FlowEntry;
28import org.onosproject.net.flow.FlowRule;
29import org.onosproject.net.flow.instructions.Instruction;
30import org.onosproject.net.flow.instructions.Instructions;
31import org.onosproject.net.statistic.StatisticStore;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070033import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman2c83a102017-08-20 17:11:41 -070035import org.onosproject.store.service.Serializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090036import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070037import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Modified;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib3d643ec2014-10-22 18:33:00 -070043import org.slf4j.Logger;
44
alshabib9c57bdd2014-11-28 19:14:06 -050045import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090046import java.util.Dictionary;
alshabib3d643ec2014-10-22 18:33:00 -070047import java.util.HashSet;
48import java.util.Map;
sangyun-hanad84e0c2016-02-19 18:30:03 +090049import java.util.Properties;
alshabib3d643ec2014-10-22 18:33:00 -070050import java.util.Set;
51import java.util.concurrent.ConcurrentHashMap;
Madan Jampani2af244a2015-02-22 13:12:01 -080052import java.util.concurrent.ExecutorService;
53import java.util.concurrent.Executors;
alshabib3d643ec2014-10-22 18:33:00 -070054import java.util.concurrent.TimeUnit;
alshabib3d643ec2014-10-22 18:33:00 -070055import java.util.concurrent.atomic.AtomicInteger;
56
sangyun-hanad84e0c2016-02-19 18:30:03 +090057import static com.google.common.base.Preconditions.checkArgument;
58import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070059import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090060import static org.onlab.util.Tools.get;
Madan Jampani2af244a2015-02-22 13:12:01 -080061import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyb5646e62018-10-16 11:42:18 -070062import static org.onosproject.store.OsgiPropertyConstants.DSS_MESSAGE_HANDLER_THREAD_POOL_SIZE;
63import static org.onosproject.store.OsgiPropertyConstants.DSS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Thomas Vachuska82041f52014-11-30 22:14:02 -080064import static org.slf4j.LoggerFactory.getLogger;
65
alshabib3d643ec2014-10-22 18:33:00 -070066
67/**
68 * Maintains statistics using RPC calls to collect stats from remote instances
69 * on demand.
70 */
Ray Milkeyb5646e62018-10-16 11:42:18 -070071@Component(
72 immediate = true,
73 service = StatisticStore.class,
74 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070075 DSS_MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + DSS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -070076 }
77)
alshabib3d643ec2014-10-22 18:33:00 -070078public class DistributedStatisticStore implements StatisticStore {
79
80 private final Logger log = getLogger(getClass());
81
sangyun-hanad84e0c2016-02-19 18:30:03 +090082 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080083
Ray Milkeyd84f89b2018-08-17 14:54:17 -070084 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanic156dd02015-08-12 15:57:46 -070085 protected MastershipService mastershipService;
alshabib3d643ec2014-10-22 18:33:00 -070086
Ray Milkeyd84f89b2018-08-17 14:54:17 -070087 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080088 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070089
Ray Milkeyd84f89b2018-08-17 14:54:17 -070090 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080091 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070092
Ray Milkeyd84f89b2018-08-17 14:54:17 -070093 @Reference(cardinality = ReferenceCardinality.MANDATORY)
sangyun-han9f0af2d2016-08-04 13:04:59 +090094 protected ComponentConfigService cfgService;
95
Madan Jampani78be2492016-06-03 23:27:07 -070096 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
97 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
98
alshabib3d643ec2014-10-22 18:33:00 -070099 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
100 new ConcurrentHashMap<>();
101
102 private Map<ConnectPoint, Set<FlowEntry>> previous =
103 new ConcurrentHashMap<>();
104
105 private Map<ConnectPoint, Set<FlowEntry>> current =
106 new ConcurrentHashMap<>();
107
Jordan Halterman2c83a102017-08-20 17:11:41 -0700108 protected static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
alshabib3d643ec2014-10-22 18:33:00 -0700109
Madan Jampani2af244a2015-02-22 13:12:01 -0800110 private ExecutorService messageHandlingExecutor;
111
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700112 /** Size of thread pool to assign message handler. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700113 private static int messageHandlerThreadPoolSize = DSS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
sangyun-hanad84e0c2016-02-19 18:30:03 +0900114
alshabib3d643ec2014-10-22 18:33:00 -0700115 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
116
117 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900118 public void activate(ComponentContext context) {
119 cfgService.registerProperties(getClass());
120
121 modified(context);
Madan Jampani2af244a2015-02-22 13:12:01 -0800122
123 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900124 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700125 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampani2af244a2015-02-22 13:12:01 -0800126
Madan Jampani1151b552015-08-12 16:11:27 -0700127 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
128 SERIALIZER::decode,
129 this::getCurrentStatisticInternal,
130 SERIALIZER::encode,
131 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700132
Madan Jampani1151b552015-08-12 16:11:27 -0700133 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
134 SERIALIZER::decode,
135 this::getPreviousStatisticInternal,
136 SERIALIZER::encode,
137 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700138
alshabib3d643ec2014-10-22 18:33:00 -0700139 log.info("Started");
140 }
141
142 @Deactivate
143 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900144 cfgService.unregisterProperties(getClass(), false);
Madan Jampani2af244a2015-02-22 13:12:01 -0800145 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
146 clusterCommunicator.removeSubscriber(GET_CURRENT);
147 messageHandlingExecutor.shutdown();
alshabib3d643ec2014-10-22 18:33:00 -0700148 log.info("Stopped");
149 }
150
sangyun-hanad84e0c2016-02-19 18:30:03 +0900151 @Modified
152 public void modified(ComponentContext context) {
153 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
154
155 int newMessageHandlerThreadPoolSize;
156
157 try {
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700158 String s = get(properties, DSS_MESSAGE_HANDLER_THREAD_POOL_SIZE);
sangyun-hanad84e0c2016-02-19 18:30:03 +0900159
160 newMessageHandlerThreadPoolSize =
161 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
162
163 } catch (NumberFormatException e) {
164 log.warn(e.getMessage());
165 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
166 }
167
168 // Any change in the following parameters implies thread pool restart
169 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
170 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
171 restartMessageHandlerThreadPool();
172 }
173
174 log.info(FORMAT, messageHandlerThreadPoolSize);
175 }
176
177
alshabib3d643ec2014-10-22 18:33:00 -0700178 @Override
179 public void prepareForStatistics(FlowRule rule) {
180 ConnectPoint cp = buildConnectPoint(rule);
181 if (cp == null) {
182 return;
183 }
184 InternalStatisticRepresentation rep;
185 synchronized (representations) {
186 rep = getOrCreateRepresentation(cp);
187 }
188 rep.prepare();
189 }
190
191 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700192 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700193 ConnectPoint cp = buildConnectPoint(rule);
194 if (cp == null) {
195 return;
196 }
197 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500198 if (rep != null && rep.remove(rule)) {
199 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700200 }
alshabibf6c2ede2014-10-22 23:31:50 -0700201 Set<FlowEntry> values = current.get(cp);
202 if (values != null) {
203 values.remove(rule);
204 }
205 values = previous.get(cp);
206 if (values != null) {
207 values.remove(rule);
208 }
209
alshabib3d643ec2014-10-22 18:33:00 -0700210 }
211
212 @Override
213 public void addOrUpdateStatistic(FlowEntry rule) {
214 ConnectPoint cp = buildConnectPoint(rule);
215 if (cp == null) {
216 return;
217 }
218 InternalStatisticRepresentation rep = representations.get(cp);
219 if (rep != null && rep.submit(rule)) {
220 updatePublishedStats(cp, rep.get());
221 }
222 }
223
224 private synchronized void updatePublishedStats(ConnectPoint cp,
225 Set<FlowEntry> flowEntries) {
226 Set<FlowEntry> curr = current.get(cp);
227 if (curr == null) {
228 curr = new HashSet<>();
229 }
230 previous.put(cp, curr);
231 current.put(cp, flowEntries);
232
233 }
234
235 @Override
236 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700237 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700238 NodeId master = mastershipService.getMasterFor(deviceId);
239 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700240 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800241 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700242 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700243 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700244 return getCurrentStatisticInternal(connectPoint);
245 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700246 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
247 connectPoint,
248 GET_CURRENT,
249 SERIALIZER::encode,
250 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700251 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700252 STATISTIC_STORE_TIMEOUT_MILLIS,
253 TimeUnit.MILLISECONDS,
254 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700255 }
256
257 }
258
259 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
260 return current.get(connectPoint);
261 }
262
263 @Override
264 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700265 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700266 NodeId master = mastershipService.getMasterFor(deviceId);
267 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700268 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800269 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700270 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700271 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700272 return getPreviousStatisticInternal(connectPoint);
273 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700274 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
275 connectPoint,
276 GET_PREVIOUS,
277 SERIALIZER::encode,
278 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700279 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700280 STATISTIC_STORE_TIMEOUT_MILLIS,
281 TimeUnit.MILLISECONDS,
282 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700283 }
alshabib3d643ec2014-10-22 18:33:00 -0700284 }
285
286 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
287 return previous.get(connectPoint);
288 }
289
290 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
291
292 if (representations.containsKey(cp)) {
293 return representations.get(cp);
294 } else {
295 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
296 representations.put(cp, rep);
297 return rep;
298 }
299
300 }
301
302 private ConnectPoint buildConnectPoint(FlowRule rule) {
303 PortNumber port = getOutput(rule);
Jonathan Hart7baba072015-02-23 14:27:59 -0800304
alshabib3d643ec2014-10-22 18:33:00 -0700305 if (port == null) {
alshabib3d643ec2014-10-22 18:33:00 -0700306 return null;
307 }
308 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
309 return cp;
310 }
311
312 private PortNumber getOutput(FlowRule rule) {
Jonathan Hart8ef6d3b2015-03-08 21:21:27 -0700313 for (Instruction i : rule.treatment().allInstructions()) {
alshabib3d643ec2014-10-22 18:33:00 -0700314 if (i.type() == Instruction.Type.OUTPUT) {
315 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
316 return out.port();
317 }
alshabib3d643ec2014-10-22 18:33:00 -0700318 }
319 return null;
320 }
321
322 private class InternalStatisticRepresentation {
323
324 private final AtomicInteger counter = new AtomicInteger(0);
325 private final Set<FlowEntry> rules = new HashSet<>();
326
327 public void prepare() {
328 counter.incrementAndGet();
329 }
330
alshabib9c57bdd2014-11-28 19:14:06 -0500331 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700332 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500333 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700334 }
335
336 public synchronized boolean submit(FlowEntry rule) {
337 if (rules.contains(rule)) {
338 rules.remove(rule);
339 }
340 rules.add(rule);
341 if (counter.get() == 0) {
342 return true;
343 } else {
344 return counter.decrementAndGet() == 0;
345 }
346 }
347
348 public synchronized Set<FlowEntry> get() {
349 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700350 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700351 }
352
353
354 }
355
sangyun-hanad84e0c2016-02-19 18:30:03 +0900356 /**
357 * Sets thread pool size of message handler.
358 *
359 * @param poolSize
360 */
361 private void setMessageHandlerThreadPoolSize(int poolSize) {
362 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
363 messageHandlerThreadPoolSize = poolSize;
364 }
365
366 /**
367 * Restarts thread pool of message handler.
368 */
369 private void restartMessageHandlerThreadPool() {
370 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700371 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
372 groupedThreads("DistStatsStore", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900373 prevExecutor.shutdown();
374 }
375
376 /**
377 * Gets current thread pool size of message handler.
378 *
379 * @return messageHandlerThreadPoolSize
380 */
381 private int getMessageHandlerThreadPoolSize() {
382 return messageHandlerThreadPoolSize;
383 }
384
alshabib3d643ec2014-10-22 18:33:00 -0700385}