blob: c8539c588c0780b03f59713d61316b2951760980 [file] [log] [blame]
Brian O'Connor8c685362015-12-05 15:27:27 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.of.flow.impl;
18
19import com.google.common.base.Objects;
20import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
Charles Chan14967c22015-12-07 11:11:50 -080023import org.onosproject.net.driver.DriverService;
Brian O'Connor8c685362015-12-05 15:27:27 -080024import org.onosproject.net.flow.DefaultTypedFlowEntry;
25import org.onosproject.net.flow.FlowEntry;
26import org.onosproject.net.flow.FlowId;
27import org.onosproject.net.flow.FlowRule;
28import org.onosproject.net.flow.StoredFlowEntry;
29import org.onosproject.net.flow.TypedStoredFlowEntry;
30import org.onosproject.net.flow.instructions.Instruction;
31import org.onosproject.net.flow.instructions.Instructions;
32import org.onosproject.openflow.controller.OpenFlowSwitch;
33import org.onosproject.openflow.controller.RoleState;
34import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
35import org.projectfloodlight.openflow.protocol.match.Match;
36import org.projectfloodlight.openflow.types.OFPort;
37import org.projectfloodlight.openflow.types.TableId;
38import org.slf4j.Logger;
39
40import java.util.HashSet;
41import java.util.List;
42import java.util.Map;
43import java.util.Optional;
44import java.util.Set;
45import java.util.concurrent.Executors;
46import java.util.concurrent.ScheduledExecutorService;
47import java.util.concurrent.ScheduledFuture;
48import java.util.concurrent.TimeUnit;
49
50import static com.google.common.base.Preconditions.checkNotNull;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Efficiently and adaptively collects flow statistics for the specified switch.
57 */
58public class NewAdaptiveFlowStatsCollector {
Brian O'Connor8c685362015-12-05 15:27:27 -080059 private final Logger log = getLogger(getClass());
60
Charles Chan14967c22015-12-07 11:11:50 -080061 private final DriverService driverService;
Brian O'Connor8c685362015-12-05 15:27:27 -080062 private final OpenFlowSwitch sw;
63
64 private ScheduledExecutorService adaptiveFlowStatsScheduler =
65 Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
66 private ScheduledFuture<?> calAndShortFlowsThread;
67 private ScheduledFuture<?> midFlowsThread;
68 private ScheduledFuture<?> longFlowsThread;
69
70 // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
71 private CalAndShortFlowsTask calAndShortFlowsTask;
72 // Task that collects stats MID flows every 2*calAndPollInterval
73 private MidFlowsTask midFlowsTask;
74 // Task that collects stats LONG flows every 3*calAndPollInterval
75 private LongFlowsTask longFlowsTask;
76
77 private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
78 private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
79 private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
80 //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
81 // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
82 private static final int ENTIRE_POLL_TIMES = 6;
83
84 private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
85 private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
86 private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
87
88 private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89 private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90 private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
91 // only used for checking condition at each task if it collects entire flows from a given switch or not
92 private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
93
94 // Number of call count of each Task,
95 // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
96 private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
97 private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
98 private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
99
100 private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
101
102 private boolean isFirstTimeStart = true;
103
104 public static final long NO_FLOW_MISSING_XID = (-1);
105 private long flowMissingXid = NO_FLOW_MISSING_XID;
106
107 /**
108 * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
109 *
110 * @param sw switch to pull
111 * @param pollInterval cal and immediate poll frequency in seconds
112 */
Charles Chan14967c22015-12-07 11:11:50 -0800113 NewAdaptiveFlowStatsCollector(
114 DriverService driverService, OpenFlowSwitch sw, int pollInterval) {
115 this.driverService = driverService;
Brian O'Connor8c685362015-12-05 15:27:27 -0800116 this.sw = sw;
Brian O'Connor8c685362015-12-05 15:27:27 -0800117 initMemberVars(pollInterval);
118 }
119
120 // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
121 private void initMemberVars(int pollInterval) {
122 if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
123 this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
124 } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
125 this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
126 } else {
127 this.calAndPollInterval = pollInterval;
128 }
129
130 calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
131 midPollInterval = MID_POLL_TIMES * calAndPollInterval;
132 longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
133 entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
134
135 callCountCalAndShortFlowsTask = 0;
136 callCountMidFlowsTask = 0;
137 callCountLongFlowsTask = 0;
138
139 flowMissingXid = NO_FLOW_MISSING_XID;
140 }
141
142 /**
143 * Adjusts adaptive poll frequency.
144 *
145 * @param pollInterval poll frequency in seconds
146 */
147 synchronized void adjustCalAndPollInterval(int pollInterval) {
148 initMemberVars(pollInterval);
149
150 if (calAndShortFlowsThread != null) {
151 calAndShortFlowsThread.cancel(false);
152 }
153 if (midFlowsThread != null) {
154 midFlowsThread.cancel(false);
155 }
156 if (longFlowsThread != null) {
157 longFlowsThread.cancel(false);
158 }
159
160 calAndShortFlowsTask = new CalAndShortFlowsTask();
161 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
162 calAndShortFlowsTask,
163 0,
164 calAndPollInterval,
165 TimeUnit.SECONDS);
166
167 midFlowsTask = new MidFlowsTask();
168 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
169 midFlowsTask,
170 0,
171 midPollInterval,
172 TimeUnit.SECONDS);
173
174 longFlowsTask = new LongFlowsTask();
175 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
176 longFlowsTask,
177 0,
178 longPollInterval,
179 TimeUnit.SECONDS);
180
181 log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
182 }
183
184 private class CalAndShortFlowsTask implements Runnable {
185 @Override
186 public void run() {
187 if (sw.getRole() == RoleState.MASTER) {
188 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
189
190 if (isFirstTimeStart) {
191 // isFirstTimeStart, get entire flow stats from a given switch sw
192 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
193 sw.getStringId());
194 ofFlowStatsRequestAllSend();
195
196 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
197 isFirstTimeStart = false;
198 } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
199 // entire_poll_times, get entire flow stats from a given switch sw
200 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
201 ofFlowStatsRequestAllSend();
202
203 callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
204 //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
205 //
206 } else {
207 calAndShortFlowsTaskInternal();
208 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
209 }
210 }
211 }
212 }
213
214 // send openflow flow stats request message with getting all flow entries to a given switch sw
215 private void ofFlowStatsRequestAllSend() {
216 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
217 .setMatch(sw.factory().matchWildcardAll())
218 .setTableId(TableId.ALL)
219 .setOutPort(OFPort.NO_MASK)
220 .build();
221
222 synchronized (this) {
223 // set the request xid to check the reply in OpenFlowRuleProvider
224 // After processing the reply of this request message,
225 // this must be set to NO_FLOW_MISSING_XID(-1) by provider
226 setFlowMissingXid(request.getXid());
227 log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
228
229 sw.sendMsg(request);
230 }
231 }
232
233 // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
234 private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
235 // set find match
236 Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
Charles Chan14967c22015-12-07 11:11:50 -0800237 Optional.of(driverService)).buildMatch();
Brian O'Connor8c685362015-12-05 15:27:27 -0800238 // set find tableId
239 TableId tableId = TableId.of(fe.tableId());
240 // set output port
241 Instruction ins = fe.treatment().allInstructions().stream()
242 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
243 .findFirst()
244 .orElse(null);
245 OFPort ofPort = OFPort.NO_MASK;
246 if (ins != null) {
247 Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
248 ofPort = OFPort.of((int) ((out.port().toLong())));
249 }
250
251 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
252 .setMatch(match)
253 .setTableId(tableId)
254 .setOutPort(ofPort)
255 .build();
256
257 synchronized (this) {
258 if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
259 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
260 + " set no flow missing xid anyway, for {}",
261 sw.getStringId());
262 setFlowMissingXid(NO_FLOW_MISSING_XID);
263 }
264
265 sw.sendMsg(request);
266 }
267 }
268
269 private void calAndShortFlowsTaskInternal() {
270 deviceFlowTable.checkAndMoveLiveFlowAll();
271
272 deviceFlowTable.getShortFlows().forEach(fe -> {
273 ofFlowStatsRequestFlowSend(fe);
274 });
275 }
276
277 private class MidFlowsTask implements Runnable {
278 @Override
279 public void run() {
280 if (sw.getRole() == RoleState.MASTER) {
281 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
282
283 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
284 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
285 callCountMidFlowsTask = MID_POLL_TIMES;
286 } else {
287 midFlowsTaskInternal();
288 callCountMidFlowsTask += MID_POLL_TIMES;
289 }
290 }
291 }
292 }
293
294 private void midFlowsTaskInternal() {
295 deviceFlowTable.getMidFlows().forEach(fe -> {
296 ofFlowStatsRequestFlowSend(fe);
297 });
298 }
299
300 private class LongFlowsTask implements Runnable {
301 @Override
302 public void run() {
303 if (sw.getRole() == RoleState.MASTER) {
304 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
305
306 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
307 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
308 callCountLongFlowsTask = LONG_POLL_TIMES;
309 } else {
310 longFlowsTaskInternal();
311 callCountLongFlowsTask += LONG_POLL_TIMES;
312 }
313 }
314 }
315 }
316
317 private void longFlowsTaskInternal() {
318 deviceFlowTable.getLongFlows().forEach(fe -> {
319 ofFlowStatsRequestFlowSend(fe);
320 });
321 }
322
323 /**
324 * start adaptive flow statistic collection.
325 *
326 */
327 public synchronized void start() {
328 log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
329 callCountCalAndShortFlowsTask = 0;
330 callCountMidFlowsTask = 0;
331 callCountLongFlowsTask = 0;
332
333 isFirstTimeStart = true;
334
335 // Initially start polling quickly. Then drop down to configured value
336 calAndShortFlowsTask = new CalAndShortFlowsTask();
337 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
338 calAndShortFlowsTask,
339 1,
340 calAndPollInterval,
341 TimeUnit.SECONDS);
342
343 midFlowsTask = new MidFlowsTask();
344 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
345 midFlowsTask,
346 1,
347 midPollInterval,
348 TimeUnit.SECONDS);
349
350 longFlowsTask = new LongFlowsTask();
351 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
352 longFlowsTask,
353 1,
354 longPollInterval,
355 TimeUnit.SECONDS);
356
357 log.info("Started");
358 }
359
360 /**
361 * stop adaptive flow statistic collection.
362 *
363 */
364 public synchronized void stop() {
365 log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
366 if (calAndShortFlowsThread != null) {
367 calAndShortFlowsThread.cancel(true);
368 }
369 if (midFlowsThread != null) {
370 midFlowsThread.cancel(true);
371 }
372 if (longFlowsThread != null) {
373 longFlowsThread.cancel(true);
374 }
375
376 adaptiveFlowStatsScheduler.shutdownNow();
377
378 isFirstTimeStart = false;
379
380 log.info("Stopped");
381 }
382
383 /**
384 * add typed flow entry from flow rule into the internal flow table.
385 *
386 * @param flowRules the flow rules
387 *
388 */
389 public synchronized void addWithFlowRule(FlowRule... flowRules) {
390 for (FlowRule fr : flowRules) {
391 // First remove old entry unconditionally, if exist
392 deviceFlowTable.remove(fr);
393
394 // add new flow entry, we suppose IMMEDIATE_FLOW
395 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
396 FlowLiveType.IMMEDIATE_FLOW);
397 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
398 }
399 }
400
401 /**
402 * add or update typed flow entry from flow entry into the internal flow table.
403 *
404 * @param flowEntries the flow entries
405 *
406 */
407 public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
408 for (FlowEntry fe : flowEntries) {
409 // check if this new rule is an update to an existing entry
410 TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
411
412 if (stored != null) {
413 // duplicated flow entry is collected!, just skip
414 if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
415 && fe.life() == stored.life()) {
416 log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
417 + ",is DUPLICATED stats collection, just skip."
418 + " AdaptiveStats collection thread for {}",
419 sw.getStringId());
420
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800421 //FIXME modification of "stored" flow entry outside of store
Brian O'Connor8c685362015-12-05 15:27:27 -0800422 stored.setLastSeen();
423 continue;
424 } else if (fe.life() < stored.life()) {
425 // Invalid updates the stats values, i.e., bytes, packets, durations ...
426 log.debug("addOrUpdateFlows():" +
427 " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
428 " new flowId=" + Long.toHexString(fe.id().value()) +
429 ", old flowId=" + Long.toHexString(stored.id().value()) +
430 ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
431 ", new life=" + fe.life() + ", old life=" + stored.life() +
432 ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
433 // go next
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800434 //FIXME modification of "stored" flow entry outside of store
Brian O'Connor8c685362015-12-05 15:27:27 -0800435 stored.setLastSeen();
436 continue;
437 }
438
439 // update now
Brian O'Connora3e5cd52015-12-05 15:59:19 -0800440 //FIXME modification of "stored" flow entry outside of store
Brian O'Connor8c685362015-12-05 15:27:27 -0800441 stored.setLife(fe.life());
442 stored.setPackets(fe.packets());
443 stored.setBytes(fe.bytes());
444 stored.setLastSeen();
445 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
446 // flow is really RULE_ADDED
447 stored.setState(FlowEntry.FlowEntryState.ADDED);
448 }
449 // flow is RULE_UPDATED, skip adding and just updating flow live table
450 //deviceFlowTable.calAndSetFlowLiveType(stored);
451 continue;
452 }
453
454 // add new flow entry, we suppose IMMEDIATE_FLOW
455 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
456 FlowLiveType.IMMEDIATE_FLOW);
457 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
458 }
459 }
460
461 /**
462 * remove typed flow entry from the internal flow table.
463 *
464 * @param flowRules the flow entries
465 *
466 */
467 public synchronized void removeFlows(FlowRule... flowRules) {
468 for (FlowRule rule : flowRules) {
469 deviceFlowTable.remove(rule);
470 }
471 }
472
473 // same as removeFlows() function
474 /**
475 * remove typed flow entry from the internal flow table.
476 *
477 * @param flowRules the flow entries
478 *
479 */
480 public void flowRemoved(FlowRule... flowRules) {
481 removeFlows(flowRules);
482 }
483
484 // same as addOrUpdateFlows() function
485 /**
486 * add or update typed flow entry from flow entry into the internal flow table.
487 *
488 * @param flowEntries the flow entry list
489 *
490 */
491 public void pushFlowMetrics(List<FlowEntry> flowEntries) {
492 flowEntries.forEach(fe -> {
493 addOrUpdateFlows(fe);
494 });
495 }
496
497 /**
498 * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
499 *
500 * @return xid of missing flow
501 */
502 public long getFlowMissingXid() {
503 return flowMissingXid;
504 }
505
506 /**
507 * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
508 *
509 * @param flowMissingXid the OFFlowStatsRequest message Id
510 *
511 */
512 public void setFlowMissingXid(long flowMissingXid) {
513 this.flowMissingXid = flowMissingXid;
514 }
515
516 private class InternalDeviceFlowTable {
517
518 private final Map<FlowId, Set<TypedStoredFlowEntry>>
519 flowEntries = Maps.newConcurrentMap();
520
521 private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
522 private final Set<StoredFlowEntry> midFlows = new HashSet<>();
523 private final Set<StoredFlowEntry> longFlows = new HashSet<>();
524
525 // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
526 private final long latencyFlowStatsRequestAndReplyMillis = 500;
527
528
529 // Statistics for table operation
530 private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
531 private long removeCount = 0;
532
533 /**
534 * Resets all count values with zero.
535 *
536 */
537 public void resetAllCount() {
538 addCount = 0;
539 addWithSetFlowLiveTypeCount = 0;
540 removeCount = 0;
541 }
542
543 // get set of flow entries for the given flowId
544 private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
545 return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
546 }
547
548 // get flow entry for the given flow rule
549 private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
550 Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
551 return flowEntries.stream()
552 .filter(entry -> Objects.equal(entry, rule))
553 .findAny()
554 .orElse(null);
555 }
556
557 // get the flow entries for all flows in flow table
558 private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
559 Set<TypedStoredFlowEntry> result = Sets.newHashSet();
560
561 flowEntries.values().forEach(result::addAll);
562 return result;
563 }
564
565 /**
566 * Gets the number of flow entry in flow table.
567 *
568 * @return the number of flow entry.
569 *
570 */
571 public long getFlowCount() {
572 return flowEntries.values().stream().mapToLong(Set::size).sum();
573 }
574
575 /**
576 * Gets the number of flow entry in flow table.
577 *
578 * @param rule the flow rule
579 * @return the typed flow entry.
580 *
581 */
582 public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
583 checkNotNull(rule);
584
585 return getFlowEntryInternal(rule);
586 }
587
588 /**
589 * Gets the all typed flow entries in flow table.
590 *
591 * @return the set of typed flow entry.
592 *
593 */
594 public Set<TypedStoredFlowEntry> getFlowEntries() {
595 return getFlowEntriesInternal();
596 }
597
598 /**
599 * Gets the short typed flow entries in flow table.
600 *
601 * @return the set of typed flow entry.
602 *
603 */
604 public Set<StoredFlowEntry> getShortFlows() {
605 return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
606 }
607
608 /**
609 * Gets the mid typed flow entries in flow table.
610 *
611 * @return the set of typed flow entry.
612 *
613 */
614 public Set<StoredFlowEntry> getMidFlows() {
615 return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
616 }
617
618 /**
619 * Gets the long typed flow entries in flow table.
620 *
621 * @return the set of typed flow entry.
622 *
623 */
624 public Set<StoredFlowEntry> getLongFlows() {
625 return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
626 }
627
628 /**
629 * Add typed flow entry into table only.
630 *
631 * @param rule the flow rule
632 *
633 */
634 public synchronized void add(TypedStoredFlowEntry rule) {
635 checkNotNull(rule);
636
637 //rule have to be new DefaultTypedFlowEntry
638 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
639
640 if (result) {
641 addCount++;
642 }
643 }
644
645 /**
646 * Calculates and set the flow live type at the first time,
647 * and then add it into a corresponding typed flow table.
648 *
649 * @param rule the flow rule
650 *
651 */
652 public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
653 checkNotNull(rule);
654
655 calAndSetFlowLiveTypeInternal(rule);
656 }
657
658 /**
659 * Add the typed flow entry into table, and calculates and set the flow live type,
660 * and then add it into a corresponding typed flow table.
661 *
662 * @param rule the flow rule
663 *
664 */
665 public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
666 checkNotNull(rule);
667
668 //rule have to be new DefaultTypedFlowEntry
669 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
670 if (result) {
671 calAndSetFlowLiveTypeInternal(rule);
672 addWithSetFlowLiveTypeCount++;
673 } else {
674 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
675 + " ADD Failed, cause it may already exists in table !!!,"
676 + " AdaptiveStats collection thread for {}",
677 sw.getStringId());
678 }
679 }
680
681 // In real, calculates and set the flow live type at the first time,
682 // and then add it into a corresponding typed flow table
683 private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
684 long life = rule.life();
685 FlowLiveType prevFlowLiveType = rule.flowLiveType();
686
687 if (life >= longPollInterval) {
688 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
689 longFlows.add(rule);
690 } else if (life >= midPollInterval) {
691 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
692 midFlows.add(rule);
693 } else if (life >= calAndPollInterval) {
694 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
695 shortFlows.add(rule);
696 } else if (life >= 0) {
697 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
698 } else { // life < 0
699 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
700 }
701
702 if (rule.flowLiveType() != prevFlowLiveType) {
703 switch (prevFlowLiveType) {
704 // delete it from previous flow table
705 case SHORT_FLOW:
706 shortFlows.remove(rule);
707 break;
708 case MID_FLOW:
709 midFlows.remove(rule);
710 break;
711 case LONG_FLOW:
712 longFlows.remove(rule);
713 break;
714 default:
715 break;
716 }
717 }
718 }
719
720
721 // check the flow live type based on current time, then set and add it into corresponding table
722 private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
723 long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
724 // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
725 long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
726 // fe.life() unit is SECOND!
727 long liveTime = fe.life() + fromLastSeen;
728
729
730 switch (fe.flowLiveType()) {
731 case IMMEDIATE_FLOW:
732 if (liveTime >= longPollInterval) {
733 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
734 longFlows.add(fe);
735 } else if (liveTime >= midPollInterval) {
736 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
737 midFlows.add(fe);
738 } else if (liveTime >= calAndPollInterval) {
739 fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
740 shortFlows.add(fe);
741 }
742 break;
743 case SHORT_FLOW:
744 if (liveTime >= longPollInterval) {
745 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
746 shortFlows.remove(fe);
747 longFlows.add(fe);
748 } else if (liveTime >= midPollInterval) {
749 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
750 shortFlows.remove(fe);
751 midFlows.add(fe);
752 }
753 break;
754 case MID_FLOW:
755 if (liveTime >= longPollInterval) {
756 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
757 midFlows.remove(fe);
758 longFlows.add(fe);
759 }
760 break;
761 case LONG_FLOW:
762 if (fromLastSeen > entirePollInterval) {
763 log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
764 return false;
765 }
766 break;
767 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
768 default :
769 // Error Unknown Live Type
770 log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
771 + "AdaptiveStats collection thread for {}",
772 sw.getStringId());
773 return false;
774 }
775
776 log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
777 + ", state=" + fe.state()
778 + ", After liveType=" + fe.flowLiveType()
779 + ", liveTime=" + liveTime
780 + ", life=" + fe.life()
781 + ", bytes=" + fe.bytes()
782 + ", packets=" + fe.packets()
783 + ", fromLastSeen=" + fromLastSeen
784 + ", priority=" + fe.priority()
785 + ", selector=" + fe.selector().criteria()
786 + ", treatment=" + fe.treatment()
787 + " AdaptiveStats collection thread for {}",
788 sw.getStringId());
789
790 return true;
791 }
792
793 /**
794 * Check and move live type for all type flow entries in table at every calAndPollInterval time.
795 *
796 */
797 public void checkAndMoveLiveFlowAll() {
798 Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
799
800 long calCurTime = System.currentTimeMillis();
801 typedFlowEntries.forEach(fe -> {
802 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
803 remove(fe);
804 }
805 });
806
807 // print table counts for debug
808 if (log.isDebugEnabled()) {
809 synchronized (this) {
810 long totalFlowCount = getFlowCount();
811 long shortFlowCount = shortFlows.size();
812 long midFlowCount = midFlows.size();
813 long longFlowCount = longFlows.size();
814 long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
815 long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
816
817 log.debug("--------------------------------------------------------------------------- for {}",
818 sw.getStringId());
819 log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
820 + ", add - remove_Count=" + calTotalCount
821 + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
822 + ", SHORT_FLOW_Count=" + shortFlowCount
823 + ", MID_FLOW_Count=" + midFlowCount
824 + ", LONG_FLOW_Count=" + longFlowCount
825 + ", add_Count=" + addCount
826 + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
827 + ", remove_Count=" + removeCount
828 + " AdaptiveStats collection thread for {}", sw.getStringId());
829 log.debug("--------------------------------------------------------------------------- for {}",
830 sw.getStringId());
831 if (totalFlowCount != calTotalCount) {
832 log.error("checkAndMoveLiveFlowAll, Real total flow count and "
833 + "calculated total flow count do NOT match, something is wrong internally "
834 + "or check counter value bound is over!");
835 }
836 if (immediateFlowCount < 0) {
837 log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
838 + "something is wrong internally "
839 + "or check counter value bound is over!");
840 }
841 }
842 }
843 log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
844 }
845
846 /**
847 * Remove the typed flow entry from table.
848 *
849 * @param rule the flow rule
850 *
851 */
852 public synchronized void remove(FlowRule rule) {
853 checkNotNull(rule);
854
855 TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
856 if (removeStore != null) {
857 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
858 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
859
860 if (result) {
861 removeCount++;
862 }
863 }
864 }
865
866 // Remove the typed flow entry from corresponding table
867 private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
868 switch (fe.flowLiveType()) {
869 case IMMEDIATE_FLOW:
870 // do nothing
871 break;
872 case SHORT_FLOW:
873 shortFlows.remove(fe);
874 break;
875 case MID_FLOW:
876 midFlows.remove(fe);
877 break;
878 case LONG_FLOW:
879 longFlows.remove(fe);
880 break;
881 default: // error in Flow Live Type
882 log.error("removeLiveFlowsInternal, Unknown Live Type error!");
883 break;
884 }
885 }
886 }
887}