blob: 78c6d789e7ccf9fa4bb938adb481d4492280f231 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.statistic.impl;
alshabib3d643ec2014-10-22 18:33:00 -070017
alshabibf6c2ede2014-10-22 23:31:50 -070018import com.google.common.collect.Sets;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070019
alshabib3d643ec2014-10-22 18:33:00 -070020import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090023import org.apache.felix.scr.annotations.Modified;
24import org.apache.felix.scr.annotations.Property;
alshabib3d643ec2014-10-22 18:33:00 -070025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070028import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.cluster.ClusterService;
Madan Jampanic156dd02015-08-12 15:57:46 -070030import org.onosproject.cluster.NodeId;
31import org.onosproject.mastership.MastershipService;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.net.ConnectPoint;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.PortNumber;
35import org.onosproject.net.flow.FlowEntry;
36import org.onosproject.net.flow.FlowRule;
37import org.onosproject.net.flow.instructions.Instruction;
38import org.onosproject.net.flow.instructions.Instructions;
39import org.onosproject.net.statistic.StatisticStore;
40import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070041import org.onosproject.store.cluster.messaging.MessageSubject;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.store.serializers.KryoNamespaces;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070043import org.onosproject.store.serializers.StoreSerializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090044import org.osgi.service.component.ComponentContext;
alshabib3d643ec2014-10-22 18:33:00 -070045import org.slf4j.Logger;
46
alshabib9c57bdd2014-11-28 19:14:06 -050047import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090048import java.util.Dictionary;
alshabib3d643ec2014-10-22 18:33:00 -070049import java.util.HashSet;
50import java.util.Map;
sangyun-hanad84e0c2016-02-19 18:30:03 +090051import java.util.Properties;
alshabib3d643ec2014-10-22 18:33:00 -070052import java.util.Set;
53import java.util.concurrent.ConcurrentHashMap;
Madan Jampani2af244a2015-02-22 13:12:01 -080054import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
alshabib3d643ec2014-10-22 18:33:00 -070056import java.util.concurrent.TimeUnit;
alshabib3d643ec2014-10-22 18:33:00 -070057import java.util.concurrent.atomic.AtomicInteger;
58
sangyun-hanad84e0c2016-02-19 18:30:03 +090059import static com.google.common.base.Preconditions.checkArgument;
60import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070061import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090062import static org.onlab.util.Tools.get;
Madan Jampani2af244a2015-02-22 13:12:01 -080063import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska82041f52014-11-30 22:14:02 -080064import static org.slf4j.LoggerFactory.getLogger;
65
alshabib3d643ec2014-10-22 18:33:00 -070066
67/**
68 * Maintains statistics using RPC calls to collect stats from remote instances
69 * on demand.
70 */
71@Component(immediate = true)
72@Service
73public class DistributedStatisticStore implements StatisticStore {
74
75 private final Logger log = getLogger(getClass());
76
sangyun-hanad84e0c2016-02-19 18:30:03 +090077 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampani2af244a2015-02-22 13:12:01 -080078
alshabib3d643ec2014-10-22 18:33:00 -070079 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanic156dd02015-08-12 15:57:46 -070080 protected MastershipService mastershipService;
alshabib3d643ec2014-10-22 18:33:00 -070081
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080083 protected ClusterCommunicationService clusterCommunicator;
alshabib3d643ec2014-10-22 18:33:00 -070084
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Thomas Vachuska82041f52014-11-30 22:14:02 -080086 protected ClusterService clusterService;
alshabib3d643ec2014-10-22 18:33:00 -070087
Madan Jampani78be2492016-06-03 23:27:07 -070088 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
89 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
90
alshabib3d643ec2014-10-22 18:33:00 -070091 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
92 new ConcurrentHashMap<>();
93
94 private Map<ConnectPoint, Set<FlowEntry>> previous =
95 new ConcurrentHashMap<>();
96
97 private Map<ConnectPoint, Set<FlowEntry>> current =
98 new ConcurrentHashMap<>();
99
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700100 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(KryoNamespaces.API);
alshabib3d643ec2014-10-22 18:33:00 -0700101
Madan Jampani2af244a2015-02-22 13:12:01 -0800102 private ExecutorService messageHandlingExecutor;
103
sangyun-hanad84e0c2016-02-19 18:30:03 +0900104 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
105 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
106 label = "Size of thread pool to assign message handler")
107 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
108
alshabib3d643ec2014-10-22 18:33:00 -0700109 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
110
111 @Activate
112 public void activate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800113
114 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900115 messageHandlerThreadPoolSize,
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700116 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampani2af244a2015-02-22 13:12:01 -0800117
Madan Jampani1151b552015-08-12 16:11:27 -0700118 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
119 SERIALIZER::decode,
120 this::getCurrentStatisticInternal,
121 SERIALIZER::encode,
122 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700123
Madan Jampani1151b552015-08-12 16:11:27 -0700124 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
125 SERIALIZER::decode,
126 this::getPreviousStatisticInternal,
127 SERIALIZER::encode,
128 messageHandlingExecutor);
alshabib3d643ec2014-10-22 18:33:00 -0700129
alshabib3d643ec2014-10-22 18:33:00 -0700130 log.info("Started");
131 }
132
133 @Deactivate
134 public void deactivate() {
Madan Jampani2af244a2015-02-22 13:12:01 -0800135 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
136 clusterCommunicator.removeSubscriber(GET_CURRENT);
137 messageHandlingExecutor.shutdown();
alshabib3d643ec2014-10-22 18:33:00 -0700138 log.info("Stopped");
139 }
140
sangyun-hanad84e0c2016-02-19 18:30:03 +0900141 @Modified
142 public void modified(ComponentContext context) {
143 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
144
145 int newMessageHandlerThreadPoolSize;
146
147 try {
148 String s = get(properties, "messageHandlerThreadPoolSize");
149
150 newMessageHandlerThreadPoolSize =
151 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
152
153 } catch (NumberFormatException e) {
154 log.warn(e.getMessage());
155 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
156 }
157
158 // Any change in the following parameters implies thread pool restart
159 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
160 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
161 restartMessageHandlerThreadPool();
162 }
163
164 log.info(FORMAT, messageHandlerThreadPoolSize);
165 }
166
167
alshabib3d643ec2014-10-22 18:33:00 -0700168 @Override
169 public void prepareForStatistics(FlowRule rule) {
170 ConnectPoint cp = buildConnectPoint(rule);
171 if (cp == null) {
172 return;
173 }
174 InternalStatisticRepresentation rep;
175 synchronized (representations) {
176 rep = getOrCreateRepresentation(cp);
177 }
178 rep.prepare();
179 }
180
181 @Override
alshabibf6c2ede2014-10-22 23:31:50 -0700182 public synchronized void removeFromStatistics(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700183 ConnectPoint cp = buildConnectPoint(rule);
184 if (cp == null) {
185 return;
186 }
187 InternalStatisticRepresentation rep = representations.get(cp);
alshabib9c57bdd2014-11-28 19:14:06 -0500188 if (rep != null && rep.remove(rule)) {
189 updatePublishedStats(cp, Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700190 }
alshabibf6c2ede2014-10-22 23:31:50 -0700191 Set<FlowEntry> values = current.get(cp);
192 if (values != null) {
193 values.remove(rule);
194 }
195 values = previous.get(cp);
196 if (values != null) {
197 values.remove(rule);
198 }
199
alshabib3d643ec2014-10-22 18:33:00 -0700200 }
201
202 @Override
203 public void addOrUpdateStatistic(FlowEntry rule) {
204 ConnectPoint cp = buildConnectPoint(rule);
205 if (cp == null) {
206 return;
207 }
208 InternalStatisticRepresentation rep = representations.get(cp);
209 if (rep != null && rep.submit(rule)) {
210 updatePublishedStats(cp, rep.get());
211 }
212 }
213
214 private synchronized void updatePublishedStats(ConnectPoint cp,
215 Set<FlowEntry> flowEntries) {
216 Set<FlowEntry> curr = current.get(cp);
217 if (curr == null) {
218 curr = new HashSet<>();
219 }
220 previous.put(cp, curr);
221 current.put(cp, flowEntries);
222
223 }
224
225 @Override
226 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700227 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700228 NodeId master = mastershipService.getMasterFor(deviceId);
229 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700230 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800231 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700232 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700233 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700234 return getCurrentStatisticInternal(connectPoint);
235 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700236 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
237 connectPoint,
238 GET_CURRENT,
239 SERIALIZER::encode,
240 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700241 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700242 STATISTIC_STORE_TIMEOUT_MILLIS,
243 TimeUnit.MILLISECONDS,
244 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700245 }
246
247 }
248
249 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
250 return current.get(connectPoint);
251 }
252
253 @Override
254 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700255 final DeviceId deviceId = connectPoint.deviceId();
Madan Jampanic156dd02015-08-12 15:57:46 -0700256 NodeId master = mastershipService.getMasterFor(deviceId);
257 if (master == null) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700258 log.warn("No master for {}", deviceId);
Thomas Vachuska82041f52014-11-30 22:14:02 -0800259 return Collections.emptySet();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700260 }
Madan Jampanic156dd02015-08-12 15:57:46 -0700261 if (master.equals(clusterService.getLocalNode().id())) {
alshabib3d643ec2014-10-22 18:33:00 -0700262 return getPreviousStatisticInternal(connectPoint);
263 } else {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700264 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
265 connectPoint,
266 GET_PREVIOUS,
267 SERIALIZER::encode,
268 SERIALIZER::decode,
Madan Jampanic156dd02015-08-12 15:57:46 -0700269 master),
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700270 STATISTIC_STORE_TIMEOUT_MILLIS,
271 TimeUnit.MILLISECONDS,
272 Collections.emptySet());
alshabib3d643ec2014-10-22 18:33:00 -0700273 }
alshabib3d643ec2014-10-22 18:33:00 -0700274 }
275
276 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
277 return previous.get(connectPoint);
278 }
279
280 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
281
282 if (representations.containsKey(cp)) {
283 return representations.get(cp);
284 } else {
285 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
286 representations.put(cp, rep);
287 return rep;
288 }
289
290 }
291
292 private ConnectPoint buildConnectPoint(FlowRule rule) {
293 PortNumber port = getOutput(rule);
Jonathan Hart7baba072015-02-23 14:27:59 -0800294
alshabib3d643ec2014-10-22 18:33:00 -0700295 if (port == null) {
alshabib3d643ec2014-10-22 18:33:00 -0700296 return null;
297 }
298 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
299 return cp;
300 }
301
302 private PortNumber getOutput(FlowRule rule) {
Jonathan Hart8ef6d3b2015-03-08 21:21:27 -0700303 for (Instruction i : rule.treatment().allInstructions()) {
alshabib3d643ec2014-10-22 18:33:00 -0700304 if (i.type() == Instruction.Type.OUTPUT) {
305 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
306 return out.port();
307 }
alshabib3d643ec2014-10-22 18:33:00 -0700308 }
309 return null;
310 }
311
312 private class InternalStatisticRepresentation {
313
314 private final AtomicInteger counter = new AtomicInteger(0);
315 private final Set<FlowEntry> rules = new HashSet<>();
316
317 public void prepare() {
318 counter.incrementAndGet();
319 }
320
alshabib9c57bdd2014-11-28 19:14:06 -0500321 public synchronized boolean remove(FlowRule rule) {
alshabib3d643ec2014-10-22 18:33:00 -0700322 rules.remove(rule);
alshabib9c57bdd2014-11-28 19:14:06 -0500323 return counter.decrementAndGet() == 0;
alshabib3d643ec2014-10-22 18:33:00 -0700324 }
325
326 public synchronized boolean submit(FlowEntry rule) {
327 if (rules.contains(rule)) {
328 rules.remove(rule);
329 }
330 rules.add(rule);
331 if (counter.get() == 0) {
332 return true;
333 } else {
334 return counter.decrementAndGet() == 0;
335 }
336 }
337
338 public synchronized Set<FlowEntry> get() {
339 counter.set(rules.size());
alshabibf6c2ede2014-10-22 23:31:50 -0700340 return Sets.newHashSet(rules);
alshabib3d643ec2014-10-22 18:33:00 -0700341 }
342
343
344 }
345
sangyun-hanad84e0c2016-02-19 18:30:03 +0900346 /**
347 * Sets thread pool size of message handler.
348 *
349 * @param poolSize
350 */
351 private void setMessageHandlerThreadPoolSize(int poolSize) {
352 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
353 messageHandlerThreadPoolSize = poolSize;
354 }
355
356 /**
357 * Restarts thread pool of message handler.
358 */
359 private void restartMessageHandlerThreadPool() {
360 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700361 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
362 groupedThreads("DistStatsStore", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900363 prevExecutor.shutdown();
364 }
365
366 /**
367 * Gets current thread pool size of message handler.
368 *
369 * @return messageHandlerThreadPoolSize
370 */
371 private int getMessageHandlerThreadPoolSize() {
372 return messageHandlerThreadPoolSize;
373 }
374
alshabib3d643ec2014-10-22 18:33:00 -0700375}