blob: d5d323d75f3c27da63a4d452cbee78c82aafac84 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001package net.floodlightcontroller.flowcache;
2
3import java.util.ArrayList;
4import java.util.Collection;
5import java.util.Date;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9import java.util.Queue;
10import java.util.concurrent.ConcurrentLinkedQueue;
11import java.util.concurrent.ScheduledExecutorService;
12import java.util.concurrent.TimeUnit;
13
14import net.floodlightcontroller.core.module.FloodlightModuleContext;
15import net.floodlightcontroller.core.module.FloodlightModuleException;
16import net.floodlightcontroller.core.module.IFloodlightModule;
17import net.floodlightcontroller.core.module.IFloodlightService;
18import net.floodlightcontroller.core.util.ListenerDispatcher;
19import net.floodlightcontroller.core.util.SingletonTask;
20import net.floodlightcontroller.counter.CounterStore;
21import net.floodlightcontroller.counter.ICounter;
22import net.floodlightcontroller.counter.ICounterStoreService;
23import net.floodlightcontroller.counter.SimpleCounter;
24import net.floodlightcontroller.devicemanager.IDevice;
25import net.floodlightcontroller.flowcache.IFlowCacheService.FCQueryEvType;
26import net.floodlightcontroller.flowcache.IFlowReconcileListener;
27import net.floodlightcontroller.flowcache.OFMatchReconcile;
28import net.floodlightcontroller.threadpool.IThreadPoolService;
29
30import org.openflow.protocol.OFType;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34public class FlowReconcileManager
35 implements IFloodlightModule, IFlowReconcileService {
36
37 /** The logger. */
38 private static Logger logger =
39 LoggerFactory.getLogger(FlowReconcileManager.class);
40
41 /** Reference to dependent modules */
42 protected IThreadPoolService threadPool;
43 protected ICounterStoreService counterStore;
44
45 /**
46 * The list of flow reconcile listeners that have registered to get
47 * flow reconcile callbacks. Such callbacks are invoked, for example, when
48 * a switch with existing flow-mods joins this controller and those flows
49 * need to be reconciled with the current configuration of the controller.
50 */
51 protected ListenerDispatcher<OFType, IFlowReconcileListener>
52 flowReconcileListeners;
53
54 /** A FIFO queue to keep all outstanding flows for reconciliation */
55 Queue<OFMatchReconcile> flowQueue;
56
57 /** Asynchronous task to feed the flowReconcile pipeline */
58 protected SingletonTask flowReconcileTask;
59
60 String controllerPktInCounterName;
61 protected SimpleCounter lastPacketInCounter;
62
63 protected static int MAX_SYSTEM_LOAD_PER_SECOND = 50000;
64 /** a minimum flow reconcile rate so that it won't stave */
65 protected static int MIN_FLOW_RECONCILE_PER_SECOND = 1000;
66
67 /** once per second */
68 protected static int FLOW_RECONCILE_DELAY_MILLISEC = 10;
69 protected Date lastReconcileTime;
70
71 /** Config to enable or disable flowReconcile */
72 protected static final String EnableConfigKey = "enable";
73 protected boolean flowReconcileEnabled;
74
75 public int flowReconcileThreadRunCount;
76
77 @Override
78 public synchronized void addFlowReconcileListener(
79 IFlowReconcileListener listener) {
80 flowReconcileListeners.addListener(OFType.FLOW_MOD, listener);
81
82 if (logger.isTraceEnabled()) {
83 StringBuffer sb = new StringBuffer();
84 sb.append("FlowMod listeners: ");
85 for (IFlowReconcileListener l :
86 flowReconcileListeners.getOrderedListeners()) {
87 sb.append(l.getName());
88 sb.append(",");
89 }
90 logger.trace(sb.toString());
91 }
92 }
93
94 @Override
95 public synchronized void removeFlowReconcileListener(
96 IFlowReconcileListener listener) {
97 flowReconcileListeners.removeListener(listener);
98 }
99
100 @Override
101 public synchronized void clearFlowReconcileListeners() {
102 flowReconcileListeners.clearListeners();
103 }
104
105 /**
106 * Add to-be-reconciled flow to the queue.
107 *
108 * @param ofmRcIn the ofm rc in
109 */
110 public void reconcileFlow(OFMatchReconcile ofmRcIn) {
111 if (ofmRcIn == null) return;
112
113 // Make a copy before putting on the queue.
114 OFMatchReconcile myOfmRc = new OFMatchReconcile(ofmRcIn);
115
116 flowQueue.add(myOfmRc);
117
118 Date currTime = new Date();
119 long delay = 0;
120
121 /** schedule reconcile task immidiately if it has been more than 1 sec
122 * since the last run. Otherwise, schedule the reconcile task in
123 * DELAY_MILLISEC.
124 */
125 if (currTime.after(new Date(lastReconcileTime.getTime() + 1000))) {
126 delay = 0;
127 } else {
128 delay = FLOW_RECONCILE_DELAY_MILLISEC;
129 }
130 flowReconcileTask.reschedule(delay, TimeUnit.MILLISECONDS);
131
132 if (logger.isTraceEnabled()) {
133 logger.trace("Reconciling flow: {}, total: {}",
134 myOfmRc.toString(), flowQueue.size());
135 }
136 }
137
138 @Override
139 public void updateFlowForDestinationDevice(IDevice device,
140 IFlowQueryHandler handler,
141 FCQueryEvType fcEvType) {
142 // NO-OP
143 }
144
145 @Override
146 public void updateFlowForSourceDevice(IDevice device,
147 IFlowQueryHandler handler,
148 FCQueryEvType fcEvType) {
149 // NO-OP
150 }
151
152 @Override
153 public void flowQueryGenericHandler(FlowCacheQueryResp flowResp) {
154 if (flowResp.queryObj.evType != FCQueryEvType.GET) {
155 OFMatchReconcile ofmRc = new OFMatchReconcile();;
156 /* Re-provision these flows */
157 for (QRFlowCacheObj entry : flowResp.qrFlowCacheObjList) {
158 /* reconcile the flows in entry */
159 entry.toOFMatchReconcile(ofmRc,
160 flowResp.queryObj.applInstName,
161 OFMatchReconcile.ReconcileAction.UPDATE_PATH);
162 reconcileFlow(ofmRc);
163 }
164 }
165 return;
166 }
167
168 // IFloodlightModule
169
170 @Override
171 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
172 Collection<Class<? extends IFloodlightService>> l =
173 new ArrayList<Class<? extends IFloodlightService>>();
174 l.add(IFlowReconcileService.class);
175 return l;
176 }
177
178 @Override
179 public Map<Class<? extends IFloodlightService>, IFloodlightService>
180 getServiceImpls() {
181 Map<Class<? extends IFloodlightService>,
182 IFloodlightService> m =
183 new HashMap<Class<? extends IFloodlightService>,
184 IFloodlightService>();
185 m.put(IFlowReconcileService.class, this);
186 return m;
187 }
188
189 @Override
190 public Collection<Class<? extends IFloodlightService>>
191 getModuleDependencies() {
192 Collection<Class<? extends IFloodlightService>> l =
193 new ArrayList<Class<? extends IFloodlightService>>();
194 l.add(IThreadPoolService.class);
195 l.add(ICounterStoreService.class);
196 return null;
197 }
198
199 @Override
200 public void init(FloodlightModuleContext context)
201 throws FloodlightModuleException {
202 threadPool = context.getServiceImpl(IThreadPoolService.class);
203 counterStore = context.getServiceImpl(ICounterStoreService.class);
204
205 flowQueue = new ConcurrentLinkedQueue<OFMatchReconcile>();
206 flowReconcileListeners =
207 new ListenerDispatcher<OFType, IFlowReconcileListener>();
208
209 Map<String, String> configParam = context.getConfigParams(this);
210 String enableValue = configParam.get(EnableConfigKey);
211 // Set flowReconcile default to true
212 flowReconcileEnabled = true;
213 if (enableValue != null &&
214 enableValue.equalsIgnoreCase("false")) {
215 flowReconcileEnabled = false;
216 }
217
218 flowReconcileThreadRunCount = 0;
219 lastReconcileTime = new Date(0);
220 logger.debug("FlowReconcile is {}", flowReconcileEnabled);
221 }
222
223 @Override
224 public void startUp(FloodlightModuleContext context) {
225 // thread to do flow reconcile
226 ScheduledExecutorService ses = threadPool.getScheduledExecutor();
227 flowReconcileTask = new SingletonTask(ses, new Runnable() {
228 @Override
229 public void run() {
230 try {
231 if (doReconcile()) {
232 flowReconcileTask.reschedule(
233 FLOW_RECONCILE_DELAY_MILLISEC,
234 TimeUnit.MILLISECONDS);
235 }
236 } catch (Exception e) {
237 logger.warn("Exception in doReconcile(): {}",
238 e.getMessage());
239 e.printStackTrace();
240 }
241 }
242 });
243
244 String packetInName = OFType.PACKET_IN.toClass().getName();
245 packetInName = packetInName.substring(packetInName.lastIndexOf('.')+1);
246
247 // Construct controller counter for the packet_in
248 controllerPktInCounterName =
249 CounterStore.createCounterName(ICounterStoreService.CONTROLLER_NAME,
250 -1,
251 packetInName);
252 }
253
254 /**
255 * Feed the flows into the flow reconciliation pipeline.
256 * @return true if more flows to be reconciled
257 * false if no more flows to be reconciled.
258 */
259 protected boolean doReconcile() {
260 if (!flowReconcileEnabled) {
261 return false;
262 }
263
264 // Record the execution time.
265 lastReconcileTime = new Date();
266
267 ArrayList<OFMatchReconcile> ofmRcList =
268 new ArrayList<OFMatchReconcile>();
269
270 // Get the maximum number of flows that can be reconciled.
271 int reconcileCapacity = getCurrentCapacity();
272 if (logger.isTraceEnabled()) {
273 logger.trace("Reconcile capacity {} flows", reconcileCapacity);
274 }
275 while (!flowQueue.isEmpty() && reconcileCapacity > 0) {
276 OFMatchReconcile ofmRc = flowQueue.poll();
277 reconcileCapacity--;
278 if (ofmRc != null) {
279 ofmRcList.add(ofmRc);
280 if (logger.isTraceEnabled()) {
281 logger.trace("Add flow {} to be the reconcileList", ofmRc.cookie);
282 }
283 } else {
284 break;
285 }
286 }
287
288 // Run the flow through all the flow reconcile listeners
289 IFlowReconcileListener.Command retCmd;
290 if (ofmRcList.size() > 0) {
291 List<IFlowReconcileListener> listeners =
292 flowReconcileListeners.getOrderedListeners();
293 if (listeners == null) {
294 if (logger.isTraceEnabled()) {
295 logger.trace("No flowReconcile listener");
296 }
297 return false;
298 }
299
300 for (IFlowReconcileListener flowReconciler :
301 flowReconcileListeners.getOrderedListeners()) {
302 if (logger.isTraceEnabled()) {
303 logger.trace("Reconciling flow: call listener {}",
304 flowReconciler.getName());
305 }
306 retCmd = flowReconciler.reconcileFlows(ofmRcList);
307 if (retCmd == IFlowReconcileListener.Command.STOP) {
308 break;
309 }
310 }
311 flowReconcileThreadRunCount++;
312 } else {
313 if (logger.isTraceEnabled()) {
314 logger.trace("No flow to be reconciled.");
315 }
316 }
317
318 // Return true if there are more flows to be reconciled
319 if (flowQueue.isEmpty()) {
320 return false;
321 } else {
322 if (logger.isTraceEnabled()) {
323 logger.trace("{} more flows to be reconciled.",
324 flowQueue.size());
325 }
326 return true;
327 }
328 }
329
330 /**
331 * Compute the maximum number of flows to be reconciled.
332 *
333 * It computes the packetIn increment from the counter values in
334 * the counter store;
335 * Then computes the rate based on the elapsed time
336 * from the last query;
337 * Then compute the max flow reconcile rate by subtracting the packetIn
338 * rate from the hard-coded max system rate.
339 * If the system rate is reached or less than MIN_FLOW_RECONCILE_PER_SECOND,
340 * set the maximum flow reconcile rate to the MIN_FLOW_RECONCILE_PER_SECOND
341 * to prevent starvation.
342 * Then convert the rate to an absolute number for the
343 * FLOW_RECONCILE_PERIOD.
344 * @return
345 */
346 protected int getCurrentCapacity() {
347 ICounter pktInCounter =
348 counterStore.getCounter(controllerPktInCounterName);
349 int minFlows = MIN_FLOW_RECONCILE_PER_SECOND *
350 FLOW_RECONCILE_DELAY_MILLISEC / 1000;
351
352 // If no packetInCounter, then there shouldn't be any flow.
353 if (pktInCounter == null ||
354 pktInCounter.getCounterDate() == null ||
355 pktInCounter.getCounterValue() == null) {
356 logger.debug("counter {} doesn't exist",
357 controllerPktInCounterName);
358 return minFlows;
359 }
360
361 // Haven't get any counter yet.
362 if (lastPacketInCounter == null) {
363 logger.debug("First time get the count for {}",
364 controllerPktInCounterName);
365 lastPacketInCounter = (SimpleCounter)
366 SimpleCounter.createCounter(pktInCounter);
367 return minFlows;
368 }
369
370 int pktInRate = getPktInRate(pktInCounter, new Date());
371
372 // Update the last packetInCounter
373 lastPacketInCounter = (SimpleCounter)
374 SimpleCounter.createCounter(pktInCounter);
375 int capacity = minFlows;
376 if ((pktInRate + MIN_FLOW_RECONCILE_PER_SECOND) <=
377 MAX_SYSTEM_LOAD_PER_SECOND) {
378 capacity = (MAX_SYSTEM_LOAD_PER_SECOND - pktInRate)
379 * FLOW_RECONCILE_DELAY_MILLISEC / 1000;
380 }
381
382 if (logger.isTraceEnabled()) {
383 logger.trace("Capacity is {}", capacity);
384 }
385 return capacity;
386 }
387
388 protected int getPktInRate(ICounter newCnt, Date currentTime) {
389 if (newCnt == null ||
390 newCnt.getCounterDate() == null ||
391 newCnt.getCounterValue() == null) {
392 return 0;
393 }
394
395 // Somehow the system time is messed up. return max packetIn rate
396 // to reduce the system load.
397 if (newCnt.getCounterDate().before(
398 lastPacketInCounter.getCounterDate())) {
399 logger.debug("Time is going backward. new {}, old {}",
400 newCnt.getCounterDate(),
401 lastPacketInCounter.getCounterDate());
402 return MAX_SYSTEM_LOAD_PER_SECOND;
403 }
404
405 long elapsedTimeInSecond = (currentTime.getTime() -
406 lastPacketInCounter.getCounterDate().getTime()) / 1000;
407 if (elapsedTimeInSecond == 0) {
408 // This should never happen. Check to avoid division by zero.
409 return 0;
410 }
411
412 long diff = 0;
413 switch (newCnt.getCounterValue().getType()) {
414 case LONG:
415 long newLong = newCnt.getCounterValue().getLong();
416 long oldLong = lastPacketInCounter.getCounterValue().getLong();
417 if (newLong < oldLong) {
418 // Roll over event
419 diff = Long.MAX_VALUE - oldLong + newLong;
420 } else {
421 diff = newLong - oldLong;
422 }
423 break;
424
425 case DOUBLE:
426 double newDouble = newCnt.getCounterValue().getDouble();
427 double oldDouble = lastPacketInCounter.getCounterValue().getDouble();
428 if (newDouble < oldDouble) {
429 // Roll over event
430 diff = (long)(Double.MAX_VALUE - oldDouble + newDouble);
431 } else {
432 diff = (long)(newDouble - oldDouble);
433 }
434 break;
435 }
436
437 return (int)(diff/elapsedTimeInSecond);
438 }
439}
440