blob: a81367cd0670ceb10c2b8e35a34d6c7b62aa92cb [file] [log] [blame]
ssyoon9030fbcd92015-08-17 10:42:07 +09001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.of.flow.impl;
18
19import java.util.HashSet;
20import java.util.List;
21import java.util.Map;
22import java.util.Optional;
23import java.util.Set;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.ScheduledFuture;
26import java.util.concurrent.Executors;
27import java.util.concurrent.ScheduledExecutorService;
28
29import com.google.common.base.Objects;
30import com.google.common.collect.ImmutableSet;
31import com.google.common.collect.Maps;
32import com.google.common.collect.Sets;
33
34import org.onosproject.net.flow.DefaultTypedFlowEntry;
35import org.onosproject.net.flow.FlowEntry;
36import org.onosproject.net.flow.FlowId;
37import org.onosproject.net.flow.FlowRule;
38import org.onosproject.net.flow.StoredFlowEntry;
39import org.onosproject.net.flow.TypedStoredFlowEntry;
40import org.onosproject.net.flow.instructions.Instruction;
41import org.onosproject.net.flow.instructions.Instructions;
42import org.onosproject.openflow.controller.OpenFlowSwitch;
43import org.onosproject.openflow.controller.RoleState;
44import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
45import org.projectfloodlight.openflow.protocol.match.Match;
46import org.projectfloodlight.openflow.types.OFPort;
47import org.projectfloodlight.openflow.types.TableId;
48import org.slf4j.Logger;
49
50import static com.google.common.base.Preconditions.checkNotNull;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.net.flow.TypedStoredFlowEntry.*;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Efficiently and adaptively collects flow statistics for the specified switch.
57 */
58public class NewAdaptiveFlowStatsCollector {
59
60 private final Logger log = getLogger(getClass());
61
62 private final OpenFlowSwitch sw;
63
64 private ScheduledExecutorService adaptiveFlowStatsScheduler =
65 Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
66 private ScheduledFuture<?> calAndShortFlowsThread;
67 private ScheduledFuture<?> midFlowsThread;
68 private ScheduledFuture<?> longFlowsThread;
69
70 // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
71 private CalAndShortFlowsTask calAndShortFlowsTask;
72 // Task that collects stats MID flows every 2*calAndPollInterval
73 private MidFlowsTask midFlowsTask;
74 // Task that collects stats LONG flows every 3*calAndPollInterval
75 private LongFlowsTask longFlowsTask;
76
77 private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
78 private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
79 private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
80 //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
81 // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
82 private static final int ENTIRE_POLL_TIMES = 6;
83
84 private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
85 private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
86 private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
87
88 private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89 private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90 private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
91 // only used for checking condition at each task if it collects entire flows from a given switch or not
92 private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
93
94 // Number of call count of each Task,
95 // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
96 private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
97 private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
98 private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
99
100 private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
101
102 private boolean isFirstTimeStart = true;
103
104 public static final long NO_FLOW_MISSING_XID = (-1);
105 private long flowMissingXid = NO_FLOW_MISSING_XID;
106
107 /**
108 * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
109 *
110 * @param sw switch to pull
111 * @param pollInterval cal and immediate poll frequency in seconds
112 */
113 NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
114 this.sw = sw;
115
116 initMemberVars(pollInterval);
117 }
118
119 // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
120 private void initMemberVars(int pollInterval) {
121 if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
122 this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
123 } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
124 this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
125 } else {
126 this.calAndPollInterval = pollInterval;
127 }
128
129 calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
130 midPollInterval = MID_POLL_TIMES * calAndPollInterval;
131 longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
132 entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
133
134 callCountCalAndShortFlowsTask = 0;
135 callCountMidFlowsTask = 0;
136 callCountLongFlowsTask = 0;
137
138 flowMissingXid = NO_FLOW_MISSING_XID;
139 }
140
141 /**
142 * Adjusts adaptive poll frequency.
143 *
144 * @param pollInterval poll frequency in seconds
145 */
146 synchronized void adjustCalAndPollInterval(int pollInterval) {
147 initMemberVars(pollInterval);
148
149 if (calAndShortFlowsThread != null) {
150 calAndShortFlowsThread.cancel(false);
151 }
152 if (midFlowsThread != null) {
153 midFlowsThread.cancel(false);
154 }
155 if (longFlowsThread != null) {
156 longFlowsThread.cancel(false);
157 }
158
159 calAndShortFlowsTask = new CalAndShortFlowsTask();
160 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
161 calAndShortFlowsTask,
162 0,
163 calAndPollInterval,
164 TimeUnit.SECONDS);
165
166 midFlowsTask = new MidFlowsTask();
167 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
168 midFlowsTask,
169 0,
170 midPollInterval,
171 TimeUnit.SECONDS);
172
173 longFlowsTask = new LongFlowsTask();
174 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
175 longFlowsTask,
176 0,
177 longPollInterval,
178 TimeUnit.SECONDS);
179
180 log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
181 }
182
183 private class CalAndShortFlowsTask implements Runnable {
184 @Override
185 public void run() {
186 if (sw.getRole() == RoleState.MASTER) {
187 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
188
189 if (isFirstTimeStart) {
190 // isFirstTimeStart, get entire flow stats from a given switch sw
191 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
192 sw.getStringId());
193 ofFlowStatsRequestAllSend();
194
195 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
196 isFirstTimeStart = false;
197 } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
198 // entire_poll_times, get entire flow stats from a given switch sw
199 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
200 ofFlowStatsRequestAllSend();
201
202 callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
203 //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
204 //
205 } else {
206 calAndShortFlowsTaskInternal();
207 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
208 }
209 }
210 }
211 }
212
213 // send openflow flow stats request message with getting all flow entries to a given switch sw
214 private void ofFlowStatsRequestAllSend() {
215 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
216 .setMatch(sw.factory().matchWildcardAll())
217 .setTableId(TableId.ALL)
218 .setOutPort(OFPort.NO_MASK)
219 .build();
220
221 synchronized (this) {
222 // set the request xid to check the reply in OpenFlowRuleProvider
223 // After processing the reply of this request message,
224 // this must be set to NO_FLOW_MISSING_XID(-1) by provider
225 setFlowMissingXid(request.getXid());
226 log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
227
228 sw.sendMsg(request);
229 }
230 }
231
232 // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
233 private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
234 // set find match
235 Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty()).buildMatch();
236 // set find tableId
237 TableId tableId = TableId.of(fe.tableId());
238 // set output port
239 Instruction ins = fe.treatment().allInstructions().stream()
240 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
241 .findFirst()
242 .orElse(null);
243 OFPort ofPort = OFPort.NO_MASK;
244 if (ins != null) {
245 Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
246 ofPort = OFPort.of((int) ((out.port().toLong())));
247 }
248
249 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
250 .setMatch(match)
251 .setTableId(tableId)
252 .setOutPort(ofPort)
253 .build();
254
255 synchronized (this) {
256 if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
257 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
258 + " set no flow missing xid anyway, for {}",
259 sw.getStringId());
260 setFlowMissingXid(NO_FLOW_MISSING_XID);
261 }
262
263 sw.sendMsg(request);
264 }
265 }
266
267 private void calAndShortFlowsTaskInternal() {
268 deviceFlowTable.checkAndMoveLiveFlowAll();
269
270 deviceFlowTable.getShortFlows().forEach(fe -> {
271 ofFlowStatsRequestFlowSend(fe);
272 });
273 }
274
275 private class MidFlowsTask implements Runnable {
276 @Override
277 public void run() {
278 if (sw.getRole() == RoleState.MASTER) {
279 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
280
281 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
282 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
283 callCountMidFlowsTask = MID_POLL_TIMES;
284 } else {
285 midFlowsTaskInternal();
286 callCountMidFlowsTask += MID_POLL_TIMES;
287 }
288 }
289 }
290 }
291
292 private void midFlowsTaskInternal() {
293 deviceFlowTable.getMidFlows().forEach(fe -> {
294 ofFlowStatsRequestFlowSend(fe);
295 });
296 }
297
298 private class LongFlowsTask implements Runnable {
299 @Override
300 public void run() {
301 if (sw.getRole() == RoleState.MASTER) {
302 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
303
304 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
305 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
306 callCountLongFlowsTask = LONG_POLL_TIMES;
307 } else {
308 longFlowsTaskInternal();
309 callCountLongFlowsTask += LONG_POLL_TIMES;
310 }
311 }
312 }
313 }
314
315 private void longFlowsTaskInternal() {
316 deviceFlowTable.getLongFlows().forEach(fe -> {
317 ofFlowStatsRequestFlowSend(fe);
318 });
319 }
320
321 /**
322 * start adaptive flow statistic collection.
323 *
324 */
325 public synchronized void start() {
326 log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
327 callCountCalAndShortFlowsTask = 0;
328 callCountMidFlowsTask = 0;
329 callCountLongFlowsTask = 0;
330
331 isFirstTimeStart = true;
332
333 // Initially start polling quickly. Then drop down to configured value
334 calAndShortFlowsTask = new CalAndShortFlowsTask();
335 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
336 calAndShortFlowsTask,
337 1,
338 calAndPollInterval,
339 TimeUnit.SECONDS);
340
341 midFlowsTask = new MidFlowsTask();
342 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
343 midFlowsTask,
344 1,
345 midPollInterval,
346 TimeUnit.SECONDS);
347
348 longFlowsTask = new LongFlowsTask();
349 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
350 longFlowsTask,
351 1,
352 longPollInterval,
353 TimeUnit.SECONDS);
354
355 log.info("Started");
356 }
357
358 /**
359 * stop adaptive flow statistic collection.
360 *
361 */
362 public synchronized void stop() {
363 log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
364 if (calAndShortFlowsThread != null) {
365 calAndShortFlowsThread.cancel(true);
366 }
367 if (midFlowsThread != null) {
368 midFlowsThread.cancel(true);
369 }
370 if (longFlowsThread != null) {
371 longFlowsThread.cancel(true);
372 }
373
374 adaptiveFlowStatsScheduler.shutdownNow();
375
376 isFirstTimeStart = false;
377
378 log.info("Stopped");
379 }
380
381 /**
382 * add typed flow entry from flow rule into the internal flow table.
383 *
384 * @param flowRules the flow rules
385 *
386 */
387 public synchronized void addWithFlowRule(FlowRule... flowRules) {
388 for (FlowRule fr : flowRules) {
389 // First remove old entry unconditionally, if exist
390 deviceFlowTable.remove(fr);
391
392 // add new flow entry, we suppose IMMEDIATE_FLOW
393 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
394 FlowLiveType.IMMEDIATE_FLOW);
395 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
396 }
397 }
398
399 /**
400 * add or update typed flow entry from flow entry into the internal flow table.
401 *
402 * @param flowEntries the flow entries
403 *
404 */
405 public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
406 for (FlowEntry fe : flowEntries) {
407 // check if this new rule is an update to an existing entry
408 TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
409
410 if (stored != null) {
411 // duplicated flow entry is collected!, just skip
412 if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
413 && fe.life() == stored.life()) {
414 log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
415 + ",is DUPLICATED stats collection, just skip."
416 + " AdaptiveStats collection thread for {}",
417 sw.getStringId());
418
419 stored.setLastSeen();
420 continue;
421 } else if (fe.life() < stored.life()) {
422 // Invalid updates the stats values, i.e., bytes, packets, durations ...
423 log.debug("addOrUpdateFlows():" +
424 " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
425 " new flowId=" + Long.toHexString(fe.id().value()) +
426 ", old flowId=" + Long.toHexString(stored.id().value()) +
427 ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
428 ", new life=" + fe.life() + ", old life=" + stored.life() +
429 ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
430 // go next
431 stored.setLastSeen();
432 continue;
433 }
434
435 // update now
436 stored.setLife(fe.life());
437 stored.setPackets(fe.packets());
438 stored.setBytes(fe.bytes());
439 stored.setLastSeen();
440 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
441 // flow is really RULE_ADDED
442 stored.setState(FlowEntry.FlowEntryState.ADDED);
443 }
444 // flow is RULE_UPDATED, skip adding and just updating flow live table
445 //deviceFlowTable.calAndSetFlowLiveType(stored);
446 continue;
447 }
448
449 // add new flow entry, we suppose IMMEDIATE_FLOW
450 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
451 FlowLiveType.IMMEDIATE_FLOW);
452 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
453 }
454 }
455
456 /**
457 * remove typed flow entry from the internal flow table.
458 *
459 * @param flowRules the flow entries
460 *
461 */
462 public synchronized void removeFlows(FlowRule... flowRules) {
463 for (FlowRule rule : flowRules) {
464 deviceFlowTable.remove(rule);
465 }
466 }
467
468 // same as removeFlows() function
469 /**
470 * remove typed flow entry from the internal flow table.
471 *
472 * @param flowRules the flow entries
473 *
474 */
475 public void flowRemoved(FlowRule... flowRules) {
476 removeFlows(flowRules);
477 }
478
479 // same as addOrUpdateFlows() function
480 /**
481 * add or update typed flow entry from flow entry into the internal flow table.
482 *
483 * @param flowEntries the flow entry list
484 *
485 */
486 public void pushFlowMetrics(List<FlowEntry> flowEntries) {
487 flowEntries.forEach(fe -> {
488 addOrUpdateFlows(fe);
489 });
490 }
491
492 /**
493 * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
494 *
495 */
496 public long getFlowMissingXid() {
497 return flowMissingXid;
498 }
499
500 /**
501 * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
502 *
503 * @param flowMissingXid the OFFlowStatsRequest message Id
504 *
505 */
506 public void setFlowMissingXid(long flowMissingXid) {
507 this.flowMissingXid = flowMissingXid;
508 }
509
510 private class InternalDeviceFlowTable {
511
512 private final Map<FlowId, Set<TypedStoredFlowEntry>>
513 flowEntries = Maps.newConcurrentMap();
514
515 private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
516 private final Set<StoredFlowEntry> midFlows = new HashSet<>();
517 private final Set<StoredFlowEntry> longFlows = new HashSet<>();
518
519 // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
520 private final long latencyFlowStatsRequestAndReplyMillis = 500;
521
522
523 // Statistics for table operation
524 private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
525 private long removeCount = 0;
526
527 /**
528 * Resets all count values with zero.
529 *
530 */
531 public void resetAllCount() {
532 addCount = 0;
533 addWithSetFlowLiveTypeCount = 0;
534 removeCount = 0;
535 }
536
537 // get set of flow entries for the given flowId
538 private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
539 return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
540 }
541
542 // get flow entry for the given flow rule
543 private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
544 Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
545 return flowEntries.stream()
546 .filter(entry -> Objects.equal(entry, rule))
547 .findAny()
548 .orElse(null);
549 }
550
551 // get the flow entries for all flows in flow table
552 private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
553 Set<TypedStoredFlowEntry> result = Sets.newHashSet();
554
555 flowEntries.values().forEach(result::addAll);
556 return result;
557 }
558
559 /**
560 * Gets the number of flow entry in flow table.
561 *
562 * @return the number of flow entry.
563 *
564 */
565 public long getFlowCount() {
566 return flowEntries.values().stream().mapToLong(Set::size).sum();
567 }
568
569 /**
570 * Gets the number of flow entry in flow table.
571 *
572 * @param rule the flow rule
573 * @return the typed flow entry.
574 *
575 */
576 public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
577 checkNotNull(rule);
578
579 return getFlowEntryInternal(rule);
580 }
581
582 /**
583 * Gets the all typed flow entries in flow table.
584 *
585 * @return the set of typed flow entry.
586 *
587 */
588 public Set<TypedStoredFlowEntry> getFlowEntries() {
589 return getFlowEntriesInternal();
590 }
591
592 /**
593 * Gets the short typed flow entries in flow table.
594 *
595 * @return the set of typed flow entry.
596 *
597 */
598 public Set<StoredFlowEntry> getShortFlows() {
599 return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
600 }
601
602 /**
603 * Gets the mid typed flow entries in flow table.
604 *
605 * @return the set of typed flow entry.
606 *
607 */
608 public Set<StoredFlowEntry> getMidFlows() {
609 return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
610 }
611
612 /**
613 * Gets the long typed flow entries in flow table.
614 *
615 * @return the set of typed flow entry.
616 *
617 */
618 public Set<StoredFlowEntry> getLongFlows() {
619 return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
620 }
621
622 /**
623 * Add typed flow entry into table only.
624 *
625 * @param rule the flow rule
626 *
627 */
628 public synchronized void add(TypedStoredFlowEntry rule) {
629 checkNotNull(rule);
630
631 //rule have to be new DefaultTypedFlowEntry
632 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
633
634 if (result) {
635 addCount++;
636 }
637 }
638
639 /**
640 * Calculates and set the flow live type at the first time,
641 * and then add it into a corresponding typed flow table.
642 *
643 * @param rule the flow rule
644 *
645 */
646 public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
647 checkNotNull(rule);
648
649 calAndSetFlowLiveTypeInternal(rule);
650 }
651
652 /**
653 * Add the typed flow entry into table, and calculates and set the flow live type,
654 * and then add it into a corresponding typed flow table.
655 *
656 * @param rule the flow rule
657 *
658 */
659 public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
660 checkNotNull(rule);
661
662 //rule have to be new DefaultTypedFlowEntry
663 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
664 if (result) {
665 calAndSetFlowLiveTypeInternal(rule);
666 addWithSetFlowLiveTypeCount++;
667 } else {
668 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
669 + " ADD Failed, cause it may already exists in table !!!,"
670 + " AdaptiveStats collection thread for {}",
671 sw.getStringId());
672 }
673 }
674
675 // In real, calculates and set the flow live type at the first time,
676 // and then add it into a corresponding typed flow table
677 private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
678 long life = rule.life();
679 FlowLiveType prevFlowLiveType = rule.flowLiveType();
680
681 if (life >= longPollInterval) {
682 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
683 longFlows.add(rule);
684 } else if (life >= midPollInterval) {
685 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
686 midFlows.add(rule);
687 } else if (life >= calAndPollInterval) {
688 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
689 shortFlows.add(rule);
690 } else if (life >= 0) {
691 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
692 } else { // life < 0
693 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
694 }
695
696 if (rule.flowLiveType() != prevFlowLiveType) {
697 switch (prevFlowLiveType) {
698 // delete it from previous flow table
699 case SHORT_FLOW:
700 shortFlows.remove(rule);
701 break;
702 case MID_FLOW:
703 midFlows.remove(rule);
704 break;
705 case LONG_FLOW:
706 longFlows.remove(rule);
707 break;
708 default:
709 break;
710 }
711 }
712 }
713
714
715 // check the flow live type based on current time, then set and add it into corresponding table
716 private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
717 long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
718 // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
719 long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
ssyoon9030fbcd92015-08-17 10:42:07 +0900720 // fe.life() unit is SECOND!
721 long liveTime = fe.life() + fromLastSeen;
722
ssyoon9030fbcd92015-08-17 10:42:07 +0900723
724 switch (fe.flowLiveType()) {
725 case IMMEDIATE_FLOW:
726 if (liveTime >= longPollInterval) {
727 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
728 longFlows.add(fe);
729 } else if (liveTime >= midPollInterval) {
730 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
731 midFlows.add(fe);
732 } else if (liveTime >= calAndPollInterval) {
733 fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
734 shortFlows.add(fe);
735 }
736 break;
737 case SHORT_FLOW:
738 if (liveTime >= longPollInterval) {
739 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
740 shortFlows.remove(fe);
741 longFlows.add(fe);
742 } else if (liveTime >= midPollInterval) {
743 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
744 shortFlows.remove(fe);
745 midFlows.add(fe);
746 }
747 break;
748 case MID_FLOW:
749 if (liveTime >= longPollInterval) {
750 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
751 midFlows.remove(fe);
752 longFlows.add(fe);
753 }
754 break;
755 case LONG_FLOW:
756 if (fromLastSeen > entirePollInterval) {
757 log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
758 return false;
759 }
760 break;
761 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
762 default :
763 // Error Unknown Live Type
764 log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
765 + "AdaptiveStats collection thread for {}",
766 sw.getStringId());
767 return false;
768 }
769
770 log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
771 + ", state=" + fe.state()
772 + ", After liveType=" + fe.flowLiveType()
773 + ", liveTime=" + liveTime
774 + ", life=" + fe.life()
775 + ", bytes=" + fe.bytes()
776 + ", packets=" + fe.packets()
777 + ", fromLastSeen=" + fromLastSeen
778 + ", priority=" + fe.priority()
779 + ", selector=" + fe.selector().criteria()
780 + ", treatment=" + fe.treatment()
781 + " AdaptiveStats collection thread for {}",
782 sw.getStringId());
783
784 return true;
785 }
786
787 /**
788 * Check and move live type for all type flow entries in table at every calAndPollInterval time.
789 *
790 */
791 public void checkAndMoveLiveFlowAll() {
792 Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
793
794 long calCurTime = System.currentTimeMillis();
795 typedFlowEntries.forEach(fe -> {
796 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
797 remove(fe);
798 }
799 });
800
801 // print table counts for debug
802 if (log.isDebugEnabled()) {
803 synchronized (this) {
804 long totalFlowCount = getFlowCount();
805 long shortFlowCount = shortFlows.size();
806 long midFlowCount = midFlows.size();
807 long longFlowCount = longFlows.size();
808 long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
809 long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
810
811 log.debug("--------------------------------------------------------------------------- for {}",
812 sw.getStringId());
813 log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
814 + ", add - remove_Count=" + calTotalCount
815 + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
816 + ", SHORT_FLOW_Count=" + shortFlowCount
817 + ", MID_FLOW_Count=" + midFlowCount
818 + ", LONG_FLOW_Count=" + longFlowCount
819 + ", add_Count=" + addCount
820 + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
821 + ", remove_Count=" + removeCount
822 + " AdaptiveStats collection thread for {}", sw.getStringId());
823 log.debug("--------------------------------------------------------------------------- for {}",
824 sw.getStringId());
825 if (totalFlowCount != calTotalCount) {
826 log.error("checkAndMoveLiveFlowAll, Real total flow count and "
827 + "calculated total flow count do NOT match, something is wrong internally "
828 + "or check counter value bound is over!");
829 }
830 if (immediateFlowCount < 0) {
831 log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
832 + "something is wrong internally "
833 + "or check counter value bound is over!");
834 }
835 }
836 }
837 log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
838 }
839
840 /**
841 * Remove the typed flow entry from table.
842 *
843 * @param rule the flow rule
844 *
845 */
846 public synchronized void remove(FlowRule rule) {
847 checkNotNull(rule);
848
849 TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
850 if (removeStore != null) {
851 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
852 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
853
854 if (result) {
855 removeCount++;
856 }
857 }
858 }
859
860 // Remove the typed flow entry from corresponding table
861 private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
862 switch (fe.flowLiveType()) {
863 case IMMEDIATE_FLOW:
864 // do nothing
865 break;
866 case SHORT_FLOW:
867 shortFlows.remove(fe);
868 break;
869 case MID_FLOW:
870 midFlows.remove(fe);
871 break;
872 case LONG_FLOW:
873 longFlows.remove(fe);
874 break;
875 default: // error in Flow Live Type
876 log.error("removeLiveFlowsInternal, Unknown Live Type error!");
877 break;
878 }
879 }
880 }
881}