blob: f18c56dcd543a69a9a91208c054ed833ada9b17f [file] [log] [blame]
ssyoon90a98825a2015-08-26 00:48:15 +09001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.net.statistic.impl;
18
19import com.google.common.base.MoreObjects;
20import com.google.common.base.Predicate;
21import com.google.common.collect.ImmutableSet;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onosproject.cli.Comparators;
29import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.Device;
31import org.onosproject.net.Port;
32import org.onosproject.net.PortNumber;
33import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.flow.DefaultTypedFlowEntry;
35import org.onosproject.net.flow.FlowEntry;
36import org.onosproject.net.flow.FlowRule;
37import org.onosproject.net.flow.FlowRuleEvent;
38import org.onosproject.net.flow.FlowRuleListener;
39import org.onosproject.net.flow.FlowRuleService;
40import org.onosproject.net.flow.TypedStoredFlowEntry;
41import org.onosproject.net.flow.instructions.Instruction;
42import org.onosproject.net.statistic.DefaultLoad;
43import org.onosproject.net.statistic.FlowStatisticService;
44import org.onosproject.net.statistic.Load;
45import org.onosproject.net.statistic.FlowStatisticStore;
46import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
47import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
48
49import org.slf4j.Logger;
50
51import java.util.ArrayList;
52import java.util.HashMap;
53import java.util.List;
54import java.util.Map;
55import java.util.Objects;
56import java.util.Set;
57import java.util.TreeMap;
58import java.util.stream.Collectors;
59
60import static com.google.common.base.Preconditions.checkNotNull;
61import static org.onosproject.security.AppGuard.checkPermission;
62import static org.slf4j.LoggerFactory.getLogger;
63import static org.onosproject.security.AppPermission.Type.*;
64
65/**
66 * Provides an implementation of the Flow Statistic Service.
67 */
68@Component(immediate = true, enabled = true)
69@Service
70public class FlowStatisticManager implements FlowStatisticService {
71 private final Logger log = getLogger(getClass());
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected FlowRuleService flowRuleService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected FlowStatisticStore flowStatisticStore;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected DeviceService deviceService;
81
82 private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();
83
84 @Activate
85 public void activate() {
86 flowRuleService.addListener(frListener);
87 log.info("Started");
88 }
89
90 @Deactivate
91 public void deactivate() {
92 flowRuleService.removeListener(frListener);
93 log.info("Stopped");
94 }
95
96 @Override
97 public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {
98 checkPermission(STATISTIC_READ);
99
100 Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
101
102 if (device == null) {
103 return summaryLoad;
104 }
105
106 List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
107
108 for (Port port : ports) {
109 ConnectPoint cp = new ConnectPoint(device.id(), port.number());
110 SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);
111 summaryLoad.put(cp, sfe);
112 }
113
114 return summaryLoad;
115 }
116
117 @Override
118 public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {
119 checkPermission(STATISTIC_READ);
120
121 ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
122 return loadSummaryPortInternal(cp);
123 }
124
125 @Override
126 public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
127 TypedStoredFlowEntry.FlowLiveType liveType,
128 Instruction.Type instType) {
129 checkPermission(STATISTIC_READ);
130
131 Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
132
133 if (device == null) {
134 return allLoad;
135 }
136
137 List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
138
139 for (Port port : ports) {
140 ConnectPoint cp = new ConnectPoint(device.id(), port.number());
141 List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);
142 allLoad.put(cp, tfel);
143 }
144
145 return allLoad;
146 }
147
148 @Override
149 public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
150 TypedStoredFlowEntry.FlowLiveType liveType,
151 Instruction.Type instType) {
152 checkPermission(STATISTIC_READ);
153
154 ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
155 return loadAllPortInternal(cp, liveType, instType);
156 }
157
158 @Override
159 public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
160 TypedStoredFlowEntry.FlowLiveType liveType,
161 Instruction.Type instType,
162 int topn) {
163 checkPermission(STATISTIC_READ);
164
165 Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
166
167 if (device == null) {
168 return allLoad;
169 }
170
171 List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
172
173 for (Port port : ports) {
174 ConnectPoint cp = new ConnectPoint(device.id(), port.number());
175 List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);
176 allLoad.put(cp, tfel);
177 }
178
179 return allLoad;
180 }
181
182 @Override
183 public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
184 TypedStoredFlowEntry.FlowLiveType liveType,
185 Instruction.Type instType,
186 int topn) {
187 checkPermission(STATISTIC_READ);
188
189 ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
190 return loadTopnPortInternal(cp, liveType, instType, topn);
191 }
192
193 private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {
194 checkPermission(STATISTIC_READ);
195
196 Set<FlowEntry> currentStats;
197 Set<FlowEntry> previousStats;
198
199 TypedStatistics typedStatistics;
200 synchronized (flowStatisticStore) {
201 currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
202 if (currentStats == null) {
203 return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
204 }
205 previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
206 if (previousStats == null) {
207 return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
208 }
209 // copy to local flow entry
210 typedStatistics = new TypedStatistics(currentStats, previousStats);
211
212 // Check for validity of this stats data
213 checkLoadValidity(currentStats, previousStats);
214 }
215
216 // current and previous set is not empty!
217 Set<FlowEntry> currentSet = typedStatistics.current();
218 Set<FlowEntry> previousSet = typedStatistics.previous();
219 Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),
220 TypedFlowEntryWithLoad.avgPollInterval());
221
222 Map<FlowRule, TypedStoredFlowEntry> currentMap;
223 Map<FlowRule, TypedStoredFlowEntry> previousMap;
224
225 currentMap = typedStatistics.currentImmediate();
226 previousMap = typedStatistics.previousImmediate();
227 Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
228 TypedFlowEntryWithLoad.shortPollInterval());
229
230 currentMap = typedStatistics.currentShort();
231 previousMap = typedStatistics.previousShort();
232 Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
233 TypedFlowEntryWithLoad.shortPollInterval());
234
235 currentMap = typedStatistics.currentMid();
236 previousMap = typedStatistics.previousMid();
237 Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
238 TypedFlowEntryWithLoad.midPollInterval());
239
240 currentMap = typedStatistics.currentLong();
241 previousMap = typedStatistics.previousLong();
242 Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
243 TypedFlowEntryWithLoad.longPollInterval());
244
245 currentMap = typedStatistics.currentUnknown();
246 previousMap = typedStatistics.previousUnknown();
247 Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
248 TypedFlowEntryWithLoad.avgPollInterval());
249
250 return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);
251 }
252
253 private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,
254 TypedStoredFlowEntry.FlowLiveType liveType,
255 Instruction.Type instType) {
256 checkPermission(STATISTIC_READ);
257
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800258 List<TypedFlowEntryWithLoad> retTfel = new ArrayList<>();
ssyoon90a98825a2015-08-26 00:48:15 +0900259
260 Set<FlowEntry> currentStats;
261 Set<FlowEntry> previousStats;
262
263 TypedStatistics typedStatistics;
264 synchronized (flowStatisticStore) {
265 currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
266 if (currentStats == null) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800267 return retTfel;
ssyoon90a98825a2015-08-26 00:48:15 +0900268 }
269 previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
270 if (previousStats == null) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800271 return retTfel;
ssyoon90a98825a2015-08-26 00:48:15 +0900272 }
273 // copy to local flow entry set
274 typedStatistics = new TypedStatistics(currentStats, previousStats);
275
276 // Check for validity of this stats data
277 checkLoadValidity(currentStats, previousStats);
278 }
279
280 // current and previous set is not empty!
281 boolean isAllLiveType = (liveType == null ? true : false); // null is all live type
282 boolean isAllInstType = (instType == null ? true : false); // null is all inst type
283
284 Map<FlowRule, TypedStoredFlowEntry> currentMap;
285 Map<FlowRule, TypedStoredFlowEntry> previousMap;
286
287 if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {
288 currentMap = typedStatistics.currentImmediate();
289 previousMap = typedStatistics.previousImmediate();
290
291 List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
292 isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
293 if (fel.size() > 0) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800294 retTfel.addAll(fel);
ssyoon90a98825a2015-08-26 00:48:15 +0900295 }
296 }
297
298 if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {
299 currentMap = typedStatistics.currentShort();
300 previousMap = typedStatistics.previousShort();
301
302 List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
303 isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
304 if (fel.size() > 0) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800305 retTfel.addAll(fel);
ssyoon90a98825a2015-08-26 00:48:15 +0900306 }
307 }
308
309 if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {
310 currentMap = typedStatistics.currentMid();
311 previousMap = typedStatistics.previousMid();
312
313 List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
314 isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());
315 if (fel.size() > 0) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800316 retTfel.addAll(fel);
ssyoon90a98825a2015-08-26 00:48:15 +0900317 }
318 }
319
320 if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {
321 currentMap = typedStatistics.currentLong();
322 previousMap = typedStatistics.previousLong();
323
324 List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
325 isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());
326 if (fel.size() > 0) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800327 retTfel.addAll(fel);
ssyoon90a98825a2015-08-26 00:48:15 +0900328 }
329 }
330
331 if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {
332 currentMap = typedStatistics.currentUnknown();
333 previousMap = typedStatistics.previousUnknown();
334
335 List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
336 isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());
337 if (fel.size() > 0) {
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800338 retTfel.addAll(fel);
ssyoon90a98825a2015-08-26 00:48:15 +0900339 }
340 }
341
Jonathan Hartd9df7bd2015-11-10 17:10:25 -0800342 return retTfel;
ssyoon90a98825a2015-08-26 00:48:15 +0900343 }
344
345 private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,
346 Map<FlowRule, TypedStoredFlowEntry> currentMap,
347 Map<FlowRule, TypedStoredFlowEntry> previousMap,
348 boolean isAllInstType,
349 Instruction.Type instType,
350 int liveTypePollInterval) {
351 List<TypedFlowEntryWithLoad> fel = new ArrayList<>();
352
353 for (TypedStoredFlowEntry tfe : currentMap.values()) {
354 if (isAllInstType ||
355 tfe.treatment().allInstructions().stream().
356 filter(i -> i.type() == instType).
357 findAny().isPresent()) {
358 long currentBytes = tfe.bytes();
359 long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();
360 Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);
361 fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));
362 }
363 }
364
365 return fel;
366 }
367
368 private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,
369 TypedStoredFlowEntry.FlowLiveType liveType,
370 Instruction.Type instType,
371 int topn) {
372 List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
373
374 // Sort with descending order of load
375 List<TypedFlowEntryWithLoad> tfel =
376 fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).
377 limit(topn).collect(Collectors.toList());
378
379 return tfel;
380 }
381
382 private long aggregateBytesSet(Set<FlowEntry> setFE) {
383 return setFE.stream().mapToLong(FlowEntry::bytes).sum();
384 }
385
386 private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {
387 return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();
388 }
389
390 /**
391 * Internal data class holding two set of typed flow entries.
392 */
393 private static class TypedStatistics {
394 private final ImmutableSet<FlowEntry> currentAll;
395 private final ImmutableSet<FlowEntry> previousAll;
396
397 private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();
398 private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();
399
400 private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();
401 private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();
402
403 private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();
404 private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();
405
406 private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();
407 private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();
408
409 private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();
410 private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();
411
412 public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {
413 this.currentAll = ImmutableSet.copyOf(checkNotNull(current));
414 this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));
415
416 currentAll.forEach(fe -> {
417 TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
418
419 switch (tfe.flowLiveType()) {
420 case IMMEDIATE_FLOW:
421 currentImmediate.put(fe, tfe);
422 break;
423 case SHORT_FLOW:
424 currentShort.put(fe, tfe);
425 break;
426 case MID_FLOW:
427 currentMid.put(fe, tfe);
428 break;
429 case LONG_FLOW:
430 currentLong.put(fe, tfe);
431 break;
432 default:
433 currentUnknown.put(fe, tfe);
434 break;
435 }
436 });
437
438 previousAll.forEach(fe -> {
439 TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
440
441 switch (tfe.flowLiveType()) {
442 case IMMEDIATE_FLOW:
443 if (currentImmediate.containsKey(fe)) {
444 previousImmediate.put(fe, tfe);
445 } else if (currentShort.containsKey(fe)) {
446 previousShort.put(fe, tfe);
447 } else if (currentMid.containsKey(fe)) {
448 previousMid.put(fe, tfe);
449 } else if (currentLong.containsKey(fe)) {
450 previousLong.put(fe, tfe);
451 } else {
452 previousUnknown.put(fe, tfe);
453 }
454 break;
455 case SHORT_FLOW:
456 if (currentShort.containsKey(fe)) {
457 previousShort.put(fe, tfe);
458 } else if (currentMid.containsKey(fe)) {
459 previousMid.put(fe, tfe);
460 } else if (currentLong.containsKey(fe)) {
461 previousLong.put(fe, tfe);
462 } else {
463 previousUnknown.put(fe, tfe);
464 }
465 break;
466 case MID_FLOW:
467 if (currentMid.containsKey(fe)) {
468 previousMid.put(fe, tfe);
469 } else if (currentLong.containsKey(fe)) {
470 previousLong.put(fe, tfe);
471 } else {
472 previousUnknown.put(fe, tfe);
473 }
474 break;
475 case LONG_FLOW:
476 if (currentLong.containsKey(fe)) {
477 previousLong.put(fe, tfe);
478 } else {
479 previousUnknown.put(fe, tfe);
480 }
481 break;
482 default:
483 previousUnknown.put(fe, tfe);
484 break;
485 }
486 });
487 }
488
489 /**
490 * Returns flow entries as the current value.
491 *
492 * @return flow entries as the current value
493 */
494 public ImmutableSet<FlowEntry> current() {
495 return currentAll;
496 }
497
498 /**
499 * Returns flow entries as the previous value.
500 *
501 * @return flow entries as the previous value
502 */
503 public ImmutableSet<FlowEntry> previous() {
504 return previousAll;
505 }
506
507 public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {
508 return currentImmediate;
509 }
510 public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {
511 return previousImmediate;
512 }
513 public Map<FlowRule, TypedStoredFlowEntry> currentShort() {
514 return currentShort;
515 }
516 public Map<FlowRule, TypedStoredFlowEntry> previousShort() {
517 return previousShort;
518 }
519 public Map<FlowRule, TypedStoredFlowEntry> currentMid() {
520 return currentMid;
521 }
522 public Map<FlowRule, TypedStoredFlowEntry> previousMid() {
523 return previousMid;
524 }
525 public Map<FlowRule, TypedStoredFlowEntry> currentLong() {
526 return currentLong;
527 }
528 public Map<FlowRule, TypedStoredFlowEntry> previousLong() {
529 return previousLong;
530 }
531 public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {
532 return currentUnknown;
533 }
534 public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {
535 return previousUnknown;
536 }
537
538 /**
539 * Validates values are not empty.
540 *
541 * @return false if either of the sets is empty. Otherwise, true.
542 */
543 public boolean isValid() {
544 return !(currentAll.isEmpty() || previousAll.isEmpty());
545 }
546
547 @Override
548 public int hashCode() {
549 return Objects.hash(currentAll, previousAll);
550 }
551
552 @Override
553 public boolean equals(Object obj) {
554 if (this == obj) {
555 return true;
556 }
557 if (!(obj instanceof TypedStatistics)) {
558 return false;
559 }
560 final TypedStatistics other = (TypedStatistics) obj;
561 return Objects.equals(this.currentAll, other.currentAll) &&
562 Objects.equals(this.previousAll, other.previousAll);
563 }
564
565 @Override
566 public String toString() {
567 return MoreObjects.toStringHelper(this)
568 .add("current", currentAll)
569 .add("previous", previousAll)
570 .toString();
571 }
572 }
573
574 private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {
575 current.stream().forEach(c -> {
576 FlowEntry f = previous.stream().filter(p -> c.equals(p)).
577 findAny().orElse(null);
578 if (f != null && c.bytes() < f.bytes()) {
579 log.debug("FlowStatisticManager:checkLoadValidity():" +
580 "Error: " + c + " :Previous bytes=" + f.bytes() +
581 " is larger than current bytes=" + c.bytes() + " !!!");
582 }
583 });
584
585 }
586
587 /**
588 * Creates a predicate that checks the instruction type of a flow entry is the same as
589 * the specified instruction type.
590 *
591 * @param instType instruction type to be checked
592 * @return predicate
593 */
594 private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {
595 return new Predicate<FlowEntry>() {
596 @Override
597 public boolean apply(FlowEntry flowEntry) {
598 List<Instruction> allInstructions = flowEntry.treatment().allInstructions();
599
600 return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();
601 }
602 };
603 }
604
605 /**
606 * Internal flow rule event listener for FlowStatisticManager.
607 */
608 private class InternalFlowRuleStatsListener implements FlowRuleListener {
609
610 @Override
611 public void event(FlowRuleEvent event) {
612 FlowRule rule = event.subject();
613 switch (event.type()) {
614 case RULE_ADDED:
615 if (rule instanceof FlowEntry) {
616 flowStatisticStore.addFlowStatistic((FlowEntry) rule);
617 }
618 break;
619 case RULE_UPDATED:
620 flowStatisticStore.updateFlowStatistic((FlowEntry) rule);
621 break;
622 case RULE_ADD_REQUESTED:
623 break;
624 case RULE_REMOVE_REQUESTED:
625 break;
626 case RULE_REMOVED:
627 flowStatisticStore.removeFlowStatistic(rule);
628 break;
629 default:
630 log.warn("Unknown flow rule event {}", event);
631 }
632 }
633 }
634}