blob: 1f38f9c6fc727137bcb112f3be2eba756b5e9fca [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.statistic.impl;
alshabib3d643ec2014-10-22 18:33:00 -070017
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070019
alshabib3d643ec2014-10-22 18:33:00 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090023import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
alshabib3d643ec2014-10-22 18:33:00 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070028import org.onlab.util.Tools;
sangyun-han9f0af2d2016-08-04 13:04:59 +090029import org.onosproject.cfg.ComponentConfigService;
Brian O'Connorabafb502014-12-02 22:26:20 -080030import org.onosproject.cluster.ClusterService;
Madan Jampanic156dd02015-08-12 15:57:46 -070031import org.onosproject.cluster.NodeId;
32import org.onosproject.mastership.MastershipService;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.instructions.Instruction;
39import org.onosproject.net.flow.instructions.Instructions;
40import org.onosproject.net.statistic.StatisticStore;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070042import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080043import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070044import org.onosproject.store.serializers.StoreSerializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090045import org.osgi.service.component.ComponentContext;
alshabib3d643ec2014-10-22 18:33:00 -070046import org.slf4j.Logger;
47
alshabib9c57bdd2014-11-28 19:14:06 -050048import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090049import java.util.Dictionary;
alshabib3d643ec2014-10-22 18:33:00 -070050import java.util.HashSet;
51import java.util.Map;
sangyun-hanad84e0c2016-02-19 18:30:03 +090052import java.util.Properties;
alshabib3d643ec2014-10-22 18:33:00 -070053import java.util.Set;
54import java.util.concurrent.ConcurrentHashMap;
Madan Jampani2af244a2015-02-22 13:12:01 -080055import java.util.concurrent.ExecutorService;
56import java.util.concurrent.Executors;
alshabib3d643ec2014-10-22 18:33:00 -070057import java.util.concurrent.TimeUnit;
alshabib3d643ec2014-10-22 18:33:00 -070058import java.util.concurrent.atomic.AtomicInteger;
59
sangyun-hanad84e0c2016-02-19 18:30:03 +090060import static com.google.common.base.Preconditions.checkArgument;
61import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070062import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090063import static org.onlab.util.Tools.get;
Madan Jampani2af244a2015-02-22 13:12:01 -080064import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska82041f52014-11-30 22:14:02 -080065import static org.slf4j.LoggerFactory.getLogger;
66
alshabib3d643ec2014-10-22 18:33:00 -070067
68/**
69 * Maintains statistics using RPC calls to collect stats from remote instances
70 * on demand.
71 */
72@Component(immediate = true)
73@Service
74public class DistributedStatisticStore implements StatisticStore {
75
76 private final Logger log = getLogger(getClass());
77
sangyun-hanad84e0c2016-02-19 18:30:03 +090078 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080079
alshabib3d643ec2014-10-22 18:33:00 -070080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanic156dd02015-08-12 15:57:46 -070081 protected MastershipService mastershipService;
alshabib3d643ec2014-10-22 18:33:00 -070082
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080084 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070085
86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080087 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070088
sangyun-han9f0af2d2016-08-04 13:04:59 +090089 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected ComponentConfigService cfgService;
91
Madan Jampani78be2492016-06-03 23:27:07 -070092 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
93 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
94
alshabib3d643ec2014-10-22 18:33:00 -070095 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
96 new ConcurrentHashMap<>();
97
98 private Map<ConnectPoint, Set<FlowEntry>> previous =
99 new ConcurrentHashMap<>();
100
101 private Map<ConnectPoint, Set<FlowEntry>> current =
102 new ConcurrentHashMap<>();
103
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700104 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
alshabib3d643ec2014-10-22 18:33:00 -0700105
Madan Jampani2af244a2015-02-22 13:12:01 -0800106 private ExecutorService messageHandlingExecutor;
107
sangyun-hanad84e0c2016-02-19 18:30:03 +0900108 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
109 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
110 label = "Size of thread pool to assign message handler")
111 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
112
alshabib3d643ec2014-10-22 18:33:00 -0700113 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
114
115 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900116 public void activate(ComponentContext context) {
117 cfgService.registerProperties(getClass());
118
119 modified(context);
Madan Jampani2af244a2015-02-22 13:12:01 -0800120
121 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900122 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700123 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampani2af244a2015-02-22 13:12:01 -0800124
Madan Jampani1151b552015-08-12 16:11:27 -0700125 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
126 SERIALIZER::decode,
127 this::getCurrentStatisticInternal,
128 SERIALIZER::encode,
129 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700130
Madan Jampani1151b552015-08-12 16:11:27 -0700131 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
132 SERIALIZER::decode,
133 this::getPreviousStatisticInternal,
134 SERIALIZER::encode,
135 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700136
alshabib3d643ec2014-10-22 18:33:00 -0700137 log.info("Started");
138 }
139
140 @Deactivate
141 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900142 cfgService.unregisterProperties(getClass(), false);
Madan Jampani2af244a2015-02-22 13:12:01 -0800143 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
144 clusterCommunicator.removeSubscriber(GET_CURRENT);
145 messageHandlingExecutor.shutdown();
alshabib3d643ec2014-10-22 18:33:00 -0700146 log.info("Stopped");
147 }
148
sangyun-hanad84e0c2016-02-19 18:30:03 +0900149 @Modified
150 public void modified(ComponentContext context) {
151 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
152
153 int newMessageHandlerThreadPoolSize;
154
155 try {
156 String s = get(properties, "messageHandlerThreadPoolSize");
157
158 newMessageHandlerThreadPoolSize =
159 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
160
161 } catch (NumberFormatException e) {
162 log.warn(e.getMessage());
163 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
164 }
165
166 // Any change in the following parameters implies thread pool restart
167 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
168 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
169 restartMessageHandlerThreadPool();
170 }
171
172 log.info(FORMAT, messageHandlerThreadPoolSize);
173 }
174
175
alshabib3d643ec2014-10-22 18:33:00 -0700176 @Override
177 public void prepareForStatistics(FlowRule rule) {
178 ConnectPoint cp = buildConnectPoint(rule);
179 if (cp == null) {
180 return;
181 }
182 InternalStatisticRepresentation rep;
183 synchronized (representations) {
184 rep = getOrCreateRepresentation(cp);
185 }
186 rep.prepare();
187 }
188
189 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700190 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700191 ConnectPoint cp = buildConnectPoint(rule);
192 if (cp == null) {
193 return;
194 }
195 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500196 if (rep != null && rep.remove(rule)) {
197 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700198 }
alshabibf6c2ede2014-10-22 23:31:50 -0700199 Set<FlowEntry> values = current.get(cp);
200 if (values != null) {
201 values.remove(rule);
202 }
203 values = previous.get(cp);
204 if (values != null) {
205 values.remove(rule);
206 }
207
alshabib3d643ec2014-10-22 18:33:00 -0700208 }
209
210 @Override
211 public void addOrUpdateStatistic(FlowEntry rule) {
212 ConnectPoint cp = buildConnectPoint(rule);
213 if (cp == null) {
214 return;
215 }
216 InternalStatisticRepresentation rep = representations.get(cp);
217 if (rep != null && rep.submit(rule)) {
218 updatePublishedStats(cp, rep.get());
219 }
220 }
221
222 private synchronized void updatePublishedStats(ConnectPoint cp,
223 Set<FlowEntry> flowEntries) {
224 Set<FlowEntry> curr = current.get(cp);
225 if (curr == null) {
226 curr = new HashSet<>();
227 }
228 previous.put(cp, curr);
229 current.put(cp, flowEntries);
230
231 }
232
233 @Override
234 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700235 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700236 NodeId master = mastershipService.getMasterFor(deviceId);
237 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700238 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800239 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700240 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700241 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700242 return getCurrentStatisticInternal(connectPoint);
243 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700244 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
245 connectPoint,
246 GET_CURRENT,
247 SERIALIZER::encode,
248 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700249 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700250 STATISTIC_STORE_TIMEOUT_MILLIS,
251 TimeUnit.MILLISECONDS,
252 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700253 }
254
255 }
256
257 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
258 return current.get(connectPoint);
259 }
260
261 @Override
262 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700263 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700264 NodeId master = mastershipService.getMasterFor(deviceId);
265 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700266 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800267 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700268 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700269 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700270 return getPreviousStatisticInternal(connectPoint);
271 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700272 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
273 connectPoint,
274 GET_PREVIOUS,
275 SERIALIZER::encode,
276 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700277 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700278 STATISTIC_STORE_TIMEOUT_MILLIS,
279 TimeUnit.MILLISECONDS,
280 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700281 }
alshabib3d643ec2014-10-22 18:33:00 -0700282 }
283
284 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
285 return previous.get(connectPoint);
286 }
287
288 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
289
290 if (representations.containsKey(cp)) {
291 return representations.get(cp);
292 } else {
293 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
294 representations.put(cp, rep);
295 return rep;
296 }
297
298 }
299
300 private ConnectPoint buildConnectPoint(FlowRule rule) {
301 PortNumber port = getOutput(rule);
Jonathan Hart7baba072015-02-23 14:27:59 -0800302
alshabib3d643ec2014-10-22 18:33:00 -0700303 if (port == null) {
alshabib3d643ec2014-10-22 18:33:00 -0700304 return null;
305 }
306 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
307 return cp;
308 }
309
310 private PortNumber getOutput(FlowRule rule) {
Jonathan Hart8ef6d3b2015-03-08 21:21:27 -0700311 for (Instruction i : rule.treatment().allInstructions()) {
alshabib3d643ec2014-10-22 18:33:00 -0700312 if (i.type() == Instruction.Type.OUTPUT) {
313 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
314 return out.port();
315 }
alshabib3d643ec2014-10-22 18:33:00 -0700316 }
317 return null;
318 }
319
320 private class InternalStatisticRepresentation {
321
322 private final AtomicInteger counter = new AtomicInteger(0);
323 private final Set<FlowEntry> rules = new HashSet<>();
324
325 public void prepare() {
326 counter.incrementAndGet();
327 }
328
alshabib9c57bdd2014-11-28 19:14:06 -0500329 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700330 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500331 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700332 }
333
334 public synchronized boolean submit(FlowEntry rule) {
335 if (rules.contains(rule)) {
336 rules.remove(rule);
337 }
338 rules.add(rule);
339 if (counter.get() == 0) {
340 return true;
341 } else {
342 return counter.decrementAndGet() == 0;
343 }
344 }
345
346 public synchronized Set<FlowEntry> get() {
347 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700348 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700349 }
350
351
352 }
353
sangyun-hanad84e0c2016-02-19 18:30:03 +0900354 /**
355 * Sets thread pool size of message handler.
356 *
357 * @param poolSize
358 */
359 private void setMessageHandlerThreadPoolSize(int poolSize) {
360 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
361 messageHandlerThreadPoolSize = poolSize;
362 }
363
364 /**
365 * Restarts thread pool of message handler.
366 */
367 private void restartMessageHandlerThreadPool() {
368 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700369 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
370 groupedThreads("DistStatsStore", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900371 prevExecutor.shutdown();
372 }
373
374 /**
375 * Gets current thread pool size of message handler.
376 *
377 * @return messageHandlerThreadPoolSize
378 */
379 private int getMessageHandlerThreadPoolSize() {
380 return messageHandlerThreadPoolSize;
381 }
382
alshabib3d643ec2014-10-22 18:33:00 -0700383}