blob: a5ee71ae378244712bd890523c6fcae6cdb63509 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.util.ArrayList;
26import java.nio.channels.ClosedChannelException;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Date;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.LinkedHashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Properties;
38import java.util.Set;
39import java.util.Stack;
40import java.util.concurrent.BlockingQueue;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ConcurrentMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.Executors;
45import java.util.concurrent.Future;
46import java.util.concurrent.LinkedBlockingQueue;
47import java.util.concurrent.RejectedExecutionException;
48import java.util.concurrent.TimeUnit;
49import java.util.concurrent.TimeoutException;
50
51import net.floodlightcontroller.core.FloodlightContext;
52import net.floodlightcontroller.core.IFloodlightProviderService;
53import net.floodlightcontroller.core.IHAListener;
54import net.floodlightcontroller.core.IInfoProvider;
Pankaj Berde8557a462013-01-07 08:59:31 -080055import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IOFMessageListener;
57import net.floodlightcontroller.core.IListener.Command;
58import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080061import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
69import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
70import net.floodlightcontroller.restserver.IRestApiService;
71import net.floodlightcontroller.storage.IResultSet;
72import net.floodlightcontroller.storage.IStorageSourceListener;
73import net.floodlightcontroller.storage.IStorageSourceService;
74import net.floodlightcontroller.storage.OperatorPredicate;
75import net.floodlightcontroller.storage.StorageException;
76import net.floodlightcontroller.threadpool.IThreadPoolService;
77
78import org.jboss.netty.bootstrap.ServerBootstrap;
79import org.jboss.netty.buffer.ChannelBuffer;
80import org.jboss.netty.buffer.ChannelBuffers;
81import org.jboss.netty.channel.Channel;
82import org.jboss.netty.channel.ChannelHandlerContext;
83import org.jboss.netty.channel.ChannelPipelineFactory;
84import org.jboss.netty.channel.ChannelStateEvent;
85import org.jboss.netty.channel.ChannelUpstreamHandler;
86import org.jboss.netty.channel.Channels;
87import org.jboss.netty.channel.ExceptionEvent;
88import org.jboss.netty.channel.MessageEvent;
89import org.jboss.netty.channel.group.ChannelGroup;
90import org.jboss.netty.channel.group.DefaultChannelGroup;
91import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
92import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
93import org.jboss.netty.handler.timeout.IdleStateEvent;
94import org.jboss.netty.handler.timeout.ReadTimeoutException;
95import org.openflow.protocol.OFEchoReply;
96import org.openflow.protocol.OFError;
97import org.openflow.protocol.OFError.OFBadActionCode;
98import org.openflow.protocol.OFError.OFBadRequestCode;
99import org.openflow.protocol.OFError.OFErrorType;
100import org.openflow.protocol.OFError.OFFlowModFailedCode;
101import org.openflow.protocol.OFError.OFHelloFailedCode;
102import org.openflow.protocol.OFError.OFPortModFailedCode;
103import org.openflow.protocol.OFError.OFQueueOpFailedCode;
104import org.openflow.protocol.OFFeaturesReply;
105import org.openflow.protocol.OFGetConfigReply;
106import org.openflow.protocol.OFMessage;
107import org.openflow.protocol.OFPacketIn;
108import org.openflow.protocol.OFPhysicalPort;
109import org.openflow.protocol.OFPortStatus;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800110import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800111import org.openflow.protocol.OFPhysicalPort.OFPortState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800112import org.openflow.protocol.OFPortStatus.OFPortReason;
113import org.openflow.protocol.OFSetConfig;
114import org.openflow.protocol.OFStatisticsRequest;
115import org.openflow.protocol.OFSwitchConfig;
116import org.openflow.protocol.OFType;
117import org.openflow.protocol.OFVendor;
118import org.openflow.protocol.factory.BasicFactory;
119import org.openflow.protocol.factory.MessageParseException;
120import org.openflow.protocol.statistics.OFDescriptionStatistics;
121import org.openflow.protocol.statistics.OFStatistics;
122import org.openflow.protocol.statistics.OFStatisticsType;
123import org.openflow.protocol.vendor.OFBasicVendorDataType;
124import org.openflow.protocol.vendor.OFBasicVendorId;
125import org.openflow.protocol.vendor.OFVendorId;
126import org.openflow.util.HexString;
127import org.openflow.util.U16;
128import org.openflow.util.U32;
129import org.openflow.vendor.nicira.OFNiciraVendorData;
130import org.openflow.vendor.nicira.OFRoleReplyVendorData;
131import org.openflow.vendor.nicira.OFRoleRequestVendorData;
132import org.openflow.vendor.nicira.OFRoleVendorData;
133import org.slf4j.Logger;
134import org.slf4j.LoggerFactory;
135
136
137/**
138 * The main controller class. Handles all setup and network listeners
139 */
140public class Controller implements IFloodlightProviderService,
141 IStorageSourceListener {
Pankaj Berde29ab7fc2013-01-25 06:17:52 -0800142
143 ThreadLocal<SwitchStorageImpl> store = new ThreadLocal<SwitchStorageImpl>() {
144 @Override
145 protected SwitchStorageImpl initialValue() {
146 SwitchStorageImpl swStore = new SwitchStorageImpl();
147 //TODO: Get the file path from global properties
148 swStore.init("/tmp/cassandra.titan");
149 return swStore;
150 }
151 };
152
153 protected SwitchStorageImpl swStore = store.get();
Pankaj Berde8557a462013-01-07 08:59:31 -0800154
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800155 protected static Logger log = LoggerFactory.getLogger(Controller.class);
156
157 private static final String ERROR_DATABASE =
158 "The controller could not communicate with the system database.";
159
160 protected BasicFactory factory;
161 protected ConcurrentMap<OFType,
162 ListenerDispatcher<OFType,IOFMessageListener>>
163 messageListeners;
164 // The activeSwitches map contains only those switches that are actively
165 // being controlled by us -- it doesn't contain switches that are
166 // in the slave role
167 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
168 // connectedSwitches contains all connected switches, including ones where
169 // we're a slave controller. We need to keep track of them so that we can
170 // send role request messages to switches when our role changes to master
171 // We add a switch to this set after it successfully completes the
172 // handshake. Access to this Set needs to be synchronized with roleChanger
173 protected HashSet<OFSwitchImpl> connectedSwitches;
174
175 // The controllerNodeIPsCache maps Controller IDs to their IP address.
176 // It's only used by handleControllerNodeIPsChanged
177 protected HashMap<String, String> controllerNodeIPsCache;
178
179 protected Set<IOFSwitchListener> switchListeners;
180 protected Set<IHAListener> haListeners;
181 protected Map<String, List<IInfoProvider>> providerMap;
182 protected BlockingQueue<IUpdate> updates;
183
184 // Module dependencies
185 protected IRestApiService restApi;
186 protected ICounterStoreService counterStore = null;
187 protected IStorageSourceService storageSource;
188 protected IPktInProcessingTimeService pktinProcTime;
189 protected IThreadPoolService threadPool;
190
191 // Configuration options
192 protected int openFlowPort = 6633;
193 protected int workerThreads = 0;
194 // The id for this controller node. Should be unique for each controller
195 // node in a controller cluster.
196 protected String controllerId = "localhost";
197
198 // The current role of the controller.
199 // If the controller isn't configured to support roles, then this is null.
200 protected Role role;
201 // A helper that handles sending and timeout handling for role requests
202 protected RoleChanger roleChanger;
203
204 // Start time of the controller
205 protected long systemStartTime;
206
207 // Flag to always flush flow table on switch reconnect (HA or otherwise)
208 protected boolean alwaysClearFlowsOnSwAdd = false;
209
210 // Storage table names
211 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
212 protected static final String CONTROLLER_ID = "id";
213
214 protected static final String SWITCH_TABLE_NAME = "controller_switch";
215 protected static final String SWITCH_DATAPATH_ID = "dpid";
216 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
217 protected static final String SWITCH_IP = "ip";
218 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
219 protected static final String SWITCH_ACTIVE = "active";
220 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
221 protected static final String SWITCH_CAPABILITIES = "capabilities";
222 protected static final String SWITCH_BUFFERS = "buffers";
223 protected static final String SWITCH_TABLES = "tables";
224 protected static final String SWITCH_ACTIONS = "actions";
225
226 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
227 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
228
229 protected static final String PORT_TABLE_NAME = "controller_port";
230 protected static final String PORT_ID = "id";
231 protected static final String PORT_SWITCH = "switch_id";
232 protected static final String PORT_NUMBER = "number";
233 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
234 protected static final String PORT_NAME = "name";
235 protected static final String PORT_CONFIG = "config";
236 protected static final String PORT_STATE = "state";
237 protected static final String PORT_CURRENT_FEATURES = "current_features";
238 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
239 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
240 protected static final String PORT_PEER_FEATURES = "peer_features";
241
242 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
243 protected static final String CONTROLLER_INTERFACE_ID = "id";
244 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
245 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
246 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
247 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
248
249
250
251 // Perf. related configuration
252 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
253 protected static final int BATCH_MAX_SIZE = 100;
254 protected static final boolean ALWAYS_DECODE_ETH = true;
255
256 /**
257 * Updates handled by the main loop
258 */
259 protected interface IUpdate {
260 /**
261 * Calls the appropriate listeners
262 */
263 public void dispatch();
264 }
265 public enum SwitchUpdateType {
266 ADDED,
267 REMOVED,
268 PORTCHANGED
269 }
270 /**
271 * Update message indicating a switch was added or removed
272 */
273 protected class SwitchUpdate implements IUpdate {
274 public IOFSwitch sw;
275 public SwitchUpdateType switchUpdateType;
276 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
277 this.sw = sw;
278 this.switchUpdateType = switchUpdateType;
279 }
280 public void dispatch() {
281 if (log.isTraceEnabled()) {
282 log.trace("Dispatching switch update {} {}",
283 sw, switchUpdateType);
284 }
285 if (switchListeners != null) {
286 for (IOFSwitchListener listener : switchListeners) {
287 switch(switchUpdateType) {
288 case ADDED:
289 listener.addedSwitch(sw);
290 break;
291 case REMOVED:
292 listener.removedSwitch(sw);
293 break;
294 case PORTCHANGED:
295 listener.switchPortChanged(sw.getId());
296 break;
297 }
298 }
299 }
300 }
301 }
302
303 /**
304 * Update message indicating controller's role has changed
305 */
306 protected class HARoleUpdate implements IUpdate {
307 public Role oldRole;
308 public Role newRole;
309 public HARoleUpdate(Role newRole, Role oldRole) {
310 this.oldRole = oldRole;
311 this.newRole = newRole;
312 }
313 public void dispatch() {
314 // Make sure that old and new roles are different.
315 if (oldRole == newRole) {
316 if (log.isTraceEnabled()) {
317 log.trace("HA role update ignored as the old and " +
318 "new roles are the same. newRole = {}" +
319 "oldRole = {}", newRole, oldRole);
320 }
321 return;
322 }
323 if (log.isTraceEnabled()) {
324 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
325 newRole, oldRole);
326 }
327 if (haListeners != null) {
328 for (IHAListener listener : haListeners) {
329 listener.roleChanged(oldRole, newRole);
330 }
331 }
332 }
333 }
334
335 /**
336 * Update message indicating
337 * IPs of controllers in controller cluster have changed.
338 */
339 protected class HAControllerNodeIPUpdate implements IUpdate {
340 public Map<String,String> curControllerNodeIPs;
341 public Map<String,String> addedControllerNodeIPs;
342 public Map<String,String> removedControllerNodeIPs;
343 public HAControllerNodeIPUpdate(
344 HashMap<String,String> curControllerNodeIPs,
345 HashMap<String,String> addedControllerNodeIPs,
346 HashMap<String,String> removedControllerNodeIPs) {
347 this.curControllerNodeIPs = curControllerNodeIPs;
348 this.addedControllerNodeIPs = addedControllerNodeIPs;
349 this.removedControllerNodeIPs = removedControllerNodeIPs;
350 }
351 public void dispatch() {
352 if (log.isTraceEnabled()) {
353 log.trace("Dispatching HA Controller Node IP update "
354 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
355 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
356 removedControllerNodeIPs }
357 );
358 }
359 if (haListeners != null) {
360 for (IHAListener listener: haListeners) {
361 listener.controllerNodeIPsChanged(curControllerNodeIPs,
362 addedControllerNodeIPs, removedControllerNodeIPs);
363 }
364 }
365 }
366 }
367
368 // ***************
369 // Getters/Setters
370 // ***************
371
372 public void setStorageSourceService(IStorageSourceService storageSource) {
373 this.storageSource = storageSource;
374 }
375
376 public void setCounterStore(ICounterStoreService counterStore) {
377 this.counterStore = counterStore;
378 }
379
380 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
381 this.pktinProcTime = pits;
382 }
383
384 public void setRestApiService(IRestApiService restApi) {
385 this.restApi = restApi;
386 }
387
388 public void setThreadPoolService(IThreadPoolService tp) {
389 this.threadPool = tp;
390 }
391
392 @Override
393 public Role getRole() {
394 synchronized(roleChanger) {
395 return role;
396 }
397 }
398
399 @Override
400 public void setRole(Role role) {
401 if (role == null) throw new NullPointerException("Role can not be null.");
402 if (role == Role.MASTER && this.role == Role.SLAVE) {
403 // Reset db state to Inactive for all switches.
404 updateAllInactiveSwitchInfo();
405 }
406
407 // Need to synchronize to ensure a reliable ordering on role request
408 // messages send and to ensure the list of connected switches is stable
409 // RoleChanger will handle the actual sending of the message and
410 // timeout handling
411 // @see RoleChanger
412 synchronized(roleChanger) {
413 if (role.equals(this.role)) {
414 log.debug("Ignoring role change: role is already {}", role);
415 return;
416 }
417
418 Role oldRole = this.role;
419 this.role = role;
420
421 log.debug("Submitting role change request to role {}", role);
422 roleChanger.submitRequest(connectedSwitches, role);
423
424 // Enqueue an update for our listeners.
425 try {
426 this.updates.put(new HARoleUpdate(role, oldRole));
427 } catch (InterruptedException e) {
428 log.error("Failure adding update to queue", e);
429 }
430 }
431 }
432
433
434
435 // **********************
436 // ChannelUpstreamHandler
437 // **********************
438
439 /**
440 * Return a new channel handler for processing a switch connections
441 * @param state The channel state object for the connection
442 * @return the new channel handler
443 */
444 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
445 return new OFChannelHandler(state);
446 }
447
448 /**
449 * Channel handler deals with the switch connection and dispatches
450 * switch messages to the appropriate locations.
451 * @author readams
452 */
453 protected class OFChannelHandler
454 extends IdleStateAwareChannelUpstreamHandler {
455 protected OFSwitchImpl sw;
456 protected OFChannelState state;
457
458 public OFChannelHandler(OFChannelState state) {
459 this.state = state;
460 }
461
462 @Override
463 @LogMessageDoc(message="New switch connection from {ip address}",
464 explanation="A new switch has connected from the " +
465 "specified IP address")
466 public void channelConnected(ChannelHandlerContext ctx,
467 ChannelStateEvent e) throws Exception {
468 log.info("New switch connection from {}",
469 e.getChannel().getRemoteAddress());
470
471 sw = new OFSwitchImpl();
472 sw.setChannel(e.getChannel());
473 sw.setFloodlightProvider(Controller.this);
474 sw.setThreadPoolService(threadPool);
475
476 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
477 msglist.add(factory.getMessage(OFType.HELLO));
478 e.getChannel().write(msglist);
479
480 }
481
482 @Override
483 @LogMessageDoc(message="Disconnected switch {switch information}",
484 explanation="The specified switch has disconnected.")
485 public void channelDisconnected(ChannelHandlerContext ctx,
486 ChannelStateEvent e) throws Exception {
487 if (sw != null && state.hsState == HandshakeState.READY) {
488 if (activeSwitches.containsKey(sw.getId())) {
489 // It's safe to call removeSwitch even though the map might
490 // not contain this particular switch but another with the
491 // same DPID
492 removeSwitch(sw);
493 }
494 synchronized(roleChanger) {
495 connectedSwitches.remove(sw);
496 }
497 sw.setConnected(false);
498 }
499 log.info("Disconnected switch {}", sw);
500 }
501
502 @Override
503 @LogMessageDocs({
504 @LogMessageDoc(level="ERROR",
505 message="Disconnecting switch {switch} due to read timeout",
506 explanation="The connected switch has failed to send any " +
507 "messages or respond to echo requests",
508 recommendation=LogMessageDoc.CHECK_SWITCH),
509 @LogMessageDoc(level="ERROR",
510 message="Disconnecting switch {switch}: failed to " +
511 "complete handshake",
512 explanation="The switch did not respond correctly " +
513 "to handshake messages",
514 recommendation=LogMessageDoc.CHECK_SWITCH),
515 @LogMessageDoc(level="ERROR",
516 message="Disconnecting switch {switch} due to IO Error: {}",
517 explanation="There was an error communicating with the switch",
518 recommendation=LogMessageDoc.CHECK_SWITCH),
519 @LogMessageDoc(level="ERROR",
520 message="Disconnecting switch {switch} due to switch " +
521 "state error: {error}",
522 explanation="The switch sent an unexpected message",
523 recommendation=LogMessageDoc.CHECK_SWITCH),
524 @LogMessageDoc(level="ERROR",
525 message="Disconnecting switch {switch} due to " +
526 "message parse failure",
527 explanation="Could not parse a message from the switch",
528 recommendation=LogMessageDoc.CHECK_SWITCH),
529 @LogMessageDoc(level="ERROR",
530 message="Terminating controller due to storage exception",
531 explanation=ERROR_DATABASE,
532 recommendation=LogMessageDoc.CHECK_CONTROLLER),
533 @LogMessageDoc(level="ERROR",
534 message="Could not process message: queue full",
535 explanation="OpenFlow messages are arriving faster than " +
536 " the controller can process them.",
537 recommendation=LogMessageDoc.CHECK_CONTROLLER),
538 @LogMessageDoc(level="ERROR",
539 message="Error while processing message " +
540 "from switch {switch} {cause}",
541 explanation="An error occurred processing the switch message",
542 recommendation=LogMessageDoc.GENERIC_ACTION)
543 })
544 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
545 throws Exception {
546 if (e.getCause() instanceof ReadTimeoutException) {
547 // switch timeout
548 log.error("Disconnecting switch {} due to read timeout", sw);
549 ctx.getChannel().close();
550 } else if (e.getCause() instanceof HandshakeTimeoutException) {
551 log.error("Disconnecting switch {}: failed to complete handshake",
552 sw);
553 ctx.getChannel().close();
554 } else if (e.getCause() instanceof ClosedChannelException) {
555 //log.warn("Channel for sw {} already closed", sw);
556 } else if (e.getCause() instanceof IOException) {
557 log.error("Disconnecting switch {} due to IO Error: {}",
558 sw, e.getCause().getMessage());
559 ctx.getChannel().close();
560 } else if (e.getCause() instanceof SwitchStateException) {
561 log.error("Disconnecting switch {} due to switch state error: {}",
562 sw, e.getCause().getMessage());
563 ctx.getChannel().close();
564 } else if (e.getCause() instanceof MessageParseException) {
565 log.error("Disconnecting switch " + sw +
566 " due to message parse failure",
567 e.getCause());
568 ctx.getChannel().close();
569 } else if (e.getCause() instanceof StorageException) {
570 log.error("Terminating controller due to storage exception",
571 e.getCause());
572 terminate();
573 } else if (e.getCause() instanceof RejectedExecutionException) {
574 log.warn("Could not process message: queue full");
575 } else {
576 log.error("Error while processing message from switch " + sw,
577 e.getCause());
578 ctx.getChannel().close();
579 }
580 }
581
582 @Override
583 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
584 throws Exception {
585 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
586 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
587 e.getChannel().write(msglist);
588 }
589
590 @Override
591 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
592 throws Exception {
593 if (e.getMessage() instanceof List) {
594 @SuppressWarnings("unchecked")
595 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
596
597 for (OFMessage ofm : msglist) {
598 try {
599 processOFMessage(ofm);
600 }
601 catch (Exception ex) {
602 // We are the last handler in the stream, so run the
603 // exception through the channel again by passing in
604 // ctx.getChannel().
605 Channels.fireExceptionCaught(ctx.getChannel(), ex);
606 }
607 }
608
609 // Flush all flow-mods/packet-out generated from this "train"
610 OFSwitchImpl.flush_all();
611 }
612 }
613
614 /**
615 * Process the request for the switch description
616 */
617 @LogMessageDoc(level="ERROR",
618 message="Exception in reading description " +
619 " during handshake {exception}",
620 explanation="Could not process the switch description string",
621 recommendation=LogMessageDoc.CHECK_SWITCH)
622 void processSwitchDescReply() {
623 try {
624 // Read description, if it has been updated
625 @SuppressWarnings("unchecked")
626 Future<List<OFStatistics>> desc_future =
627 (Future<List<OFStatistics>>)sw.
628 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
629 List<OFStatistics> values =
630 desc_future.get(0, TimeUnit.MILLISECONDS);
631 if (values != null) {
632 OFDescriptionStatistics description =
633 new OFDescriptionStatistics();
634 ChannelBuffer data =
635 ChannelBuffers.buffer(description.getLength());
636 for (OFStatistics f : values) {
637 f.writeTo(data);
638 description.readFrom(data);
639 break; // SHOULD be a list of length 1
640 }
641 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
642 description);
643 sw.setSwitchProperties(description);
644 data = null;
645
646 // At this time, also set other switch properties from storage
647 boolean is_core_switch = false;
648 IResultSet resultSet = null;
649 try {
650 String swid = sw.getStringId();
651 resultSet =
652 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
653 for (Iterator<IResultSet> it =
654 resultSet.iterator(); it.hasNext();) {
655 // In case of multiple rows, use the status
656 // in last row?
657 Map<String, Object> row = it.next().getRow();
658 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
659 if (log.isDebugEnabled()) {
660 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
661 "config for switch={}, is-core={}",
662 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
663 }
664 String ics =
665 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
666 is_core_switch = ics.equals("true");
667 }
668 }
669 }
670 finally {
671 if (resultSet != null)
672 resultSet.close();
673 }
674 if (is_core_switch) {
675 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
676 new Boolean(true));
677 }
678 }
679 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
680 state.hasDescription = true;
681 checkSwitchReady();
682 }
683 catch (InterruptedException ex) {
684 // Ignore
685 }
686 catch (TimeoutException ex) {
687 // Ignore
688 } catch (Exception ex) {
689 log.error("Exception in reading description " +
690 " during handshake", ex);
691 }
692 }
693
694 /**
695 * Send initial switch setup information that we need before adding
696 * the switch
697 * @throws IOException
698 */
699 void sendHelloConfiguration() throws IOException {
700 // Send initial Features Request
701 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
702 }
703
704 /**
705 * Send the configuration requests we can only do after we have
706 * the features reply
707 * @throws IOException
708 */
709 void sendFeatureReplyConfiguration() throws IOException {
710 // Ensure we receive the full packet via PacketIn
711 OFSetConfig config = (OFSetConfig) factory
712 .getMessage(OFType.SET_CONFIG);
713 config.setMissSendLength((short) 0xffff)
714 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
715 sw.write(config, null);
716 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
717 null);
718
719 // Get Description to set switch-specific flags
720 OFStatisticsRequest req = new OFStatisticsRequest();
721 req.setStatisticType(OFStatisticsType.DESC);
722 req.setLengthU(req.getLengthU());
723 Future<List<OFStatistics>> dfuture =
724 sw.getStatistics(req);
725 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
726 dfuture);
727
728 }
729
730 protected void checkSwitchReady() {
731 if (state.hsState == HandshakeState.FEATURES_REPLY &&
732 state.hasDescription && state.hasGetConfigReply) {
733
734 state.hsState = HandshakeState.READY;
735
736 synchronized(roleChanger) {
737 // We need to keep track of all of the switches that are connected
738 // to the controller, in any role, so that we can later send the
739 // role request messages when the controller role changes.
740 // We need to be synchronized while doing this: we must not
741 // send a another role request to the connectedSwitches until
742 // we were able to add this new switch to connectedSwitches
743 // *and* send the current role to the new switch.
744 connectedSwitches.add(sw);
745
746 if (role != null) {
747 // Send a role request if role support is enabled for the controller
748 // This is a probe that we'll use to determine if the switch
749 // actually supports the role request message. If it does we'll
750 // get back a role reply message. If it doesn't we'll get back an
751 // OFError message.
752 // If role is MASTER we will promote switch to active
753 // list when we receive the switch's role reply messages
754 log.debug("This controller's role is {}, " +
755 "sending initial role request msg to {}",
756 role, sw);
757 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
758 swList.add(sw);
759 roleChanger.submitRequest(swList, role);
760 }
761 else {
762 // Role supported not enabled on controller (for now)
763 // automatically promote switch to active state.
764 log.debug("This controller's role is null, " +
765 "not sending role request msg to {}",
766 role, sw);
767 // Need to clear FlowMods before we add the switch
768 // and dispatch updates otherwise we have a race condition.
769 sw.clearAllFlowMods();
770 addSwitch(sw);
771 state.firstRoleReplyReceived = true;
772 }
773 }
774 }
775 }
776
777 /* Handle a role reply message we received from the switch. Since
778 * netty serializes message dispatch we don't need to synchronize
779 * against other receive operations from the same switch, so no need
780 * to synchronize addSwitch(), removeSwitch() operations from the same
781 * connection.
782 * FIXME: However, when a switch with the same DPID connects we do
783 * need some synchronization. However, handling switches with same
784 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
785 * removedSwitch notification):1
786 *
787 */
788 @LogMessageDoc(level="ERROR",
789 message="Invalid role value in role reply message",
790 explanation="Was unable to set the HA role (master or slave) " +
791 "for the controller.",
792 recommendation=LogMessageDoc.CHECK_CONTROLLER)
793 protected void handleRoleReplyMessage(OFVendor vendorMessage,
794 OFRoleReplyVendorData roleReplyVendorData) {
795 // Map from the role code in the message to our role enum
796 int nxRole = roleReplyVendorData.getRole();
797 Role role = null;
798 switch (nxRole) {
799 case OFRoleVendorData.NX_ROLE_OTHER:
800 role = Role.EQUAL;
801 break;
802 case OFRoleVendorData.NX_ROLE_MASTER:
803 role = Role.MASTER;
804 break;
805 case OFRoleVendorData.NX_ROLE_SLAVE:
806 role = Role.SLAVE;
807 break;
808 default:
809 log.error("Invalid role value in role reply message");
810 sw.getChannel().close();
811 return;
812 }
813
814 log.debug("Handling role reply for role {} from {}. " +
815 "Controller's role is {} ",
816 new Object[] { role, sw, Controller.this.role}
817 );
818
819 sw.deliverRoleReply(vendorMessage.getXid(), role);
820
821 boolean isActive = activeSwitches.containsKey(sw.getId());
822 if (!isActive && sw.isActive()) {
823 // Transition from SLAVE to MASTER.
824
825 if (!state.firstRoleReplyReceived ||
826 getAlwaysClearFlowsOnSwAdd()) {
827 // This is the first role-reply message we receive from
828 // this switch or roles were disabled when the switch
829 // connected:
830 // Delete all pre-existing flows for new connections to
831 // the master
832 //
833 // FIXME: Need to think more about what the test should
834 // be for when we flush the flow-table? For example,
835 // if all the controllers are temporarily in the backup
836 // role (e.g. right after a failure of the master
837 // controller) at the point the switch connects, then
838 // all of the controllers will initially connect as
839 // backup controllers and not flush the flow-table.
840 // Then when one of them is promoted to master following
841 // the master controller election the flow-table
842 // will still not be flushed because that's treated as
843 // a failover event where we don't want to flush the
844 // flow-table. The end result would be that the flow
845 // table for a newly connected switch is never
846 // flushed. Not sure how to handle that case though...
847 sw.clearAllFlowMods();
848 log.debug("First role reply from master switch {}, " +
849 "clear FlowTable to active switch list",
850 HexString.toHexString(sw.getId()));
851 }
852
853 // Some switches don't seem to update us with port
854 // status messages while in slave role.
855 readSwitchPortStateFromStorage(sw);
856
857 // Only add the switch to the active switch list if
858 // we're not in the slave role. Note that if the role
859 // attribute is null, then that means that the switch
860 // doesn't support the role request messages, so in that
861 // case we're effectively in the EQUAL role and the
862 // switch should be included in the active switch list.
863 addSwitch(sw);
864 log.debug("Added master switch {} to active switch list",
865 HexString.toHexString(sw.getId()));
866
867 }
868 else if (isActive && !sw.isActive()) {
869 // Transition from MASTER to SLAVE: remove switch
870 // from active switch list.
871 log.debug("Removed slave switch {} from active switch" +
872 " list", HexString.toHexString(sw.getId()));
873 removeSwitch(sw);
874 }
875
876 // Indicate that we have received a role reply message.
877 state.firstRoleReplyReceived = true;
878 }
879
880 protected boolean handleVendorMessage(OFVendor vendorMessage) {
881 boolean shouldHandleMessage = false;
882 int vendor = vendorMessage.getVendor();
883 switch (vendor) {
884 case OFNiciraVendorData.NX_VENDOR_ID:
885 OFNiciraVendorData niciraVendorData =
886 (OFNiciraVendorData)vendorMessage.getVendorData();
887 int dataType = niciraVendorData.getDataType();
888 switch (dataType) {
889 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
890 OFRoleReplyVendorData roleReplyVendorData =
891 (OFRoleReplyVendorData) niciraVendorData;
892 handleRoleReplyMessage(vendorMessage,
893 roleReplyVendorData);
894 break;
895 default:
896 log.warn("Unhandled Nicira VENDOR message; " +
897 "data type = {}", dataType);
898 break;
899 }
900 break;
901 default:
902 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
903 break;
904 }
905
906 return shouldHandleMessage;
907 }
908
909 /**
910 * Dispatch an Openflow message from a switch to the appropriate
911 * handler.
912 * @param m The message to process
913 * @throws IOException
914 * @throws SwitchStateException
915 */
916 @LogMessageDocs({
917 @LogMessageDoc(level="WARN",
918 message="Config Reply from {switch} has " +
919 "miss length set to {length}",
920 explanation="The controller requires that the switch " +
921 "use a miss length of 0xffff for correct " +
922 "function",
923 recommendation="Use a different switch to ensure " +
924 "correct function"),
925 @LogMessageDoc(level="WARN",
926 message="Received ERROR from sw {switch} that "
927 +"indicates roles are not supported "
928 +"but we have received a valid "
929 +"role reply earlier",
930 explanation="The switch sent a confusing message to the" +
931 "controller")
932 })
933 protected void processOFMessage(OFMessage m)
934 throws IOException, SwitchStateException {
935 boolean shouldHandleMessage = false;
936
937 switch (m.getType()) {
938 case HELLO:
939 if (log.isTraceEnabled())
940 log.trace("HELLO from {}", sw);
941
942 if (state.hsState.equals(HandshakeState.START)) {
943 state.hsState = HandshakeState.HELLO;
944 sendHelloConfiguration();
945 } else {
946 throw new SwitchStateException("Unexpected HELLO from "
947 + sw);
948 }
949 break;
950 case ECHO_REQUEST:
951 OFEchoReply reply =
952 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
953 reply.setXid(m.getXid());
954 sw.write(reply, null);
955 break;
956 case ECHO_REPLY:
957 break;
958 case FEATURES_REPLY:
959 if (log.isTraceEnabled())
960 log.trace("Features Reply from {}", sw);
961
962 sw.setFeaturesReply((OFFeaturesReply) m);
963 if (state.hsState.equals(HandshakeState.HELLO)) {
964 sendFeatureReplyConfiguration();
965 state.hsState = HandshakeState.FEATURES_REPLY;
966 // uncomment to enable "dumb" switches like cbench
967 // state.hsState = HandshakeState.READY;
968 // addSwitch(sw);
969 } else {
970 // return results to rest api caller
971 sw.deliverOFFeaturesReply(m);
972 // update database */
973 updateActiveSwitchInfo(sw);
974 }
975 break;
976 case GET_CONFIG_REPLY:
977 if (log.isTraceEnabled())
978 log.trace("Get config reply from {}", sw);
979
980 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
981 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
982 throw new SwitchStateException(em);
983 }
984 OFGetConfigReply cr = (OFGetConfigReply) m;
985 if (cr.getMissSendLength() == (short)0xffff) {
986 log.trace("Config Reply from {} confirms " +
987 "miss length set to 0xffff", sw);
988 } else {
989 log.warn("Config Reply from {} has " +
990 "miss length set to {}",
991 sw, cr.getMissSendLength() & 0xffff);
992 }
993 state.hasGetConfigReply = true;
994 checkSwitchReady();
995 break;
996 case VENDOR:
997 shouldHandleMessage = handleVendorMessage((OFVendor)m);
998 break;
999 case ERROR:
1000 // TODO: we need better error handling. Especially for
1001 // request/reply style message (stats, roles) we should have
1002 // a unified way to lookup the xid in the error message.
1003 // This will probable involve rewriting the way we handle
1004 // request/reply style messages.
1005 OFError error = (OFError) m;
1006 boolean shouldLogError = true;
1007 // TODO: should we check that firstRoleReplyReceived is false,
1008 // i.e., check only whether the first request fails?
1009 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1010 boolean isBadVendorError =
1011 (error.getErrorType() == OFError.OFErrorType.
1012 OFPET_BAD_REQUEST.getValue());
1013 // We expect to receive a bad vendor error when
1014 // we're connected to a switch that doesn't support
1015 // the Nicira vendor extensions (i.e. not OVS or
1016 // derived from OVS). By protocol, it should also be
1017 // BAD_VENDOR, but too many switch implementations
1018 // get it wrong and we can already check the xid()
1019 // so we can ignore the type with confidence that this
1020 // is not a spurious error
1021 shouldLogError = !isBadVendorError;
1022 if (isBadVendorError) {
1023 if (state.firstRoleReplyReceived && (role != null)) {
1024 log.warn("Received ERROR from sw {} that "
1025 +"indicates roles are not supported "
1026 +"but we have received a valid "
1027 +"role reply earlier", sw);
1028 }
1029 state.firstRoleReplyReceived = true;
1030 sw.deliverRoleRequestNotSupported(error.getXid());
1031 synchronized(roleChanger) {
1032 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1033 // the switch doesn't understand role request
1034 // messages and the current controller role is
1035 // slave. We need to disconnect the switch.
1036 // @see RoleChanger for rationale
1037 sw.getChannel().close();
1038 }
1039 else if (sw.role == null) {
1040 // Controller's role is master: add to
1041 // active
1042 // TODO: check if clearing flow table is
1043 // right choice here.
1044 // Need to clear FlowMods before we add the switch
1045 // and dispatch updates otherwise we have a race condition.
1046 // TODO: switch update is async. Won't we still have a potential
1047 // race condition?
1048 sw.clearAllFlowMods();
1049 addSwitch(sw);
1050 }
1051 }
1052 }
1053 else {
1054 // TODO: Is this the right thing to do if we receive
1055 // some other error besides a bad vendor error?
1056 // Presumably that means the switch did actually
1057 // understand the role request message, but there
1058 // was some other error from processing the message.
1059 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1060 // error code, but it doesn't look like the Nicira
1061 // role request has that. Should check OVS source
1062 // code to see if it's possible for any other errors
1063 // to be returned.
1064 // If we received an error the switch is not
1065 // in the correct role, so we need to disconnect it.
1066 // We could also resend the request but then we need to
1067 // check if there are other pending request in which
1068 // case we shouldn't resend. If we do resend we need
1069 // to make sure that the switch eventually accepts one
1070 // of our requests or disconnect the switch. This feels
1071 // cumbersome.
1072 sw.getChannel().close();
1073 }
1074 }
1075 // Once we support OF 1.2, we'd add code to handle it here.
1076 //if (error.getXid() == state.ofRoleRequestXid) {
1077 //}
1078 if (shouldLogError)
1079 logError(sw, error);
1080 break;
1081 case STATS_REPLY:
1082 if (state.hsState.ordinal() <
1083 HandshakeState.FEATURES_REPLY.ordinal()) {
1084 String em = "Unexpected STATS_REPLY from " + sw;
1085 throw new SwitchStateException(em);
1086 }
1087 sw.deliverStatisticsReply(m);
1088 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1089 processSwitchDescReply();
1090 }
1091 break;
1092 case PORT_STATUS:
1093 // We want to update our port state info even if we're in
1094 // the slave role, but we only want to update storage if
1095 // we're the master (or equal).
1096 boolean updateStorage = state.hsState.
1097 equals(HandshakeState.READY) &&
1098 (sw.getRole() != Role.SLAVE);
1099 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1100 shouldHandleMessage = true;
1101 break;
1102
1103 default:
1104 shouldHandleMessage = true;
1105 break;
1106 }
1107
1108 if (shouldHandleMessage) {
1109 sw.getListenerReadLock().lock();
1110 try {
1111 if (sw.isConnected()) {
1112 if (!state.hsState.equals(HandshakeState.READY)) {
1113 log.debug("Ignoring message type {} received " +
1114 "from switch {} before switch is " +
1115 "fully configured.", m.getType(), sw);
1116 }
1117 // Check if the controller is in the slave role for the
1118 // switch. If it is, then don't dispatch the message to
1119 // the listeners.
1120 // TODO: Should we dispatch messages that we expect to
1121 // receive when we're in the slave role, e.g. port
1122 // status messages? Since we're "hiding" switches from
1123 // the listeners when we're in the slave role, then it
1124 // seems a little weird to dispatch port status messages
1125 // to them. On the other hand there might be special
1126 // modules that care about all of the connected switches
1127 // and would like to receive port status notifications.
1128 else if (sw.getRole() == Role.SLAVE) {
1129 // Don't log message if it's a port status message
1130 // since we expect to receive those from the switch
1131 // and don't want to emit spurious messages.
1132 if (m.getType() != OFType.PORT_STATUS) {
1133 log.debug("Ignoring message type {} received " +
1134 "from switch {} while in the slave role.",
1135 m.getType(), sw);
1136 }
1137 } else {
1138 handleMessage(sw, m, null);
1139 }
1140 }
1141 }
1142 finally {
1143 sw.getListenerReadLock().unlock();
1144 }
1145 }
1146 }
1147 }
1148
1149 // ****************
1150 // Message handlers
1151 // ****************
1152
1153 protected void handlePortStatusMessage(IOFSwitch sw,
1154 OFPortStatus m,
1155 boolean updateStorage) {
1156 short portNumber = m.getDesc().getPortNumber();
1157 OFPhysicalPort port = m.getDesc();
1158 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001159 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1160 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001161 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001162 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001163 swStore.addPort(sw.getStringId(), port);
1164 } else {
1165 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1166 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001167 if (updateStorage)
1168 updatePortInfo(sw, port);
1169 log.debug("Port #{} modified for {}", portNumber, sw);
1170 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1171 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001172 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001173 if (updateStorage)
1174 updatePortInfo(sw, port);
1175 log.debug("Port #{} added for {}", portNumber, sw);
1176 } else if (m.getReason() ==
1177 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1178 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001179 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001180 if (updateStorage)
1181 removePortInfo(sw, portNumber);
1182 log.debug("Port #{} deleted for {}", portNumber, sw);
1183 }
1184 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1185 try {
1186 this.updates.put(update);
1187 } catch (InterruptedException e) {
1188 log.error("Failure adding update to queue", e);
1189 }
1190 }
1191
1192 /**
1193 * flcontext_cache - Keep a thread local stack of contexts
1194 */
1195 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1196 new ThreadLocal <Stack<FloodlightContext>> () {
1197 @Override
1198 protected Stack<FloodlightContext> initialValue() {
1199 return new Stack<FloodlightContext>();
1200 }
1201 };
1202
1203 /**
1204 * flcontext_alloc - pop a context off the stack, if required create a new one
1205 * @return FloodlightContext
1206 */
1207 protected static FloodlightContext flcontext_alloc() {
1208 FloodlightContext flcontext = null;
1209
1210 if (flcontext_cache.get().empty()) {
1211 flcontext = new FloodlightContext();
1212 }
1213 else {
1214 flcontext = flcontext_cache.get().pop();
1215 }
1216
1217 return flcontext;
1218 }
1219
1220 /**
1221 * flcontext_free - Free the context to the current thread
1222 * @param flcontext
1223 */
1224 protected void flcontext_free(FloodlightContext flcontext) {
1225 flcontext.getStorage().clear();
1226 flcontext_cache.get().push(flcontext);
1227 }
1228
1229 /**
1230 * Handle replies to certain OFMessages, and pass others off to listeners
1231 * @param sw The switch for the message
1232 * @param m The message
1233 * @param bContext The floodlight context. If null then floodlight context would
1234 * be allocated in this function
1235 * @throws IOException
1236 */
1237 @LogMessageDocs({
1238 @LogMessageDoc(level="ERROR",
1239 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1240 " field is empty.",
1241 explanation="The switch sent an improperly-formatted PacketIn" +
1242 " message",
1243 recommendation=LogMessageDoc.CHECK_SWITCH),
1244 @LogMessageDoc(level="WARN",
1245 message="Unhandled OF Message: {} from {}",
1246 explanation="The switch sent a message not handled by " +
1247 "the controller")
1248 })
1249 protected void handleMessage(IOFSwitch sw, OFMessage m,
1250 FloodlightContext bContext)
1251 throws IOException {
1252 Ethernet eth = null;
1253
1254 switch (m.getType()) {
1255 case PACKET_IN:
1256 OFPacketIn pi = (OFPacketIn)m;
1257
1258 if (pi.getPacketData().length <= 0) {
1259 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1260 ") because the data field is empty.");
1261 return;
1262 }
1263
1264 if (Controller.ALWAYS_DECODE_ETH) {
1265 eth = new Ethernet();
1266 eth.deserialize(pi.getPacketData(), 0,
1267 pi.getPacketData().length);
1268 counterStore.updatePacketInCounters(sw, m, eth);
1269 }
1270 // fall through to default case...
1271
1272 default:
1273
1274 List<IOFMessageListener> listeners = null;
1275 if (messageListeners.containsKey(m.getType())) {
1276 listeners = messageListeners.get(m.getType()).
1277 getOrderedListeners();
1278 }
1279
1280 FloodlightContext bc = null;
1281 if (listeners != null) {
1282 // Check if floodlight context is passed from the calling
1283 // function, if so use that floodlight context, otherwise
1284 // allocate one
1285 if (bContext == null) {
1286 bc = flcontext_alloc();
1287 } else {
1288 bc = bContext;
1289 }
1290 if (eth != null) {
1291 IFloodlightProviderService.bcStore.put(bc,
1292 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1293 eth);
1294 }
1295
1296 // Get the starting time (overall and per-component) of
1297 // the processing chain for this packet if performance
1298 // monitoring is turned on
1299 pktinProcTime.bootstrap(listeners);
1300 pktinProcTime.recordStartTimePktIn();
1301 Command cmd;
1302 for (IOFMessageListener listener : listeners) {
1303 if (listener instanceof IOFSwitchFilter) {
1304 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1305 continue;
1306 }
1307 }
1308
1309 pktinProcTime.recordStartTimeComp(listener);
1310 cmd = listener.receive(sw, m, bc);
1311 pktinProcTime.recordEndTimeComp(listener);
1312
1313 if (Command.STOP.equals(cmd)) {
1314 break;
1315 }
1316 }
1317 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1318 } else {
1319 log.warn("Unhandled OF Message: {} from {}", m, sw);
1320 }
1321
1322 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1323 }
1324 }
1325
1326 /**
1327 * Log an OpenFlow error message from a switch
1328 * @param sw The switch that sent the error
1329 * @param error The error message
1330 */
1331 @LogMessageDoc(level="ERROR",
1332 message="Error {error type} {error code} from {switch}",
1333 explanation="The switch responded with an unexpected error" +
1334 "to an OpenFlow message from the controller",
1335 recommendation="This could indicate improper network operation. " +
1336 "If the problem persists restarting the switch and " +
1337 "controller may help."
1338 )
1339 protected void logError(IOFSwitch sw, OFError error) {
1340 int etint = 0xffff & error.getErrorType();
1341 if (etint < 0 || etint >= OFErrorType.values().length) {
1342 log.error("Unknown error code {} from sw {}", etint, sw);
1343 }
1344 OFErrorType et = OFErrorType.values()[etint];
1345 switch (et) {
1346 case OFPET_HELLO_FAILED:
1347 OFHelloFailedCode hfc =
1348 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1349 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1350 break;
1351 case OFPET_BAD_REQUEST:
1352 OFBadRequestCode brc =
1353 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1354 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1355 break;
1356 case OFPET_BAD_ACTION:
1357 OFBadActionCode bac =
1358 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1359 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1360 break;
1361 case OFPET_FLOW_MOD_FAILED:
1362 OFFlowModFailedCode fmfc =
1363 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1364 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1365 break;
1366 case OFPET_PORT_MOD_FAILED:
1367 OFPortModFailedCode pmfc =
1368 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1369 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1370 break;
1371 case OFPET_QUEUE_OP_FAILED:
1372 OFQueueOpFailedCode qofc =
1373 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1374 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1375 break;
1376 default:
1377 break;
1378 }
1379 }
1380
1381 /**
1382 * Add a switch to the active switch list and call the switch listeners.
1383 * This happens either when a switch first connects (and the controller is
1384 * not in the slave role) or when the role of the controller changes from
1385 * slave to master.
1386 * @param sw the switch that has been added
1387 */
1388 // TODO: need to rethink locking and the synchronous switch update.
1389 // We can / should also handle duplicate DPIDs in connectedSwitches
1390 @LogMessageDoc(level="ERROR",
1391 message="New switch added {switch} for already-added switch {switch}",
1392 explanation="A switch with the same DPID as another switch " +
1393 "connected to the controller. This can be caused by " +
1394 "multiple switches configured with the same DPID, or " +
1395 "by a switch reconnected very quickly after " +
1396 "disconnecting.",
1397 recommendation="If this happens repeatedly, it is likely there " +
1398 "are switches with duplicate DPIDs on the network. " +
1399 "Reconfigure the appropriate switches. If it happens " +
1400 "very rarely, then it is likely this is a transient " +
1401 "network problem that can be ignored."
1402 )
1403 protected void addSwitch(IOFSwitch sw) {
1404 // TODO: is it safe to modify the HashMap without holding
1405 // the old switch's lock?
1406 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1407 if (sw == oldSw) {
1408 // Note == for object equality, not .equals for value
1409 log.info("New add switch for pre-existing switch {}", sw);
1410 return;
1411 }
1412
1413 if (oldSw != null) {
1414 oldSw.getListenerWriteLock().lock();
1415 try {
1416 log.error("New switch added {} for already-added switch {}",
1417 sw, oldSw);
1418 // Set the connected flag to false to suppress calling
1419 // the listeners for this switch in processOFMessage
1420 oldSw.setConnected(false);
1421
1422 oldSw.cancelAllStatisticsReplies();
1423
1424 updateInactiveSwitchInfo(oldSw);
1425
1426 // we need to clean out old switch state definitively
1427 // before adding the new switch
1428 // FIXME: It seems not completely kosher to call the
1429 // switch listeners here. I thought one of the points of
1430 // having the asynchronous switch update mechanism was so
1431 // the addedSwitch and removedSwitch were always called
1432 // from a single thread to simplify concurrency issues
1433 // for the listener.
1434 if (switchListeners != null) {
1435 for (IOFSwitchListener listener : switchListeners) {
1436 listener.removedSwitch(oldSw);
1437 }
1438 }
1439 // will eventually trigger a removeSwitch(), which will cause
1440 // a "Not removing Switch ... already removed debug message.
1441 // TODO: Figure out a way to handle this that avoids the
1442 // spurious debug message.
1443 oldSw.getChannel().close();
1444 }
1445 finally {
1446 oldSw.getListenerWriteLock().unlock();
1447 }
1448 }
1449
1450 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001451 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001452 for (OFPhysicalPort port: sw.getPorts()) {
1453 swStore.addPort(sw.getStringId(), port);
1454 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001455 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1456 try {
1457 this.updates.put(update);
1458 } catch (InterruptedException e) {
1459 log.error("Failure adding update to queue", e);
1460 }
1461 }
1462
1463 /**
1464 * Remove a switch from the active switch list and call the switch listeners.
1465 * This happens either when the switch is disconnected or when the
1466 * controller's role for the switch changes from master to slave.
1467 * @param sw the switch that has been removed
1468 */
1469 protected void removeSwitch(IOFSwitch sw) {
1470 // No need to acquire the listener lock, since
1471 // this method is only called after netty has processed all
1472 // pending messages
1473 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001474 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001475 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1476 log.debug("Not removing switch {}; already removed", sw);
1477 return;
1478 }
1479 // We cancel all outstanding statistics replies if the switch transition
1480 // from active. In the future we might allow statistics requests
1481 // from slave controllers. Then we need to move this cancelation
1482 // to switch disconnect
1483 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001484
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001485
1486 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1487 // here if role support is enabled. In that case if the switch is being
1488 // removed because we've been switched to being in the slave role, then I think
1489 // it's possible that the new master may have already been promoted to master
1490 // and written out the active switch state to storage. If we now execute
1491 // updateInactiveSwitchInfo we may wipe out all of the state that was
1492 // written out by the new master. Maybe need to revisit how we handle all
1493 // of the switch state that's written to storage.
1494
1495 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001496
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001497 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1498 try {
1499 this.updates.put(update);
1500 } catch (InterruptedException e) {
1501 log.error("Failure adding update to queue", e);
1502 }
1503 }
1504
1505 // ***************
1506 // IFloodlightProvider
1507 // ***************
1508
1509 @Override
1510 public synchronized void addOFMessageListener(OFType type,
1511 IOFMessageListener listener) {
1512 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1513 messageListeners.get(type);
1514 if (ldd == null) {
1515 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1516 messageListeners.put(type, ldd);
1517 }
1518 ldd.addListener(type, listener);
1519 }
1520
1521 @Override
1522 public synchronized void removeOFMessageListener(OFType type,
1523 IOFMessageListener listener) {
1524 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1525 messageListeners.get(type);
1526 if (ldd != null) {
1527 ldd.removeListener(listener);
1528 }
1529 }
1530
1531 private void logListeners() {
1532 for (Map.Entry<OFType,
1533 ListenerDispatcher<OFType,
1534 IOFMessageListener>> entry
1535 : messageListeners.entrySet()) {
1536
1537 OFType type = entry.getKey();
1538 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1539 entry.getValue();
1540
1541 StringBuffer sb = new StringBuffer();
1542 sb.append("OFListeners for ");
1543 sb.append(type);
1544 sb.append(": ");
1545 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1546 sb.append(l.getName());
1547 sb.append(",");
1548 }
1549 log.debug(sb.toString());
1550 }
1551 }
1552
1553 public void removeOFMessageListeners(OFType type) {
1554 messageListeners.remove(type);
1555 }
1556
1557 @Override
1558 public Map<Long, IOFSwitch> getSwitches() {
1559 return Collections.unmodifiableMap(this.activeSwitches);
1560 }
1561
1562 @Override
1563 public void addOFSwitchListener(IOFSwitchListener listener) {
1564 this.switchListeners.add(listener);
1565 }
1566
1567 @Override
1568 public void removeOFSwitchListener(IOFSwitchListener listener) {
1569 this.switchListeners.remove(listener);
1570 }
1571
1572 @Override
1573 public Map<OFType, List<IOFMessageListener>> getListeners() {
1574 Map<OFType, List<IOFMessageListener>> lers =
1575 new HashMap<OFType, List<IOFMessageListener>>();
1576 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1577 messageListeners.entrySet()) {
1578 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1579 }
1580 return Collections.unmodifiableMap(lers);
1581 }
1582
1583 @Override
1584 @LogMessageDocs({
1585 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1586 "a null switch",
1587 explanation="Failed to process a message because the switch " +
1588 " is no longer connected."),
1589 @LogMessageDoc(level="ERROR",
1590 message="Error reinjecting OFMessage on switch {switch}",
1591 explanation="An I/O error occured while attempting to " +
1592 "process an OpenFlow message",
1593 recommendation=LogMessageDoc.CHECK_SWITCH)
1594 })
1595 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1596 FloodlightContext bc) {
1597 if (sw == null) {
1598 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1599 return false;
1600 }
1601
1602 // FIXME: Do we need to be able to inject messages to switches
1603 // where we're the slave controller (i.e. they're connected but
1604 // not active)?
1605 // FIXME: Don't we need synchronization logic here so we're holding
1606 // the listener read lock when we call handleMessage? After some
1607 // discussions it sounds like the right thing to do here would be to
1608 // inject the message as a netty upstream channel event so it goes
1609 // through the normal netty event processing, including being
1610 // handled
1611 if (!activeSwitches.containsKey(sw.getId())) return false;
1612
1613 try {
1614 // Pass Floodlight context to the handleMessages()
1615 handleMessage(sw, msg, bc);
1616 } catch (IOException e) {
1617 log.error("Error reinjecting OFMessage on switch {}",
1618 HexString.toHexString(sw.getId()));
1619 return false;
1620 }
1621 return true;
1622 }
1623
1624 @Override
1625 @LogMessageDoc(message="Calling System.exit",
1626 explanation="The controller is terminating")
1627 public synchronized void terminate() {
1628 log.info("Calling System.exit");
1629 System.exit(1);
1630 }
1631
1632 @Override
1633 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1634 // call the overloaded version with floodlight context set to null
1635 return injectOfMessage(sw, msg, null);
1636 }
1637
1638 @Override
1639 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1640 FloodlightContext bc) {
1641 if (log.isTraceEnabled()) {
1642 String str = OFMessage.getDataAsString(sw, m, bc);
1643 log.trace("{}", str);
1644 }
1645
1646 List<IOFMessageListener> listeners = null;
1647 if (messageListeners.containsKey(m.getType())) {
1648 listeners =
1649 messageListeners.get(m.getType()).getOrderedListeners();
1650 }
1651
1652 if (listeners != null) {
1653 for (IOFMessageListener listener : listeners) {
1654 if (listener instanceof IOFSwitchFilter) {
1655 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1656 continue;
1657 }
1658 }
1659 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1660 break;
1661 }
1662 }
1663 }
1664 }
1665
1666 @Override
1667 public BasicFactory getOFMessageFactory() {
1668 return factory;
1669 }
1670
1671 @Override
1672 public String getControllerId() {
1673 return controllerId;
1674 }
1675
1676 // **************
1677 // Initialization
1678 // **************
1679
1680 protected void updateAllInactiveSwitchInfo() {
1681 if (role == Role.SLAVE) {
1682 return;
1683 }
1684 String controllerId = getControllerId();
1685 String[] switchColumns = { SWITCH_DATAPATH_ID,
1686 SWITCH_CONTROLLER_ID,
1687 SWITCH_ACTIVE };
1688 String[] portColumns = { PORT_ID, PORT_SWITCH };
1689 IResultSet switchResultSet = null;
1690 try {
1691 OperatorPredicate op =
1692 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1693 OperatorPredicate.Operator.EQ,
1694 controllerId);
1695 switchResultSet =
1696 storageSource.executeQuery(SWITCH_TABLE_NAME,
1697 switchColumns,
1698 op, null);
1699 while (switchResultSet.next()) {
1700 IResultSet portResultSet = null;
1701 try {
1702 String datapathId =
1703 switchResultSet.getString(SWITCH_DATAPATH_ID);
1704 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1705 op = new OperatorPredicate(PORT_SWITCH,
1706 OperatorPredicate.Operator.EQ,
1707 datapathId);
1708 portResultSet =
1709 storageSource.executeQuery(PORT_TABLE_NAME,
1710 portColumns,
1711 op, null);
1712 while (portResultSet.next()) {
1713 portResultSet.deleteRow();
1714 }
1715 portResultSet.save();
1716 }
1717 finally {
1718 if (portResultSet != null)
1719 portResultSet.close();
1720 }
1721 }
1722 switchResultSet.save();
1723 }
1724 finally {
1725 if (switchResultSet != null)
1726 switchResultSet.close();
1727 }
1728 }
1729
1730 protected void updateControllerInfo() {
1731 updateAllInactiveSwitchInfo();
1732
1733 // Write out the controller info to the storage source
1734 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1735 String id = getControllerId();
1736 controllerInfo.put(CONTROLLER_ID, id);
1737 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1738 }
1739
1740 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1741 if (role == Role.SLAVE) {
1742 return;
1743 }
1744 // Obtain the row info for the switch
1745 Map<String, Object> switchInfo = new HashMap<String, Object>();
1746 String datapathIdString = sw.getStringId();
1747 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1748 String controllerId = getControllerId();
1749 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1750 Date connectedSince = sw.getConnectedSince();
1751 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1752 Channel channel = sw.getChannel();
1753 SocketAddress socketAddress = channel.getRemoteAddress();
1754 if (socketAddress != null) {
1755 String socketAddressString = socketAddress.toString();
1756 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1757 if (socketAddress instanceof InetSocketAddress) {
1758 InetSocketAddress inetSocketAddress =
1759 (InetSocketAddress)socketAddress;
1760 InetAddress inetAddress = inetSocketAddress.getAddress();
1761 String ip = inetAddress.getHostAddress();
1762 switchInfo.put(SWITCH_IP, ip);
1763 }
1764 }
1765
1766 // Write out the switch features info
1767 long capabilities = U32.f(sw.getCapabilities());
1768 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1769 long buffers = U32.f(sw.getBuffers());
1770 switchInfo.put(SWITCH_BUFFERS, buffers);
1771 long tables = U32.f(sw.getTables());
1772 switchInfo.put(SWITCH_TABLES, tables);
1773 long actions = U32.f(sw.getActions());
1774 switchInfo.put(SWITCH_ACTIONS, actions);
1775 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1776
1777 // Update the switch
1778 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1779
1780 // Update the ports
1781 for (OFPhysicalPort port: sw.getPorts()) {
1782 updatePortInfo(sw, port);
1783 }
1784 }
1785
1786 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1787 if (role == Role.SLAVE) {
1788 return;
1789 }
1790 log.debug("Update DB with inactiveSW {}", sw);
1791 // Update the controller info in the storage source to be inactive
1792 Map<String, Object> switchInfo = new HashMap<String, Object>();
1793 String datapathIdString = sw.getStringId();
1794 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1795 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1796 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1797 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1798 }
1799
1800 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1801 if (role == Role.SLAVE) {
1802 return;
1803 }
1804 String datapathIdString = sw.getStringId();
1805 Map<String, Object> portInfo = new HashMap<String, Object>();
1806 int portNumber = U16.f(port.getPortNumber());
1807 String id = datapathIdString + "|" + portNumber;
1808 portInfo.put(PORT_ID, id);
1809 portInfo.put(PORT_SWITCH, datapathIdString);
1810 portInfo.put(PORT_NUMBER, portNumber);
1811 byte[] hardwareAddress = port.getHardwareAddress();
1812 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1813 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1814 String name = port.getName();
1815 portInfo.put(PORT_NAME, name);
1816 long config = U32.f(port.getConfig());
1817 portInfo.put(PORT_CONFIG, config);
1818 long state = U32.f(port.getState());
1819 portInfo.put(PORT_STATE, state);
1820 long currentFeatures = U32.f(port.getCurrentFeatures());
1821 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1822 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1823 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1824 long supportedFeatures = U32.f(port.getSupportedFeatures());
1825 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1826 long peerFeatures = U32.f(port.getPeerFeatures());
1827 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1828 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1829 }
1830
1831 /**
1832 * Read switch port data from storage and write it into a switch object
1833 * @param sw the switch to update
1834 */
1835 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1836 OperatorPredicate op =
1837 new OperatorPredicate(PORT_SWITCH,
1838 OperatorPredicate.Operator.EQ,
1839 sw.getStringId());
1840 IResultSet portResultSet =
1841 storageSource.executeQuery(PORT_TABLE_NAME,
1842 null, op, null);
1843 //Map<Short, OFPhysicalPort> oldports =
1844 // new HashMap<Short, OFPhysicalPort>();
1845 //oldports.putAll(sw.getPorts());
1846
1847 while (portResultSet.next()) {
1848 try {
1849 OFPhysicalPort p = new OFPhysicalPort();
1850 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1851 p.setName(portResultSet.getString(PORT_NAME));
1852 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1853 p.setState((int)portResultSet.getLong(PORT_STATE));
1854 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1855 p.setHardwareAddress(HexString.fromHexString(portMac));
1856 p.setCurrentFeatures((int)portResultSet.
1857 getLong(PORT_CURRENT_FEATURES));
1858 p.setAdvertisedFeatures((int)portResultSet.
1859 getLong(PORT_ADVERTISED_FEATURES));
1860 p.setSupportedFeatures((int)portResultSet.
1861 getLong(PORT_SUPPORTED_FEATURES));
1862 p.setPeerFeatures((int)portResultSet.
1863 getLong(PORT_PEER_FEATURES));
1864 //oldports.remove(Short.valueOf(p.getPortNumber()));
1865 sw.setPort(p);
1866 } catch (NullPointerException e) {
1867 // ignore
1868 }
1869 }
1870 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1871 try {
1872 this.updates.put(update);
1873 } catch (InterruptedException e) {
1874 log.error("Failure adding update to queue", e);
1875 }
1876 }
1877
1878 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1879 if (role == Role.SLAVE) {
1880 return;
1881 }
1882 String datapathIdString = sw.getStringId();
1883 String id = datapathIdString + "|" + portNumber;
1884 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1885 }
1886
1887 /**
1888 * Sets the initial role based on properties in the config params.
1889 * It looks for two different properties.
1890 * If the "role" property is specified then the value should be
1891 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1892 * controller is set to the specified value. If the "role" property
1893 * is not specified then it looks next for the "role.path" property.
1894 * In this case the value should be the path to a property file in
1895 * the file system that contains a property called "floodlight.role"
1896 * which can be one of the values listed above for the "role" property.
1897 * The idea behind the "role.path" mechanism is that you have some
1898 * separate heartbeat and master controller election algorithm that
1899 * determines the role of the controller. When a role transition happens,
1900 * it updates the current role in the file specified by the "role.path"
1901 * file. Then if floodlight restarts for some reason it can get the
1902 * correct current role of the controller from the file.
1903 * @param configParams The config params for the FloodlightProvider service
1904 * @return A valid role if role information is specified in the
1905 * config params, otherwise null
1906 */
1907 @LogMessageDocs({
1908 @LogMessageDoc(message="Controller role set to {role}",
1909 explanation="Setting the initial HA role to "),
1910 @LogMessageDoc(level="ERROR",
1911 message="Invalid current role value: {role}",
1912 explanation="An invalid HA role value was read from the " +
1913 "properties file",
1914 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1915 })
1916 protected Role getInitialRole(Map<String, String> configParams) {
1917 Role role = null;
1918 String roleString = configParams.get("role");
1919 if (roleString == null) {
1920 String rolePath = configParams.get("rolepath");
1921 if (rolePath != null) {
1922 Properties properties = new Properties();
1923 try {
1924 properties.load(new FileInputStream(rolePath));
1925 roleString = properties.getProperty("floodlight.role");
1926 }
1927 catch (IOException exc) {
1928 // Don't treat it as an error if the file specified by the
1929 // rolepath property doesn't exist. This lets us enable the
1930 // HA mechanism by just creating/setting the floodlight.role
1931 // property in that file without having to modify the
1932 // floodlight properties.
1933 }
1934 }
1935 }
1936
1937 if (roleString != null) {
1938 // Canonicalize the string to the form used for the enum constants
1939 roleString = roleString.trim().toUpperCase();
1940 try {
1941 role = Role.valueOf(roleString);
1942 }
1943 catch (IllegalArgumentException exc) {
1944 log.error("Invalid current role value: {}", roleString);
1945 }
1946 }
1947
1948 log.info("Controller role set to {}", role);
1949
1950 return role;
1951 }
1952
1953 /**
1954 * Tell controller that we're ready to accept switches loop
1955 * @throws IOException
1956 */
1957 @LogMessageDocs({
1958 @LogMessageDoc(message="Listening for switch connections on {address}",
1959 explanation="The controller is ready and listening for new" +
1960 " switch connections"),
1961 @LogMessageDoc(message="Storage exception in controller " +
1962 "updates loop; terminating process",
1963 explanation=ERROR_DATABASE,
1964 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1965 @LogMessageDoc(level="ERROR",
1966 message="Exception in controller updates loop",
1967 explanation="Failed to dispatch controller event",
1968 recommendation=LogMessageDoc.GENERIC_ACTION)
1969 })
1970 public void run() {
1971 if (log.isDebugEnabled()) {
1972 logListeners();
1973 }
1974
1975 try {
1976 final ServerBootstrap bootstrap = createServerBootStrap();
1977
1978 bootstrap.setOption("reuseAddr", true);
1979 bootstrap.setOption("child.keepAlive", true);
1980 bootstrap.setOption("child.tcpNoDelay", true);
1981 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
1982
1983 ChannelPipelineFactory pfact =
1984 new OpenflowPipelineFactory(this, null);
1985 bootstrap.setPipelineFactory(pfact);
1986 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
1987 final ChannelGroup cg = new DefaultChannelGroup();
1988 cg.add(bootstrap.bind(sa));
1989
1990 log.info("Listening for switch connections on {}", sa);
1991 } catch (Exception e) {
1992 throw new RuntimeException(e);
1993 }
1994
1995 // main loop
1996 while (true) {
1997 try {
1998 IUpdate update = updates.take();
1999 update.dispatch();
2000 } catch (InterruptedException e) {
2001 return;
2002 } catch (StorageException e) {
2003 log.error("Storage exception in controller " +
2004 "updates loop; terminating process", e);
2005 return;
2006 } catch (Exception e) {
2007 log.error("Exception in controller updates loop", e);
2008 }
2009 }
2010 }
2011
2012 private ServerBootstrap createServerBootStrap() {
2013 if (workerThreads == 0) {
2014 return new ServerBootstrap(
2015 new NioServerSocketChannelFactory(
2016 Executors.newCachedThreadPool(),
2017 Executors.newCachedThreadPool()));
2018 } else {
2019 return new ServerBootstrap(
2020 new NioServerSocketChannelFactory(
2021 Executors.newCachedThreadPool(),
2022 Executors.newCachedThreadPool(), workerThreads));
2023 }
2024 }
2025
2026 public void setConfigParams(Map<String, String> configParams) {
2027 String ofPort = configParams.get("openflowport");
2028 if (ofPort != null) {
2029 this.openFlowPort = Integer.parseInt(ofPort);
2030 }
2031 log.debug("OpenFlow port set to {}", this.openFlowPort);
2032 String threads = configParams.get("workerthreads");
2033 if (threads != null) {
2034 this.workerThreads = Integer.parseInt(threads);
2035 }
2036 log.debug("Number of worker threads set to {}", this.workerThreads);
2037 String controllerId = configParams.get("controllerid");
2038 if (controllerId != null) {
2039 this.controllerId = controllerId;
2040 }
2041 log.debug("ControllerId set to {}", this.controllerId);
2042 }
2043
2044 private void initVendorMessages() {
2045 // Configure openflowj to be able to parse the role request/reply
2046 // vendor messages.
2047 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2048 OFNiciraVendorData.NX_VENDOR_ID, 4);
2049 OFVendorId.registerVendorId(niciraVendorId);
2050 OFBasicVendorDataType roleRequestVendorData =
2051 new OFBasicVendorDataType(
2052 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2053 OFRoleRequestVendorData.getInstantiable());
2054 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2055 OFBasicVendorDataType roleReplyVendorData =
2056 new OFBasicVendorDataType(
2057 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2058 OFRoleReplyVendorData.getInstantiable());
2059 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2060 }
2061
2062 /**
2063 * Initialize internal data structures
2064 */
2065 public void init(Map<String, String> configParams) {
2066 // These data structures are initialized here because other
2067 // module's startUp() might be called before ours
2068 this.messageListeners =
2069 new ConcurrentHashMap<OFType,
2070 ListenerDispatcher<OFType,
2071 IOFMessageListener>>();
2072 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2073 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2074 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2075 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2076 this.controllerNodeIPsCache = new HashMap<String, String>();
2077 this.updates = new LinkedBlockingQueue<IUpdate>();
2078 this.factory = new BasicFactory();
2079 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2080 setConfigParams(configParams);
2081 this.role = getInitialRole(configParams);
2082 this.roleChanger = new RoleChanger();
2083 initVendorMessages();
2084 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002085 }
2086
2087 /**
2088 * Startup all of the controller's components
2089 */
2090 @LogMessageDoc(message="Waiting for storage source",
2091 explanation="The system database is not yet ready",
2092 recommendation="If this message persists, this indicates " +
2093 "that the system database has failed to start. " +
2094 LogMessageDoc.CHECK_CONTROLLER)
2095 public void startupComponents() {
2096 // Create the table names we use
2097 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2098 storageSource.createTable(SWITCH_TABLE_NAME, null);
2099 storageSource.createTable(PORT_TABLE_NAME, null);
2100 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2101 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2102 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2103 CONTROLLER_ID);
2104 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2105 SWITCH_DATAPATH_ID);
2106 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2107 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2108 CONTROLLER_INTERFACE_ID);
2109 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2110
2111 while (true) {
2112 try {
2113 updateControllerInfo();
2114 break;
2115 }
2116 catch (StorageException e) {
2117 log.info("Waiting for storage source");
2118 try {
2119 Thread.sleep(1000);
2120 } catch (InterruptedException e1) {
2121 }
2122 }
2123 }
2124
2125 // Add our REST API
2126 restApi.addRestletRoutable(new CoreWebRoutable());
2127 }
2128
2129 @Override
2130 public void addInfoProvider(String type, IInfoProvider provider) {
2131 if (!providerMap.containsKey(type)) {
2132 providerMap.put(type, new ArrayList<IInfoProvider>());
2133 }
2134 providerMap.get(type).add(provider);
2135 }
2136
2137 @Override
2138 public void removeInfoProvider(String type, IInfoProvider provider) {
2139 if (!providerMap.containsKey(type)) {
2140 log.debug("Provider type {} doesn't exist.", type);
2141 return;
2142 }
2143
2144 providerMap.get(type).remove(provider);
2145 }
2146
2147 public Map<String, Object> getControllerInfo(String type) {
2148 if (!providerMap.containsKey(type)) return null;
2149
2150 Map<String, Object> result = new LinkedHashMap<String, Object>();
2151 for (IInfoProvider provider : providerMap.get(type)) {
2152 result.putAll(provider.getInfo(type));
2153 }
2154
2155 return result;
2156 }
2157
2158 @Override
2159 public void addHAListener(IHAListener listener) {
2160 this.haListeners.add(listener);
2161 }
2162
2163 @Override
2164 public void removeHAListener(IHAListener listener) {
2165 this.haListeners.remove(listener);
2166 }
2167
2168
2169 /**
2170 * Handle changes to the controller nodes IPs and dispatch update.
2171 */
2172 @SuppressWarnings("unchecked")
2173 protected void handleControllerNodeIPChanges() {
2174 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2175 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2176 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2177 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2178 CONTROLLER_INTERFACE_TYPE,
2179 CONTROLLER_INTERFACE_NUMBER,
2180 CONTROLLER_INTERFACE_DISCOVERED_IP };
2181 synchronized(controllerNodeIPsCache) {
2182 // We currently assume that interface Ethernet0 is the relevant
2183 // controller interface. Might change.
2184 // We could (should?) implement this using
2185 // predicates, but creating the individual and compound predicate
2186 // seems more overhead then just checking every row. Particularly,
2187 // since the number of rows is small and changes infrequent
2188 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2189 colNames,null, null);
2190 while (res.next()) {
2191 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2192 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2193 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2194 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2195 String curIP = controllerNodeIPsCache.get(controllerID);
2196
2197 curControllerNodeIPs.put(controllerID, discoveredIP);
2198 if (curIP == null) {
2199 // new controller node IP
2200 addedControllerNodeIPs.put(controllerID, discoveredIP);
2201 }
2202 else if (curIP != discoveredIP) {
2203 // IP changed
2204 removedControllerNodeIPs.put(controllerID, curIP);
2205 addedControllerNodeIPs.put(controllerID, discoveredIP);
2206 }
2207 }
2208 }
2209 // Now figure out if rows have been deleted. We can't use the
2210 // rowKeys from rowsDeleted directly, since the tables primary
2211 // key is a compound that we can't disassemble
2212 Set<String> curEntries = curControllerNodeIPs.keySet();
2213 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2214 removedEntries.removeAll(curEntries);
2215 for (String removedControllerID : removedEntries)
2216 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2217 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2218 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2219 curControllerNodeIPs, addedControllerNodeIPs,
2220 removedControllerNodeIPs);
2221 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2222 try {
2223 this.updates.put(update);
2224 } catch (InterruptedException e) {
2225 log.error("Failure adding update to queue", e);
2226 }
2227 }
2228 }
2229 }
2230
2231 @Override
2232 public Map<String, String> getControllerNodeIPs() {
2233 // We return a copy of the mapping so we can guarantee that
2234 // the mapping return is the same as one that will be (or was)
2235 // dispatched to IHAListeners
2236 HashMap<String,String> retval = new HashMap<String,String>();
2237 synchronized(controllerNodeIPsCache) {
2238 retval.putAll(controllerNodeIPsCache);
2239 }
2240 return retval;
2241 }
2242
2243 @Override
2244 public void rowsModified(String tableName, Set<Object> rowKeys) {
2245 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2246 handleControllerNodeIPChanges();
2247 }
2248
2249 }
2250
2251 @Override
2252 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2253 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2254 handleControllerNodeIPChanges();
2255 }
2256 }
2257
2258 @Override
2259 public long getSystemStartTime() {
2260 return (this.systemStartTime);
2261 }
2262
2263 @Override
2264 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2265 this.alwaysClearFlowsOnSwAdd = value;
2266 }
2267
2268 public boolean getAlwaysClearFlowsOnSwAdd() {
2269 return this.alwaysClearFlowsOnSwAdd;
2270 }
2271}