blob: f35ee0d3e71efb24b56a4cf3a5acd43a1342d3a6 [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Madan Jampanic27b6b22016-02-05 11:36:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
Madan Jampani78be2492016-06-03 23:27:07 -070020
Madan Jampanic27b6b22016-02-05 11:36:31 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
sangyun-hanad84e0c2016-02-19 18:30:03 +090024import org.apache.felix.scr.annotations.Modified;
25import org.apache.felix.scr.annotations.Property;
Madan Jampanic27b6b22016-02-05 11:36:31 -080026import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Madan Jampanic27b6b22016-02-05 11:36:31 -080029import org.onlab.util.Tools;
sangyun-han9f0af2d2016-08-04 13:04:59 +090030import org.onosproject.cfg.ComponentConfigService;
Madan Jampanic27b6b22016-02-05 11:36:31 -080031import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.mastership.MastershipService;
34import org.onosproject.net.ConnectPoint;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.PortNumber;
37import org.onosproject.net.flow.FlowEntry;
38import org.onosproject.net.flow.FlowRule;
39import org.onosproject.net.flow.instructions.Instruction;
40import org.onosproject.net.flow.instructions.Instructions;
41import org.onosproject.net.statistic.FlowStatisticStore;
42import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070043import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic27b6b22016-02-05 11:36:31 -080044import org.onosproject.store.serializers.KryoNamespaces;
Jordan Haltermanc6c6ef22017-08-20 17:11:41 -070045import org.onosproject.store.service.Serializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090046import org.osgi.service.component.ComponentContext;
Madan Jampanic27b6b22016-02-05 11:36:31 -080047import org.slf4j.Logger;
48
49import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090050import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080051import java.util.HashSet;
52import java.util.Map;
53import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090054import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080055import java.util.Set;
56import java.util.concurrent.ConcurrentHashMap;
57import java.util.concurrent.ExecutorService;
58import java.util.concurrent.Executors;
59import java.util.concurrent.TimeUnit;
60
sangyun-hanad84e0c2016-02-19 18:30:03 +090061import static com.google.common.base.Preconditions.checkArgument;
62import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070063import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090064import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080065import static org.onlab.util.Tools.groupedThreads;
Madan Jampanic27b6b22016-02-05 11:36:31 -080066import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * Maintains flow statistics using RPC calls to collect stats from remote instances
70 * on demand.
71 */
72@Component(immediate = true)
73@Service
74public class DistributedFlowStatisticStore implements FlowStatisticStore {
75 private final Logger log = getLogger(getClass());
76
sangyun-hanad84e0c2016-02-19 18:30:03 +090077 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080078
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected MastershipService mastershipService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected ClusterCommunicationService clusterCommunicator;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected ClusterService clusterService;
87
sangyun-han9f0af2d2016-08-04 13:04:59 +090088 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected ComponentConfigService cfgService;
90
Madan Jampanic27b6b22016-02-05 11:36:31 -080091 private Map<ConnectPoint, Set<FlowEntry>> previous =
92 new ConcurrentHashMap<>();
93
94 private Map<ConnectPoint, Set<FlowEntry>> current =
95 new ConcurrentHashMap<>();
96
Madan Jampani78be2492016-06-03 23:27:07 -070097 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
98 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
99
Jordan Haltermanc6c6ef22017-08-20 17:11:41 -0700100 protected static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Madan Jampanic27b6b22016-02-05 11:36:31 -0800101
102 private NodeId local;
103 private ExecutorService messageHandlingExecutor;
104
sangyun-hanad84e0c2016-02-19 18:30:03 +0900105 private static final int DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
106 @Property(name = "messageHandlerThreadPoolSize", intValue = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE,
107 label = "Size of thread pool to assign message handler")
108 private static int messageHandlerThreadPoolSize = DEFAULT_MESSAGE_HANDLER_THREAD_POOL_SIZE;
109
110
Madan Jampanic27b6b22016-02-05 11:36:31 -0800111 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
112
113 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900114 public void activate(ComponentContext context) {
115 cfgService.registerProperties(getClass());
116
117 modified(context);
118
Madan Jampanic27b6b22016-02-05 11:36:31 -0800119 local = clusterService.getLocalNode().id();
120
121 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900122 messageHandlerThreadPoolSize,
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800123 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampanic27b6b22016-02-05 11:36:31 -0800124
125 clusterCommunicator.addSubscriber(
126 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
127 messageHandlingExecutor);
128
129 clusterCommunicator.addSubscriber(
130 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
131 messageHandlingExecutor);
132
133 log.info("Started");
134 }
135
136 @Deactivate
137 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900138 cfgService.unregisterProperties(getClass(), false);
Madan Jampanic27b6b22016-02-05 11:36:31 -0800139 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
140 clusterCommunicator.removeSubscriber(GET_CURRENT);
141 messageHandlingExecutor.shutdown();
142 log.info("Stopped");
143 }
144
sangyun-hanad84e0c2016-02-19 18:30:03 +0900145 @Modified
146 public void modified(ComponentContext context) {
147 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
148
149 int newMessageHandlerThreadPoolSize;
150
151 try {
152 String s = get(properties, "messageHandlerThreadPoolSize");
153
154 newMessageHandlerThreadPoolSize =
155 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
156
157 } catch (NumberFormatException e) {
158 log.warn(e.getMessage());
159 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
160 }
161
162 // Any change in the following parameters implies thread pool restart
163 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
164 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
165 restartMessageHandlerThreadPool();
166 }
167
168 log.info(FORMAT, messageHandlerThreadPoolSize);
169 }
170
Madan Jampanic27b6b22016-02-05 11:36:31 -0800171 @Override
172 public synchronized void removeFlowStatistic(FlowRule rule) {
173 ConnectPoint cp = buildConnectPoint(rule);
174 if (cp == null) {
175 return;
176 }
177
178 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900179 current.computeIfPresent(cp, (c, e) -> {
180 e.remove(rule);
181 return e;
182 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800183
184 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900185 previous.computeIfPresent(cp, (c, e) -> {
186 e.remove(rule);
187 return e;
188 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800189 }
190
191 @Override
192 public synchronized void addFlowStatistic(FlowEntry rule) {
193 ConnectPoint cp = buildConnectPoint(rule);
194 if (cp == null) {
195 return;
196 }
197
198 // create one if absent and add this rule
199 current.putIfAbsent(cp, new HashSet<>());
Ray Milkey88cc3432017-03-30 17:19:08 -0700200 current.computeIfPresent(cp, (c, e) -> {
201 e.add(rule); return e;
202 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800203
204 // remove previous one if present
Ray Milkey88cc3432017-03-30 17:19:08 -0700205 previous.computeIfPresent(cp, (c, e) -> {
206 e.remove(rule); return e;
207 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800208 }
209
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800210 @Override
Madan Jampanic27b6b22016-02-05 11:36:31 -0800211 public synchronized void updateFlowStatistic(FlowEntry rule) {
212 ConnectPoint cp = buildConnectPoint(rule);
213 if (cp == null) {
214 return;
215 }
216
217 Set<FlowEntry> curr = current.get(cp);
218 if (curr == null) {
219 addFlowStatistic(rule);
220 } else {
221 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
222 findAny();
223 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
224 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
225 " Invalid Flow Update! Will be removed!!" +
226 " curr flowId=" + Long.toHexString(rule.id().value()) +
227 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
228 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
229 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
230 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
231 // something is wrong! invalid flow entry, so delete it
232 removeFlowStatistic(rule);
233 return;
234 }
235 Set<FlowEntry> prev = previous.get(cp);
236 if (prev == null) {
237 prev = new HashSet<>();
238 previous.put(cp, prev);
239 }
240
241 // previous one is exist
242 if (f.isPresent()) {
243 // remove old one and add new one
244 prev.remove(rule);
245 if (!prev.add(f.get())) {
246 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
247 " flowId={}, add failed into previous.",
248 Long.toHexString(rule.id().value()));
249 }
250 }
251
252 // remove old one and add new one
253 curr.remove(rule);
254 if (!curr.add(rule)) {
255 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
256 " flowId={}, add failed into current.",
257 Long.toHexString(rule.id().value()));
258 }
259 }
260 }
261
262 @Override
263 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
264 final DeviceId deviceId = connectPoint.deviceId();
265
266 NodeId master = mastershipService.getMasterFor(deviceId);
267 if (master == null) {
268 log.warn("No master for {}", deviceId);
269 return Collections.emptySet();
270 }
271
272 if (Objects.equal(local, master)) {
273 return getCurrentStatisticInternal(connectPoint);
274 } else {
275 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
276 connectPoint,
277 GET_CURRENT,
278 SERIALIZER::encode,
279 SERIALIZER::decode,
280 master),
281 STATISTIC_STORE_TIMEOUT_MILLIS,
282 TimeUnit.MILLISECONDS,
283 Collections.emptySet());
284 }
285 }
286
287 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
288 return current.get(connectPoint);
289 }
290
291 @Override
292 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
293 final DeviceId deviceId = connectPoint.deviceId();
294
295 NodeId master = mastershipService.getMasterFor(deviceId);
296 if (master == null) {
297 log.warn("No master for {}", deviceId);
298 return Collections.emptySet();
299 }
300
301 if (Objects.equal(local, master)) {
302 return getPreviousStatisticInternal(connectPoint);
303 } else {
304 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
305 connectPoint,
306 GET_PREVIOUS,
307 SERIALIZER::encode,
308 SERIALIZER::decode,
309 master),
310 STATISTIC_STORE_TIMEOUT_MILLIS,
311 TimeUnit.MILLISECONDS,
312 Collections.emptySet());
313 }
314 }
315
316 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
317 return previous.get(connectPoint);
318 }
319
320 private ConnectPoint buildConnectPoint(FlowRule rule) {
321 PortNumber port = getOutput(rule);
322
323 if (port == null) {
324 return null;
325 }
326 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
327 return cp;
328 }
329
330 private PortNumber getOutput(FlowRule rule) {
331 for (Instruction i : rule.treatment().allInstructions()) {
332 if (i.type() == Instruction.Type.OUTPUT) {
333 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
334 return out.port();
335 }
Madan Jampanic27b6b22016-02-05 11:36:31 -0800336 }
337 return null;
338 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900339
340 /**
341 * Sets thread pool size of message handler.
342 *
343 * @param poolSize
344 */
345 private void setMessageHandlerThreadPoolSize(int poolSize) {
346 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
347 messageHandlerThreadPoolSize = poolSize;
348 }
349
350 /**
351 * Restarts thread pool of message handler.
352 */
353 private void restartMessageHandlerThreadPool() {
354 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700355 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
356 groupedThreads("DistFlowStats", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900357 prevExecutor.shutdown();
358 }
359
360 /**
361 * Gets current thread pool size of message handler.
362 *
363 * @return messageHandlerThreadPoolSize
364 */
365 private int getMessageHandlerThreadPoolSize() {
366 return messageHandlerThreadPoolSize;
367 }
Sho SHIMIZU57f2efd2016-02-24 12:20:05 -0800368}