blob: 52d6099b78f4ae951ef8525990c20cee45d83114 [file] [log] [blame]
Brian O'Connor8c685362015-12-05 15:27:27 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Brian O'Connor8c685362015-12-05 15:27:27 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.of.flow.impl;
18
19import com.google.common.base.Objects;
20import com.google.common.collect.ImmutableSet;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
Charles Chan14967c22015-12-07 11:11:50 -080023import org.onosproject.net.driver.DriverService;
Brian O'Connor8c685362015-12-05 15:27:27 -080024import org.onosproject.net.flow.DefaultTypedFlowEntry;
25import org.onosproject.net.flow.FlowEntry;
26import org.onosproject.net.flow.FlowId;
27import org.onosproject.net.flow.FlowRule;
28import org.onosproject.net.flow.StoredFlowEntry;
29import org.onosproject.net.flow.TypedStoredFlowEntry;
30import org.onosproject.net.flow.instructions.Instruction;
31import org.onosproject.net.flow.instructions.Instructions;
32import org.onosproject.openflow.controller.OpenFlowSwitch;
33import org.onosproject.openflow.controller.RoleState;
34import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
35import org.projectfloodlight.openflow.protocol.match.Match;
36import org.projectfloodlight.openflow.types.OFPort;
37import org.projectfloodlight.openflow.types.TableId;
38import org.slf4j.Logger;
39
40import java.util.HashSet;
41import java.util.List;
42import java.util.Map;
43import java.util.Optional;
44import java.util.Set;
45import java.util.concurrent.Executors;
46import java.util.concurrent.ScheduledExecutorService;
47import java.util.concurrent.ScheduledFuture;
48import java.util.concurrent.TimeUnit;
49
50import static com.google.common.base.Preconditions.checkNotNull;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Efficiently and adaptively collects flow statistics for the specified switch.
57 */
Thomas Vachuskaa394b952016-06-14 15:02:09 -070058public class NewAdaptiveFlowStatsCollector implements SwitchDataCollector {
Brian O'Connor8c685362015-12-05 15:27:27 -080059 private final Logger log = getLogger(getClass());
60
Jonathan Hart84f4f312016-03-03 08:17:11 -080061 private static final String CHECK_AND_MOVE_LOG =
62 "checkAndMoveLiveFlowInternal: flowId={}, state={}, afterLiveType={}"
63 + ", liveTime={}, life={}, bytes={}, packets={}, fromLastSeen={}"
64 + ", priority={}, selector={}, treatment={} dpid={}";
65
66 private static final String CHECK_AND_MOVE_COUNT_LOG =
67 "checkAndMoveLiveFlowAll: Total Flow_Count={}, add-remove_Count={}"
68 + ", IMMEDIATE_FLOW_Count={}, SHORT_FLOW_Count={}"
69 + ", MID_FLOW_Count={}, LONG_FLOW_Count={}, add_Count={}"
70 + ", addWithSetFlowLiveType_Count={}, remove_Count={}, dpid={}";
71
72 private static final String ADD_INVALID_LOG =
73 "addOrUpdateFlows: invalid flow update! The new life is SMALLER than the previous one"
74 + ", new flowId={}, old flowId={}, new bytes={}, old bytes={}"
75 + ", new life={}, old life={}, new lastSeen={}, old lastSeen={}";
76
Charles Chan14967c22015-12-07 11:11:50 -080077 private final DriverService driverService;
Brian O'Connor8c685362015-12-05 15:27:27 -080078 private final OpenFlowSwitch sw;
79
80 private ScheduledExecutorService adaptiveFlowStatsScheduler =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -070081 Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d", log));
Brian O'Connor8c685362015-12-05 15:27:27 -080082 private ScheduledFuture<?> calAndShortFlowsThread;
83 private ScheduledFuture<?> midFlowsThread;
84 private ScheduledFuture<?> longFlowsThread;
85
86 // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
87 private CalAndShortFlowsTask calAndShortFlowsTask;
88 // Task that collects stats MID flows every 2*calAndPollInterval
89 private MidFlowsTask midFlowsTask;
90 // Task that collects stats LONG flows every 3*calAndPollInterval
91 private LongFlowsTask longFlowsTask;
92
93 private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
94 private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
95 private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
96 //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
97 // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
98 private static final int ENTIRE_POLL_TIMES = 6;
99
100 private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
101 private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
102 private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
103
104 private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
105 private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
106 private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
107 // only used for checking condition at each task if it collects entire flows from a given switch or not
108 private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
109
110 // Number of call count of each Task,
111 // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
112 private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
113 private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
114 private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
115
116 private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
117
118 private boolean isFirstTimeStart = true;
119
120 public static final long NO_FLOW_MISSING_XID = (-1);
121 private long flowMissingXid = NO_FLOW_MISSING_XID;
122
123 /**
124 * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
125 *
Thomas Vachuska708d3032016-02-18 11:11:46 -0800126 * @param driverService driver service reference
127 * @param sw switch to pull
128 * @param pollInterval cal and immediate poll frequency in seconds
Brian O'Connor8c685362015-12-05 15:27:27 -0800129 */
Thomas Vachuska708d3032016-02-18 11:11:46 -0800130 NewAdaptiveFlowStatsCollector(DriverService driverService, OpenFlowSwitch sw, int pollInterval) {
Charles Chan14967c22015-12-07 11:11:50 -0800131 this.driverService = driverService;
Brian O'Connor8c685362015-12-05 15:27:27 -0800132 this.sw = sw;
Brian O'Connor8c685362015-12-05 15:27:27 -0800133 initMemberVars(pollInterval);
134 }
135
136 // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
137 private void initMemberVars(int pollInterval) {
138 if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
139 this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
140 } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
141 this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
142 } else {
143 this.calAndPollInterval = pollInterval;
144 }
145
146 calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
147 midPollInterval = MID_POLL_TIMES * calAndPollInterval;
148 longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
149 entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
150
151 callCountCalAndShortFlowsTask = 0;
152 callCountMidFlowsTask = 0;
153 callCountLongFlowsTask = 0;
154
155 flowMissingXid = NO_FLOW_MISSING_XID;
156 }
157
158 /**
159 * Adjusts adaptive poll frequency.
160 *
161 * @param pollInterval poll frequency in seconds
162 */
163 synchronized void adjustCalAndPollInterval(int pollInterval) {
164 initMemberVars(pollInterval);
165
166 if (calAndShortFlowsThread != null) {
167 calAndShortFlowsThread.cancel(false);
168 }
169 if (midFlowsThread != null) {
170 midFlowsThread.cancel(false);
171 }
172 if (longFlowsThread != null) {
173 longFlowsThread.cancel(false);
174 }
175
176 calAndShortFlowsTask = new CalAndShortFlowsTask();
177 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
178 calAndShortFlowsTask,
179 0,
180 calAndPollInterval,
181 TimeUnit.SECONDS);
182
183 midFlowsTask = new MidFlowsTask();
184 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
185 midFlowsTask,
186 0,
187 midPollInterval,
188 TimeUnit.SECONDS);
189
190 longFlowsTask = new LongFlowsTask();
191 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
192 longFlowsTask,
193 0,
194 longPollInterval,
195 TimeUnit.SECONDS);
196
Jonathan Hart84f4f312016-03-03 08:17:11 -0800197 log.debug("calAndPollInterval={} is adjusted", calAndPollInterval);
Brian O'Connor8c685362015-12-05 15:27:27 -0800198 }
199
200 private class CalAndShortFlowsTask implements Runnable {
201 @Override
202 public void run() {
203 if (sw.getRole() == RoleState.MASTER) {
204 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
205
206 if (isFirstTimeStart) {
207 // isFirstTimeStart, get entire flow stats from a given switch sw
208 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
209 sw.getStringId());
210 ofFlowStatsRequestAllSend();
211
212 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
213 isFirstTimeStart = false;
214 } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
215 // entire_poll_times, get entire flow stats from a given switch sw
216 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
217 ofFlowStatsRequestAllSend();
218
219 callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
220 //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
221 //
222 } else {
223 calAndShortFlowsTaskInternal();
224 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
225 }
226 }
227 }
228 }
229
230 // send openflow flow stats request message with getting all flow entries to a given switch sw
231 private void ofFlowStatsRequestAllSend() {
232 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
233 .setMatch(sw.factory().matchWildcardAll())
234 .setTableId(TableId.ALL)
235 .setOutPort(OFPort.NO_MASK)
236 .build();
237
238 synchronized (this) {
239 // set the request xid to check the reply in OpenFlowRuleProvider
240 // After processing the reply of this request message,
241 // this must be set to NO_FLOW_MISSING_XID(-1) by provider
242 setFlowMissingXid(request.getXid());
Jonathan Hart84f4f312016-03-03 08:17:11 -0800243 log.debug("ofFlowStatsRequestAllSend: request={}, dpid={}",
244 request.toString(), sw.getStringId());
Brian O'Connor8c685362015-12-05 15:27:27 -0800245
246 sw.sendMsg(request);
247 }
248 }
249
250 // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
251 private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
252 // set find match
253 Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
Charles Chan14967c22015-12-07 11:11:50 -0800254 Optional.of(driverService)).buildMatch();
Brian O'Connor8c685362015-12-05 15:27:27 -0800255 // set find tableId
256 TableId tableId = TableId.of(fe.tableId());
257 // set output port
258 Instruction ins = fe.treatment().allInstructions().stream()
259 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
260 .findFirst()
261 .orElse(null);
262 OFPort ofPort = OFPort.NO_MASK;
263 if (ins != null) {
264 Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
265 ofPort = OFPort.of((int) ((out.port().toLong())));
266 }
267
268 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
269 .setMatch(match)
270 .setTableId(tableId)
271 .setOutPort(ofPort)
272 .build();
273
274 synchronized (this) {
275 if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
276 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
277 + " set no flow missing xid anyway, for {}",
278 sw.getStringId());
279 setFlowMissingXid(NO_FLOW_MISSING_XID);
280 }
281
282 sw.sendMsg(request);
283 }
284 }
285
286 private void calAndShortFlowsTaskInternal() {
287 deviceFlowTable.checkAndMoveLiveFlowAll();
288
289 deviceFlowTable.getShortFlows().forEach(fe -> {
290 ofFlowStatsRequestFlowSend(fe);
291 });
292 }
293
294 private class MidFlowsTask implements Runnable {
295 @Override
296 public void run() {
297 if (sw.getRole() == RoleState.MASTER) {
298 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
299
300 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
301 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
302 callCountMidFlowsTask = MID_POLL_TIMES;
303 } else {
304 midFlowsTaskInternal();
305 callCountMidFlowsTask += MID_POLL_TIMES;
306 }
307 }
308 }
309 }
310
311 private void midFlowsTaskInternal() {
312 deviceFlowTable.getMidFlows().forEach(fe -> {
313 ofFlowStatsRequestFlowSend(fe);
314 });
315 }
316
317 private class LongFlowsTask implements Runnable {
318 @Override
319 public void run() {
320 if (sw.getRole() == RoleState.MASTER) {
321 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
322
323 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
324 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
325 callCountLongFlowsTask = LONG_POLL_TIMES;
326 } else {
327 longFlowsTaskInternal();
328 callCountLongFlowsTask += LONG_POLL_TIMES;
329 }
330 }
331 }
332 }
333
334 private void longFlowsTaskInternal() {
335 deviceFlowTable.getLongFlows().forEach(fe -> {
336 ofFlowStatsRequestFlowSend(fe);
337 });
338 }
339
340 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800341 * Starts adaptive flow statistic collection.
Brian O'Connor8c685362015-12-05 15:27:27 -0800342 */
343 public synchronized void start() {
344 log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
345 callCountCalAndShortFlowsTask = 0;
346 callCountMidFlowsTask = 0;
347 callCountLongFlowsTask = 0;
348
349 isFirstTimeStart = true;
350
351 // Initially start polling quickly. Then drop down to configured value
352 calAndShortFlowsTask = new CalAndShortFlowsTask();
353 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
354 calAndShortFlowsTask,
355 1,
356 calAndPollInterval,
357 TimeUnit.SECONDS);
358
359 midFlowsTask = new MidFlowsTask();
360 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
361 midFlowsTask,
362 1,
363 midPollInterval,
364 TimeUnit.SECONDS);
365
366 longFlowsTask = new LongFlowsTask();
367 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
368 longFlowsTask,
369 1,
370 longPollInterval,
371 TimeUnit.SECONDS);
372
373 log.info("Started");
374 }
375
376 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800377 * Stops adaptive flow statistic collection.
Brian O'Connor8c685362015-12-05 15:27:27 -0800378 */
379 public synchronized void stop() {
380 log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
381 if (calAndShortFlowsThread != null) {
382 calAndShortFlowsThread.cancel(true);
383 }
384 if (midFlowsThread != null) {
385 midFlowsThread.cancel(true);
386 }
387 if (longFlowsThread != null) {
388 longFlowsThread.cancel(true);
389 }
390
391 adaptiveFlowStatsScheduler.shutdownNow();
392
393 isFirstTimeStart = false;
394
395 log.info("Stopped");
396 }
397
398 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800399 * Adds typed flow entry from flow rule into the internal flow table.
Brian O'Connor8c685362015-12-05 15:27:27 -0800400 *
401 * @param flowRules the flow rules
Brian O'Connor8c685362015-12-05 15:27:27 -0800402 */
403 public synchronized void addWithFlowRule(FlowRule... flowRules) {
404 for (FlowRule fr : flowRules) {
405 // First remove old entry unconditionally, if exist
406 deviceFlowTable.remove(fr);
407
408 // add new flow entry, we suppose IMMEDIATE_FLOW
409 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
410 FlowLiveType.IMMEDIATE_FLOW);
411 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
412 }
413 }
414
415 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800416 * Adds or updates typed flow entry from flow entry into the internal flow table.
Brian O'Connor8c685362015-12-05 15:27:27 -0800417 *
418 * @param flowEntries the flow entries
Brian O'Connor8c685362015-12-05 15:27:27 -0800419 */
420 public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800421 for (FlowEntry fe : flowEntries) {
422 // check if this new rule is an update to an existing entry
423 TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
Brian O'Connor8c685362015-12-05 15:27:27 -0800424
Jonathan Hart84f4f312016-03-03 08:17:11 -0800425 if (stored != null) {
426 // duplicated flow entry is collected!, just skip
427 if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
428 && fe.life() == stored.life()) {
429 if (log.isTraceEnabled()) {
430 log.trace("addOrUpdateFlows({}): flowId={},is DUPLICATED stats collection, just skip.",
431 sw.getStringId(), fe.id());
432 }
Brian O'Connor8c685362015-12-05 15:27:27 -0800433
Jonathan Hart84f4f312016-03-03 08:17:11 -0800434 //FIXME modification of "stored" flow entry outside of store
435 stored.setLastSeen();
436 continue;
437 } else if (fe.life() < stored.life()) {
438 // Invalid updates the stats values, i.e., bytes, packets, durations ...
439 if (log.isDebugEnabled()) {
440 log.debug(ADD_INVALID_LOG, fe.id(), stored.id(), fe.bytes(),
441 stored.bytes(), fe.life(), stored.life(),
442 fe.lastSeen(), stored.lastSeen());
443 }
444 // go next
445 //FIXME modification of "stored" flow entry outside of store
446 stored.setLastSeen();
447 continue;
448 }
Brian O'Connor8c685362015-12-05 15:27:27 -0800449
Jonathan Hart84f4f312016-03-03 08:17:11 -0800450 // update now
451 //FIXME modification of "stored" flow entry outside of store
Thiago Santos877914d2016-07-20 18:29:29 -0300452 stored.setLife(fe.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
Jonathan Hart84f4f312016-03-03 08:17:11 -0800453 stored.setPackets(fe.packets());
454 stored.setBytes(fe.bytes());
455 stored.setLastSeen();
456 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
457 // flow is really RULE_ADDED
458 stored.setState(FlowEntry.FlowEntryState.ADDED);
459 }
460 // flow is RULE_UPDATED, skip adding and just updating flow live table
461 //deviceFlowTable.calAndSetFlowLiveType(stored);
462 continue;
463 }
Brian O'Connor8c685362015-12-05 15:27:27 -0800464
Jonathan Hart84f4f312016-03-03 08:17:11 -0800465 // add new flow entry, we suppose IMMEDIATE_FLOW
466 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
Brian O'Connor8c685362015-12-05 15:27:27 -0800467 FlowLiveType.IMMEDIATE_FLOW);
Jonathan Hart84f4f312016-03-03 08:17:11 -0800468 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
Brian O'Connor8c685362015-12-05 15:27:27 -0800469 }
470 }
471
472 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800473 * Removes typed flow entry from the internal flow table.
Brian O'Connor8c685362015-12-05 15:27:27 -0800474 *
475 * @param flowRules the flow entries
Brian O'Connor8c685362015-12-05 15:27:27 -0800476 */
477 public synchronized void removeFlows(FlowRule... flowRules) {
478 for (FlowRule rule : flowRules) {
479 deviceFlowTable.remove(rule);
480 }
481 }
482
483 // same as removeFlows() function
484 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800485 * Removes typed flow entry from the internal flow table.
Brian O'Connor8c685362015-12-05 15:27:27 -0800486 *
487 * @param flowRules the flow entries
Brian O'Connor8c685362015-12-05 15:27:27 -0800488 */
489 public void flowRemoved(FlowRule... flowRules) {
490 removeFlows(flowRules);
491 }
492
493 // same as addOrUpdateFlows() function
494 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800495 * Adds or updates typed flow entry from flow entry into the internal flow table.
Brian O'Connor8c685362015-12-05 15:27:27 -0800496 *
497 * @param flowEntries the flow entry list
Brian O'Connor8c685362015-12-05 15:27:27 -0800498 */
499 public void pushFlowMetrics(List<FlowEntry> flowEntries) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800500 flowEntries.forEach(this::addOrUpdateFlows);
Brian O'Connor8c685362015-12-05 15:27:27 -0800501 }
502
503 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800504 * Returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
Brian O'Connor8c685362015-12-05 15:27:27 -0800505 *
506 * @return xid of missing flow
507 */
508 public long getFlowMissingXid() {
509 return flowMissingXid;
510 }
511
512 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800513 * Sets flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
Brian O'Connor8c685362015-12-05 15:27:27 -0800514 *
515 * @param flowMissingXid the OFFlowStatsRequest message Id
Brian O'Connor8c685362015-12-05 15:27:27 -0800516 */
517 public void setFlowMissingXid(long flowMissingXid) {
518 this.flowMissingXid = flowMissingXid;
519 }
520
521 private class InternalDeviceFlowTable {
522
523 private final Map<FlowId, Set<TypedStoredFlowEntry>>
524 flowEntries = Maps.newConcurrentMap();
525
526 private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
527 private final Set<StoredFlowEntry> midFlows = new HashSet<>();
528 private final Set<StoredFlowEntry> longFlows = new HashSet<>();
529
530 // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
531 private final long latencyFlowStatsRequestAndReplyMillis = 500;
532
533
534 // Statistics for table operation
535 private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
536 private long removeCount = 0;
537
538 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800539 * Resets all count values to zero.
Brian O'Connor8c685362015-12-05 15:27:27 -0800540 */
541 public void resetAllCount() {
542 addCount = 0;
543 addWithSetFlowLiveTypeCount = 0;
544 removeCount = 0;
545 }
546
547 // get set of flow entries for the given flowId
548 private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
549 return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
550 }
551
552 // get flow entry for the given flow rule
553 private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
554 Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
555 return flowEntries.stream()
556 .filter(entry -> Objects.equal(entry, rule))
557 .findAny()
558 .orElse(null);
559 }
560
561 // get the flow entries for all flows in flow table
562 private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
563 Set<TypedStoredFlowEntry> result = Sets.newHashSet();
564
565 flowEntries.values().forEach(result::addAll);
566 return result;
567 }
568
569 /**
570 * Gets the number of flow entry in flow table.
571 *
Jonathan Hart84f4f312016-03-03 08:17:11 -0800572 * @return the number of flow entry
Brian O'Connor8c685362015-12-05 15:27:27 -0800573 */
574 public long getFlowCount() {
575 return flowEntries.values().stream().mapToLong(Set::size).sum();
576 }
577
578 /**
579 * Gets the number of flow entry in flow table.
580 *
581 * @param rule the flow rule
Jonathan Hart84f4f312016-03-03 08:17:11 -0800582 * @return the typed flow entry
Brian O'Connor8c685362015-12-05 15:27:27 -0800583 */
584 public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
585 checkNotNull(rule);
586
587 return getFlowEntryInternal(rule);
588 }
589
590 /**
591 * Gets the all typed flow entries in flow table.
592 *
Jonathan Hart84f4f312016-03-03 08:17:11 -0800593 * @return the set of typed flow entry
Brian O'Connor8c685362015-12-05 15:27:27 -0800594 */
595 public Set<TypedStoredFlowEntry> getFlowEntries() {
596 return getFlowEntriesInternal();
597 }
598
599 /**
600 * Gets the short typed flow entries in flow table.
601 *
Jonathan Hart84f4f312016-03-03 08:17:11 -0800602 * @return the set of typed flow entry
Brian O'Connor8c685362015-12-05 15:27:27 -0800603 */
604 public Set<StoredFlowEntry> getShortFlows() {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800605 return ImmutableSet.copyOf(shortFlows);
Brian O'Connor8c685362015-12-05 15:27:27 -0800606 }
607
608 /**
609 * Gets the mid typed flow entries in flow table.
610 *
Jonathan Hart84f4f312016-03-03 08:17:11 -0800611 * @return the set of typed flow entry
Brian O'Connor8c685362015-12-05 15:27:27 -0800612 */
613 public Set<StoredFlowEntry> getMidFlows() {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800614 return ImmutableSet.copyOf(midFlows);
Brian O'Connor8c685362015-12-05 15:27:27 -0800615 }
616
617 /**
618 * Gets the long typed flow entries in flow table.
619 *
Jonathan Hart84f4f312016-03-03 08:17:11 -0800620 * @return the set of typed flow entry
Brian O'Connor8c685362015-12-05 15:27:27 -0800621 */
622 public Set<StoredFlowEntry> getLongFlows() {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800623 return ImmutableSet.copyOf(longFlows);
Brian O'Connor8c685362015-12-05 15:27:27 -0800624 }
625
626 /**
627 * Add typed flow entry into table only.
628 *
629 * @param rule the flow rule
Brian O'Connor8c685362015-12-05 15:27:27 -0800630 */
631 public synchronized void add(TypedStoredFlowEntry rule) {
632 checkNotNull(rule);
633
634 //rule have to be new DefaultTypedFlowEntry
635 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
636
637 if (result) {
638 addCount++;
639 }
640 }
641
642 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800643 * Calculates and sets the flow live type at the first time,
Brian O'Connor8c685362015-12-05 15:27:27 -0800644 * and then add it into a corresponding typed flow table.
645 *
646 * @param rule the flow rule
Brian O'Connor8c685362015-12-05 15:27:27 -0800647 */
648 public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
649 checkNotNull(rule);
650
651 calAndSetFlowLiveTypeInternal(rule);
652 }
653
654 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800655 * Adds the typed flow entry into table, and calculates and set the flow live type,
Brian O'Connor8c685362015-12-05 15:27:27 -0800656 * and then add it into a corresponding typed flow table.
657 *
658 * @param rule the flow rule
Brian O'Connor8c685362015-12-05 15:27:27 -0800659 */
Jonathan Hart84f4f312016-03-03 08:17:11 -0800660 public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
Brian O'Connor8c685362015-12-05 15:27:27 -0800661 checkNotNull(rule);
662
663 //rule have to be new DefaultTypedFlowEntry
664 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
665 if (result) {
666 calAndSetFlowLiveTypeInternal(rule);
667 addWithSetFlowLiveTypeCount++;
668 } else {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800669 if (log.isDebugEnabled()) {
670 log.debug("FlowId {} ADD failed, it may already exist in table - {}",
671 rule.id(), sw.getStringId());
672 }
Brian O'Connor8c685362015-12-05 15:27:27 -0800673 }
674 }
675
676 // In real, calculates and set the flow live type at the first time,
677 // and then add it into a corresponding typed flow table
678 private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
679 long life = rule.life();
680 FlowLiveType prevFlowLiveType = rule.flowLiveType();
681
682 if (life >= longPollInterval) {
683 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
684 longFlows.add(rule);
685 } else if (life >= midPollInterval) {
686 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
687 midFlows.add(rule);
688 } else if (life >= calAndPollInterval) {
689 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
690 shortFlows.add(rule);
691 } else if (life >= 0) {
692 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
693 } else { // life < 0
694 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
695 }
696
697 if (rule.flowLiveType() != prevFlowLiveType) {
698 switch (prevFlowLiveType) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800699 // delete it from previous flow table
700 case SHORT_FLOW:
701 shortFlows.remove(rule);
702 break;
703 case MID_FLOW:
704 midFlows.remove(rule);
705 break;
706 case LONG_FLOW:
707 longFlows.remove(rule);
708 break;
709 default:
710 break;
Brian O'Connor8c685362015-12-05 15:27:27 -0800711 }
712 }
713 }
714
715
716 // check the flow live type based on current time, then set and add it into corresponding table
717 private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
718 long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
719 // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
720 long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
721 // fe.life() unit is SECOND!
722 long liveTime = fe.life() + fromLastSeen;
723
724
725 switch (fe.flowLiveType()) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800726 case IMMEDIATE_FLOW:
727 if (liveTime >= longPollInterval) {
728 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
729 longFlows.add(fe);
730 } else if (liveTime >= midPollInterval) {
731 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
732 midFlows.add(fe);
733 } else if (liveTime >= calAndPollInterval) {
734 fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
735 shortFlows.add(fe);
736 }
737 break;
738 case SHORT_FLOW:
739 if (liveTime >= longPollInterval) {
740 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
741 shortFlows.remove(fe);
742 longFlows.add(fe);
743 } else if (liveTime >= midPollInterval) {
744 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
745 shortFlows.remove(fe);
746 midFlows.add(fe);
747 }
748 break;
749 case MID_FLOW:
750 if (liveTime >= longPollInterval) {
751 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
752 midFlows.remove(fe);
753 longFlows.add(fe);
754 }
755 break;
756 case LONG_FLOW:
757 if (fromLastSeen > entirePollInterval) {
758 log.trace("checkAndMoveLiveFlowInternal: flow is already removed at switch.");
Brian O'Connor8c685362015-12-05 15:27:27 -0800759 return false;
Jonathan Hart84f4f312016-03-03 08:17:11 -0800760 }
761 break;
762 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
763 default:
764 log.error("Unknown live type error for {}", sw.getStringId());
765 return false;
Brian O'Connor8c685362015-12-05 15:27:27 -0800766 }
767
Jonathan Hart84f4f312016-03-03 08:17:11 -0800768 if (log.isTraceEnabled()) {
769 log.trace(CHECK_AND_MOVE_LOG, fe.id(), fe.state(), fe.flowLiveType(),
770 liveTime, fe.life(), fe.bytes(), fe.packets(), fromLastSeen,
771 fe.priority(), fe.selector().criteria(), fe.treatment(),
772 sw.getStringId());
773 }
Brian O'Connor8c685362015-12-05 15:27:27 -0800774
775 return true;
776 }
777
778 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800779 * Checks and moves live type for all type flow entries in table at every calAndPollInterval time.
Brian O'Connor8c685362015-12-05 15:27:27 -0800780 */
781 public void checkAndMoveLiveFlowAll() {
782 Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
783
784 long calCurTime = System.currentTimeMillis();
785 typedFlowEntries.forEach(fe -> {
786 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
787 remove(fe);
788 }
789 });
790
791 // print table counts for debug
Jonathan Hart84f4f312016-03-03 08:17:11 -0800792 if (log.isTraceEnabled()) {
Brian O'Connor8c685362015-12-05 15:27:27 -0800793 synchronized (this) {
794 long totalFlowCount = getFlowCount();
795 long shortFlowCount = shortFlows.size();
796 long midFlowCount = midFlows.size();
797 long longFlowCount = longFlows.size();
798 long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
799 long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
800
Jonathan Hart84f4f312016-03-03 08:17:11 -0800801 log.trace(CHECK_AND_MOVE_COUNT_LOG, totalFlowCount, calTotalCount,
802 immediateFlowCount, shortFlowCount, midFlowCount, longFlowCount,
803 addCount, addWithSetFlowLiveTypeCount, removeCount, sw.getStringId());
804
Brian O'Connor8c685362015-12-05 15:27:27 -0800805 if (totalFlowCount != calTotalCount) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800806 log.error("Real total flow count and calculated total flow count do NOT match");
Brian O'Connor8c685362015-12-05 15:27:27 -0800807 }
808 if (immediateFlowCount < 0) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800809 log.error("Immediate flow count is negative");
Brian O'Connor8c685362015-12-05 15:27:27 -0800810 }
811 }
812 }
Jonathan Hart84f4f312016-03-03 08:17:11 -0800813 log.trace("checkAndMoveLiveFlowAll: adaptiveStats for {}", sw.getStringId());
Brian O'Connor8c685362015-12-05 15:27:27 -0800814 }
815
816 /**
Jonathan Hart84f4f312016-03-03 08:17:11 -0800817 * Removes the typed flow entry from table.
Brian O'Connor8c685362015-12-05 15:27:27 -0800818 *
819 * @param rule the flow rule
Brian O'Connor8c685362015-12-05 15:27:27 -0800820 */
821 public synchronized void remove(FlowRule rule) {
822 checkNotNull(rule);
823
824 TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
825 if (removeStore != null) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800826 removeLiveFlowsInternal(removeStore);
Brian O'Connor8c685362015-12-05 15:27:27 -0800827 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
828
829 if (result) {
830 removeCount++;
831 }
832 }
Jonathan Hart84f4f312016-03-03 08:17:11 -0800833 }
Brian O'Connor8c685362015-12-05 15:27:27 -0800834
835 // Remove the typed flow entry from corresponding table
836 private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
837 switch (fe.flowLiveType()) {
Jonathan Hart84f4f312016-03-03 08:17:11 -0800838 case IMMEDIATE_FLOW:
839 // do nothing
840 break;
841 case SHORT_FLOW:
842 shortFlows.remove(fe);
843 break;
844 case MID_FLOW:
845 midFlows.remove(fe);
846 break;
847 case LONG_FLOW:
848 longFlows.remove(fe);
849 break;
850 default: // error in Flow Live Type
851 log.error("removeLiveFlowsInternal: unknown live type error");
852 break;
Brian O'Connor8c685362015-12-05 15:27:27 -0800853 }
854 }
855 }
856}