blob: a1159954a50892f30d3469ee4b3b3131cdc3ab80 [file] [log] [blame]
Madan Jampanic27b6b22016-02-05 11:36:31 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Madan Jampanic27b6b22016-02-05 11:36:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.statistic.impl;
18
19import com.google.common.base.Objects;
Madan Jampanic27b6b22016-02-05 11:36:31 -080020import org.onlab.util.Tools;
sangyun-han9f0af2d2016-08-04 13:04:59 +090021import org.onosproject.cfg.ComponentConfigService;
Madan Jampanic27b6b22016-02-05 11:36:31 -080022import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.mastership.MastershipService;
25import org.onosproject.net.ConnectPoint;
26import org.onosproject.net.DeviceId;
27import org.onosproject.net.PortNumber;
28import org.onosproject.net.flow.FlowEntry;
29import org.onosproject.net.flow.FlowRule;
30import org.onosproject.net.flow.instructions.Instruction;
31import org.onosproject.net.flow.instructions.Instructions;
32import org.onosproject.net.statistic.FlowStatisticStore;
33import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani78be2492016-06-03 23:27:07 -070034import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic27b6b22016-02-05 11:36:31 -080035import org.onosproject.store.serializers.KryoNamespaces;
Jordan Halterman2c83a102017-08-20 17:11:41 -070036import org.onosproject.store.service.Serializer;
sangyun-hanad84e0c2016-02-19 18:30:03 +090037import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070038import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Modified;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
Madan Jampanic27b6b22016-02-05 11:36:31 -080044import org.slf4j.Logger;
45
46import java.util.Collections;
sangyun-hanad84e0c2016-02-19 18:30:03 +090047import java.util.Dictionary;
Madan Jampanic27b6b22016-02-05 11:36:31 -080048import java.util.HashSet;
49import java.util.Map;
50import java.util.Optional;
sangyun-hanad84e0c2016-02-19 18:30:03 +090051import java.util.Properties;
Madan Jampanic27b6b22016-02-05 11:36:31 -080052import java.util.Set;
53import java.util.concurrent.ConcurrentHashMap;
54import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.TimeUnit;
57
sangyun-hanad84e0c2016-02-19 18:30:03 +090058import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070060import static java.util.concurrent.Executors.newFixedThreadPool;
sangyun-hanad84e0c2016-02-19 18:30:03 +090061import static org.onlab.util.Tools.get;
Madan Jampanic27b6b22016-02-05 11:36:31 -080062import static org.onlab.util.Tools.groupedThreads;
Ray Milkeyb5646e62018-10-16 11:42:18 -070063import static org.onosproject.store.OsgiPropertyConstants.DFS_MESSAGE_HANDLER_THREAD_POOL_SIZE;
64import static org.onosproject.store.OsgiPropertyConstants.DFS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Madan Jampanic27b6b22016-02-05 11:36:31 -080065import static org.slf4j.LoggerFactory.getLogger;
66
67/**
68 * Maintains flow statistics using RPC calls to collect stats from remote instances
69 * on demand.
70 */
Ray Milkeyb5646e62018-10-16 11:42:18 -070071@Component(
72 immediate = true,
73 service = FlowStatisticStore.class,
74 property = {
Ray Milkey2d7bca12018-10-17 14:51:52 -070075 DFS_MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + DFS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT
Ray Milkeyb5646e62018-10-16 11:42:18 -070076 }
77)
Madan Jampanic27b6b22016-02-05 11:36:31 -080078public class DistributedFlowStatisticStore implements FlowStatisticStore {
79 private final Logger log = getLogger(getClass());
80
sangyun-hanad84e0c2016-02-19 18:30:03 +090081 private static final String FORMAT = "Setting: messageHandlerThreadPoolSize={}";
Madan Jampanic27b6b22016-02-05 11:36:31 -080082
Ray Milkeyd84f89b2018-08-17 14:54:17 -070083 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanic27b6b22016-02-05 11:36:31 -080084 protected MastershipService mastershipService;
85
Ray Milkeyd84f89b2018-08-17 14:54:17 -070086 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanic27b6b22016-02-05 11:36:31 -080087 protected ClusterCommunicationService clusterCommunicator;
88
Ray Milkeyd84f89b2018-08-17 14:54:17 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Madan Jampanic27b6b22016-02-05 11:36:31 -080090 protected ClusterService clusterService;
91
Ray Milkeyd84f89b2018-08-17 14:54:17 -070092 @Reference(cardinality = ReferenceCardinality.MANDATORY)
sangyun-han9f0af2d2016-08-04 13:04:59 +090093 protected ComponentConfigService cfgService;
94
Madan Jampanic27b6b22016-02-05 11:36:31 -080095 private Map<ConnectPoint, Set<FlowEntry>> previous =
96 new ConcurrentHashMap<>();
97
98 private Map<ConnectPoint, Set<FlowEntry>> current =
99 new ConcurrentHashMap<>();
100
Madan Jampani78be2492016-06-03 23:27:07 -0700101 public static final MessageSubject GET_CURRENT = new MessageSubject("peer-return-current");
102 public static final MessageSubject GET_PREVIOUS = new MessageSubject("peer-return-previous");
103
Jordan Halterman2c83a102017-08-20 17:11:41 -0700104 protected static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
Madan Jampanic27b6b22016-02-05 11:36:31 -0800105
106 private NodeId local;
107 private ExecutorService messageHandlingExecutor;
108
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700109 /** Size of thread pool to assign message handler. */
Ray Milkeyb5646e62018-10-16 11:42:18 -0700110 private static int messageHandlerThreadPoolSize = DFS_MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
sangyun-hanad84e0c2016-02-19 18:30:03 +0900111
112
Madan Jampanic27b6b22016-02-05 11:36:31 -0800113 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
114
115 @Activate
sangyun-han9f0af2d2016-08-04 13:04:59 +0900116 public void activate(ComponentContext context) {
117 cfgService.registerProperties(getClass());
118
119 modified(context);
120
Madan Jampanic27b6b22016-02-05 11:36:31 -0800121 local = clusterService.getLocalNode().id();
122
123 messageHandlingExecutor = Executors.newFixedThreadPool(
sangyun-hanad84e0c2016-02-19 18:30:03 +0900124 messageHandlerThreadPoolSize,
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800125 groupedThreads("onos/store/statistic", "message-handlers", log));
Madan Jampanic27b6b22016-02-05 11:36:31 -0800126
127 clusterCommunicator.addSubscriber(
128 GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
129 messageHandlingExecutor);
130
131 clusterCommunicator.addSubscriber(
132 GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
133 messageHandlingExecutor);
134
135 log.info("Started");
136 }
137
138 @Deactivate
139 public void deactivate() {
sangyun-han9f0af2d2016-08-04 13:04:59 +0900140 cfgService.unregisterProperties(getClass(), false);
Madan Jampanic27b6b22016-02-05 11:36:31 -0800141 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
142 clusterCommunicator.removeSubscriber(GET_CURRENT);
143 messageHandlingExecutor.shutdown();
144 log.info("Stopped");
145 }
146
sangyun-hanad84e0c2016-02-19 18:30:03 +0900147 @Modified
148 public void modified(ComponentContext context) {
149 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
150
151 int newMessageHandlerThreadPoolSize;
152
153 try {
Thomas Vachuskaf566fa22018-10-30 14:03:36 -0700154 String s = get(properties, DFS_MESSAGE_HANDLER_THREAD_POOL_SIZE);
sangyun-hanad84e0c2016-02-19 18:30:03 +0900155
156 newMessageHandlerThreadPoolSize =
157 isNullOrEmpty(s) ? messageHandlerThreadPoolSize : Integer.parseInt(s.trim());
158
159 } catch (NumberFormatException e) {
160 log.warn(e.getMessage());
161 newMessageHandlerThreadPoolSize = messageHandlerThreadPoolSize;
162 }
163
164 // Any change in the following parameters implies thread pool restart
165 if (newMessageHandlerThreadPoolSize != messageHandlerThreadPoolSize) {
166 setMessageHandlerThreadPoolSize(newMessageHandlerThreadPoolSize);
167 restartMessageHandlerThreadPool();
168 }
169
170 log.info(FORMAT, messageHandlerThreadPoolSize);
171 }
172
Madan Jampanic27b6b22016-02-05 11:36:31 -0800173 @Override
174 public synchronized void removeFlowStatistic(FlowRule rule) {
175 ConnectPoint cp = buildConnectPoint(rule);
176 if (cp == null) {
177 return;
178 }
179
180 // remove this rule if present from current map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900181 current.computeIfPresent(cp, (c, e) -> {
182 e.remove(rule);
183 return e;
184 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800185
186 // remove this on if present from previous map
sangyun-hanad84e0c2016-02-19 18:30:03 +0900187 previous.computeIfPresent(cp, (c, e) -> {
188 e.remove(rule);
189 return e;
190 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800191 }
192
193 @Override
194 public synchronized void addFlowStatistic(FlowEntry rule) {
195 ConnectPoint cp = buildConnectPoint(rule);
196 if (cp == null) {
197 return;
198 }
199
200 // create one if absent and add this rule
201 current.putIfAbsent(cp, new HashSet<>());
Ray Milkey88cc3432017-03-30 17:19:08 -0700202 current.computeIfPresent(cp, (c, e) -> {
203 e.add(rule); return e;
204 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800205
206 // remove previous one if present
Ray Milkey88cc3432017-03-30 17:19:08 -0700207 previous.computeIfPresent(cp, (c, e) -> {
208 e.remove(rule); return e;
209 });
Madan Jampanic27b6b22016-02-05 11:36:31 -0800210 }
211
HIGUCHI Yuta060da9a2016-03-11 19:16:35 -0800212 @Override
Madan Jampanic27b6b22016-02-05 11:36:31 -0800213 public synchronized void updateFlowStatistic(FlowEntry rule) {
214 ConnectPoint cp = buildConnectPoint(rule);
215 if (cp == null) {
216 return;
217 }
218
219 Set<FlowEntry> curr = current.get(cp);
220 if (curr == null) {
221 addFlowStatistic(rule);
222 } else {
223 Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
224 findAny();
225 if (f.isPresent() && rule.bytes() < f.get().bytes()) {
226 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
227 " Invalid Flow Update! Will be removed!!" +
228 " curr flowId=" + Long.toHexString(rule.id().value()) +
229 ", prev flowId=" + Long.toHexString(f.get().id().value()) +
230 ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
231 ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
232 ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
233 // something is wrong! invalid flow entry, so delete it
234 removeFlowStatistic(rule);
235 return;
236 }
237 Set<FlowEntry> prev = previous.get(cp);
238 if (prev == null) {
239 prev = new HashSet<>();
240 previous.put(cp, prev);
241 }
242
243 // previous one is exist
244 if (f.isPresent()) {
245 // remove old one and add new one
246 prev.remove(rule);
247 if (!prev.add(f.get())) {
248 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
249 " flowId={}, add failed into previous.",
250 Long.toHexString(rule.id().value()));
251 }
252 }
253
254 // remove old one and add new one
255 curr.remove(rule);
256 if (!curr.add(rule)) {
257 log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
258 " flowId={}, add failed into current.",
259 Long.toHexString(rule.id().value()));
260 }
261 }
262 }
263
264 @Override
265 public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
266 final DeviceId deviceId = connectPoint.deviceId();
267
268 NodeId master = mastershipService.getMasterFor(deviceId);
269 if (master == null) {
270 log.warn("No master for {}", deviceId);
271 return Collections.emptySet();
272 }
273
274 if (Objects.equal(local, master)) {
275 return getCurrentStatisticInternal(connectPoint);
276 } else {
277 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
278 connectPoint,
279 GET_CURRENT,
280 SERIALIZER::encode,
281 SERIALIZER::decode,
282 master),
283 STATISTIC_STORE_TIMEOUT_MILLIS,
284 TimeUnit.MILLISECONDS,
285 Collections.emptySet());
286 }
287 }
288
289 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
290 return current.get(connectPoint);
291 }
292
293 @Override
294 public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
295 final DeviceId deviceId = connectPoint.deviceId();
296
297 NodeId master = mastershipService.getMasterFor(deviceId);
298 if (master == null) {
299 log.warn("No master for {}", deviceId);
300 return Collections.emptySet();
301 }
302
303 if (Objects.equal(local, master)) {
304 return getPreviousStatisticInternal(connectPoint);
305 } else {
306 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
307 connectPoint,
308 GET_PREVIOUS,
309 SERIALIZER::encode,
310 SERIALIZER::decode,
311 master),
312 STATISTIC_STORE_TIMEOUT_MILLIS,
313 TimeUnit.MILLISECONDS,
314 Collections.emptySet());
315 }
316 }
317
318 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
319 return previous.get(connectPoint);
320 }
321
322 private ConnectPoint buildConnectPoint(FlowRule rule) {
323 PortNumber port = getOutput(rule);
324
325 if (port == null) {
326 return null;
327 }
328 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
329 return cp;
330 }
331
332 private PortNumber getOutput(FlowRule rule) {
333 for (Instruction i : rule.treatment().allInstructions()) {
334 if (i.type() == Instruction.Type.OUTPUT) {
335 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
336 return out.port();
337 }
Madan Jampanic27b6b22016-02-05 11:36:31 -0800338 }
339 return null;
340 }
sangyun-hanad84e0c2016-02-19 18:30:03 +0900341
342 /**
343 * Sets thread pool size of message handler.
344 *
345 * @param poolSize
346 */
347 private void setMessageHandlerThreadPoolSize(int poolSize) {
348 checkArgument(poolSize >= 0, "Message handler pool size must be 0 or more");
349 messageHandlerThreadPoolSize = poolSize;
350 }
351
352 /**
353 * Restarts thread pool of message handler.
354 */
355 private void restartMessageHandlerThreadPool() {
356 ExecutorService prevExecutor = messageHandlingExecutor;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700357 messageHandlingExecutor = newFixedThreadPool(getMessageHandlerThreadPoolSize(),
358 groupedThreads("DistFlowStats", "messageHandling-%d", log));
sangyun-hanad84e0c2016-02-19 18:30:03 +0900359 prevExecutor.shutdown();
360 }
361
362 /**
363 * Gets current thread pool size of message handler.
364 *
365 * @return messageHandlerThreadPoolSize
366 */
367 private int getMessageHandlerThreadPoolSize() {
368 return messageHandlerThreadPoolSize;
369 }
Sho SHIMIZU57f2efd2016-02-24 12:20:05 -0800370}