blob: e286684204b2ec4f655d9c66f9140e75daaffa90 [file] [log] [blame]
Brian O'Connor8c685362015-12-05 15:27:27 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.of.flow.impl;
18
19import com.google.common.base.Objects;
20import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import org.onosproject.net.flow.DefaultTypedFlowEntry;
24import org.onosproject.net.flow.FlowEntry;
25import org.onosproject.net.flow.FlowId;
26import org.onosproject.net.flow.FlowRule;
27import org.onosproject.net.flow.StoredFlowEntry;
28import org.onosproject.net.flow.TypedStoredFlowEntry;
29import org.onosproject.net.flow.instructions.Instruction;
30import org.onosproject.net.flow.instructions.Instructions;
31import org.onosproject.openflow.controller.OpenFlowSwitch;
32import org.onosproject.openflow.controller.RoleState;
33import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
34import org.projectfloodlight.openflow.protocol.match.Match;
35import org.projectfloodlight.openflow.types.OFPort;
36import org.projectfloodlight.openflow.types.TableId;
37import org.slf4j.Logger;
38
39import java.util.HashSet;
40import java.util.List;
41import java.util.Map;
42import java.util.Optional;
43import java.util.Set;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
46import java.util.concurrent.ScheduledFuture;
47import java.util.concurrent.TimeUnit;
48
49import static com.google.common.base.Preconditions.checkNotNull;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
52import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * Efficiently and adaptively collects flow statistics for the specified switch.
56 */
57public class NewAdaptiveFlowStatsCollector {
58
59 private final Logger log = getLogger(getClass());
60
61 private final OpenFlowSwitch sw;
62
63 private ScheduledExecutorService adaptiveFlowStatsScheduler =
64 Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
65 private ScheduledFuture<?> calAndShortFlowsThread;
66 private ScheduledFuture<?> midFlowsThread;
67 private ScheduledFuture<?> longFlowsThread;
68
69 // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
70 private CalAndShortFlowsTask calAndShortFlowsTask;
71 // Task that collects stats MID flows every 2*calAndPollInterval
72 private MidFlowsTask midFlowsTask;
73 // Task that collects stats LONG flows every 3*calAndPollInterval
74 private LongFlowsTask longFlowsTask;
75
76 private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
77 private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
78 private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
79 //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
80 // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
81 private static final int ENTIRE_POLL_TIMES = 6;
82
83 private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
84 private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
85 private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
86
87 private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
88 private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89 private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90 // only used for checking condition at each task if it collects entire flows from a given switch or not
91 private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
92
93 // Number of call count of each Task,
94 // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
95 private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
96 private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
97 private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
98
99 private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
100
101 private boolean isFirstTimeStart = true;
102
103 public static final long NO_FLOW_MISSING_XID = (-1);
104 private long flowMissingXid = NO_FLOW_MISSING_XID;
105
106 /**
107 * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
108 *
109 * @param sw switch to pull
110 * @param pollInterval cal and immediate poll frequency in seconds
111 */
112 NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
113 this.sw = sw;
114
115 initMemberVars(pollInterval);
116 }
117
118 // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
119 private void initMemberVars(int pollInterval) {
120 if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
121 this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
122 } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
123 this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
124 } else {
125 this.calAndPollInterval = pollInterval;
126 }
127
128 calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
129 midPollInterval = MID_POLL_TIMES * calAndPollInterval;
130 longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
131 entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
132
133 callCountCalAndShortFlowsTask = 0;
134 callCountMidFlowsTask = 0;
135 callCountLongFlowsTask = 0;
136
137 flowMissingXid = NO_FLOW_MISSING_XID;
138 }
139
140 /**
141 * Adjusts adaptive poll frequency.
142 *
143 * @param pollInterval poll frequency in seconds
144 */
145 synchronized void adjustCalAndPollInterval(int pollInterval) {
146 initMemberVars(pollInterval);
147
148 if (calAndShortFlowsThread != null) {
149 calAndShortFlowsThread.cancel(false);
150 }
151 if (midFlowsThread != null) {
152 midFlowsThread.cancel(false);
153 }
154 if (longFlowsThread != null) {
155 longFlowsThread.cancel(false);
156 }
157
158 calAndShortFlowsTask = new CalAndShortFlowsTask();
159 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
160 calAndShortFlowsTask,
161 0,
162 calAndPollInterval,
163 TimeUnit.SECONDS);
164
165 midFlowsTask = new MidFlowsTask();
166 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
167 midFlowsTask,
168 0,
169 midPollInterval,
170 TimeUnit.SECONDS);
171
172 longFlowsTask = new LongFlowsTask();
173 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
174 longFlowsTask,
175 0,
176 longPollInterval,
177 TimeUnit.SECONDS);
178
179 log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
180 }
181
182 private class CalAndShortFlowsTask implements Runnable {
183 @Override
184 public void run() {
185 if (sw.getRole() == RoleState.MASTER) {
186 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
187
188 if (isFirstTimeStart) {
189 // isFirstTimeStart, get entire flow stats from a given switch sw
190 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
191 sw.getStringId());
192 ofFlowStatsRequestAllSend();
193
194 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
195 isFirstTimeStart = false;
196 } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
197 // entire_poll_times, get entire flow stats from a given switch sw
198 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
199 ofFlowStatsRequestAllSend();
200
201 callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
202 //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
203 //
204 } else {
205 calAndShortFlowsTaskInternal();
206 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
207 }
208 }
209 }
210 }
211
212 // send openflow flow stats request message with getting all flow entries to a given switch sw
213 private void ofFlowStatsRequestAllSend() {
214 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
215 .setMatch(sw.factory().matchWildcardAll())
216 .setTableId(TableId.ALL)
217 .setOutPort(OFPort.NO_MASK)
218 .build();
219
220 synchronized (this) {
221 // set the request xid to check the reply in OpenFlowRuleProvider
222 // After processing the reply of this request message,
223 // this must be set to NO_FLOW_MISSING_XID(-1) by provider
224 setFlowMissingXid(request.getXid());
225 log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
226
227 sw.sendMsg(request);
228 }
229 }
230
231 // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
232 private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
233 // set find match
234 Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
235 Optional.empty()).buildMatch();
236 // set find tableId
237 TableId tableId = TableId.of(fe.tableId());
238 // set output port
239 Instruction ins = fe.treatment().allInstructions().stream()
240 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
241 .findFirst()
242 .orElse(null);
243 OFPort ofPort = OFPort.NO_MASK;
244 if (ins != null) {
245 Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
246 ofPort = OFPort.of((int) ((out.port().toLong())));
247 }
248
249 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
250 .setMatch(match)
251 .setTableId(tableId)
252 .setOutPort(ofPort)
253 .build();
254
255 synchronized (this) {
256 if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
257 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
258 + " set no flow missing xid anyway, for {}",
259 sw.getStringId());
260 setFlowMissingXid(NO_FLOW_MISSING_XID);
261 }
262
263 sw.sendMsg(request);
264 }
265 }
266
267 private void calAndShortFlowsTaskInternal() {
268 deviceFlowTable.checkAndMoveLiveFlowAll();
269
270 deviceFlowTable.getShortFlows().forEach(fe -> {
271 ofFlowStatsRequestFlowSend(fe);
272 });
273 }
274
275 private class MidFlowsTask implements Runnable {
276 @Override
277 public void run() {
278 if (sw.getRole() == RoleState.MASTER) {
279 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
280
281 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
282 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
283 callCountMidFlowsTask = MID_POLL_TIMES;
284 } else {
285 midFlowsTaskInternal();
286 callCountMidFlowsTask += MID_POLL_TIMES;
287 }
288 }
289 }
290 }
291
292 private void midFlowsTaskInternal() {
293 deviceFlowTable.getMidFlows().forEach(fe -> {
294 ofFlowStatsRequestFlowSend(fe);
295 });
296 }
297
298 private class LongFlowsTask implements Runnable {
299 @Override
300 public void run() {
301 if (sw.getRole() == RoleState.MASTER) {
302 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
303
304 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
305 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
306 callCountLongFlowsTask = LONG_POLL_TIMES;
307 } else {
308 longFlowsTaskInternal();
309 callCountLongFlowsTask += LONG_POLL_TIMES;
310 }
311 }
312 }
313 }
314
315 private void longFlowsTaskInternal() {
316 deviceFlowTable.getLongFlows().forEach(fe -> {
317 ofFlowStatsRequestFlowSend(fe);
318 });
319 }
320
321 /**
322 * start adaptive flow statistic collection.
323 *
324 */
325 public synchronized void start() {
326 log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
327 callCountCalAndShortFlowsTask = 0;
328 callCountMidFlowsTask = 0;
329 callCountLongFlowsTask = 0;
330
331 isFirstTimeStart = true;
332
333 // Initially start polling quickly. Then drop down to configured value
334 calAndShortFlowsTask = new CalAndShortFlowsTask();
335 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
336 calAndShortFlowsTask,
337 1,
338 calAndPollInterval,
339 TimeUnit.SECONDS);
340
341 midFlowsTask = new MidFlowsTask();
342 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
343 midFlowsTask,
344 1,
345 midPollInterval,
346 TimeUnit.SECONDS);
347
348 longFlowsTask = new LongFlowsTask();
349 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
350 longFlowsTask,
351 1,
352 longPollInterval,
353 TimeUnit.SECONDS);
354
355 log.info("Started");
356 }
357
358 /**
359 * stop adaptive flow statistic collection.
360 *
361 */
362 public synchronized void stop() {
363 log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
364 if (calAndShortFlowsThread != null) {
365 calAndShortFlowsThread.cancel(true);
366 }
367 if (midFlowsThread != null) {
368 midFlowsThread.cancel(true);
369 }
370 if (longFlowsThread != null) {
371 longFlowsThread.cancel(true);
372 }
373
374 adaptiveFlowStatsScheduler.shutdownNow();
375
376 isFirstTimeStart = false;
377
378 log.info("Stopped");
379 }
380
381 /**
382 * add typed flow entry from flow rule into the internal flow table.
383 *
384 * @param flowRules the flow rules
385 *
386 */
387 public synchronized void addWithFlowRule(FlowRule... flowRules) {
388 for (FlowRule fr : flowRules) {
389 // First remove old entry unconditionally, if exist
390 deviceFlowTable.remove(fr);
391
392 // add new flow entry, we suppose IMMEDIATE_FLOW
393 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
394 FlowLiveType.IMMEDIATE_FLOW);
395 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
396 }
397 }
398
399 /**
400 * add or update typed flow entry from flow entry into the internal flow table.
401 *
402 * @param flowEntries the flow entries
403 *
404 */
405 public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
406 for (FlowEntry fe : flowEntries) {
407 // check if this new rule is an update to an existing entry
408 TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
409
410 if (stored != null) {
411 // duplicated flow entry is collected!, just skip
412 if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
413 && fe.life() == stored.life()) {
414 log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
415 + ",is DUPLICATED stats collection, just skip."
416 + " AdaptiveStats collection thread for {}",
417 sw.getStringId());
418
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800419 //FIXME modification of "stored" flow entry outside of store
Brian O'Connor8c685362015-12-05 15:27:27 -0800420 stored.setLastSeen();
421 continue;
422 } else if (fe.life() < stored.life()) {
423 // Invalid updates the stats values, i.e., bytes, packets, durations ...
424 log.debug("addOrUpdateFlows():" +
425 " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
426 " new flowId=" + Long.toHexString(fe.id().value()) +
427 ", old flowId=" + Long.toHexString(stored.id().value()) +
428 ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
429 ", new life=" + fe.life() + ", old life=" + stored.life() +
430 ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
431 // go next
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800432 //FIXME modification of "stored" flow entry outside of store
Brian O'Connor8c685362015-12-05 15:27:27 -0800433 stored.setLastSeen();
434 continue;
435 }
436
437 // update now
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800438 //FIXME modification of "stored" flow entry outside of store
Brian O'Connor8c685362015-12-05 15:27:27 -0800439 stored.setLife(fe.life());
440 stored.setPackets(fe.packets());
441 stored.setBytes(fe.bytes());
442 stored.setLastSeen();
443 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
444 // flow is really RULE_ADDED
445 stored.setState(FlowEntry.FlowEntryState.ADDED);
446 }
447 // flow is RULE_UPDATED, skip adding and just updating flow live table
448 //deviceFlowTable.calAndSetFlowLiveType(stored);
449 continue;
450 }
451
452 // add new flow entry, we suppose IMMEDIATE_FLOW
453 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
454 FlowLiveType.IMMEDIATE_FLOW);
455 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
456 }
457 }
458
459 /**
460 * remove typed flow entry from the internal flow table.
461 *
462 * @param flowRules the flow entries
463 *
464 */
465 public synchronized void removeFlows(FlowRule... flowRules) {
466 for (FlowRule rule : flowRules) {
467 deviceFlowTable.remove(rule);
468 }
469 }
470
471 // same as removeFlows() function
472 /**
473 * remove typed flow entry from the internal flow table.
474 *
475 * @param flowRules the flow entries
476 *
477 */
478 public void flowRemoved(FlowRule... flowRules) {
479 removeFlows(flowRules);
480 }
481
482 // same as addOrUpdateFlows() function
483 /**
484 * add or update typed flow entry from flow entry into the internal flow table.
485 *
486 * @param flowEntries the flow entry list
487 *
488 */
489 public void pushFlowMetrics(List<FlowEntry> flowEntries) {
490 flowEntries.forEach(fe -> {
491 addOrUpdateFlows(fe);
492 });
493 }
494
495 /**
496 * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
497 *
498 * @return xid of missing flow
499 */
500 public long getFlowMissingXid() {
501 return flowMissingXid;
502 }
503
504 /**
505 * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
506 *
507 * @param flowMissingXid the OFFlowStatsRequest message Id
508 *
509 */
510 public void setFlowMissingXid(long flowMissingXid) {
511 this.flowMissingXid = flowMissingXid;
512 }
513
514 private class InternalDeviceFlowTable {
515
516 private final Map<FlowId, Set<TypedStoredFlowEntry>>
517 flowEntries = Maps.newConcurrentMap();
518
519 private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
520 private final Set<StoredFlowEntry> midFlows = new HashSet<>();
521 private final Set<StoredFlowEntry> longFlows = new HashSet<>();
522
523 // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
524 private final long latencyFlowStatsRequestAndReplyMillis = 500;
525
526
527 // Statistics for table operation
528 private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
529 private long removeCount = 0;
530
531 /**
532 * Resets all count values with zero.
533 *
534 */
535 public void resetAllCount() {
536 addCount = 0;
537 addWithSetFlowLiveTypeCount = 0;
538 removeCount = 0;
539 }
540
541 // get set of flow entries for the given flowId
542 private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
543 return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
544 }
545
546 // get flow entry for the given flow rule
547 private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
548 Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
549 return flowEntries.stream()
550 .filter(entry -> Objects.equal(entry, rule))
551 .findAny()
552 .orElse(null);
553 }
554
555 // get the flow entries for all flows in flow table
556 private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
557 Set<TypedStoredFlowEntry> result = Sets.newHashSet();
558
559 flowEntries.values().forEach(result::addAll);
560 return result;
561 }
562
563 /**
564 * Gets the number of flow entry in flow table.
565 *
566 * @return the number of flow entry.
567 *
568 */
569 public long getFlowCount() {
570 return flowEntries.values().stream().mapToLong(Set::size).sum();
571 }
572
573 /**
574 * Gets the number of flow entry in flow table.
575 *
576 * @param rule the flow rule
577 * @return the typed flow entry.
578 *
579 */
580 public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
581 checkNotNull(rule);
582
583 return getFlowEntryInternal(rule);
584 }
585
586 /**
587 * Gets the all typed flow entries in flow table.
588 *
589 * @return the set of typed flow entry.
590 *
591 */
592 public Set<TypedStoredFlowEntry> getFlowEntries() {
593 return getFlowEntriesInternal();
594 }
595
596 /**
597 * Gets the short typed flow entries in flow table.
598 *
599 * @return the set of typed flow entry.
600 *
601 */
602 public Set<StoredFlowEntry> getShortFlows() {
603 return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
604 }
605
606 /**
607 * Gets the mid typed flow entries in flow table.
608 *
609 * @return the set of typed flow entry.
610 *
611 */
612 public Set<StoredFlowEntry> getMidFlows() {
613 return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
614 }
615
616 /**
617 * Gets the long typed flow entries in flow table.
618 *
619 * @return the set of typed flow entry.
620 *
621 */
622 public Set<StoredFlowEntry> getLongFlows() {
623 return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
624 }
625
626 /**
627 * Add typed flow entry into table only.
628 *
629 * @param rule the flow rule
630 *
631 */
632 public synchronized void add(TypedStoredFlowEntry rule) {
633 checkNotNull(rule);
634
635 //rule have to be new DefaultTypedFlowEntry
636 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
637
638 if (result) {
639 addCount++;
640 }
641 }
642
643 /**
644 * Calculates and set the flow live type at the first time,
645 * and then add it into a corresponding typed flow table.
646 *
647 * @param rule the flow rule
648 *
649 */
650 public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
651 checkNotNull(rule);
652
653 calAndSetFlowLiveTypeInternal(rule);
654 }
655
656 /**
657 * Add the typed flow entry into table, and calculates and set the flow live type,
658 * and then add it into a corresponding typed flow table.
659 *
660 * @param rule the flow rule
661 *
662 */
663 public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
664 checkNotNull(rule);
665
666 //rule have to be new DefaultTypedFlowEntry
667 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
668 if (result) {
669 calAndSetFlowLiveTypeInternal(rule);
670 addWithSetFlowLiveTypeCount++;
671 } else {
672 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
673 + " ADD Failed, cause it may already exists in table !!!,"
674 + " AdaptiveStats collection thread for {}",
675 sw.getStringId());
676 }
677 }
678
679 // In real, calculates and set the flow live type at the first time,
680 // and then add it into a corresponding typed flow table
681 private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
682 long life = rule.life();
683 FlowLiveType prevFlowLiveType = rule.flowLiveType();
684
685 if (life >= longPollInterval) {
686 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
687 longFlows.add(rule);
688 } else if (life >= midPollInterval) {
689 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
690 midFlows.add(rule);
691 } else if (life >= calAndPollInterval) {
692 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
693 shortFlows.add(rule);
694 } else if (life >= 0) {
695 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
696 } else { // life < 0
697 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
698 }
699
700 if (rule.flowLiveType() != prevFlowLiveType) {
701 switch (prevFlowLiveType) {
702 // delete it from previous flow table
703 case SHORT_FLOW:
704 shortFlows.remove(rule);
705 break;
706 case MID_FLOW:
707 midFlows.remove(rule);
708 break;
709 case LONG_FLOW:
710 longFlows.remove(rule);
711 break;
712 default:
713 break;
714 }
715 }
716 }
717
718
719 // check the flow live type based on current time, then set and add it into corresponding table
720 private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
721 long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
722 // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
723 long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
724 // fe.life() unit is SECOND!
725 long liveTime = fe.life() + fromLastSeen;
726
727
728 switch (fe.flowLiveType()) {
729 case IMMEDIATE_FLOW:
730 if (liveTime >= longPollInterval) {
731 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
732 longFlows.add(fe);
733 } else if (liveTime >= midPollInterval) {
734 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
735 midFlows.add(fe);
736 } else if (liveTime >= calAndPollInterval) {
737 fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
738 shortFlows.add(fe);
739 }
740 break;
741 case SHORT_FLOW:
742 if (liveTime >= longPollInterval) {
743 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
744 shortFlows.remove(fe);
745 longFlows.add(fe);
746 } else if (liveTime >= midPollInterval) {
747 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
748 shortFlows.remove(fe);
749 midFlows.add(fe);
750 }
751 break;
752 case MID_FLOW:
753 if (liveTime >= longPollInterval) {
754 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
755 midFlows.remove(fe);
756 longFlows.add(fe);
757 }
758 break;
759 case LONG_FLOW:
760 if (fromLastSeen > entirePollInterval) {
761 log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
762 return false;
763 }
764 break;
765 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
766 default :
767 // Error Unknown Live Type
768 log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
769 + "AdaptiveStats collection thread for {}",
770 sw.getStringId());
771 return false;
772 }
773
774 log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
775 + ", state=" + fe.state()
776 + ", After liveType=" + fe.flowLiveType()
777 + ", liveTime=" + liveTime
778 + ", life=" + fe.life()
779 + ", bytes=" + fe.bytes()
780 + ", packets=" + fe.packets()
781 + ", fromLastSeen=" + fromLastSeen
782 + ", priority=" + fe.priority()
783 + ", selector=" + fe.selector().criteria()
784 + ", treatment=" + fe.treatment()
785 + " AdaptiveStats collection thread for {}",
786 sw.getStringId());
787
788 return true;
789 }
790
791 /**
792 * Check and move live type for all type flow entries in table at every calAndPollInterval time.
793 *
794 */
795 public void checkAndMoveLiveFlowAll() {
796 Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
797
798 long calCurTime = System.currentTimeMillis();
799 typedFlowEntries.forEach(fe -> {
800 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
801 remove(fe);
802 }
803 });
804
805 // print table counts for debug
806 if (log.isDebugEnabled()) {
807 synchronized (this) {
808 long totalFlowCount = getFlowCount();
809 long shortFlowCount = shortFlows.size();
810 long midFlowCount = midFlows.size();
811 long longFlowCount = longFlows.size();
812 long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
813 long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
814
815 log.debug("--------------------------------------------------------------------------- for {}",
816 sw.getStringId());
817 log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
818 + ", add - remove_Count=" + calTotalCount
819 + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
820 + ", SHORT_FLOW_Count=" + shortFlowCount
821 + ", MID_FLOW_Count=" + midFlowCount
822 + ", LONG_FLOW_Count=" + longFlowCount
823 + ", add_Count=" + addCount
824 + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
825 + ", remove_Count=" + removeCount
826 + " AdaptiveStats collection thread for {}", sw.getStringId());
827 log.debug("--------------------------------------------------------------------------- for {}",
828 sw.getStringId());
829 if (totalFlowCount != calTotalCount) {
830 log.error("checkAndMoveLiveFlowAll, Real total flow count and "
831 + "calculated total flow count do NOT match, something is wrong internally "
832 + "or check counter value bound is over!");
833 }
834 if (immediateFlowCount < 0) {
835 log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
836 + "something is wrong internally "
837 + "or check counter value bound is over!");
838 }
839 }
840 }
841 log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
842 }
843
844 /**
845 * Remove the typed flow entry from table.
846 *
847 * @param rule the flow rule
848 *
849 */
850 public synchronized void remove(FlowRule rule) {
851 checkNotNull(rule);
852
853 TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
854 if (removeStore != null) {
855 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
856 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
857
858 if (result) {
859 removeCount++;
860 }
861 }
862 }
863
864 // Remove the typed flow entry from corresponding table
865 private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
866 switch (fe.flowLiveType()) {
867 case IMMEDIATE_FLOW:
868 // do nothing
869 break;
870 case SHORT_FLOW:
871 shortFlows.remove(fe);
872 break;
873 case MID_FLOW:
874 midFlows.remove(fe);
875 break;
876 case LONG_FLOW:
877 longFlows.remove(fe);
878 break;
879 default: // error in Flow Live Type
880 log.error("removeLiveFlowsInternal, Unknown Live Type error!");
881 break;
882 }
883 }
884 }
885}