blob: 8a35b54e7a96b26b94f80f051ec2c34179085011 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.util.ArrayList;
26import java.nio.channels.ClosedChannelException;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Date;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.LinkedHashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Properties;
38import java.util.Set;
39import java.util.Stack;
40import java.util.concurrent.BlockingQueue;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ConcurrentMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.Executors;
45import java.util.concurrent.Future;
46import java.util.concurrent.LinkedBlockingQueue;
47import java.util.concurrent.RejectedExecutionException;
48import java.util.concurrent.TimeUnit;
49import java.util.concurrent.TimeoutException;
50
51import net.floodlightcontroller.core.FloodlightContext;
52import net.floodlightcontroller.core.IFloodlightProviderService;
53import net.floodlightcontroller.core.IHAListener;
54import net.floodlightcontroller.core.IInfoProvider;
Pankaj Berde8557a462013-01-07 08:59:31 -080055import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IOFMessageListener;
57import net.floodlightcontroller.core.IListener.Command;
58import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080061import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
69import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
70import net.floodlightcontroller.restserver.IRestApiService;
71import net.floodlightcontroller.storage.IResultSet;
72import net.floodlightcontroller.storage.IStorageSourceListener;
73import net.floodlightcontroller.storage.IStorageSourceService;
74import net.floodlightcontroller.storage.OperatorPredicate;
75import net.floodlightcontroller.storage.StorageException;
76import net.floodlightcontroller.threadpool.IThreadPoolService;
77
78import org.jboss.netty.bootstrap.ServerBootstrap;
79import org.jboss.netty.buffer.ChannelBuffer;
80import org.jboss.netty.buffer.ChannelBuffers;
81import org.jboss.netty.channel.Channel;
82import org.jboss.netty.channel.ChannelHandlerContext;
83import org.jboss.netty.channel.ChannelPipelineFactory;
84import org.jboss.netty.channel.ChannelStateEvent;
85import org.jboss.netty.channel.ChannelUpstreamHandler;
86import org.jboss.netty.channel.Channels;
87import org.jboss.netty.channel.ExceptionEvent;
88import org.jboss.netty.channel.MessageEvent;
89import org.jboss.netty.channel.group.ChannelGroup;
90import org.jboss.netty.channel.group.DefaultChannelGroup;
91import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
92import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
93import org.jboss.netty.handler.timeout.IdleStateEvent;
94import org.jboss.netty.handler.timeout.ReadTimeoutException;
95import org.openflow.protocol.OFEchoReply;
96import org.openflow.protocol.OFError;
97import org.openflow.protocol.OFError.OFBadActionCode;
98import org.openflow.protocol.OFError.OFBadRequestCode;
99import org.openflow.protocol.OFError.OFErrorType;
100import org.openflow.protocol.OFError.OFFlowModFailedCode;
101import org.openflow.protocol.OFError.OFHelloFailedCode;
102import org.openflow.protocol.OFError.OFPortModFailedCode;
103import org.openflow.protocol.OFError.OFQueueOpFailedCode;
104import org.openflow.protocol.OFFeaturesReply;
105import org.openflow.protocol.OFGetConfigReply;
106import org.openflow.protocol.OFMessage;
107import org.openflow.protocol.OFPacketIn;
108import org.openflow.protocol.OFPhysicalPort;
109import org.openflow.protocol.OFPortStatus;
110import org.openflow.protocol.OFPortStatus.OFPortReason;
111import org.openflow.protocol.OFSetConfig;
112import org.openflow.protocol.OFStatisticsRequest;
113import org.openflow.protocol.OFSwitchConfig;
114import org.openflow.protocol.OFType;
115import org.openflow.protocol.OFVendor;
116import org.openflow.protocol.factory.BasicFactory;
117import org.openflow.protocol.factory.MessageParseException;
118import org.openflow.protocol.statistics.OFDescriptionStatistics;
119import org.openflow.protocol.statistics.OFStatistics;
120import org.openflow.protocol.statistics.OFStatisticsType;
121import org.openflow.protocol.vendor.OFBasicVendorDataType;
122import org.openflow.protocol.vendor.OFBasicVendorId;
123import org.openflow.protocol.vendor.OFVendorId;
124import org.openflow.util.HexString;
125import org.openflow.util.U16;
126import org.openflow.util.U32;
127import org.openflow.vendor.nicira.OFNiciraVendorData;
128import org.openflow.vendor.nicira.OFRoleReplyVendorData;
129import org.openflow.vendor.nicira.OFRoleRequestVendorData;
130import org.openflow.vendor.nicira.OFRoleVendorData;
131import org.slf4j.Logger;
132import org.slf4j.LoggerFactory;
133
134
135/**
136 * The main controller class. Handles all setup and network listeners
137 */
138public class Controller implements IFloodlightProviderService,
139 IStorageSourceListener {
140
Pankaj Berde8557a462013-01-07 08:59:31 -0800141 protected SwitchStorageImpl swStore;
142
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800143 protected static Logger log = LoggerFactory.getLogger(Controller.class);
144
145 private static final String ERROR_DATABASE =
146 "The controller could not communicate with the system database.";
147
148 protected BasicFactory factory;
149 protected ConcurrentMap<OFType,
150 ListenerDispatcher<OFType,IOFMessageListener>>
151 messageListeners;
152 // The activeSwitches map contains only those switches that are actively
153 // being controlled by us -- it doesn't contain switches that are
154 // in the slave role
155 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
156 // connectedSwitches contains all connected switches, including ones where
157 // we're a slave controller. We need to keep track of them so that we can
158 // send role request messages to switches when our role changes to master
159 // We add a switch to this set after it successfully completes the
160 // handshake. Access to this Set needs to be synchronized with roleChanger
161 protected HashSet<OFSwitchImpl> connectedSwitches;
162
163 // The controllerNodeIPsCache maps Controller IDs to their IP address.
164 // It's only used by handleControllerNodeIPsChanged
165 protected HashMap<String, String> controllerNodeIPsCache;
166
167 protected Set<IOFSwitchListener> switchListeners;
168 protected Set<IHAListener> haListeners;
169 protected Map<String, List<IInfoProvider>> providerMap;
170 protected BlockingQueue<IUpdate> updates;
171
172 // Module dependencies
173 protected IRestApiService restApi;
174 protected ICounterStoreService counterStore = null;
175 protected IStorageSourceService storageSource;
176 protected IPktInProcessingTimeService pktinProcTime;
177 protected IThreadPoolService threadPool;
178
179 // Configuration options
180 protected int openFlowPort = 6633;
181 protected int workerThreads = 0;
182 // The id for this controller node. Should be unique for each controller
183 // node in a controller cluster.
184 protected String controllerId = "localhost";
185
186 // The current role of the controller.
187 // If the controller isn't configured to support roles, then this is null.
188 protected Role role;
189 // A helper that handles sending and timeout handling for role requests
190 protected RoleChanger roleChanger;
191
192 // Start time of the controller
193 protected long systemStartTime;
194
195 // Flag to always flush flow table on switch reconnect (HA or otherwise)
196 protected boolean alwaysClearFlowsOnSwAdd = false;
197
198 // Storage table names
199 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
200 protected static final String CONTROLLER_ID = "id";
201
202 protected static final String SWITCH_TABLE_NAME = "controller_switch";
203 protected static final String SWITCH_DATAPATH_ID = "dpid";
204 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
205 protected static final String SWITCH_IP = "ip";
206 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
207 protected static final String SWITCH_ACTIVE = "active";
208 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
209 protected static final String SWITCH_CAPABILITIES = "capabilities";
210 protected static final String SWITCH_BUFFERS = "buffers";
211 protected static final String SWITCH_TABLES = "tables";
212 protected static final String SWITCH_ACTIONS = "actions";
213
214 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
215 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
216
217 protected static final String PORT_TABLE_NAME = "controller_port";
218 protected static final String PORT_ID = "id";
219 protected static final String PORT_SWITCH = "switch_id";
220 protected static final String PORT_NUMBER = "number";
221 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
222 protected static final String PORT_NAME = "name";
223 protected static final String PORT_CONFIG = "config";
224 protected static final String PORT_STATE = "state";
225 protected static final String PORT_CURRENT_FEATURES = "current_features";
226 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
227 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
228 protected static final String PORT_PEER_FEATURES = "peer_features";
229
230 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
231 protected static final String CONTROLLER_INTERFACE_ID = "id";
232 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
233 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
234 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
235 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
236
237
238
239 // Perf. related configuration
240 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
241 protected static final int BATCH_MAX_SIZE = 100;
242 protected static final boolean ALWAYS_DECODE_ETH = true;
243
244 /**
245 * Updates handled by the main loop
246 */
247 protected interface IUpdate {
248 /**
249 * Calls the appropriate listeners
250 */
251 public void dispatch();
252 }
253 public enum SwitchUpdateType {
254 ADDED,
255 REMOVED,
256 PORTCHANGED
257 }
258 /**
259 * Update message indicating a switch was added or removed
260 */
261 protected class SwitchUpdate implements IUpdate {
262 public IOFSwitch sw;
263 public SwitchUpdateType switchUpdateType;
264 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
265 this.sw = sw;
266 this.switchUpdateType = switchUpdateType;
267 }
268 public void dispatch() {
269 if (log.isTraceEnabled()) {
270 log.trace("Dispatching switch update {} {}",
271 sw, switchUpdateType);
272 }
273 if (switchListeners != null) {
274 for (IOFSwitchListener listener : switchListeners) {
275 switch(switchUpdateType) {
276 case ADDED:
277 listener.addedSwitch(sw);
278 break;
279 case REMOVED:
280 listener.removedSwitch(sw);
281 break;
282 case PORTCHANGED:
283 listener.switchPortChanged(sw.getId());
284 break;
285 }
286 }
287 }
288 }
289 }
290
291 /**
292 * Update message indicating controller's role has changed
293 */
294 protected class HARoleUpdate implements IUpdate {
295 public Role oldRole;
296 public Role newRole;
297 public HARoleUpdate(Role newRole, Role oldRole) {
298 this.oldRole = oldRole;
299 this.newRole = newRole;
300 }
301 public void dispatch() {
302 // Make sure that old and new roles are different.
303 if (oldRole == newRole) {
304 if (log.isTraceEnabled()) {
305 log.trace("HA role update ignored as the old and " +
306 "new roles are the same. newRole = {}" +
307 "oldRole = {}", newRole, oldRole);
308 }
309 return;
310 }
311 if (log.isTraceEnabled()) {
312 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
313 newRole, oldRole);
314 }
315 if (haListeners != null) {
316 for (IHAListener listener : haListeners) {
317 listener.roleChanged(oldRole, newRole);
318 }
319 }
320 }
321 }
322
323 /**
324 * Update message indicating
325 * IPs of controllers in controller cluster have changed.
326 */
327 protected class HAControllerNodeIPUpdate implements IUpdate {
328 public Map<String,String> curControllerNodeIPs;
329 public Map<String,String> addedControllerNodeIPs;
330 public Map<String,String> removedControllerNodeIPs;
331 public HAControllerNodeIPUpdate(
332 HashMap<String,String> curControllerNodeIPs,
333 HashMap<String,String> addedControllerNodeIPs,
334 HashMap<String,String> removedControllerNodeIPs) {
335 this.curControllerNodeIPs = curControllerNodeIPs;
336 this.addedControllerNodeIPs = addedControllerNodeIPs;
337 this.removedControllerNodeIPs = removedControllerNodeIPs;
338 }
339 public void dispatch() {
340 if (log.isTraceEnabled()) {
341 log.trace("Dispatching HA Controller Node IP update "
342 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
343 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
344 removedControllerNodeIPs }
345 );
346 }
347 if (haListeners != null) {
348 for (IHAListener listener: haListeners) {
349 listener.controllerNodeIPsChanged(curControllerNodeIPs,
350 addedControllerNodeIPs, removedControllerNodeIPs);
351 }
352 }
353 }
354 }
355
356 // ***************
357 // Getters/Setters
358 // ***************
359
360 public void setStorageSourceService(IStorageSourceService storageSource) {
361 this.storageSource = storageSource;
362 }
363
364 public void setCounterStore(ICounterStoreService counterStore) {
365 this.counterStore = counterStore;
366 }
367
368 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
369 this.pktinProcTime = pits;
370 }
371
372 public void setRestApiService(IRestApiService restApi) {
373 this.restApi = restApi;
374 }
375
376 public void setThreadPoolService(IThreadPoolService tp) {
377 this.threadPool = tp;
378 }
379
380 @Override
381 public Role getRole() {
382 synchronized(roleChanger) {
383 return role;
384 }
385 }
386
387 @Override
388 public void setRole(Role role) {
389 if (role == null) throw new NullPointerException("Role can not be null.");
390 if (role == Role.MASTER && this.role == Role.SLAVE) {
391 // Reset db state to Inactive for all switches.
392 updateAllInactiveSwitchInfo();
393 }
394
395 // Need to synchronize to ensure a reliable ordering on role request
396 // messages send and to ensure the list of connected switches is stable
397 // RoleChanger will handle the actual sending of the message and
398 // timeout handling
399 // @see RoleChanger
400 synchronized(roleChanger) {
401 if (role.equals(this.role)) {
402 log.debug("Ignoring role change: role is already {}", role);
403 return;
404 }
405
406 Role oldRole = this.role;
407 this.role = role;
408
409 log.debug("Submitting role change request to role {}", role);
410 roleChanger.submitRequest(connectedSwitches, role);
411
412 // Enqueue an update for our listeners.
413 try {
414 this.updates.put(new HARoleUpdate(role, oldRole));
415 } catch (InterruptedException e) {
416 log.error("Failure adding update to queue", e);
417 }
418 }
419 }
420
421
422
423 // **********************
424 // ChannelUpstreamHandler
425 // **********************
426
427 /**
428 * Return a new channel handler for processing a switch connections
429 * @param state The channel state object for the connection
430 * @return the new channel handler
431 */
432 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
433 return new OFChannelHandler(state);
434 }
435
436 /**
437 * Channel handler deals with the switch connection and dispatches
438 * switch messages to the appropriate locations.
439 * @author readams
440 */
441 protected class OFChannelHandler
442 extends IdleStateAwareChannelUpstreamHandler {
443 protected OFSwitchImpl sw;
444 protected OFChannelState state;
445
446 public OFChannelHandler(OFChannelState state) {
447 this.state = state;
448 }
449
450 @Override
451 @LogMessageDoc(message="New switch connection from {ip address}",
452 explanation="A new switch has connected from the " +
453 "specified IP address")
454 public void channelConnected(ChannelHandlerContext ctx,
455 ChannelStateEvent e) throws Exception {
456 log.info("New switch connection from {}",
457 e.getChannel().getRemoteAddress());
458
459 sw = new OFSwitchImpl();
460 sw.setChannel(e.getChannel());
461 sw.setFloodlightProvider(Controller.this);
462 sw.setThreadPoolService(threadPool);
463
464 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
465 msglist.add(factory.getMessage(OFType.HELLO));
466 e.getChannel().write(msglist);
467
468 }
469
470 @Override
471 @LogMessageDoc(message="Disconnected switch {switch information}",
472 explanation="The specified switch has disconnected.")
473 public void channelDisconnected(ChannelHandlerContext ctx,
474 ChannelStateEvent e) throws Exception {
475 if (sw != null && state.hsState == HandshakeState.READY) {
476 if (activeSwitches.containsKey(sw.getId())) {
477 // It's safe to call removeSwitch even though the map might
478 // not contain this particular switch but another with the
479 // same DPID
480 removeSwitch(sw);
481 }
482 synchronized(roleChanger) {
483 connectedSwitches.remove(sw);
484 }
485 sw.setConnected(false);
486 }
487 log.info("Disconnected switch {}", sw);
488 }
489
490 @Override
491 @LogMessageDocs({
492 @LogMessageDoc(level="ERROR",
493 message="Disconnecting switch {switch} due to read timeout",
494 explanation="The connected switch has failed to send any " +
495 "messages or respond to echo requests",
496 recommendation=LogMessageDoc.CHECK_SWITCH),
497 @LogMessageDoc(level="ERROR",
498 message="Disconnecting switch {switch}: failed to " +
499 "complete handshake",
500 explanation="The switch did not respond correctly " +
501 "to handshake messages",
502 recommendation=LogMessageDoc.CHECK_SWITCH),
503 @LogMessageDoc(level="ERROR",
504 message="Disconnecting switch {switch} due to IO Error: {}",
505 explanation="There was an error communicating with the switch",
506 recommendation=LogMessageDoc.CHECK_SWITCH),
507 @LogMessageDoc(level="ERROR",
508 message="Disconnecting switch {switch} due to switch " +
509 "state error: {error}",
510 explanation="The switch sent an unexpected message",
511 recommendation=LogMessageDoc.CHECK_SWITCH),
512 @LogMessageDoc(level="ERROR",
513 message="Disconnecting switch {switch} due to " +
514 "message parse failure",
515 explanation="Could not parse a message from the switch",
516 recommendation=LogMessageDoc.CHECK_SWITCH),
517 @LogMessageDoc(level="ERROR",
518 message="Terminating controller due to storage exception",
519 explanation=ERROR_DATABASE,
520 recommendation=LogMessageDoc.CHECK_CONTROLLER),
521 @LogMessageDoc(level="ERROR",
522 message="Could not process message: queue full",
523 explanation="OpenFlow messages are arriving faster than " +
524 " the controller can process them.",
525 recommendation=LogMessageDoc.CHECK_CONTROLLER),
526 @LogMessageDoc(level="ERROR",
527 message="Error while processing message " +
528 "from switch {switch} {cause}",
529 explanation="An error occurred processing the switch message",
530 recommendation=LogMessageDoc.GENERIC_ACTION)
531 })
532 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
533 throws Exception {
534 if (e.getCause() instanceof ReadTimeoutException) {
535 // switch timeout
536 log.error("Disconnecting switch {} due to read timeout", sw);
537 ctx.getChannel().close();
538 } else if (e.getCause() instanceof HandshakeTimeoutException) {
539 log.error("Disconnecting switch {}: failed to complete handshake",
540 sw);
541 ctx.getChannel().close();
542 } else if (e.getCause() instanceof ClosedChannelException) {
543 //log.warn("Channel for sw {} already closed", sw);
544 } else if (e.getCause() instanceof IOException) {
545 log.error("Disconnecting switch {} due to IO Error: {}",
546 sw, e.getCause().getMessage());
547 ctx.getChannel().close();
548 } else if (e.getCause() instanceof SwitchStateException) {
549 log.error("Disconnecting switch {} due to switch state error: {}",
550 sw, e.getCause().getMessage());
551 ctx.getChannel().close();
552 } else if (e.getCause() instanceof MessageParseException) {
553 log.error("Disconnecting switch " + sw +
554 " due to message parse failure",
555 e.getCause());
556 ctx.getChannel().close();
557 } else if (e.getCause() instanceof StorageException) {
558 log.error("Terminating controller due to storage exception",
559 e.getCause());
560 terminate();
561 } else if (e.getCause() instanceof RejectedExecutionException) {
562 log.warn("Could not process message: queue full");
563 } else {
564 log.error("Error while processing message from switch " + sw,
565 e.getCause());
566 ctx.getChannel().close();
567 }
568 }
569
570 @Override
571 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
572 throws Exception {
573 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
574 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
575 e.getChannel().write(msglist);
576 }
577
578 @Override
579 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
580 throws Exception {
581 if (e.getMessage() instanceof List) {
582 @SuppressWarnings("unchecked")
583 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
584
585 for (OFMessage ofm : msglist) {
586 try {
587 processOFMessage(ofm);
588 }
589 catch (Exception ex) {
590 // We are the last handler in the stream, so run the
591 // exception through the channel again by passing in
592 // ctx.getChannel().
593 Channels.fireExceptionCaught(ctx.getChannel(), ex);
594 }
595 }
596
597 // Flush all flow-mods/packet-out generated from this "train"
598 OFSwitchImpl.flush_all();
599 }
600 }
601
602 /**
603 * Process the request for the switch description
604 */
605 @LogMessageDoc(level="ERROR",
606 message="Exception in reading description " +
607 " during handshake {exception}",
608 explanation="Could not process the switch description string",
609 recommendation=LogMessageDoc.CHECK_SWITCH)
610 void processSwitchDescReply() {
611 try {
612 // Read description, if it has been updated
613 @SuppressWarnings("unchecked")
614 Future<List<OFStatistics>> desc_future =
615 (Future<List<OFStatistics>>)sw.
616 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
617 List<OFStatistics> values =
618 desc_future.get(0, TimeUnit.MILLISECONDS);
619 if (values != null) {
620 OFDescriptionStatistics description =
621 new OFDescriptionStatistics();
622 ChannelBuffer data =
623 ChannelBuffers.buffer(description.getLength());
624 for (OFStatistics f : values) {
625 f.writeTo(data);
626 description.readFrom(data);
627 break; // SHOULD be a list of length 1
628 }
629 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
630 description);
631 sw.setSwitchProperties(description);
632 data = null;
633
634 // At this time, also set other switch properties from storage
635 boolean is_core_switch = false;
636 IResultSet resultSet = null;
637 try {
638 String swid = sw.getStringId();
639 resultSet =
640 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
641 for (Iterator<IResultSet> it =
642 resultSet.iterator(); it.hasNext();) {
643 // In case of multiple rows, use the status
644 // in last row?
645 Map<String, Object> row = it.next().getRow();
646 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
647 if (log.isDebugEnabled()) {
648 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
649 "config for switch={}, is-core={}",
650 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
651 }
652 String ics =
653 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
654 is_core_switch = ics.equals("true");
655 }
656 }
657 }
658 finally {
659 if (resultSet != null)
660 resultSet.close();
661 }
662 if (is_core_switch) {
663 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
664 new Boolean(true));
665 }
666 }
667 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
668 state.hasDescription = true;
669 checkSwitchReady();
670 }
671 catch (InterruptedException ex) {
672 // Ignore
673 }
674 catch (TimeoutException ex) {
675 // Ignore
676 } catch (Exception ex) {
677 log.error("Exception in reading description " +
678 " during handshake", ex);
679 }
680 }
681
682 /**
683 * Send initial switch setup information that we need before adding
684 * the switch
685 * @throws IOException
686 */
687 void sendHelloConfiguration() throws IOException {
688 // Send initial Features Request
689 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
690 }
691
692 /**
693 * Send the configuration requests we can only do after we have
694 * the features reply
695 * @throws IOException
696 */
697 void sendFeatureReplyConfiguration() throws IOException {
698 // Ensure we receive the full packet via PacketIn
699 OFSetConfig config = (OFSetConfig) factory
700 .getMessage(OFType.SET_CONFIG);
701 config.setMissSendLength((short) 0xffff)
702 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
703 sw.write(config, null);
704 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
705 null);
706
707 // Get Description to set switch-specific flags
708 OFStatisticsRequest req = new OFStatisticsRequest();
709 req.setStatisticType(OFStatisticsType.DESC);
710 req.setLengthU(req.getLengthU());
711 Future<List<OFStatistics>> dfuture =
712 sw.getStatistics(req);
713 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
714 dfuture);
715
716 }
717
718 protected void checkSwitchReady() {
719 if (state.hsState == HandshakeState.FEATURES_REPLY &&
720 state.hasDescription && state.hasGetConfigReply) {
721
722 state.hsState = HandshakeState.READY;
723
724 synchronized(roleChanger) {
725 // We need to keep track of all of the switches that are connected
726 // to the controller, in any role, so that we can later send the
727 // role request messages when the controller role changes.
728 // We need to be synchronized while doing this: we must not
729 // send a another role request to the connectedSwitches until
730 // we were able to add this new switch to connectedSwitches
731 // *and* send the current role to the new switch.
732 connectedSwitches.add(sw);
733
734 if (role != null) {
735 // Send a role request if role support is enabled for the controller
736 // This is a probe that we'll use to determine if the switch
737 // actually supports the role request message. If it does we'll
738 // get back a role reply message. If it doesn't we'll get back an
739 // OFError message.
740 // If role is MASTER we will promote switch to active
741 // list when we receive the switch's role reply messages
742 log.debug("This controller's role is {}, " +
743 "sending initial role request msg to {}",
744 role, sw);
745 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
746 swList.add(sw);
747 roleChanger.submitRequest(swList, role);
748 }
749 else {
750 // Role supported not enabled on controller (for now)
751 // automatically promote switch to active state.
752 log.debug("This controller's role is null, " +
753 "not sending role request msg to {}",
754 role, sw);
755 // Need to clear FlowMods before we add the switch
756 // and dispatch updates otherwise we have a race condition.
757 sw.clearAllFlowMods();
758 addSwitch(sw);
759 state.firstRoleReplyReceived = true;
760 }
761 }
762 }
763 }
764
765 /* Handle a role reply message we received from the switch. Since
766 * netty serializes message dispatch we don't need to synchronize
767 * against other receive operations from the same switch, so no need
768 * to synchronize addSwitch(), removeSwitch() operations from the same
769 * connection.
770 * FIXME: However, when a switch with the same DPID connects we do
771 * need some synchronization. However, handling switches with same
772 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
773 * removedSwitch notification):1
774 *
775 */
776 @LogMessageDoc(level="ERROR",
777 message="Invalid role value in role reply message",
778 explanation="Was unable to set the HA role (master or slave) " +
779 "for the controller.",
780 recommendation=LogMessageDoc.CHECK_CONTROLLER)
781 protected void handleRoleReplyMessage(OFVendor vendorMessage,
782 OFRoleReplyVendorData roleReplyVendorData) {
783 // Map from the role code in the message to our role enum
784 int nxRole = roleReplyVendorData.getRole();
785 Role role = null;
786 switch (nxRole) {
787 case OFRoleVendorData.NX_ROLE_OTHER:
788 role = Role.EQUAL;
789 break;
790 case OFRoleVendorData.NX_ROLE_MASTER:
791 role = Role.MASTER;
792 break;
793 case OFRoleVendorData.NX_ROLE_SLAVE:
794 role = Role.SLAVE;
795 break;
796 default:
797 log.error("Invalid role value in role reply message");
798 sw.getChannel().close();
799 return;
800 }
801
802 log.debug("Handling role reply for role {} from {}. " +
803 "Controller's role is {} ",
804 new Object[] { role, sw, Controller.this.role}
805 );
806
807 sw.deliverRoleReply(vendorMessage.getXid(), role);
808
809 boolean isActive = activeSwitches.containsKey(sw.getId());
810 if (!isActive && sw.isActive()) {
811 // Transition from SLAVE to MASTER.
812
813 if (!state.firstRoleReplyReceived ||
814 getAlwaysClearFlowsOnSwAdd()) {
815 // This is the first role-reply message we receive from
816 // this switch or roles were disabled when the switch
817 // connected:
818 // Delete all pre-existing flows for new connections to
819 // the master
820 //
821 // FIXME: Need to think more about what the test should
822 // be for when we flush the flow-table? For example,
823 // if all the controllers are temporarily in the backup
824 // role (e.g. right after a failure of the master
825 // controller) at the point the switch connects, then
826 // all of the controllers will initially connect as
827 // backup controllers and not flush the flow-table.
828 // Then when one of them is promoted to master following
829 // the master controller election the flow-table
830 // will still not be flushed because that's treated as
831 // a failover event where we don't want to flush the
832 // flow-table. The end result would be that the flow
833 // table for a newly connected switch is never
834 // flushed. Not sure how to handle that case though...
835 sw.clearAllFlowMods();
836 log.debug("First role reply from master switch {}, " +
837 "clear FlowTable to active switch list",
838 HexString.toHexString(sw.getId()));
839 }
840
841 // Some switches don't seem to update us with port
842 // status messages while in slave role.
843 readSwitchPortStateFromStorage(sw);
844
845 // Only add the switch to the active switch list if
846 // we're not in the slave role. Note that if the role
847 // attribute is null, then that means that the switch
848 // doesn't support the role request messages, so in that
849 // case we're effectively in the EQUAL role and the
850 // switch should be included in the active switch list.
851 addSwitch(sw);
852 log.debug("Added master switch {} to active switch list",
853 HexString.toHexString(sw.getId()));
854
855 }
856 else if (isActive && !sw.isActive()) {
857 // Transition from MASTER to SLAVE: remove switch
858 // from active switch list.
859 log.debug("Removed slave switch {} from active switch" +
860 " list", HexString.toHexString(sw.getId()));
861 removeSwitch(sw);
862 }
863
864 // Indicate that we have received a role reply message.
865 state.firstRoleReplyReceived = true;
866 }
867
868 protected boolean handleVendorMessage(OFVendor vendorMessage) {
869 boolean shouldHandleMessage = false;
870 int vendor = vendorMessage.getVendor();
871 switch (vendor) {
872 case OFNiciraVendorData.NX_VENDOR_ID:
873 OFNiciraVendorData niciraVendorData =
874 (OFNiciraVendorData)vendorMessage.getVendorData();
875 int dataType = niciraVendorData.getDataType();
876 switch (dataType) {
877 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
878 OFRoleReplyVendorData roleReplyVendorData =
879 (OFRoleReplyVendorData) niciraVendorData;
880 handleRoleReplyMessage(vendorMessage,
881 roleReplyVendorData);
882 break;
883 default:
884 log.warn("Unhandled Nicira VENDOR message; " +
885 "data type = {}", dataType);
886 break;
887 }
888 break;
889 default:
890 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
891 break;
892 }
893
894 return shouldHandleMessage;
895 }
896
897 /**
898 * Dispatch an Openflow message from a switch to the appropriate
899 * handler.
900 * @param m The message to process
901 * @throws IOException
902 * @throws SwitchStateException
903 */
904 @LogMessageDocs({
905 @LogMessageDoc(level="WARN",
906 message="Config Reply from {switch} has " +
907 "miss length set to {length}",
908 explanation="The controller requires that the switch " +
909 "use a miss length of 0xffff for correct " +
910 "function",
911 recommendation="Use a different switch to ensure " +
912 "correct function"),
913 @LogMessageDoc(level="WARN",
914 message="Received ERROR from sw {switch} that "
915 +"indicates roles are not supported "
916 +"but we have received a valid "
917 +"role reply earlier",
918 explanation="The switch sent a confusing message to the" +
919 "controller")
920 })
921 protected void processOFMessage(OFMessage m)
922 throws IOException, SwitchStateException {
923 boolean shouldHandleMessage = false;
924
925 switch (m.getType()) {
926 case HELLO:
927 if (log.isTraceEnabled())
928 log.trace("HELLO from {}", sw);
929
930 if (state.hsState.equals(HandshakeState.START)) {
931 state.hsState = HandshakeState.HELLO;
932 sendHelloConfiguration();
933 } else {
934 throw new SwitchStateException("Unexpected HELLO from "
935 + sw);
936 }
937 break;
938 case ECHO_REQUEST:
939 OFEchoReply reply =
940 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
941 reply.setXid(m.getXid());
942 sw.write(reply, null);
943 break;
944 case ECHO_REPLY:
945 break;
946 case FEATURES_REPLY:
947 if (log.isTraceEnabled())
948 log.trace("Features Reply from {}", sw);
949
950 sw.setFeaturesReply((OFFeaturesReply) m);
951 if (state.hsState.equals(HandshakeState.HELLO)) {
952 sendFeatureReplyConfiguration();
953 state.hsState = HandshakeState.FEATURES_REPLY;
954 // uncomment to enable "dumb" switches like cbench
955 // state.hsState = HandshakeState.READY;
956 // addSwitch(sw);
957 } else {
958 // return results to rest api caller
959 sw.deliverOFFeaturesReply(m);
960 // update database */
961 updateActiveSwitchInfo(sw);
962 }
963 break;
964 case GET_CONFIG_REPLY:
965 if (log.isTraceEnabled())
966 log.trace("Get config reply from {}", sw);
967
968 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
969 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
970 throw new SwitchStateException(em);
971 }
972 OFGetConfigReply cr = (OFGetConfigReply) m;
973 if (cr.getMissSendLength() == (short)0xffff) {
974 log.trace("Config Reply from {} confirms " +
975 "miss length set to 0xffff", sw);
976 } else {
977 log.warn("Config Reply from {} has " +
978 "miss length set to {}",
979 sw, cr.getMissSendLength() & 0xffff);
980 }
981 state.hasGetConfigReply = true;
982 checkSwitchReady();
983 break;
984 case VENDOR:
985 shouldHandleMessage = handleVendorMessage((OFVendor)m);
986 break;
987 case ERROR:
988 // TODO: we need better error handling. Especially for
989 // request/reply style message (stats, roles) we should have
990 // a unified way to lookup the xid in the error message.
991 // This will probable involve rewriting the way we handle
992 // request/reply style messages.
993 OFError error = (OFError) m;
994 boolean shouldLogError = true;
995 // TODO: should we check that firstRoleReplyReceived is false,
996 // i.e., check only whether the first request fails?
997 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
998 boolean isBadVendorError =
999 (error.getErrorType() == OFError.OFErrorType.
1000 OFPET_BAD_REQUEST.getValue());
1001 // We expect to receive a bad vendor error when
1002 // we're connected to a switch that doesn't support
1003 // the Nicira vendor extensions (i.e. not OVS or
1004 // derived from OVS). By protocol, it should also be
1005 // BAD_VENDOR, but too many switch implementations
1006 // get it wrong and we can already check the xid()
1007 // so we can ignore the type with confidence that this
1008 // is not a spurious error
1009 shouldLogError = !isBadVendorError;
1010 if (isBadVendorError) {
1011 if (state.firstRoleReplyReceived && (role != null)) {
1012 log.warn("Received ERROR from sw {} that "
1013 +"indicates roles are not supported "
1014 +"but we have received a valid "
1015 +"role reply earlier", sw);
1016 }
1017 state.firstRoleReplyReceived = true;
1018 sw.deliverRoleRequestNotSupported(error.getXid());
1019 synchronized(roleChanger) {
1020 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1021 // the switch doesn't understand role request
1022 // messages and the current controller role is
1023 // slave. We need to disconnect the switch.
1024 // @see RoleChanger for rationale
1025 sw.getChannel().close();
1026 }
1027 else if (sw.role == null) {
1028 // Controller's role is master: add to
1029 // active
1030 // TODO: check if clearing flow table is
1031 // right choice here.
1032 // Need to clear FlowMods before we add the switch
1033 // and dispatch updates otherwise we have a race condition.
1034 // TODO: switch update is async. Won't we still have a potential
1035 // race condition?
1036 sw.clearAllFlowMods();
1037 addSwitch(sw);
1038 }
1039 }
1040 }
1041 else {
1042 // TODO: Is this the right thing to do if we receive
1043 // some other error besides a bad vendor error?
1044 // Presumably that means the switch did actually
1045 // understand the role request message, but there
1046 // was some other error from processing the message.
1047 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1048 // error code, but it doesn't look like the Nicira
1049 // role request has that. Should check OVS source
1050 // code to see if it's possible for any other errors
1051 // to be returned.
1052 // If we received an error the switch is not
1053 // in the correct role, so we need to disconnect it.
1054 // We could also resend the request but then we need to
1055 // check if there are other pending request in which
1056 // case we shouldn't resend. If we do resend we need
1057 // to make sure that the switch eventually accepts one
1058 // of our requests or disconnect the switch. This feels
1059 // cumbersome.
1060 sw.getChannel().close();
1061 }
1062 }
1063 // Once we support OF 1.2, we'd add code to handle it here.
1064 //if (error.getXid() == state.ofRoleRequestXid) {
1065 //}
1066 if (shouldLogError)
1067 logError(sw, error);
1068 break;
1069 case STATS_REPLY:
1070 if (state.hsState.ordinal() <
1071 HandshakeState.FEATURES_REPLY.ordinal()) {
1072 String em = "Unexpected STATS_REPLY from " + sw;
1073 throw new SwitchStateException(em);
1074 }
1075 sw.deliverStatisticsReply(m);
1076 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1077 processSwitchDescReply();
1078 }
1079 break;
1080 case PORT_STATUS:
1081 // We want to update our port state info even if we're in
1082 // the slave role, but we only want to update storage if
1083 // we're the master (or equal).
1084 boolean updateStorage = state.hsState.
1085 equals(HandshakeState.READY) &&
1086 (sw.getRole() != Role.SLAVE);
1087 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1088 shouldHandleMessage = true;
1089 break;
1090
1091 default:
1092 shouldHandleMessage = true;
1093 break;
1094 }
1095
1096 if (shouldHandleMessage) {
1097 sw.getListenerReadLock().lock();
1098 try {
1099 if (sw.isConnected()) {
1100 if (!state.hsState.equals(HandshakeState.READY)) {
1101 log.debug("Ignoring message type {} received " +
1102 "from switch {} before switch is " +
1103 "fully configured.", m.getType(), sw);
1104 }
1105 // Check if the controller is in the slave role for the
1106 // switch. If it is, then don't dispatch the message to
1107 // the listeners.
1108 // TODO: Should we dispatch messages that we expect to
1109 // receive when we're in the slave role, e.g. port
1110 // status messages? Since we're "hiding" switches from
1111 // the listeners when we're in the slave role, then it
1112 // seems a little weird to dispatch port status messages
1113 // to them. On the other hand there might be special
1114 // modules that care about all of the connected switches
1115 // and would like to receive port status notifications.
1116 else if (sw.getRole() == Role.SLAVE) {
1117 // Don't log message if it's a port status message
1118 // since we expect to receive those from the switch
1119 // and don't want to emit spurious messages.
1120 if (m.getType() != OFType.PORT_STATUS) {
1121 log.debug("Ignoring message type {} received " +
1122 "from switch {} while in the slave role.",
1123 m.getType(), sw);
1124 }
1125 } else {
1126 handleMessage(sw, m, null);
1127 }
1128 }
1129 }
1130 finally {
1131 sw.getListenerReadLock().unlock();
1132 }
1133 }
1134 }
1135 }
1136
1137 // ****************
1138 // Message handlers
1139 // ****************
1140
1141 protected void handlePortStatusMessage(IOFSwitch sw,
1142 OFPortStatus m,
1143 boolean updateStorage) {
1144 short portNumber = m.getDesc().getPortNumber();
1145 OFPhysicalPort port = m.getDesc();
1146 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
1147 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001148 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001149 if (updateStorage)
1150 updatePortInfo(sw, port);
1151 log.debug("Port #{} modified for {}", portNumber, sw);
1152 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1153 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001154 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001155 if (updateStorage)
1156 updatePortInfo(sw, port);
1157 log.debug("Port #{} added for {}", portNumber, sw);
1158 } else if (m.getReason() ==
1159 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1160 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001161 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001162 if (updateStorage)
1163 removePortInfo(sw, portNumber);
1164 log.debug("Port #{} deleted for {}", portNumber, sw);
1165 }
1166 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1167 try {
1168 this.updates.put(update);
1169 } catch (InterruptedException e) {
1170 log.error("Failure adding update to queue", e);
1171 }
1172 }
1173
1174 /**
1175 * flcontext_cache - Keep a thread local stack of contexts
1176 */
1177 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1178 new ThreadLocal <Stack<FloodlightContext>> () {
1179 @Override
1180 protected Stack<FloodlightContext> initialValue() {
1181 return new Stack<FloodlightContext>();
1182 }
1183 };
1184
1185 /**
1186 * flcontext_alloc - pop a context off the stack, if required create a new one
1187 * @return FloodlightContext
1188 */
1189 protected static FloodlightContext flcontext_alloc() {
1190 FloodlightContext flcontext = null;
1191
1192 if (flcontext_cache.get().empty()) {
1193 flcontext = new FloodlightContext();
1194 }
1195 else {
1196 flcontext = flcontext_cache.get().pop();
1197 }
1198
1199 return flcontext;
1200 }
1201
1202 /**
1203 * flcontext_free - Free the context to the current thread
1204 * @param flcontext
1205 */
1206 protected void flcontext_free(FloodlightContext flcontext) {
1207 flcontext.getStorage().clear();
1208 flcontext_cache.get().push(flcontext);
1209 }
1210
1211 /**
1212 * Handle replies to certain OFMessages, and pass others off to listeners
1213 * @param sw The switch for the message
1214 * @param m The message
1215 * @param bContext The floodlight context. If null then floodlight context would
1216 * be allocated in this function
1217 * @throws IOException
1218 */
1219 @LogMessageDocs({
1220 @LogMessageDoc(level="ERROR",
1221 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1222 " field is empty.",
1223 explanation="The switch sent an improperly-formatted PacketIn" +
1224 " message",
1225 recommendation=LogMessageDoc.CHECK_SWITCH),
1226 @LogMessageDoc(level="WARN",
1227 message="Unhandled OF Message: {} from {}",
1228 explanation="The switch sent a message not handled by " +
1229 "the controller")
1230 })
1231 protected void handleMessage(IOFSwitch sw, OFMessage m,
1232 FloodlightContext bContext)
1233 throws IOException {
1234 Ethernet eth = null;
1235
1236 switch (m.getType()) {
1237 case PACKET_IN:
1238 OFPacketIn pi = (OFPacketIn)m;
1239
1240 if (pi.getPacketData().length <= 0) {
1241 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1242 ") because the data field is empty.");
1243 return;
1244 }
1245
1246 if (Controller.ALWAYS_DECODE_ETH) {
1247 eth = new Ethernet();
1248 eth.deserialize(pi.getPacketData(), 0,
1249 pi.getPacketData().length);
1250 counterStore.updatePacketInCounters(sw, m, eth);
1251 }
1252 // fall through to default case...
1253
1254 default:
1255
1256 List<IOFMessageListener> listeners = null;
1257 if (messageListeners.containsKey(m.getType())) {
1258 listeners = messageListeners.get(m.getType()).
1259 getOrderedListeners();
1260 }
1261
1262 FloodlightContext bc = null;
1263 if (listeners != null) {
1264 // Check if floodlight context is passed from the calling
1265 // function, if so use that floodlight context, otherwise
1266 // allocate one
1267 if (bContext == null) {
1268 bc = flcontext_alloc();
1269 } else {
1270 bc = bContext;
1271 }
1272 if (eth != null) {
1273 IFloodlightProviderService.bcStore.put(bc,
1274 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1275 eth);
1276 }
1277
1278 // Get the starting time (overall and per-component) of
1279 // the processing chain for this packet if performance
1280 // monitoring is turned on
1281 pktinProcTime.bootstrap(listeners);
1282 pktinProcTime.recordStartTimePktIn();
1283 Command cmd;
1284 for (IOFMessageListener listener : listeners) {
1285 if (listener instanceof IOFSwitchFilter) {
1286 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1287 continue;
1288 }
1289 }
1290
1291 pktinProcTime.recordStartTimeComp(listener);
1292 cmd = listener.receive(sw, m, bc);
1293 pktinProcTime.recordEndTimeComp(listener);
1294
1295 if (Command.STOP.equals(cmd)) {
1296 break;
1297 }
1298 }
1299 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1300 } else {
1301 log.warn("Unhandled OF Message: {} from {}", m, sw);
1302 }
1303
1304 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1305 }
1306 }
1307
1308 /**
1309 * Log an OpenFlow error message from a switch
1310 * @param sw The switch that sent the error
1311 * @param error The error message
1312 */
1313 @LogMessageDoc(level="ERROR",
1314 message="Error {error type} {error code} from {switch}",
1315 explanation="The switch responded with an unexpected error" +
1316 "to an OpenFlow message from the controller",
1317 recommendation="This could indicate improper network operation. " +
1318 "If the problem persists restarting the switch and " +
1319 "controller may help."
1320 )
1321 protected void logError(IOFSwitch sw, OFError error) {
1322 int etint = 0xffff & error.getErrorType();
1323 if (etint < 0 || etint >= OFErrorType.values().length) {
1324 log.error("Unknown error code {} from sw {}", etint, sw);
1325 }
1326 OFErrorType et = OFErrorType.values()[etint];
1327 switch (et) {
1328 case OFPET_HELLO_FAILED:
1329 OFHelloFailedCode hfc =
1330 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1331 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1332 break;
1333 case OFPET_BAD_REQUEST:
1334 OFBadRequestCode brc =
1335 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1336 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1337 break;
1338 case OFPET_BAD_ACTION:
1339 OFBadActionCode bac =
1340 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1341 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1342 break;
1343 case OFPET_FLOW_MOD_FAILED:
1344 OFFlowModFailedCode fmfc =
1345 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1346 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1347 break;
1348 case OFPET_PORT_MOD_FAILED:
1349 OFPortModFailedCode pmfc =
1350 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1351 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1352 break;
1353 case OFPET_QUEUE_OP_FAILED:
1354 OFQueueOpFailedCode qofc =
1355 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1356 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1357 break;
1358 default:
1359 break;
1360 }
1361 }
1362
1363 /**
1364 * Add a switch to the active switch list and call the switch listeners.
1365 * This happens either when a switch first connects (and the controller is
1366 * not in the slave role) or when the role of the controller changes from
1367 * slave to master.
1368 * @param sw the switch that has been added
1369 */
1370 // TODO: need to rethink locking and the synchronous switch update.
1371 // We can / should also handle duplicate DPIDs in connectedSwitches
1372 @LogMessageDoc(level="ERROR",
1373 message="New switch added {switch} for already-added switch {switch}",
1374 explanation="A switch with the same DPID as another switch " +
1375 "connected to the controller. This can be caused by " +
1376 "multiple switches configured with the same DPID, or " +
1377 "by a switch reconnected very quickly after " +
1378 "disconnecting.",
1379 recommendation="If this happens repeatedly, it is likely there " +
1380 "are switches with duplicate DPIDs on the network. " +
1381 "Reconfigure the appropriate switches. If it happens " +
1382 "very rarely, then it is likely this is a transient " +
1383 "network problem that can be ignored."
1384 )
1385 protected void addSwitch(IOFSwitch sw) {
1386 // TODO: is it safe to modify the HashMap without holding
1387 // the old switch's lock?
1388 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1389 if (sw == oldSw) {
1390 // Note == for object equality, not .equals for value
1391 log.info("New add switch for pre-existing switch {}", sw);
1392 return;
1393 }
1394
1395 if (oldSw != null) {
1396 oldSw.getListenerWriteLock().lock();
1397 try {
1398 log.error("New switch added {} for already-added switch {}",
1399 sw, oldSw);
1400 // Set the connected flag to false to suppress calling
1401 // the listeners for this switch in processOFMessage
1402 oldSw.setConnected(false);
1403
1404 oldSw.cancelAllStatisticsReplies();
1405
1406 updateInactiveSwitchInfo(oldSw);
1407
1408 // we need to clean out old switch state definitively
1409 // before adding the new switch
1410 // FIXME: It seems not completely kosher to call the
1411 // switch listeners here. I thought one of the points of
1412 // having the asynchronous switch update mechanism was so
1413 // the addedSwitch and removedSwitch were always called
1414 // from a single thread to simplify concurrency issues
1415 // for the listener.
1416 if (switchListeners != null) {
1417 for (IOFSwitchListener listener : switchListeners) {
1418 listener.removedSwitch(oldSw);
1419 }
1420 }
1421 // will eventually trigger a removeSwitch(), which will cause
1422 // a "Not removing Switch ... already removed debug message.
1423 // TODO: Figure out a way to handle this that avoids the
1424 // spurious debug message.
1425 oldSw.getChannel().close();
1426 }
1427 finally {
1428 oldSw.getListenerWriteLock().unlock();
1429 }
1430 }
1431
1432 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001433 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001434 for (OFPhysicalPort port: sw.getPorts()) {
1435 swStore.addPort(sw.getStringId(), port);
1436 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001437 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1438 try {
1439 this.updates.put(update);
1440 } catch (InterruptedException e) {
1441 log.error("Failure adding update to queue", e);
1442 }
1443 }
1444
1445 /**
1446 * Remove a switch from the active switch list and call the switch listeners.
1447 * This happens either when the switch is disconnected or when the
1448 * controller's role for the switch changes from master to slave.
1449 * @param sw the switch that has been removed
1450 */
1451 protected void removeSwitch(IOFSwitch sw) {
1452 // No need to acquire the listener lock, since
1453 // this method is only called after netty has processed all
1454 // pending messages
1455 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001456 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001457 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1458 log.debug("Not removing switch {}; already removed", sw);
1459 return;
1460 }
1461 // We cancel all outstanding statistics replies if the switch transition
1462 // from active. In the future we might allow statistics requests
1463 // from slave controllers. Then we need to move this cancelation
1464 // to switch disconnect
1465 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001466
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001467
1468 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1469 // here if role support is enabled. In that case if the switch is being
1470 // removed because we've been switched to being in the slave role, then I think
1471 // it's possible that the new master may have already been promoted to master
1472 // and written out the active switch state to storage. If we now execute
1473 // updateInactiveSwitchInfo we may wipe out all of the state that was
1474 // written out by the new master. Maybe need to revisit how we handle all
1475 // of the switch state that's written to storage.
1476
1477 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001478
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001479 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1480 try {
1481 this.updates.put(update);
1482 } catch (InterruptedException e) {
1483 log.error("Failure adding update to queue", e);
1484 }
1485 }
1486
1487 // ***************
1488 // IFloodlightProvider
1489 // ***************
1490
1491 @Override
1492 public synchronized void addOFMessageListener(OFType type,
1493 IOFMessageListener listener) {
1494 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1495 messageListeners.get(type);
1496 if (ldd == null) {
1497 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1498 messageListeners.put(type, ldd);
1499 }
1500 ldd.addListener(type, listener);
1501 }
1502
1503 @Override
1504 public synchronized void removeOFMessageListener(OFType type,
1505 IOFMessageListener listener) {
1506 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1507 messageListeners.get(type);
1508 if (ldd != null) {
1509 ldd.removeListener(listener);
1510 }
1511 }
1512
1513 private void logListeners() {
1514 for (Map.Entry<OFType,
1515 ListenerDispatcher<OFType,
1516 IOFMessageListener>> entry
1517 : messageListeners.entrySet()) {
1518
1519 OFType type = entry.getKey();
1520 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1521 entry.getValue();
1522
1523 StringBuffer sb = new StringBuffer();
1524 sb.append("OFListeners for ");
1525 sb.append(type);
1526 sb.append(": ");
1527 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1528 sb.append(l.getName());
1529 sb.append(",");
1530 }
1531 log.debug(sb.toString());
1532 }
1533 }
1534
1535 public void removeOFMessageListeners(OFType type) {
1536 messageListeners.remove(type);
1537 }
1538
1539 @Override
1540 public Map<Long, IOFSwitch> getSwitches() {
1541 return Collections.unmodifiableMap(this.activeSwitches);
1542 }
1543
1544 @Override
1545 public void addOFSwitchListener(IOFSwitchListener listener) {
1546 this.switchListeners.add(listener);
1547 }
1548
1549 @Override
1550 public void removeOFSwitchListener(IOFSwitchListener listener) {
1551 this.switchListeners.remove(listener);
1552 }
1553
1554 @Override
1555 public Map<OFType, List<IOFMessageListener>> getListeners() {
1556 Map<OFType, List<IOFMessageListener>> lers =
1557 new HashMap<OFType, List<IOFMessageListener>>();
1558 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1559 messageListeners.entrySet()) {
1560 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1561 }
1562 return Collections.unmodifiableMap(lers);
1563 }
1564
1565 @Override
1566 @LogMessageDocs({
1567 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1568 "a null switch",
1569 explanation="Failed to process a message because the switch " +
1570 " is no longer connected."),
1571 @LogMessageDoc(level="ERROR",
1572 message="Error reinjecting OFMessage on switch {switch}",
1573 explanation="An I/O error occured while attempting to " +
1574 "process an OpenFlow message",
1575 recommendation=LogMessageDoc.CHECK_SWITCH)
1576 })
1577 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1578 FloodlightContext bc) {
1579 if (sw == null) {
1580 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1581 return false;
1582 }
1583
1584 // FIXME: Do we need to be able to inject messages to switches
1585 // where we're the slave controller (i.e. they're connected but
1586 // not active)?
1587 // FIXME: Don't we need synchronization logic here so we're holding
1588 // the listener read lock when we call handleMessage? After some
1589 // discussions it sounds like the right thing to do here would be to
1590 // inject the message as a netty upstream channel event so it goes
1591 // through the normal netty event processing, including being
1592 // handled
1593 if (!activeSwitches.containsKey(sw.getId())) return false;
1594
1595 try {
1596 // Pass Floodlight context to the handleMessages()
1597 handleMessage(sw, msg, bc);
1598 } catch (IOException e) {
1599 log.error("Error reinjecting OFMessage on switch {}",
1600 HexString.toHexString(sw.getId()));
1601 return false;
1602 }
1603 return true;
1604 }
1605
1606 @Override
1607 @LogMessageDoc(message="Calling System.exit",
1608 explanation="The controller is terminating")
1609 public synchronized void terminate() {
1610 log.info("Calling System.exit");
1611 System.exit(1);
1612 }
1613
1614 @Override
1615 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1616 // call the overloaded version with floodlight context set to null
1617 return injectOfMessage(sw, msg, null);
1618 }
1619
1620 @Override
1621 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1622 FloodlightContext bc) {
1623 if (log.isTraceEnabled()) {
1624 String str = OFMessage.getDataAsString(sw, m, bc);
1625 log.trace("{}", str);
1626 }
1627
1628 List<IOFMessageListener> listeners = null;
1629 if (messageListeners.containsKey(m.getType())) {
1630 listeners =
1631 messageListeners.get(m.getType()).getOrderedListeners();
1632 }
1633
1634 if (listeners != null) {
1635 for (IOFMessageListener listener : listeners) {
1636 if (listener instanceof IOFSwitchFilter) {
1637 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1638 continue;
1639 }
1640 }
1641 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1642 break;
1643 }
1644 }
1645 }
1646 }
1647
1648 @Override
1649 public BasicFactory getOFMessageFactory() {
1650 return factory;
1651 }
1652
1653 @Override
1654 public String getControllerId() {
1655 return controllerId;
1656 }
1657
1658 // **************
1659 // Initialization
1660 // **************
1661
1662 protected void updateAllInactiveSwitchInfo() {
1663 if (role == Role.SLAVE) {
1664 return;
1665 }
1666 String controllerId = getControllerId();
1667 String[] switchColumns = { SWITCH_DATAPATH_ID,
1668 SWITCH_CONTROLLER_ID,
1669 SWITCH_ACTIVE };
1670 String[] portColumns = { PORT_ID, PORT_SWITCH };
1671 IResultSet switchResultSet = null;
1672 try {
1673 OperatorPredicate op =
1674 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1675 OperatorPredicate.Operator.EQ,
1676 controllerId);
1677 switchResultSet =
1678 storageSource.executeQuery(SWITCH_TABLE_NAME,
1679 switchColumns,
1680 op, null);
1681 while (switchResultSet.next()) {
1682 IResultSet portResultSet = null;
1683 try {
1684 String datapathId =
1685 switchResultSet.getString(SWITCH_DATAPATH_ID);
1686 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1687 op = new OperatorPredicate(PORT_SWITCH,
1688 OperatorPredicate.Operator.EQ,
1689 datapathId);
1690 portResultSet =
1691 storageSource.executeQuery(PORT_TABLE_NAME,
1692 portColumns,
1693 op, null);
1694 while (portResultSet.next()) {
1695 portResultSet.deleteRow();
1696 }
1697 portResultSet.save();
1698 }
1699 finally {
1700 if (portResultSet != null)
1701 portResultSet.close();
1702 }
1703 }
1704 switchResultSet.save();
1705 }
1706 finally {
1707 if (switchResultSet != null)
1708 switchResultSet.close();
1709 }
1710 }
1711
1712 protected void updateControllerInfo() {
1713 updateAllInactiveSwitchInfo();
1714
1715 // Write out the controller info to the storage source
1716 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1717 String id = getControllerId();
1718 controllerInfo.put(CONTROLLER_ID, id);
1719 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1720 }
1721
1722 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1723 if (role == Role.SLAVE) {
1724 return;
1725 }
1726 // Obtain the row info for the switch
1727 Map<String, Object> switchInfo = new HashMap<String, Object>();
1728 String datapathIdString = sw.getStringId();
1729 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1730 String controllerId = getControllerId();
1731 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1732 Date connectedSince = sw.getConnectedSince();
1733 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1734 Channel channel = sw.getChannel();
1735 SocketAddress socketAddress = channel.getRemoteAddress();
1736 if (socketAddress != null) {
1737 String socketAddressString = socketAddress.toString();
1738 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1739 if (socketAddress instanceof InetSocketAddress) {
1740 InetSocketAddress inetSocketAddress =
1741 (InetSocketAddress)socketAddress;
1742 InetAddress inetAddress = inetSocketAddress.getAddress();
1743 String ip = inetAddress.getHostAddress();
1744 switchInfo.put(SWITCH_IP, ip);
1745 }
1746 }
1747
1748 // Write out the switch features info
1749 long capabilities = U32.f(sw.getCapabilities());
1750 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1751 long buffers = U32.f(sw.getBuffers());
1752 switchInfo.put(SWITCH_BUFFERS, buffers);
1753 long tables = U32.f(sw.getTables());
1754 switchInfo.put(SWITCH_TABLES, tables);
1755 long actions = U32.f(sw.getActions());
1756 switchInfo.put(SWITCH_ACTIONS, actions);
1757 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1758
1759 // Update the switch
1760 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1761
1762 // Update the ports
1763 for (OFPhysicalPort port: sw.getPorts()) {
1764 updatePortInfo(sw, port);
1765 }
1766 }
1767
1768 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1769 if (role == Role.SLAVE) {
1770 return;
1771 }
1772 log.debug("Update DB with inactiveSW {}", sw);
1773 // Update the controller info in the storage source to be inactive
1774 Map<String, Object> switchInfo = new HashMap<String, Object>();
1775 String datapathIdString = sw.getStringId();
1776 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1777 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1778 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1779 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1780 }
1781
1782 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1783 if (role == Role.SLAVE) {
1784 return;
1785 }
1786 String datapathIdString = sw.getStringId();
1787 Map<String, Object> portInfo = new HashMap<String, Object>();
1788 int portNumber = U16.f(port.getPortNumber());
1789 String id = datapathIdString + "|" + portNumber;
1790 portInfo.put(PORT_ID, id);
1791 portInfo.put(PORT_SWITCH, datapathIdString);
1792 portInfo.put(PORT_NUMBER, portNumber);
1793 byte[] hardwareAddress = port.getHardwareAddress();
1794 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1795 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1796 String name = port.getName();
1797 portInfo.put(PORT_NAME, name);
1798 long config = U32.f(port.getConfig());
1799 portInfo.put(PORT_CONFIG, config);
1800 long state = U32.f(port.getState());
1801 portInfo.put(PORT_STATE, state);
1802 long currentFeatures = U32.f(port.getCurrentFeatures());
1803 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1804 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1805 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1806 long supportedFeatures = U32.f(port.getSupportedFeatures());
1807 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1808 long peerFeatures = U32.f(port.getPeerFeatures());
1809 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1810 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1811 }
1812
1813 /**
1814 * Read switch port data from storage and write it into a switch object
1815 * @param sw the switch to update
1816 */
1817 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1818 OperatorPredicate op =
1819 new OperatorPredicate(PORT_SWITCH,
1820 OperatorPredicate.Operator.EQ,
1821 sw.getStringId());
1822 IResultSet portResultSet =
1823 storageSource.executeQuery(PORT_TABLE_NAME,
1824 null, op, null);
1825 //Map<Short, OFPhysicalPort> oldports =
1826 // new HashMap<Short, OFPhysicalPort>();
1827 //oldports.putAll(sw.getPorts());
1828
1829 while (portResultSet.next()) {
1830 try {
1831 OFPhysicalPort p = new OFPhysicalPort();
1832 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1833 p.setName(portResultSet.getString(PORT_NAME));
1834 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1835 p.setState((int)portResultSet.getLong(PORT_STATE));
1836 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1837 p.setHardwareAddress(HexString.fromHexString(portMac));
1838 p.setCurrentFeatures((int)portResultSet.
1839 getLong(PORT_CURRENT_FEATURES));
1840 p.setAdvertisedFeatures((int)portResultSet.
1841 getLong(PORT_ADVERTISED_FEATURES));
1842 p.setSupportedFeatures((int)portResultSet.
1843 getLong(PORT_SUPPORTED_FEATURES));
1844 p.setPeerFeatures((int)portResultSet.
1845 getLong(PORT_PEER_FEATURES));
1846 //oldports.remove(Short.valueOf(p.getPortNumber()));
1847 sw.setPort(p);
1848 } catch (NullPointerException e) {
1849 // ignore
1850 }
1851 }
1852 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1853 try {
1854 this.updates.put(update);
1855 } catch (InterruptedException e) {
1856 log.error("Failure adding update to queue", e);
1857 }
1858 }
1859
1860 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1861 if (role == Role.SLAVE) {
1862 return;
1863 }
1864 String datapathIdString = sw.getStringId();
1865 String id = datapathIdString + "|" + portNumber;
1866 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1867 }
1868
1869 /**
1870 * Sets the initial role based on properties in the config params.
1871 * It looks for two different properties.
1872 * If the "role" property is specified then the value should be
1873 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1874 * controller is set to the specified value. If the "role" property
1875 * is not specified then it looks next for the "role.path" property.
1876 * In this case the value should be the path to a property file in
1877 * the file system that contains a property called "floodlight.role"
1878 * which can be one of the values listed above for the "role" property.
1879 * The idea behind the "role.path" mechanism is that you have some
1880 * separate heartbeat and master controller election algorithm that
1881 * determines the role of the controller. When a role transition happens,
1882 * it updates the current role in the file specified by the "role.path"
1883 * file. Then if floodlight restarts for some reason it can get the
1884 * correct current role of the controller from the file.
1885 * @param configParams The config params for the FloodlightProvider service
1886 * @return A valid role if role information is specified in the
1887 * config params, otherwise null
1888 */
1889 @LogMessageDocs({
1890 @LogMessageDoc(message="Controller role set to {role}",
1891 explanation="Setting the initial HA role to "),
1892 @LogMessageDoc(level="ERROR",
1893 message="Invalid current role value: {role}",
1894 explanation="An invalid HA role value was read from the " +
1895 "properties file",
1896 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1897 })
1898 protected Role getInitialRole(Map<String, String> configParams) {
1899 Role role = null;
1900 String roleString = configParams.get("role");
1901 if (roleString == null) {
1902 String rolePath = configParams.get("rolepath");
1903 if (rolePath != null) {
1904 Properties properties = new Properties();
1905 try {
1906 properties.load(new FileInputStream(rolePath));
1907 roleString = properties.getProperty("floodlight.role");
1908 }
1909 catch (IOException exc) {
1910 // Don't treat it as an error if the file specified by the
1911 // rolepath property doesn't exist. This lets us enable the
1912 // HA mechanism by just creating/setting the floodlight.role
1913 // property in that file without having to modify the
1914 // floodlight properties.
1915 }
1916 }
1917 }
1918
1919 if (roleString != null) {
1920 // Canonicalize the string to the form used for the enum constants
1921 roleString = roleString.trim().toUpperCase();
1922 try {
1923 role = Role.valueOf(roleString);
1924 }
1925 catch (IllegalArgumentException exc) {
1926 log.error("Invalid current role value: {}", roleString);
1927 }
1928 }
1929
1930 log.info("Controller role set to {}", role);
1931
1932 return role;
1933 }
1934
1935 /**
1936 * Tell controller that we're ready to accept switches loop
1937 * @throws IOException
1938 */
1939 @LogMessageDocs({
1940 @LogMessageDoc(message="Listening for switch connections on {address}",
1941 explanation="The controller is ready and listening for new" +
1942 " switch connections"),
1943 @LogMessageDoc(message="Storage exception in controller " +
1944 "updates loop; terminating process",
1945 explanation=ERROR_DATABASE,
1946 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1947 @LogMessageDoc(level="ERROR",
1948 message="Exception in controller updates loop",
1949 explanation="Failed to dispatch controller event",
1950 recommendation=LogMessageDoc.GENERIC_ACTION)
1951 })
1952 public void run() {
1953 if (log.isDebugEnabled()) {
1954 logListeners();
1955 }
1956
1957 try {
1958 final ServerBootstrap bootstrap = createServerBootStrap();
1959
1960 bootstrap.setOption("reuseAddr", true);
1961 bootstrap.setOption("child.keepAlive", true);
1962 bootstrap.setOption("child.tcpNoDelay", true);
1963 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
1964
1965 ChannelPipelineFactory pfact =
1966 new OpenflowPipelineFactory(this, null);
1967 bootstrap.setPipelineFactory(pfact);
1968 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
1969 final ChannelGroup cg = new DefaultChannelGroup();
1970 cg.add(bootstrap.bind(sa));
1971
1972 log.info("Listening for switch connections on {}", sa);
1973 } catch (Exception e) {
1974 throw new RuntimeException(e);
1975 }
1976
1977 // main loop
1978 while (true) {
1979 try {
1980 IUpdate update = updates.take();
1981 update.dispatch();
1982 } catch (InterruptedException e) {
1983 return;
1984 } catch (StorageException e) {
1985 log.error("Storage exception in controller " +
1986 "updates loop; terminating process", e);
1987 return;
1988 } catch (Exception e) {
1989 log.error("Exception in controller updates loop", e);
1990 }
1991 }
1992 }
1993
1994 private ServerBootstrap createServerBootStrap() {
1995 if (workerThreads == 0) {
1996 return new ServerBootstrap(
1997 new NioServerSocketChannelFactory(
1998 Executors.newCachedThreadPool(),
1999 Executors.newCachedThreadPool()));
2000 } else {
2001 return new ServerBootstrap(
2002 new NioServerSocketChannelFactory(
2003 Executors.newCachedThreadPool(),
2004 Executors.newCachedThreadPool(), workerThreads));
2005 }
2006 }
2007
2008 public void setConfigParams(Map<String, String> configParams) {
2009 String ofPort = configParams.get("openflowport");
2010 if (ofPort != null) {
2011 this.openFlowPort = Integer.parseInt(ofPort);
2012 }
2013 log.debug("OpenFlow port set to {}", this.openFlowPort);
2014 String threads = configParams.get("workerthreads");
2015 if (threads != null) {
2016 this.workerThreads = Integer.parseInt(threads);
2017 }
2018 log.debug("Number of worker threads set to {}", this.workerThreads);
2019 String controllerId = configParams.get("controllerid");
2020 if (controllerId != null) {
2021 this.controllerId = controllerId;
2022 }
2023 log.debug("ControllerId set to {}", this.controllerId);
2024 }
2025
2026 private void initVendorMessages() {
2027 // Configure openflowj to be able to parse the role request/reply
2028 // vendor messages.
2029 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2030 OFNiciraVendorData.NX_VENDOR_ID, 4);
2031 OFVendorId.registerVendorId(niciraVendorId);
2032 OFBasicVendorDataType roleRequestVendorData =
2033 new OFBasicVendorDataType(
2034 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2035 OFRoleRequestVendorData.getInstantiable());
2036 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2037 OFBasicVendorDataType roleReplyVendorData =
2038 new OFBasicVendorDataType(
2039 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2040 OFRoleReplyVendorData.getInstantiable());
2041 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2042 }
2043
2044 /**
2045 * Initialize internal data structures
2046 */
2047 public void init(Map<String, String> configParams) {
2048 // These data structures are initialized here because other
2049 // module's startUp() might be called before ours
2050 this.messageListeners =
2051 new ConcurrentHashMap<OFType,
2052 ListenerDispatcher<OFType,
2053 IOFMessageListener>>();
2054 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2055 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2056 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2057 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2058 this.controllerNodeIPsCache = new HashMap<String, String>();
2059 this.updates = new LinkedBlockingQueue<IUpdate>();
2060 this.factory = new BasicFactory();
2061 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2062 setConfigParams(configParams);
2063 this.role = getInitialRole(configParams);
2064 this.roleChanger = new RoleChanger();
2065 initVendorMessages();
2066 this.systemStartTime = System.currentTimeMillis();
Pankaj Berdeafb20532013-01-08 15:05:24 -08002067 this.swStore = new SwitchStorageImpl();
Pankaj Berde8557a462013-01-07 08:59:31 -08002068 this.swStore.init("");
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002069 }
2070
2071 /**
2072 * Startup all of the controller's components
2073 */
2074 @LogMessageDoc(message="Waiting for storage source",
2075 explanation="The system database is not yet ready",
2076 recommendation="If this message persists, this indicates " +
2077 "that the system database has failed to start. " +
2078 LogMessageDoc.CHECK_CONTROLLER)
2079 public void startupComponents() {
2080 // Create the table names we use
2081 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2082 storageSource.createTable(SWITCH_TABLE_NAME, null);
2083 storageSource.createTable(PORT_TABLE_NAME, null);
2084 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2085 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2086 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2087 CONTROLLER_ID);
2088 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2089 SWITCH_DATAPATH_ID);
2090 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2091 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2092 CONTROLLER_INTERFACE_ID);
2093 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2094
2095 while (true) {
2096 try {
2097 updateControllerInfo();
2098 break;
2099 }
2100 catch (StorageException e) {
2101 log.info("Waiting for storage source");
2102 try {
2103 Thread.sleep(1000);
2104 } catch (InterruptedException e1) {
2105 }
2106 }
2107 }
2108
2109 // Add our REST API
2110 restApi.addRestletRoutable(new CoreWebRoutable());
2111 }
2112
2113 @Override
2114 public void addInfoProvider(String type, IInfoProvider provider) {
2115 if (!providerMap.containsKey(type)) {
2116 providerMap.put(type, new ArrayList<IInfoProvider>());
2117 }
2118 providerMap.get(type).add(provider);
2119 }
2120
2121 @Override
2122 public void removeInfoProvider(String type, IInfoProvider provider) {
2123 if (!providerMap.containsKey(type)) {
2124 log.debug("Provider type {} doesn't exist.", type);
2125 return;
2126 }
2127
2128 providerMap.get(type).remove(provider);
2129 }
2130
2131 public Map<String, Object> getControllerInfo(String type) {
2132 if (!providerMap.containsKey(type)) return null;
2133
2134 Map<String, Object> result = new LinkedHashMap<String, Object>();
2135 for (IInfoProvider provider : providerMap.get(type)) {
2136 result.putAll(provider.getInfo(type));
2137 }
2138
2139 return result;
2140 }
2141
2142 @Override
2143 public void addHAListener(IHAListener listener) {
2144 this.haListeners.add(listener);
2145 }
2146
2147 @Override
2148 public void removeHAListener(IHAListener listener) {
2149 this.haListeners.remove(listener);
2150 }
2151
2152
2153 /**
2154 * Handle changes to the controller nodes IPs and dispatch update.
2155 */
2156 @SuppressWarnings("unchecked")
2157 protected void handleControllerNodeIPChanges() {
2158 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2159 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2160 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2161 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2162 CONTROLLER_INTERFACE_TYPE,
2163 CONTROLLER_INTERFACE_NUMBER,
2164 CONTROLLER_INTERFACE_DISCOVERED_IP };
2165 synchronized(controllerNodeIPsCache) {
2166 // We currently assume that interface Ethernet0 is the relevant
2167 // controller interface. Might change.
2168 // We could (should?) implement this using
2169 // predicates, but creating the individual and compound predicate
2170 // seems more overhead then just checking every row. Particularly,
2171 // since the number of rows is small and changes infrequent
2172 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2173 colNames,null, null);
2174 while (res.next()) {
2175 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2176 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2177 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2178 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2179 String curIP = controllerNodeIPsCache.get(controllerID);
2180
2181 curControllerNodeIPs.put(controllerID, discoveredIP);
2182 if (curIP == null) {
2183 // new controller node IP
2184 addedControllerNodeIPs.put(controllerID, discoveredIP);
2185 }
2186 else if (curIP != discoveredIP) {
2187 // IP changed
2188 removedControllerNodeIPs.put(controllerID, curIP);
2189 addedControllerNodeIPs.put(controllerID, discoveredIP);
2190 }
2191 }
2192 }
2193 // Now figure out if rows have been deleted. We can't use the
2194 // rowKeys from rowsDeleted directly, since the tables primary
2195 // key is a compound that we can't disassemble
2196 Set<String> curEntries = curControllerNodeIPs.keySet();
2197 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2198 removedEntries.removeAll(curEntries);
2199 for (String removedControllerID : removedEntries)
2200 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2201 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2202 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2203 curControllerNodeIPs, addedControllerNodeIPs,
2204 removedControllerNodeIPs);
2205 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2206 try {
2207 this.updates.put(update);
2208 } catch (InterruptedException e) {
2209 log.error("Failure adding update to queue", e);
2210 }
2211 }
2212 }
2213 }
2214
2215 @Override
2216 public Map<String, String> getControllerNodeIPs() {
2217 // We return a copy of the mapping so we can guarantee that
2218 // the mapping return is the same as one that will be (or was)
2219 // dispatched to IHAListeners
2220 HashMap<String,String> retval = new HashMap<String,String>();
2221 synchronized(controllerNodeIPsCache) {
2222 retval.putAll(controllerNodeIPsCache);
2223 }
2224 return retval;
2225 }
2226
2227 @Override
2228 public void rowsModified(String tableName, Set<Object> rowKeys) {
2229 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2230 handleControllerNodeIPChanges();
2231 }
2232
2233 }
2234
2235 @Override
2236 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2237 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2238 handleControllerNodeIPChanges();
2239 }
2240 }
2241
2242 @Override
2243 public long getSystemStartTime() {
2244 return (this.systemStartTime);
2245 }
2246
2247 @Override
2248 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2249 this.alwaysClearFlowsOnSwAdd = value;
2250 }
2251
2252 public boolean getAlwaysClearFlowsOnSwAdd() {
2253 return this.alwaysClearFlowsOnSwAdd;
2254 }
2255}