blob: aa8df947d517e38aa98178a3efb1a1251206e105 [file] [log] [blame]
Brian O'Connor8c685362015-12-05 15:27:27 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.of.flow.impl;
18
19import com.google.common.base.Objects;
20import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.onosproject.net.flow.DefaultTypedFlowEntry;
24import org.onosproject.net.flow.FlowEntry;
25import org.onosproject.net.flow.FlowId;
26import org.onosproject.net.flow.FlowRule;
27import org.onosproject.net.flow.StoredFlowEntry;
28import org.onosproject.net.flow.TypedStoredFlowEntry;
29import org.onosproject.net.flow.instructions.Instruction;
30import org.onosproject.net.flow.instructions.Instructions;
31import org.onosproject.openflow.controller.OpenFlowSwitch;
32import org.onosproject.openflow.controller.RoleState;
33import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
34import org.projectfloodlight.openflow.protocol.match.Match;
35import org.projectfloodlight.openflow.types.OFPort;
36import org.projectfloodlight.openflow.types.TableId;
37import org.slf4j.Logger;
38
39import java.util.HashSet;
40import java.util.List;
41import java.util.Map;
42import java.util.Optional;
43import java.util.Set;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
46import java.util.concurrent.ScheduledFuture;
47import java.util.concurrent.TimeUnit;
48
49import static com.google.common.base.Preconditions.checkNotNull;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
52import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * Efficiently and adaptively collects flow statistics for the specified switch.
56 */
57public class NewAdaptiveFlowStatsCollector {
58
59 private final Logger log = getLogger(getClass());
60
61 private final OpenFlowSwitch sw;
62
63 private ScheduledExecutorService adaptiveFlowStatsScheduler =
64 Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
65 private ScheduledFuture<?> calAndShortFlowsThread;
66 private ScheduledFuture<?> midFlowsThread;
67 private ScheduledFuture<?> longFlowsThread;
68
69 // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
70 private CalAndShortFlowsTask calAndShortFlowsTask;
71 // Task that collects stats MID flows every 2*calAndPollInterval
72 private MidFlowsTask midFlowsTask;
73 // Task that collects stats LONG flows every 3*calAndPollInterval
74 private LongFlowsTask longFlowsTask;
75
76 private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
77 private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
78 private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
79 //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
80 // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
81 private static final int ENTIRE_POLL_TIMES = 6;
82
83 private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
84 private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
85 private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
86
87 private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
88 private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89 private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90 // only used for checking condition at each task if it collects entire flows from a given switch or not
91 private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
92
93 // Number of call count of each Task,
94 // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
95 private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
96 private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
97 private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
98
99 private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
100
101 private boolean isFirstTimeStart = true;
102
103 public static final long NO_FLOW_MISSING_XID = (-1);
104 private long flowMissingXid = NO_FLOW_MISSING_XID;
105
106 /**
107 * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
108 *
109 * @param sw switch to pull
110 * @param pollInterval cal and immediate poll frequency in seconds
111 */
112 NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
113 this.sw = sw;
114
115 initMemberVars(pollInterval);
116 }
117
118 // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
119 private void initMemberVars(int pollInterval) {
120 if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
121 this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
122 } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
123 this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
124 } else {
125 this.calAndPollInterval = pollInterval;
126 }
127
128 calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
129 midPollInterval = MID_POLL_TIMES * calAndPollInterval;
130 longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
131 entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
132
133 callCountCalAndShortFlowsTask = 0;
134 callCountMidFlowsTask = 0;
135 callCountLongFlowsTask = 0;
136
137 flowMissingXid = NO_FLOW_MISSING_XID;
138 }
139
140 /**
141 * Adjusts adaptive poll frequency.
142 *
143 * @param pollInterval poll frequency in seconds
144 */
145 synchronized void adjustCalAndPollInterval(int pollInterval) {
146 initMemberVars(pollInterval);
147
148 if (calAndShortFlowsThread != null) {
149 calAndShortFlowsThread.cancel(false);
150 }
151 if (midFlowsThread != null) {
152 midFlowsThread.cancel(false);
153 }
154 if (longFlowsThread != null) {
155 longFlowsThread.cancel(false);
156 }
157
158 calAndShortFlowsTask = new CalAndShortFlowsTask();
159 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
160 calAndShortFlowsTask,
161 0,
162 calAndPollInterval,
163 TimeUnit.SECONDS);
164
165 midFlowsTask = new MidFlowsTask();
166 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
167 midFlowsTask,
168 0,
169 midPollInterval,
170 TimeUnit.SECONDS);
171
172 longFlowsTask = new LongFlowsTask();
173 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
174 longFlowsTask,
175 0,
176 longPollInterval,
177 TimeUnit.SECONDS);
178
179 log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
180 }
181
182 private class CalAndShortFlowsTask implements Runnable {
183 @Override
184 public void run() {
185 if (sw.getRole() == RoleState.MASTER) {
186 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
187
188 if (isFirstTimeStart) {
189 // isFirstTimeStart, get entire flow stats from a given switch sw
190 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
191 sw.getStringId());
192 ofFlowStatsRequestAllSend();
193
194 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
195 isFirstTimeStart = false;
196 } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
197 // entire_poll_times, get entire flow stats from a given switch sw
198 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
199 ofFlowStatsRequestAllSend();
200
201 callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
202 //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
203 //
204 } else {
205 calAndShortFlowsTaskInternal();
206 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
207 }
208 }
209 }
210 }
211
212 // send openflow flow stats request message with getting all flow entries to a given switch sw
213 private void ofFlowStatsRequestAllSend() {
214 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
215 .setMatch(sw.factory().matchWildcardAll())
216 .setTableId(TableId.ALL)
217 .setOutPort(OFPort.NO_MASK)
218 .build();
219
220 synchronized (this) {
221 // set the request xid to check the reply in OpenFlowRuleProvider
222 // After processing the reply of this request message,
223 // this must be set to NO_FLOW_MISSING_XID(-1) by provider
224 setFlowMissingXid(request.getXid());
225 log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
226
227 sw.sendMsg(request);
228 }
229 }
230
231 // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
232 private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
233 // set find match
234 Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
235 Optional.empty()).buildMatch();
236 // set find tableId
237 TableId tableId = TableId.of(fe.tableId());
238 // set output port
239 Instruction ins = fe.treatment().allInstructions().stream()
240 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
241 .findFirst()
242 .orElse(null);
243 OFPort ofPort = OFPort.NO_MASK;
244 if (ins != null) {
245 Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
246 ofPort = OFPort.of((int) ((out.port().toLong())));
247 }
248
249 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
250 .setMatch(match)
251 .setTableId(tableId)
252 .setOutPort(ofPort)
253 .build();
254
255 synchronized (this) {
256 if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
257 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
258 + " set no flow missing xid anyway, for {}",
259 sw.getStringId());
260 setFlowMissingXid(NO_FLOW_MISSING_XID);
261 }
262
263 sw.sendMsg(request);
264 }
265 }
266
267 private void calAndShortFlowsTaskInternal() {
268 deviceFlowTable.checkAndMoveLiveFlowAll();
269
270 deviceFlowTable.getShortFlows().forEach(fe -> {
271 ofFlowStatsRequestFlowSend(fe);
272 });
273 }
274
275 private class MidFlowsTask implements Runnable {
276 @Override
277 public void run() {
278 if (sw.getRole() == RoleState.MASTER) {
279 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
280
281 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
282 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
283 callCountMidFlowsTask = MID_POLL_TIMES;
284 } else {
285 midFlowsTaskInternal();
286 callCountMidFlowsTask += MID_POLL_TIMES;
287 }
288 }
289 }
290 }
291
292 private void midFlowsTaskInternal() {
293 deviceFlowTable.getMidFlows().forEach(fe -> {
294 ofFlowStatsRequestFlowSend(fe);
295 });
296 }
297
298 private class LongFlowsTask implements Runnable {
299 @Override
300 public void run() {
301 if (sw.getRole() == RoleState.MASTER) {
302 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
303
304 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
305 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
306 callCountLongFlowsTask = LONG_POLL_TIMES;
307 } else {
308 longFlowsTaskInternal();
309 callCountLongFlowsTask += LONG_POLL_TIMES;
310 }
311 }
312 }
313 }
314
315 private void longFlowsTaskInternal() {
316 deviceFlowTable.getLongFlows().forEach(fe -> {
317 ofFlowStatsRequestFlowSend(fe);
318 });
319 }
320
321 /**
322 * start adaptive flow statistic collection.
323 *
324 */
325 public synchronized void start() {
326 log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
327 callCountCalAndShortFlowsTask = 0;
328 callCountMidFlowsTask = 0;
329 callCountLongFlowsTask = 0;
330
331 isFirstTimeStart = true;
332
333 // Initially start polling quickly. Then drop down to configured value
334 calAndShortFlowsTask = new CalAndShortFlowsTask();
335 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
336 calAndShortFlowsTask,
337 1,
338 calAndPollInterval,
339 TimeUnit.SECONDS);
340
341 midFlowsTask = new MidFlowsTask();
342 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
343 midFlowsTask,
344 1,
345 midPollInterval,
346 TimeUnit.SECONDS);
347
348 longFlowsTask = new LongFlowsTask();
349 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
350 longFlowsTask,
351 1,
352 longPollInterval,
353 TimeUnit.SECONDS);
354
355 log.info("Started");
356 }
357
358 /**
359 * stop adaptive flow statistic collection.
360 *
361 */
362 public synchronized void stop() {
363 log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
364 if (calAndShortFlowsThread != null) {
365 calAndShortFlowsThread.cancel(true);
366 }
367 if (midFlowsThread != null) {
368 midFlowsThread.cancel(true);
369 }
370 if (longFlowsThread != null) {
371 longFlowsThread.cancel(true);
372 }
373
374 adaptiveFlowStatsScheduler.shutdownNow();
375
376 isFirstTimeStart = false;
377
378 log.info("Stopped");
379 }
380
381 /**
382 * add typed flow entry from flow rule into the internal flow table.
383 *
384 * @param flowRules the flow rules
385 *
386 */
387 public synchronized void addWithFlowRule(FlowRule... flowRules) {
388 for (FlowRule fr : flowRules) {
389 // First remove old entry unconditionally, if exist
390 deviceFlowTable.remove(fr);
391
392 // add new flow entry, we suppose IMMEDIATE_FLOW
393 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
394 FlowLiveType.IMMEDIATE_FLOW);
395 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
396 }
397 }
398
399 /**
400 * add or update typed flow entry from flow entry into the internal flow table.
401 *
402 * @param flowEntries the flow entries
403 *
404 */
405 public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
406 for (FlowEntry fe : flowEntries) {
407 // check if this new rule is an update to an existing entry
408 TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
409
410 if (stored != null) {
411 // duplicated flow entry is collected!, just skip
412 if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
413 && fe.life() == stored.life()) {
414 log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
415 + ",is DUPLICATED stats collection, just skip."
416 + " AdaptiveStats collection thread for {}",
417 sw.getStringId());
418
419 stored.setLastSeen();
420 continue;
421 } else if (fe.life() < stored.life()) {
422 // Invalid updates the stats values, i.e., bytes, packets, durations ...
423 log.debug("addOrUpdateFlows():" +
424 " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
425 " new flowId=" + Long.toHexString(fe.id().value()) +
426 ", old flowId=" + Long.toHexString(stored.id().value()) +
427 ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
428 ", new life=" + fe.life() + ", old life=" + stored.life() +
429 ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
430 // go next
431 stored.setLastSeen();
432 continue;
433 }
434
435 // update now
436 stored.setLife(fe.life());
437 stored.setPackets(fe.packets());
438 stored.setBytes(fe.bytes());
439 stored.setLastSeen();
440 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
441 // flow is really RULE_ADDED
442 stored.setState(FlowEntry.FlowEntryState.ADDED);
443 }
444 // flow is RULE_UPDATED, skip adding and just updating flow live table
445 //deviceFlowTable.calAndSetFlowLiveType(stored);
446 continue;
447 }
448
449 // add new flow entry, we suppose IMMEDIATE_FLOW
450 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
451 FlowLiveType.IMMEDIATE_FLOW);
452 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
453 }
454 }
455
456 /**
457 * remove typed flow entry from the internal flow table.
458 *
459 * @param flowRules the flow entries
460 *
461 */
462 public synchronized void removeFlows(FlowRule... flowRules) {
463 for (FlowRule rule : flowRules) {
464 deviceFlowTable.remove(rule);
465 }
466 }
467
468 // same as removeFlows() function
469 /**
470 * remove typed flow entry from the internal flow table.
471 *
472 * @param flowRules the flow entries
473 *
474 */
475 public void flowRemoved(FlowRule... flowRules) {
476 removeFlows(flowRules);
477 }
478
479 // same as addOrUpdateFlows() function
480 /**
481 * add or update typed flow entry from flow entry into the internal flow table.
482 *
483 * @param flowEntries the flow entry list
484 *
485 */
486 public void pushFlowMetrics(List<FlowEntry> flowEntries) {
487 flowEntries.forEach(fe -> {
488 addOrUpdateFlows(fe);
489 });
490 }
491
492 /**
493 * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
494 *
495 * @return xid of missing flow
496 */
497 public long getFlowMissingXid() {
498 return flowMissingXid;
499 }
500
501 /**
502 * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
503 *
504 * @param flowMissingXid the OFFlowStatsRequest message Id
505 *
506 */
507 public void setFlowMissingXid(long flowMissingXid) {
508 this.flowMissingXid = flowMissingXid;
509 }
510
511 private class InternalDeviceFlowTable {
512
513 private final Map<FlowId, Set<TypedStoredFlowEntry>>
514 flowEntries = Maps.newConcurrentMap();
515
516 private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
517 private final Set<StoredFlowEntry> midFlows = new HashSet<>();
518 private final Set<StoredFlowEntry> longFlows = new HashSet<>();
519
520 // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
521 private final long latencyFlowStatsRequestAndReplyMillis = 500;
522
523
524 // Statistics for table operation
525 private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
526 private long removeCount = 0;
527
528 /**
529 * Resets all count values with zero.
530 *
531 */
532 public void resetAllCount() {
533 addCount = 0;
534 addWithSetFlowLiveTypeCount = 0;
535 removeCount = 0;
536 }
537
538 // get set of flow entries for the given flowId
539 private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
540 return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
541 }
542
543 // get flow entry for the given flow rule
544 private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
545 Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
546 return flowEntries.stream()
547 .filter(entry -> Objects.equal(entry, rule))
548 .findAny()
549 .orElse(null);
550 }
551
552 // get the flow entries for all flows in flow table
553 private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
554 Set<TypedStoredFlowEntry> result = Sets.newHashSet();
555
556 flowEntries.values().forEach(result::addAll);
557 return result;
558 }
559
560 /**
561 * Gets the number of flow entry in flow table.
562 *
563 * @return the number of flow entry.
564 *
565 */
566 public long getFlowCount() {
567 return flowEntries.values().stream().mapToLong(Set::size).sum();
568 }
569
570 /**
571 * Gets the number of flow entry in flow table.
572 *
573 * @param rule the flow rule
574 * @return the typed flow entry.
575 *
576 */
577 public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
578 checkNotNull(rule);
579
580 return getFlowEntryInternal(rule);
581 }
582
583 /**
584 * Gets the all typed flow entries in flow table.
585 *
586 * @return the set of typed flow entry.
587 *
588 */
589 public Set<TypedStoredFlowEntry> getFlowEntries() {
590 return getFlowEntriesInternal();
591 }
592
593 /**
594 * Gets the short typed flow entries in flow table.
595 *
596 * @return the set of typed flow entry.
597 *
598 */
599 public Set<StoredFlowEntry> getShortFlows() {
600 return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
601 }
602
603 /**
604 * Gets the mid typed flow entries in flow table.
605 *
606 * @return the set of typed flow entry.
607 *
608 */
609 public Set<StoredFlowEntry> getMidFlows() {
610 return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
611 }
612
613 /**
614 * Gets the long typed flow entries in flow table.
615 *
616 * @return the set of typed flow entry.
617 *
618 */
619 public Set<StoredFlowEntry> getLongFlows() {
620 return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
621 }
622
623 /**
624 * Add typed flow entry into table only.
625 *
626 * @param rule the flow rule
627 *
628 */
629 public synchronized void add(TypedStoredFlowEntry rule) {
630 checkNotNull(rule);
631
632 //rule have to be new DefaultTypedFlowEntry
633 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
634
635 if (result) {
636 addCount++;
637 }
638 }
639
640 /**
641 * Calculates and set the flow live type at the first time,
642 * and then add it into a corresponding typed flow table.
643 *
644 * @param rule the flow rule
645 *
646 */
647 public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
648 checkNotNull(rule);
649
650 calAndSetFlowLiveTypeInternal(rule);
651 }
652
653 /**
654 * Add the typed flow entry into table, and calculates and set the flow live type,
655 * and then add it into a corresponding typed flow table.
656 *
657 * @param rule the flow rule
658 *
659 */
660 public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
661 checkNotNull(rule);
662
663 //rule have to be new DefaultTypedFlowEntry
664 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
665 if (result) {
666 calAndSetFlowLiveTypeInternal(rule);
667 addWithSetFlowLiveTypeCount++;
668 } else {
669 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
670 + " ADD Failed, cause it may already exists in table !!!,"
671 + " AdaptiveStats collection thread for {}",
672 sw.getStringId());
673 }
674 }
675
676 // In real, calculates and set the flow live type at the first time,
677 // and then add it into a corresponding typed flow table
678 private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
679 long life = rule.life();
680 FlowLiveType prevFlowLiveType = rule.flowLiveType();
681
682 if (life >= longPollInterval) {
683 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
684 longFlows.add(rule);
685 } else if (life >= midPollInterval) {
686 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
687 midFlows.add(rule);
688 } else if (life >= calAndPollInterval) {
689 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
690 shortFlows.add(rule);
691 } else if (life >= 0) {
692 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
693 } else { // life < 0
694 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
695 }
696
697 if (rule.flowLiveType() != prevFlowLiveType) {
698 switch (prevFlowLiveType) {
699 // delete it from previous flow table
700 case SHORT_FLOW:
701 shortFlows.remove(rule);
702 break;
703 case MID_FLOW:
704 midFlows.remove(rule);
705 break;
706 case LONG_FLOW:
707 longFlows.remove(rule);
708 break;
709 default:
710 break;
711 }
712 }
713 }
714
715
716 // check the flow live type based on current time, then set and add it into corresponding table
717 private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
718 long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
719 // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
720 long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
721 // fe.life() unit is SECOND!
722 long liveTime = fe.life() + fromLastSeen;
723
724
725 switch (fe.flowLiveType()) {
726 case IMMEDIATE_FLOW:
727 if (liveTime >= longPollInterval) {
728 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
729 longFlows.add(fe);
730 } else if (liveTime >= midPollInterval) {
731 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
732 midFlows.add(fe);
733 } else if (liveTime >= calAndPollInterval) {
734 fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
735 shortFlows.add(fe);
736 }
737 break;
738 case SHORT_FLOW:
739 if (liveTime >= longPollInterval) {
740 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
741 shortFlows.remove(fe);
742 longFlows.add(fe);
743 } else if (liveTime >= midPollInterval) {
744 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
745 shortFlows.remove(fe);
746 midFlows.add(fe);
747 }
748 break;
749 case MID_FLOW:
750 if (liveTime >= longPollInterval) {
751 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
752 midFlows.remove(fe);
753 longFlows.add(fe);
754 }
755 break;
756 case LONG_FLOW:
757 if (fromLastSeen > entirePollInterval) {
758 log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
759 return false;
760 }
761 break;
762 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
763 default :
764 // Error Unknown Live Type
765 log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
766 + "AdaptiveStats collection thread for {}",
767 sw.getStringId());
768 return false;
769 }
770
771 log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
772 + ", state=" + fe.state()
773 + ", After liveType=" + fe.flowLiveType()
774 + ", liveTime=" + liveTime
775 + ", life=" + fe.life()
776 + ", bytes=" + fe.bytes()
777 + ", packets=" + fe.packets()
778 + ", fromLastSeen=" + fromLastSeen
779 + ", priority=" + fe.priority()
780 + ", selector=" + fe.selector().criteria()
781 + ", treatment=" + fe.treatment()
782 + " AdaptiveStats collection thread for {}",
783 sw.getStringId());
784
785 return true;
786 }
787
788 /**
789 * Check and move live type for all type flow entries in table at every calAndPollInterval time.
790 *
791 */
792 public void checkAndMoveLiveFlowAll() {
793 Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
794
795 long calCurTime = System.currentTimeMillis();
796 typedFlowEntries.forEach(fe -> {
797 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
798 remove(fe);
799 }
800 });
801
802 // print table counts for debug
803 if (log.isDebugEnabled()) {
804 synchronized (this) {
805 long totalFlowCount = getFlowCount();
806 long shortFlowCount = shortFlows.size();
807 long midFlowCount = midFlows.size();
808 long longFlowCount = longFlows.size();
809 long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
810 long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
811
812 log.debug("--------------------------------------------------------------------------- for {}",
813 sw.getStringId());
814 log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
815 + ", add - remove_Count=" + calTotalCount
816 + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
817 + ", SHORT_FLOW_Count=" + shortFlowCount
818 + ", MID_FLOW_Count=" + midFlowCount
819 + ", LONG_FLOW_Count=" + longFlowCount
820 + ", add_Count=" + addCount
821 + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
822 + ", remove_Count=" + removeCount
823 + " AdaptiveStats collection thread for {}", sw.getStringId());
824 log.debug("--------------------------------------------------------------------------- for {}",
825 sw.getStringId());
826 if (totalFlowCount != calTotalCount) {
827 log.error("checkAndMoveLiveFlowAll, Real total flow count and "
828 + "calculated total flow count do NOT match, something is wrong internally "
829 + "or check counter value bound is over!");
830 }
831 if (immediateFlowCount < 0) {
832 log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
833 + "something is wrong internally "
834 + "or check counter value bound is over!");
835 }
836 }
837 }
838 log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
839 }
840
841 /**
842 * Remove the typed flow entry from table.
843 *
844 * @param rule the flow rule
845 *
846 */
847 public synchronized void remove(FlowRule rule) {
848 checkNotNull(rule);
849
850 TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
851 if (removeStore != null) {
852 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
853 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
854
855 if (result) {
856 removeCount++;
857 }
858 }
859 }
860
861 // Remove the typed flow entry from corresponding table
862 private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
863 switch (fe.flowLiveType()) {
864 case IMMEDIATE_FLOW:
865 // do nothing
866 break;
867 case SHORT_FLOW:
868 shortFlows.remove(fe);
869 break;
870 case MID_FLOW:
871 midFlows.remove(fe);
872 break;
873 case LONG_FLOW:
874 longFlows.remove(fe);
875 break;
876 default: // error in Flow Live Type
877 log.error("removeLiveFlowsInternal, Unknown Live Type error!");
878 break;
879 }
880 }
881 }
882}