blob: d9cc5e13f8a7e8f594697482bd614d37ef16f8eb [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.util.ArrayList;
26import java.nio.channels.ClosedChannelException;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Date;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.LinkedHashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Properties;
38import java.util.Set;
39import java.util.Stack;
40import java.util.concurrent.BlockingQueue;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ConcurrentMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.Executors;
45import java.util.concurrent.Future;
46import java.util.concurrent.LinkedBlockingQueue;
47import java.util.concurrent.RejectedExecutionException;
48import java.util.concurrent.TimeUnit;
49import java.util.concurrent.TimeoutException;
50
51import net.floodlightcontroller.core.FloodlightContext;
52import net.floodlightcontroller.core.IFloodlightProviderService;
53import net.floodlightcontroller.core.IHAListener;
54import net.floodlightcontroller.core.IInfoProvider;
Pankaj Berde8557a462013-01-07 08:59:31 -080055import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IOFMessageListener;
57import net.floodlightcontroller.core.IListener.Command;
58import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080061import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080068import net.floodlightcontroller.mastership.IMastershipService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080069import net.floodlightcontroller.packet.Ethernet;
70import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
71import net.floodlightcontroller.restserver.IRestApiService;
72import net.floodlightcontroller.storage.IResultSet;
73import net.floodlightcontroller.storage.IStorageSourceListener;
74import net.floodlightcontroller.storage.IStorageSourceService;
75import net.floodlightcontroller.storage.OperatorPredicate;
76import net.floodlightcontroller.storage.StorageException;
77import net.floodlightcontroller.threadpool.IThreadPoolService;
78
79import org.jboss.netty.bootstrap.ServerBootstrap;
80import org.jboss.netty.buffer.ChannelBuffer;
81import org.jboss.netty.buffer.ChannelBuffers;
82import org.jboss.netty.channel.Channel;
83import org.jboss.netty.channel.ChannelHandlerContext;
84import org.jboss.netty.channel.ChannelPipelineFactory;
85import org.jboss.netty.channel.ChannelStateEvent;
86import org.jboss.netty.channel.ChannelUpstreamHandler;
87import org.jboss.netty.channel.Channels;
88import org.jboss.netty.channel.ExceptionEvent;
89import org.jboss.netty.channel.MessageEvent;
90import org.jboss.netty.channel.group.ChannelGroup;
91import org.jboss.netty.channel.group.DefaultChannelGroup;
92import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
93import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
94import org.jboss.netty.handler.timeout.IdleStateEvent;
95import org.jboss.netty.handler.timeout.ReadTimeoutException;
96import org.openflow.protocol.OFEchoReply;
97import org.openflow.protocol.OFError;
98import org.openflow.protocol.OFError.OFBadActionCode;
99import org.openflow.protocol.OFError.OFBadRequestCode;
100import org.openflow.protocol.OFError.OFErrorType;
101import org.openflow.protocol.OFError.OFFlowModFailedCode;
102import org.openflow.protocol.OFError.OFHelloFailedCode;
103import org.openflow.protocol.OFError.OFPortModFailedCode;
104import org.openflow.protocol.OFError.OFQueueOpFailedCode;
105import org.openflow.protocol.OFFeaturesReply;
106import org.openflow.protocol.OFGetConfigReply;
107import org.openflow.protocol.OFMessage;
108import org.openflow.protocol.OFPacketIn;
109import org.openflow.protocol.OFPhysicalPort;
110import org.openflow.protocol.OFPortStatus;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800111import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800112import org.openflow.protocol.OFPhysicalPort.OFPortState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800113import org.openflow.protocol.OFPortStatus.OFPortReason;
114import org.openflow.protocol.OFSetConfig;
115import org.openflow.protocol.OFStatisticsRequest;
116import org.openflow.protocol.OFSwitchConfig;
117import org.openflow.protocol.OFType;
118import org.openflow.protocol.OFVendor;
119import org.openflow.protocol.factory.BasicFactory;
120import org.openflow.protocol.factory.MessageParseException;
121import org.openflow.protocol.statistics.OFDescriptionStatistics;
122import org.openflow.protocol.statistics.OFStatistics;
123import org.openflow.protocol.statistics.OFStatisticsType;
124import org.openflow.protocol.vendor.OFBasicVendorDataType;
125import org.openflow.protocol.vendor.OFBasicVendorId;
126import org.openflow.protocol.vendor.OFVendorId;
127import org.openflow.util.HexString;
128import org.openflow.util.U16;
129import org.openflow.util.U32;
130import org.openflow.vendor.nicira.OFNiciraVendorData;
131import org.openflow.vendor.nicira.OFRoleReplyVendorData;
132import org.openflow.vendor.nicira.OFRoleRequestVendorData;
133import org.openflow.vendor.nicira.OFRoleVendorData;
134import org.slf4j.Logger;
135import org.slf4j.LoggerFactory;
136
137
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800138
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800139/**
140 * The main controller class. Handles all setup and network listeners
141 */
142public class Controller implements IFloodlightProviderService,
143 IStorageSourceListener {
Pankaj Berde29ab7fc2013-01-25 06:17:52 -0800144
145 ThreadLocal<SwitchStorageImpl> store = new ThreadLocal<SwitchStorageImpl>() {
146 @Override
147 protected SwitchStorageImpl initialValue() {
148 SwitchStorageImpl swStore = new SwitchStorageImpl();
149 //TODO: Get the file path from global properties
150 swStore.init("/tmp/cassandra.titan");
151 return swStore;
152 }
153 };
154
155 protected SwitchStorageImpl swStore = store.get();
Pankaj Berde8557a462013-01-07 08:59:31 -0800156
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800157 protected static Logger log = LoggerFactory.getLogger(Controller.class);
158
159 private static final String ERROR_DATABASE =
160 "The controller could not communicate with the system database.";
161
162 protected BasicFactory factory;
163 protected ConcurrentMap<OFType,
164 ListenerDispatcher<OFType,IOFMessageListener>>
165 messageListeners;
166 // The activeSwitches map contains only those switches that are actively
167 // being controlled by us -- it doesn't contain switches that are
168 // in the slave role
169 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
170 // connectedSwitches contains all connected switches, including ones where
171 // we're a slave controller. We need to keep track of them so that we can
172 // send role request messages to switches when our role changes to master
173 // We add a switch to this set after it successfully completes the
174 // handshake. Access to this Set needs to be synchronized with roleChanger
175 protected HashSet<OFSwitchImpl> connectedSwitches;
176
177 // The controllerNodeIPsCache maps Controller IDs to their IP address.
178 // It's only used by handleControllerNodeIPsChanged
179 protected HashMap<String, String> controllerNodeIPsCache;
180
181 protected Set<IOFSwitchListener> switchListeners;
182 protected Set<IHAListener> haListeners;
183 protected Map<String, List<IInfoProvider>> providerMap;
184 protected BlockingQueue<IUpdate> updates;
185
186 // Module dependencies
187 protected IRestApiService restApi;
188 protected ICounterStoreService counterStore = null;
189 protected IStorageSourceService storageSource;
190 protected IPktInProcessingTimeService pktinProcTime;
191 protected IThreadPoolService threadPool;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800192 protected IMastershipService masterHelper;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800193
194 // Configuration options
195 protected int openFlowPort = 6633;
196 protected int workerThreads = 0;
197 // The id for this controller node. Should be unique for each controller
198 // node in a controller cluster.
199 protected String controllerId = "localhost";
200
201 // The current role of the controller.
202 // If the controller isn't configured to support roles, then this is null.
203 protected Role role;
204 // A helper that handles sending and timeout handling for role requests
205 protected RoleChanger roleChanger;
206
207 // Start time of the controller
208 protected long systemStartTime;
209
210 // Flag to always flush flow table on switch reconnect (HA or otherwise)
211 protected boolean alwaysClearFlowsOnSwAdd = false;
212
213 // Storage table names
214 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
215 protected static final String CONTROLLER_ID = "id";
216
217 protected static final String SWITCH_TABLE_NAME = "controller_switch";
218 protected static final String SWITCH_DATAPATH_ID = "dpid";
219 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
220 protected static final String SWITCH_IP = "ip";
221 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
222 protected static final String SWITCH_ACTIVE = "active";
223 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
224 protected static final String SWITCH_CAPABILITIES = "capabilities";
225 protected static final String SWITCH_BUFFERS = "buffers";
226 protected static final String SWITCH_TABLES = "tables";
227 protected static final String SWITCH_ACTIONS = "actions";
228
229 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
230 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
231
232 protected static final String PORT_TABLE_NAME = "controller_port";
233 protected static final String PORT_ID = "id";
234 protected static final String PORT_SWITCH = "switch_id";
235 protected static final String PORT_NUMBER = "number";
236 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
237 protected static final String PORT_NAME = "name";
238 protected static final String PORT_CONFIG = "config";
239 protected static final String PORT_STATE = "state";
240 protected static final String PORT_CURRENT_FEATURES = "current_features";
241 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
242 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
243 protected static final String PORT_PEER_FEATURES = "peer_features";
244
245 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
246 protected static final String CONTROLLER_INTERFACE_ID = "id";
247 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
248 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
249 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
250 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
251
252
253
254 // Perf. related configuration
255 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
256 protected static final int BATCH_MAX_SIZE = 100;
257 protected static final boolean ALWAYS_DECODE_ETH = true;
258
259 /**
260 * Updates handled by the main loop
261 */
262 protected interface IUpdate {
263 /**
264 * Calls the appropriate listeners
265 */
266 public void dispatch();
267 }
268 public enum SwitchUpdateType {
269 ADDED,
270 REMOVED,
271 PORTCHANGED
272 }
273 /**
274 * Update message indicating a switch was added or removed
275 */
276 protected class SwitchUpdate implements IUpdate {
277 public IOFSwitch sw;
278 public SwitchUpdateType switchUpdateType;
279 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
280 this.sw = sw;
281 this.switchUpdateType = switchUpdateType;
282 }
283 public void dispatch() {
284 if (log.isTraceEnabled()) {
285 log.trace("Dispatching switch update {} {}",
286 sw, switchUpdateType);
287 }
288 if (switchListeners != null) {
289 for (IOFSwitchListener listener : switchListeners) {
290 switch(switchUpdateType) {
291 case ADDED:
292 listener.addedSwitch(sw);
293 break;
294 case REMOVED:
295 listener.removedSwitch(sw);
296 break;
297 case PORTCHANGED:
298 listener.switchPortChanged(sw.getId());
299 break;
300 }
301 }
302 }
303 }
304 }
305
306 /**
307 * Update message indicating controller's role has changed
308 */
309 protected class HARoleUpdate implements IUpdate {
310 public Role oldRole;
311 public Role newRole;
312 public HARoleUpdate(Role newRole, Role oldRole) {
313 this.oldRole = oldRole;
314 this.newRole = newRole;
315 }
316 public void dispatch() {
317 // Make sure that old and new roles are different.
318 if (oldRole == newRole) {
319 if (log.isTraceEnabled()) {
320 log.trace("HA role update ignored as the old and " +
321 "new roles are the same. newRole = {}" +
322 "oldRole = {}", newRole, oldRole);
323 }
324 return;
325 }
326 if (log.isTraceEnabled()) {
327 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
328 newRole, oldRole);
329 }
330 if (haListeners != null) {
331 for (IHAListener listener : haListeners) {
332 listener.roleChanged(oldRole, newRole);
333 }
334 }
335 }
336 }
337
338 /**
339 * Update message indicating
340 * IPs of controllers in controller cluster have changed.
341 */
342 protected class HAControllerNodeIPUpdate implements IUpdate {
343 public Map<String,String> curControllerNodeIPs;
344 public Map<String,String> addedControllerNodeIPs;
345 public Map<String,String> removedControllerNodeIPs;
346 public HAControllerNodeIPUpdate(
347 HashMap<String,String> curControllerNodeIPs,
348 HashMap<String,String> addedControllerNodeIPs,
349 HashMap<String,String> removedControllerNodeIPs) {
350 this.curControllerNodeIPs = curControllerNodeIPs;
351 this.addedControllerNodeIPs = addedControllerNodeIPs;
352 this.removedControllerNodeIPs = removedControllerNodeIPs;
353 }
354 public void dispatch() {
355 if (log.isTraceEnabled()) {
356 log.trace("Dispatching HA Controller Node IP update "
357 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
358 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
359 removedControllerNodeIPs }
360 );
361 }
362 if (haListeners != null) {
363 for (IHAListener listener: haListeners) {
364 listener.controllerNodeIPsChanged(curControllerNodeIPs,
365 addedControllerNodeIPs, removedControllerNodeIPs);
366 }
367 }
368 }
369 }
370
371 // ***************
372 // Getters/Setters
373 // ***************
374
375 public void setStorageSourceService(IStorageSourceService storageSource) {
376 this.storageSource = storageSource;
377 }
378
379 public void setCounterStore(ICounterStoreService counterStore) {
380 this.counterStore = counterStore;
381 }
382
383 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
384 this.pktinProcTime = pits;
385 }
386
387 public void setRestApiService(IRestApiService restApi) {
388 this.restApi = restApi;
389 }
390
391 public void setThreadPoolService(IThreadPoolService tp) {
392 this.threadPool = tp;
393 }
394
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800395 public void setMastershipService(IMastershipService serviceImpl) {
396 this.masterHelper = serviceImpl;
397 }
398
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800399 @Override
400 public Role getRole() {
401 synchronized(roleChanger) {
402 return role;
403 }
404 }
405
406 @Override
407 public void setRole(Role role) {
408 if (role == null) throw new NullPointerException("Role can not be null.");
409 if (role == Role.MASTER && this.role == Role.SLAVE) {
410 // Reset db state to Inactive for all switches.
411 updateAllInactiveSwitchInfo();
412 }
413
414 // Need to synchronize to ensure a reliable ordering on role request
415 // messages send and to ensure the list of connected switches is stable
416 // RoleChanger will handle the actual sending of the message and
417 // timeout handling
418 // @see RoleChanger
419 synchronized(roleChanger) {
420 if (role.equals(this.role)) {
421 log.debug("Ignoring role change: role is already {}", role);
422 return;
423 }
424
425 Role oldRole = this.role;
426 this.role = role;
427
428 log.debug("Submitting role change request to role {}", role);
429 roleChanger.submitRequest(connectedSwitches, role);
430
431 // Enqueue an update for our listeners.
432 try {
433 this.updates.put(new HARoleUpdate(role, oldRole));
434 } catch (InterruptedException e) {
435 log.error("Failure adding update to queue", e);
436 }
437 }
438 }
439
440
441
442 // **********************
443 // ChannelUpstreamHandler
444 // **********************
445
446 /**
447 * Return a new channel handler for processing a switch connections
448 * @param state The channel state object for the connection
449 * @return the new channel handler
450 */
451 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
452 return new OFChannelHandler(state);
453 }
454
455 /**
456 * Channel handler deals with the switch connection and dispatches
457 * switch messages to the appropriate locations.
458 * @author readams
459 */
460 protected class OFChannelHandler
461 extends IdleStateAwareChannelUpstreamHandler {
462 protected OFSwitchImpl sw;
463 protected OFChannelState state;
464
465 public OFChannelHandler(OFChannelState state) {
466 this.state = state;
467 }
468
469 @Override
470 @LogMessageDoc(message="New switch connection from {ip address}",
471 explanation="A new switch has connected from the " +
472 "specified IP address")
473 public void channelConnected(ChannelHandlerContext ctx,
474 ChannelStateEvent e) throws Exception {
475 log.info("New switch connection from {}",
476 e.getChannel().getRemoteAddress());
477
478 sw = new OFSwitchImpl();
479 sw.setChannel(e.getChannel());
480 sw.setFloodlightProvider(Controller.this);
481 sw.setThreadPoolService(threadPool);
482
483 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
484 msglist.add(factory.getMessage(OFType.HELLO));
485 e.getChannel().write(msglist);
486
487 }
488
489 @Override
490 @LogMessageDoc(message="Disconnected switch {switch information}",
491 explanation="The specified switch has disconnected.")
492 public void channelDisconnected(ChannelHandlerContext ctx,
493 ChannelStateEvent e) throws Exception {
494 if (sw != null && state.hsState == HandshakeState.READY) {
495 if (activeSwitches.containsKey(sw.getId())) {
496 // It's safe to call removeSwitch even though the map might
497 // not contain this particular switch but another with the
498 // same DPID
499 removeSwitch(sw);
500 }
501 synchronized(roleChanger) {
502 connectedSwitches.remove(sw);
503 }
504 sw.setConnected(false);
505 }
506 log.info("Disconnected switch {}", sw);
507 }
508
509 @Override
510 @LogMessageDocs({
511 @LogMessageDoc(level="ERROR",
512 message="Disconnecting switch {switch} due to read timeout",
513 explanation="The connected switch has failed to send any " +
514 "messages or respond to echo requests",
515 recommendation=LogMessageDoc.CHECK_SWITCH),
516 @LogMessageDoc(level="ERROR",
517 message="Disconnecting switch {switch}: failed to " +
518 "complete handshake",
519 explanation="The switch did not respond correctly " +
520 "to handshake messages",
521 recommendation=LogMessageDoc.CHECK_SWITCH),
522 @LogMessageDoc(level="ERROR",
523 message="Disconnecting switch {switch} due to IO Error: {}",
524 explanation="There was an error communicating with the switch",
525 recommendation=LogMessageDoc.CHECK_SWITCH),
526 @LogMessageDoc(level="ERROR",
527 message="Disconnecting switch {switch} due to switch " +
528 "state error: {error}",
529 explanation="The switch sent an unexpected message",
530 recommendation=LogMessageDoc.CHECK_SWITCH),
531 @LogMessageDoc(level="ERROR",
532 message="Disconnecting switch {switch} due to " +
533 "message parse failure",
534 explanation="Could not parse a message from the switch",
535 recommendation=LogMessageDoc.CHECK_SWITCH),
536 @LogMessageDoc(level="ERROR",
537 message="Terminating controller due to storage exception",
538 explanation=ERROR_DATABASE,
539 recommendation=LogMessageDoc.CHECK_CONTROLLER),
540 @LogMessageDoc(level="ERROR",
541 message="Could not process message: queue full",
542 explanation="OpenFlow messages are arriving faster than " +
543 " the controller can process them.",
544 recommendation=LogMessageDoc.CHECK_CONTROLLER),
545 @LogMessageDoc(level="ERROR",
546 message="Error while processing message " +
547 "from switch {switch} {cause}",
548 explanation="An error occurred processing the switch message",
549 recommendation=LogMessageDoc.GENERIC_ACTION)
550 })
551 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
552 throws Exception {
553 if (e.getCause() instanceof ReadTimeoutException) {
554 // switch timeout
555 log.error("Disconnecting switch {} due to read timeout", sw);
556 ctx.getChannel().close();
557 } else if (e.getCause() instanceof HandshakeTimeoutException) {
558 log.error("Disconnecting switch {}: failed to complete handshake",
559 sw);
560 ctx.getChannel().close();
561 } else if (e.getCause() instanceof ClosedChannelException) {
562 //log.warn("Channel for sw {} already closed", sw);
563 } else if (e.getCause() instanceof IOException) {
564 log.error("Disconnecting switch {} due to IO Error: {}",
565 sw, e.getCause().getMessage());
566 ctx.getChannel().close();
567 } else if (e.getCause() instanceof SwitchStateException) {
568 log.error("Disconnecting switch {} due to switch state error: {}",
569 sw, e.getCause().getMessage());
570 ctx.getChannel().close();
571 } else if (e.getCause() instanceof MessageParseException) {
572 log.error("Disconnecting switch " + sw +
573 " due to message parse failure",
574 e.getCause());
575 ctx.getChannel().close();
576 } else if (e.getCause() instanceof StorageException) {
577 log.error("Terminating controller due to storage exception",
578 e.getCause());
579 terminate();
580 } else if (e.getCause() instanceof RejectedExecutionException) {
581 log.warn("Could not process message: queue full");
582 } else {
583 log.error("Error while processing message from switch " + sw,
584 e.getCause());
585 ctx.getChannel().close();
586 }
587 }
588
589 @Override
590 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
591 throws Exception {
592 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
593 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
594 e.getChannel().write(msglist);
595 }
596
597 @Override
598 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
599 throws Exception {
600 if (e.getMessage() instanceof List) {
601 @SuppressWarnings("unchecked")
602 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
603
604 for (OFMessage ofm : msglist) {
605 try {
606 processOFMessage(ofm);
607 }
608 catch (Exception ex) {
609 // We are the last handler in the stream, so run the
610 // exception through the channel again by passing in
611 // ctx.getChannel().
612 Channels.fireExceptionCaught(ctx.getChannel(), ex);
613 }
614 }
615
616 // Flush all flow-mods/packet-out generated from this "train"
617 OFSwitchImpl.flush_all();
618 }
619 }
620
621 /**
622 * Process the request for the switch description
623 */
624 @LogMessageDoc(level="ERROR",
625 message="Exception in reading description " +
626 " during handshake {exception}",
627 explanation="Could not process the switch description string",
628 recommendation=LogMessageDoc.CHECK_SWITCH)
629 void processSwitchDescReply() {
630 try {
631 // Read description, if it has been updated
632 @SuppressWarnings("unchecked")
633 Future<List<OFStatistics>> desc_future =
634 (Future<List<OFStatistics>>)sw.
635 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
636 List<OFStatistics> values =
637 desc_future.get(0, TimeUnit.MILLISECONDS);
638 if (values != null) {
639 OFDescriptionStatistics description =
640 new OFDescriptionStatistics();
641 ChannelBuffer data =
642 ChannelBuffers.buffer(description.getLength());
643 for (OFStatistics f : values) {
644 f.writeTo(data);
645 description.readFrom(data);
646 break; // SHOULD be a list of length 1
647 }
648 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
649 description);
650 sw.setSwitchProperties(description);
651 data = null;
652
653 // At this time, also set other switch properties from storage
654 boolean is_core_switch = false;
655 IResultSet resultSet = null;
656 try {
657 String swid = sw.getStringId();
658 resultSet =
659 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
660 for (Iterator<IResultSet> it =
661 resultSet.iterator(); it.hasNext();) {
662 // In case of multiple rows, use the status
663 // in last row?
664 Map<String, Object> row = it.next().getRow();
665 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
666 if (log.isDebugEnabled()) {
667 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
668 "config for switch={}, is-core={}",
669 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
670 }
671 String ics =
672 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
673 is_core_switch = ics.equals("true");
674 }
675 }
676 }
677 finally {
678 if (resultSet != null)
679 resultSet.close();
680 }
681 if (is_core_switch) {
682 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
683 new Boolean(true));
684 }
685 }
686 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
687 state.hasDescription = true;
688 checkSwitchReady();
689 }
690 catch (InterruptedException ex) {
691 // Ignore
692 }
693 catch (TimeoutException ex) {
694 // Ignore
695 } catch (Exception ex) {
696 log.error("Exception in reading description " +
697 " during handshake", ex);
698 }
699 }
700
701 /**
702 * Send initial switch setup information that we need before adding
703 * the switch
704 * @throws IOException
705 */
706 void sendHelloConfiguration() throws IOException {
707 // Send initial Features Request
708 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
709 }
710
711 /**
712 * Send the configuration requests we can only do after we have
713 * the features reply
714 * @throws IOException
715 */
716 void sendFeatureReplyConfiguration() throws IOException {
717 // Ensure we receive the full packet via PacketIn
718 OFSetConfig config = (OFSetConfig) factory
719 .getMessage(OFType.SET_CONFIG);
720 config.setMissSendLength((short) 0xffff)
721 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
722 sw.write(config, null);
723 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
724 null);
725
726 // Get Description to set switch-specific flags
727 OFStatisticsRequest req = new OFStatisticsRequest();
728 req.setStatisticType(OFStatisticsType.DESC);
729 req.setLengthU(req.getLengthU());
730 Future<List<OFStatistics>> dfuture =
731 sw.getStatistics(req);
732 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
733 dfuture);
734
735 }
736
737 protected void checkSwitchReady() {
738 if (state.hsState == HandshakeState.FEATURES_REPLY &&
739 state.hasDescription && state.hasGetConfigReply) {
740
741 state.hsState = HandshakeState.READY;
742
743 synchronized(roleChanger) {
744 // We need to keep track of all of the switches that are connected
745 // to the controller, in any role, so that we can later send the
746 // role request messages when the controller role changes.
747 // We need to be synchronized while doing this: we must not
748 // send a another role request to the connectedSwitches until
749 // we were able to add this new switch to connectedSwitches
750 // *and* send the current role to the new switch.
751 connectedSwitches.add(sw);
752
753 if (role != null) {
754 // Send a role request if role support is enabled for the controller
755 // This is a probe that we'll use to determine if the switch
756 // actually supports the role request message. If it does we'll
757 // get back a role reply message. If it doesn't we'll get back an
758 // OFError message.
759 // If role is MASTER we will promote switch to active
760 // list when we receive the switch's role reply messages
761 log.debug("This controller's role is {}, " +
762 "sending initial role request msg to {}",
763 role, sw);
764 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
765 swList.add(sw);
766 roleChanger.submitRequest(swList, role);
767 }
768 else {
769 // Role supported not enabled on controller (for now)
770 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800771 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800772 "not sending role request msg to {}",
773 role, sw);
774 // Need to clear FlowMods before we add the switch
775 // and dispatch updates otherwise we have a race condition.
776 sw.clearAllFlowMods();
777 addSwitch(sw);
778 state.firstRoleReplyReceived = true;
779 }
780 }
781 }
782 }
783
784 /* Handle a role reply message we received from the switch. Since
785 * netty serializes message dispatch we don't need to synchronize
786 * against other receive operations from the same switch, so no need
787 * to synchronize addSwitch(), removeSwitch() operations from the same
788 * connection.
789 * FIXME: However, when a switch with the same DPID connects we do
790 * need some synchronization. However, handling switches with same
791 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
792 * removedSwitch notification):1
793 *
794 */
795 @LogMessageDoc(level="ERROR",
796 message="Invalid role value in role reply message",
797 explanation="Was unable to set the HA role (master or slave) " +
798 "for the controller.",
799 recommendation=LogMessageDoc.CHECK_CONTROLLER)
800 protected void handleRoleReplyMessage(OFVendor vendorMessage,
801 OFRoleReplyVendorData roleReplyVendorData) {
802 // Map from the role code in the message to our role enum
803 int nxRole = roleReplyVendorData.getRole();
804 Role role = null;
805 switch (nxRole) {
806 case OFRoleVendorData.NX_ROLE_OTHER:
807 role = Role.EQUAL;
808 break;
809 case OFRoleVendorData.NX_ROLE_MASTER:
810 role = Role.MASTER;
811 break;
812 case OFRoleVendorData.NX_ROLE_SLAVE:
813 role = Role.SLAVE;
814 break;
815 default:
816 log.error("Invalid role value in role reply message");
817 sw.getChannel().close();
818 return;
819 }
820
821 log.debug("Handling role reply for role {} from {}. " +
822 "Controller's role is {} ",
823 new Object[] { role, sw, Controller.this.role}
824 );
825
826 sw.deliverRoleReply(vendorMessage.getXid(), role);
827
828 boolean isActive = activeSwitches.containsKey(sw.getId());
829 if (!isActive && sw.isActive()) {
830 // Transition from SLAVE to MASTER.
831
832 if (!state.firstRoleReplyReceived ||
833 getAlwaysClearFlowsOnSwAdd()) {
834 // This is the first role-reply message we receive from
835 // this switch or roles were disabled when the switch
836 // connected:
837 // Delete all pre-existing flows for new connections to
838 // the master
839 //
840 // FIXME: Need to think more about what the test should
841 // be for when we flush the flow-table? For example,
842 // if all the controllers are temporarily in the backup
843 // role (e.g. right after a failure of the master
844 // controller) at the point the switch connects, then
845 // all of the controllers will initially connect as
846 // backup controllers and not flush the flow-table.
847 // Then when one of them is promoted to master following
848 // the master controller election the flow-table
849 // will still not be flushed because that's treated as
850 // a failover event where we don't want to flush the
851 // flow-table. The end result would be that the flow
852 // table for a newly connected switch is never
853 // flushed. Not sure how to handle that case though...
854 sw.clearAllFlowMods();
855 log.debug("First role reply from master switch {}, " +
856 "clear FlowTable to active switch list",
857 HexString.toHexString(sw.getId()));
858 }
859
860 // Some switches don't seem to update us with port
861 // status messages while in slave role.
862 readSwitchPortStateFromStorage(sw);
863
864 // Only add the switch to the active switch list if
865 // we're not in the slave role. Note that if the role
866 // attribute is null, then that means that the switch
867 // doesn't support the role request messages, so in that
868 // case we're effectively in the EQUAL role and the
869 // switch should be included in the active switch list.
870 addSwitch(sw);
871 log.debug("Added master switch {} to active switch list",
872 HexString.toHexString(sw.getId()));
873
874 }
875 else if (isActive && !sw.isActive()) {
876 // Transition from MASTER to SLAVE: remove switch
877 // from active switch list.
878 log.debug("Removed slave switch {} from active switch" +
879 " list", HexString.toHexString(sw.getId()));
880 removeSwitch(sw);
881 }
882
883 // Indicate that we have received a role reply message.
884 state.firstRoleReplyReceived = true;
885 }
886
887 protected boolean handleVendorMessage(OFVendor vendorMessage) {
888 boolean shouldHandleMessage = false;
889 int vendor = vendorMessage.getVendor();
890 switch (vendor) {
891 case OFNiciraVendorData.NX_VENDOR_ID:
892 OFNiciraVendorData niciraVendorData =
893 (OFNiciraVendorData)vendorMessage.getVendorData();
894 int dataType = niciraVendorData.getDataType();
895 switch (dataType) {
896 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
897 OFRoleReplyVendorData roleReplyVendorData =
898 (OFRoleReplyVendorData) niciraVendorData;
899 handleRoleReplyMessage(vendorMessage,
900 roleReplyVendorData);
901 break;
902 default:
903 log.warn("Unhandled Nicira VENDOR message; " +
904 "data type = {}", dataType);
905 break;
906 }
907 break;
908 default:
909 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
910 break;
911 }
912
913 return shouldHandleMessage;
914 }
915
916 /**
917 * Dispatch an Openflow message from a switch to the appropriate
918 * handler.
919 * @param m The message to process
920 * @throws IOException
921 * @throws SwitchStateException
922 */
923 @LogMessageDocs({
924 @LogMessageDoc(level="WARN",
925 message="Config Reply from {switch} has " +
926 "miss length set to {length}",
927 explanation="The controller requires that the switch " +
928 "use a miss length of 0xffff for correct " +
929 "function",
930 recommendation="Use a different switch to ensure " +
931 "correct function"),
932 @LogMessageDoc(level="WARN",
933 message="Received ERROR from sw {switch} that "
934 +"indicates roles are not supported "
935 +"but we have received a valid "
936 +"role reply earlier",
937 explanation="The switch sent a confusing message to the" +
938 "controller")
939 })
940 protected void processOFMessage(OFMessage m)
941 throws IOException, SwitchStateException {
942 boolean shouldHandleMessage = false;
943
944 switch (m.getType()) {
945 case HELLO:
946 if (log.isTraceEnabled())
947 log.trace("HELLO from {}", sw);
948
949 if (state.hsState.equals(HandshakeState.START)) {
950 state.hsState = HandshakeState.HELLO;
951 sendHelloConfiguration();
952 } else {
953 throw new SwitchStateException("Unexpected HELLO from "
954 + sw);
955 }
956 break;
957 case ECHO_REQUEST:
958 OFEchoReply reply =
959 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
960 reply.setXid(m.getXid());
961 sw.write(reply, null);
962 break;
963 case ECHO_REPLY:
964 break;
965 case FEATURES_REPLY:
966 if (log.isTraceEnabled())
967 log.trace("Features Reply from {}", sw);
968
969 sw.setFeaturesReply((OFFeaturesReply) m);
970 if (state.hsState.equals(HandshakeState.HELLO)) {
971 sendFeatureReplyConfiguration();
972 state.hsState = HandshakeState.FEATURES_REPLY;
973 // uncomment to enable "dumb" switches like cbench
974 // state.hsState = HandshakeState.READY;
975 // addSwitch(sw);
976 } else {
977 // return results to rest api caller
978 sw.deliverOFFeaturesReply(m);
979 // update database */
980 updateActiveSwitchInfo(sw);
981 }
982 break;
983 case GET_CONFIG_REPLY:
984 if (log.isTraceEnabled())
985 log.trace("Get config reply from {}", sw);
986
987 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
988 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
989 throw new SwitchStateException(em);
990 }
991 OFGetConfigReply cr = (OFGetConfigReply) m;
992 if (cr.getMissSendLength() == (short)0xffff) {
993 log.trace("Config Reply from {} confirms " +
994 "miss length set to 0xffff", sw);
995 } else {
996 log.warn("Config Reply from {} has " +
997 "miss length set to {}",
998 sw, cr.getMissSendLength() & 0xffff);
999 }
1000 state.hasGetConfigReply = true;
1001 checkSwitchReady();
1002 break;
1003 case VENDOR:
1004 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1005 break;
1006 case ERROR:
1007 // TODO: we need better error handling. Especially for
1008 // request/reply style message (stats, roles) we should have
1009 // a unified way to lookup the xid in the error message.
1010 // This will probable involve rewriting the way we handle
1011 // request/reply style messages.
1012 OFError error = (OFError) m;
1013 boolean shouldLogError = true;
1014 // TODO: should we check that firstRoleReplyReceived is false,
1015 // i.e., check only whether the first request fails?
1016 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1017 boolean isBadVendorError =
1018 (error.getErrorType() == OFError.OFErrorType.
1019 OFPET_BAD_REQUEST.getValue());
1020 // We expect to receive a bad vendor error when
1021 // we're connected to a switch that doesn't support
1022 // the Nicira vendor extensions (i.e. not OVS or
1023 // derived from OVS). By protocol, it should also be
1024 // BAD_VENDOR, but too many switch implementations
1025 // get it wrong and we can already check the xid()
1026 // so we can ignore the type with confidence that this
1027 // is not a spurious error
1028 shouldLogError = !isBadVendorError;
1029 if (isBadVendorError) {
1030 if (state.firstRoleReplyReceived && (role != null)) {
1031 log.warn("Received ERROR from sw {} that "
1032 +"indicates roles are not supported "
1033 +"but we have received a valid "
1034 +"role reply earlier", sw);
1035 }
1036 state.firstRoleReplyReceived = true;
1037 sw.deliverRoleRequestNotSupported(error.getXid());
1038 synchronized(roleChanger) {
1039 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1040 // the switch doesn't understand role request
1041 // messages and the current controller role is
1042 // slave. We need to disconnect the switch.
1043 // @see RoleChanger for rationale
1044 sw.getChannel().close();
1045 }
1046 else if (sw.role == null) {
1047 // Controller's role is master: add to
1048 // active
1049 // TODO: check if clearing flow table is
1050 // right choice here.
1051 // Need to clear FlowMods before we add the switch
1052 // and dispatch updates otherwise we have a race condition.
1053 // TODO: switch update is async. Won't we still have a potential
1054 // race condition?
1055 sw.clearAllFlowMods();
1056 addSwitch(sw);
1057 }
1058 }
1059 }
1060 else {
1061 // TODO: Is this the right thing to do if we receive
1062 // some other error besides a bad vendor error?
1063 // Presumably that means the switch did actually
1064 // understand the role request message, but there
1065 // was some other error from processing the message.
1066 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1067 // error code, but it doesn't look like the Nicira
1068 // role request has that. Should check OVS source
1069 // code to see if it's possible for any other errors
1070 // to be returned.
1071 // If we received an error the switch is not
1072 // in the correct role, so we need to disconnect it.
1073 // We could also resend the request but then we need to
1074 // check if there are other pending request in which
1075 // case we shouldn't resend. If we do resend we need
1076 // to make sure that the switch eventually accepts one
1077 // of our requests or disconnect the switch. This feels
1078 // cumbersome.
1079 sw.getChannel().close();
1080 }
1081 }
1082 // Once we support OF 1.2, we'd add code to handle it here.
1083 //if (error.getXid() == state.ofRoleRequestXid) {
1084 //}
1085 if (shouldLogError)
1086 logError(sw, error);
1087 break;
1088 case STATS_REPLY:
1089 if (state.hsState.ordinal() <
1090 HandshakeState.FEATURES_REPLY.ordinal()) {
1091 String em = "Unexpected STATS_REPLY from " + sw;
1092 throw new SwitchStateException(em);
1093 }
1094 sw.deliverStatisticsReply(m);
1095 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1096 processSwitchDescReply();
1097 }
1098 break;
1099 case PORT_STATUS:
1100 // We want to update our port state info even if we're in
1101 // the slave role, but we only want to update storage if
1102 // we're the master (or equal).
1103 boolean updateStorage = state.hsState.
1104 equals(HandshakeState.READY) &&
1105 (sw.getRole() != Role.SLAVE);
1106 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1107 shouldHandleMessage = true;
1108 break;
1109
1110 default:
1111 shouldHandleMessage = true;
1112 break;
1113 }
1114
1115 if (shouldHandleMessage) {
1116 sw.getListenerReadLock().lock();
1117 try {
1118 if (sw.isConnected()) {
1119 if (!state.hsState.equals(HandshakeState.READY)) {
1120 log.debug("Ignoring message type {} received " +
1121 "from switch {} before switch is " +
1122 "fully configured.", m.getType(), sw);
1123 }
1124 // Check if the controller is in the slave role for the
1125 // switch. If it is, then don't dispatch the message to
1126 // the listeners.
1127 // TODO: Should we dispatch messages that we expect to
1128 // receive when we're in the slave role, e.g. port
1129 // status messages? Since we're "hiding" switches from
1130 // the listeners when we're in the slave role, then it
1131 // seems a little weird to dispatch port status messages
1132 // to them. On the other hand there might be special
1133 // modules that care about all of the connected switches
1134 // and would like to receive port status notifications.
1135 else if (sw.getRole() == Role.SLAVE) {
1136 // Don't log message if it's a port status message
1137 // since we expect to receive those from the switch
1138 // and don't want to emit spurious messages.
1139 if (m.getType() != OFType.PORT_STATUS) {
1140 log.debug("Ignoring message type {} received " +
1141 "from switch {} while in the slave role.",
1142 m.getType(), sw);
1143 }
1144 } else {
1145 handleMessage(sw, m, null);
1146 }
1147 }
1148 }
1149 finally {
1150 sw.getListenerReadLock().unlock();
1151 }
1152 }
1153 }
1154 }
1155
1156 // ****************
1157 // Message handlers
1158 // ****************
1159
1160 protected void handlePortStatusMessage(IOFSwitch sw,
1161 OFPortStatus m,
1162 boolean updateStorage) {
1163 short portNumber = m.getDesc().getPortNumber();
1164 OFPhysicalPort port = m.getDesc();
1165 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001166 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1167 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001168 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001169 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001170 swStore.addPort(sw.getStringId(), port);
1171 } else {
1172 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1173 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001174 if (updateStorage)
1175 updatePortInfo(sw, port);
1176 log.debug("Port #{} modified for {}", portNumber, sw);
1177 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1178 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001179 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001180 if (updateStorage)
1181 updatePortInfo(sw, port);
1182 log.debug("Port #{} added for {}", portNumber, sw);
1183 } else if (m.getReason() ==
1184 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1185 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001186 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001187 if (updateStorage)
1188 removePortInfo(sw, portNumber);
1189 log.debug("Port #{} deleted for {}", portNumber, sw);
1190 }
1191 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1192 try {
1193 this.updates.put(update);
1194 } catch (InterruptedException e) {
1195 log.error("Failure adding update to queue", e);
1196 }
1197 }
1198
1199 /**
1200 * flcontext_cache - Keep a thread local stack of contexts
1201 */
1202 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1203 new ThreadLocal <Stack<FloodlightContext>> () {
1204 @Override
1205 protected Stack<FloodlightContext> initialValue() {
1206 return new Stack<FloodlightContext>();
1207 }
1208 };
1209
1210 /**
1211 * flcontext_alloc - pop a context off the stack, if required create a new one
1212 * @return FloodlightContext
1213 */
1214 protected static FloodlightContext flcontext_alloc() {
1215 FloodlightContext flcontext = null;
1216
1217 if (flcontext_cache.get().empty()) {
1218 flcontext = new FloodlightContext();
1219 }
1220 else {
1221 flcontext = flcontext_cache.get().pop();
1222 }
1223
1224 return flcontext;
1225 }
1226
1227 /**
1228 * flcontext_free - Free the context to the current thread
1229 * @param flcontext
1230 */
1231 protected void flcontext_free(FloodlightContext flcontext) {
1232 flcontext.getStorage().clear();
1233 flcontext_cache.get().push(flcontext);
1234 }
1235
1236 /**
1237 * Handle replies to certain OFMessages, and pass others off to listeners
1238 * @param sw The switch for the message
1239 * @param m The message
1240 * @param bContext The floodlight context. If null then floodlight context would
1241 * be allocated in this function
1242 * @throws IOException
1243 */
1244 @LogMessageDocs({
1245 @LogMessageDoc(level="ERROR",
1246 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1247 " field is empty.",
1248 explanation="The switch sent an improperly-formatted PacketIn" +
1249 " message",
1250 recommendation=LogMessageDoc.CHECK_SWITCH),
1251 @LogMessageDoc(level="WARN",
1252 message="Unhandled OF Message: {} from {}",
1253 explanation="The switch sent a message not handled by " +
1254 "the controller")
1255 })
1256 protected void handleMessage(IOFSwitch sw, OFMessage m,
1257 FloodlightContext bContext)
1258 throws IOException {
1259 Ethernet eth = null;
1260
1261 switch (m.getType()) {
1262 case PACKET_IN:
1263 OFPacketIn pi = (OFPacketIn)m;
1264
1265 if (pi.getPacketData().length <= 0) {
1266 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1267 ") because the data field is empty.");
1268 return;
1269 }
1270
1271 if (Controller.ALWAYS_DECODE_ETH) {
1272 eth = new Ethernet();
1273 eth.deserialize(pi.getPacketData(), 0,
1274 pi.getPacketData().length);
1275 counterStore.updatePacketInCounters(sw, m, eth);
1276 }
1277 // fall through to default case...
1278
1279 default:
1280
1281 List<IOFMessageListener> listeners = null;
1282 if (messageListeners.containsKey(m.getType())) {
1283 listeners = messageListeners.get(m.getType()).
1284 getOrderedListeners();
1285 }
1286
1287 FloodlightContext bc = null;
1288 if (listeners != null) {
1289 // Check if floodlight context is passed from the calling
1290 // function, if so use that floodlight context, otherwise
1291 // allocate one
1292 if (bContext == null) {
1293 bc = flcontext_alloc();
1294 } else {
1295 bc = bContext;
1296 }
1297 if (eth != null) {
1298 IFloodlightProviderService.bcStore.put(bc,
1299 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1300 eth);
1301 }
1302
1303 // Get the starting time (overall and per-component) of
1304 // the processing chain for this packet if performance
1305 // monitoring is turned on
1306 pktinProcTime.bootstrap(listeners);
1307 pktinProcTime.recordStartTimePktIn();
1308 Command cmd;
1309 for (IOFMessageListener listener : listeners) {
1310 if (listener instanceof IOFSwitchFilter) {
1311 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1312 continue;
1313 }
1314 }
1315
1316 pktinProcTime.recordStartTimeComp(listener);
1317 cmd = listener.receive(sw, m, bc);
1318 pktinProcTime.recordEndTimeComp(listener);
1319
1320 if (Command.STOP.equals(cmd)) {
1321 break;
1322 }
1323 }
1324 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1325 } else {
1326 log.warn("Unhandled OF Message: {} from {}", m, sw);
1327 }
1328
1329 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1330 }
1331 }
1332
1333 /**
1334 * Log an OpenFlow error message from a switch
1335 * @param sw The switch that sent the error
1336 * @param error The error message
1337 */
1338 @LogMessageDoc(level="ERROR",
1339 message="Error {error type} {error code} from {switch}",
1340 explanation="The switch responded with an unexpected error" +
1341 "to an OpenFlow message from the controller",
1342 recommendation="This could indicate improper network operation. " +
1343 "If the problem persists restarting the switch and " +
1344 "controller may help."
1345 )
1346 protected void logError(IOFSwitch sw, OFError error) {
1347 int etint = 0xffff & error.getErrorType();
1348 if (etint < 0 || etint >= OFErrorType.values().length) {
1349 log.error("Unknown error code {} from sw {}", etint, sw);
1350 }
1351 OFErrorType et = OFErrorType.values()[etint];
1352 switch (et) {
1353 case OFPET_HELLO_FAILED:
1354 OFHelloFailedCode hfc =
1355 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1356 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1357 break;
1358 case OFPET_BAD_REQUEST:
1359 OFBadRequestCode brc =
1360 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1361 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1362 break;
1363 case OFPET_BAD_ACTION:
1364 OFBadActionCode bac =
1365 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1366 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1367 break;
1368 case OFPET_FLOW_MOD_FAILED:
1369 OFFlowModFailedCode fmfc =
1370 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1371 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1372 break;
1373 case OFPET_PORT_MOD_FAILED:
1374 OFPortModFailedCode pmfc =
1375 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1376 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1377 break;
1378 case OFPET_QUEUE_OP_FAILED:
1379 OFQueueOpFailedCode qofc =
1380 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1381 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1382 break;
1383 default:
1384 break;
1385 }
1386 }
1387
1388 /**
1389 * Add a switch to the active switch list and call the switch listeners.
1390 * This happens either when a switch first connects (and the controller is
1391 * not in the slave role) or when the role of the controller changes from
1392 * slave to master.
1393 * @param sw the switch that has been added
1394 */
1395 // TODO: need to rethink locking and the synchronous switch update.
1396 // We can / should also handle duplicate DPIDs in connectedSwitches
1397 @LogMessageDoc(level="ERROR",
1398 message="New switch added {switch} for already-added switch {switch}",
1399 explanation="A switch with the same DPID as another switch " +
1400 "connected to the controller. This can be caused by " +
1401 "multiple switches configured with the same DPID, or " +
1402 "by a switch reconnected very quickly after " +
1403 "disconnecting.",
1404 recommendation="If this happens repeatedly, it is likely there " +
1405 "are switches with duplicate DPIDs on the network. " +
1406 "Reconfigure the appropriate switches. If it happens " +
1407 "very rarely, then it is likely this is a transient " +
1408 "network problem that can be ignored."
1409 )
1410 protected void addSwitch(IOFSwitch sw) {
1411 // TODO: is it safe to modify the HashMap without holding
1412 // the old switch's lock?
1413 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1414 if (sw == oldSw) {
1415 // Note == for object equality, not .equals for value
1416 log.info("New add switch for pre-existing switch {}", sw);
1417 return;
1418 }
1419
1420 if (oldSw != null) {
1421 oldSw.getListenerWriteLock().lock();
1422 try {
1423 log.error("New switch added {} for already-added switch {}",
1424 sw, oldSw);
1425 // Set the connected flag to false to suppress calling
1426 // the listeners for this switch in processOFMessage
1427 oldSw.setConnected(false);
1428
1429 oldSw.cancelAllStatisticsReplies();
1430
1431 updateInactiveSwitchInfo(oldSw);
1432
1433 // we need to clean out old switch state definitively
1434 // before adding the new switch
1435 // FIXME: It seems not completely kosher to call the
1436 // switch listeners here. I thought one of the points of
1437 // having the asynchronous switch update mechanism was so
1438 // the addedSwitch and removedSwitch were always called
1439 // from a single thread to simplify concurrency issues
1440 // for the listener.
1441 if (switchListeners != null) {
1442 for (IOFSwitchListener listener : switchListeners) {
1443 listener.removedSwitch(oldSw);
1444 }
1445 }
1446 // will eventually trigger a removeSwitch(), which will cause
1447 // a "Not removing Switch ... already removed debug message.
1448 // TODO: Figure out a way to handle this that avoids the
1449 // spurious debug message.
1450 oldSw.getChannel().close();
1451 }
1452 finally {
1453 oldSw.getListenerWriteLock().unlock();
1454 }
1455 }
1456
1457 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001458 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001459 for (OFPhysicalPort port: sw.getPorts()) {
1460 swStore.addPort(sw.getStringId(), port);
1461 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001462 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1463 try {
1464 this.updates.put(update);
1465 } catch (InterruptedException e) {
1466 log.error("Failure adding update to queue", e);
1467 }
1468 }
1469
1470 /**
1471 * Remove a switch from the active switch list and call the switch listeners.
1472 * This happens either when the switch is disconnected or when the
1473 * controller's role for the switch changes from master to slave.
1474 * @param sw the switch that has been removed
1475 */
1476 protected void removeSwitch(IOFSwitch sw) {
1477 // No need to acquire the listener lock, since
1478 // this method is only called after netty has processed all
1479 // pending messages
1480 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001481 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001482 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1483 log.debug("Not removing switch {}; already removed", sw);
1484 return;
1485 }
1486 // We cancel all outstanding statistics replies if the switch transition
1487 // from active. In the future we might allow statistics requests
1488 // from slave controllers. Then we need to move this cancelation
1489 // to switch disconnect
1490 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001491
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001492
1493 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1494 // here if role support is enabled. In that case if the switch is being
1495 // removed because we've been switched to being in the slave role, then I think
1496 // it's possible that the new master may have already been promoted to master
1497 // and written out the active switch state to storage. If we now execute
1498 // updateInactiveSwitchInfo we may wipe out all of the state that was
1499 // written out by the new master. Maybe need to revisit how we handle all
1500 // of the switch state that's written to storage.
1501
1502 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001503
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001504 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1505 try {
1506 this.updates.put(update);
1507 } catch (InterruptedException e) {
1508 log.error("Failure adding update to queue", e);
1509 }
1510 }
1511
1512 // ***************
1513 // IFloodlightProvider
1514 // ***************
1515
1516 @Override
1517 public synchronized void addOFMessageListener(OFType type,
1518 IOFMessageListener listener) {
1519 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1520 messageListeners.get(type);
1521 if (ldd == null) {
1522 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1523 messageListeners.put(type, ldd);
1524 }
1525 ldd.addListener(type, listener);
1526 }
1527
1528 @Override
1529 public synchronized void removeOFMessageListener(OFType type,
1530 IOFMessageListener listener) {
1531 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1532 messageListeners.get(type);
1533 if (ldd != null) {
1534 ldd.removeListener(listener);
1535 }
1536 }
1537
1538 private void logListeners() {
1539 for (Map.Entry<OFType,
1540 ListenerDispatcher<OFType,
1541 IOFMessageListener>> entry
1542 : messageListeners.entrySet()) {
1543
1544 OFType type = entry.getKey();
1545 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1546 entry.getValue();
1547
1548 StringBuffer sb = new StringBuffer();
1549 sb.append("OFListeners for ");
1550 sb.append(type);
1551 sb.append(": ");
1552 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1553 sb.append(l.getName());
1554 sb.append(",");
1555 }
1556 log.debug(sb.toString());
1557 }
1558 }
1559
1560 public void removeOFMessageListeners(OFType type) {
1561 messageListeners.remove(type);
1562 }
1563
1564 @Override
1565 public Map<Long, IOFSwitch> getSwitches() {
1566 return Collections.unmodifiableMap(this.activeSwitches);
1567 }
1568
1569 @Override
1570 public void addOFSwitchListener(IOFSwitchListener listener) {
1571 this.switchListeners.add(listener);
1572 }
1573
1574 @Override
1575 public void removeOFSwitchListener(IOFSwitchListener listener) {
1576 this.switchListeners.remove(listener);
1577 }
1578
1579 @Override
1580 public Map<OFType, List<IOFMessageListener>> getListeners() {
1581 Map<OFType, List<IOFMessageListener>> lers =
1582 new HashMap<OFType, List<IOFMessageListener>>();
1583 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1584 messageListeners.entrySet()) {
1585 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1586 }
1587 return Collections.unmodifiableMap(lers);
1588 }
1589
1590 @Override
1591 @LogMessageDocs({
1592 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1593 "a null switch",
1594 explanation="Failed to process a message because the switch " +
1595 " is no longer connected."),
1596 @LogMessageDoc(level="ERROR",
1597 message="Error reinjecting OFMessage on switch {switch}",
1598 explanation="An I/O error occured while attempting to " +
1599 "process an OpenFlow message",
1600 recommendation=LogMessageDoc.CHECK_SWITCH)
1601 })
1602 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1603 FloodlightContext bc) {
1604 if (sw == null) {
1605 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1606 return false;
1607 }
1608
1609 // FIXME: Do we need to be able to inject messages to switches
1610 // where we're the slave controller (i.e. they're connected but
1611 // not active)?
1612 // FIXME: Don't we need synchronization logic here so we're holding
1613 // the listener read lock when we call handleMessage? After some
1614 // discussions it sounds like the right thing to do here would be to
1615 // inject the message as a netty upstream channel event so it goes
1616 // through the normal netty event processing, including being
1617 // handled
1618 if (!activeSwitches.containsKey(sw.getId())) return false;
1619
1620 try {
1621 // Pass Floodlight context to the handleMessages()
1622 handleMessage(sw, msg, bc);
1623 } catch (IOException e) {
1624 log.error("Error reinjecting OFMessage on switch {}",
1625 HexString.toHexString(sw.getId()));
1626 return false;
1627 }
1628 return true;
1629 }
1630
1631 @Override
1632 @LogMessageDoc(message="Calling System.exit",
1633 explanation="The controller is terminating")
1634 public synchronized void terminate() {
1635 log.info("Calling System.exit");
1636 System.exit(1);
1637 }
1638
1639 @Override
1640 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1641 // call the overloaded version with floodlight context set to null
1642 return injectOfMessage(sw, msg, null);
1643 }
1644
1645 @Override
1646 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1647 FloodlightContext bc) {
1648 if (log.isTraceEnabled()) {
1649 String str = OFMessage.getDataAsString(sw, m, bc);
1650 log.trace("{}", str);
1651 }
1652
1653 List<IOFMessageListener> listeners = null;
1654 if (messageListeners.containsKey(m.getType())) {
1655 listeners =
1656 messageListeners.get(m.getType()).getOrderedListeners();
1657 }
1658
1659 if (listeners != null) {
1660 for (IOFMessageListener listener : listeners) {
1661 if (listener instanceof IOFSwitchFilter) {
1662 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1663 continue;
1664 }
1665 }
1666 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1667 break;
1668 }
1669 }
1670 }
1671 }
1672
1673 @Override
1674 public BasicFactory getOFMessageFactory() {
1675 return factory;
1676 }
1677
1678 @Override
1679 public String getControllerId() {
1680 return controllerId;
1681 }
1682
1683 // **************
1684 // Initialization
1685 // **************
1686
1687 protected void updateAllInactiveSwitchInfo() {
1688 if (role == Role.SLAVE) {
1689 return;
1690 }
1691 String controllerId = getControllerId();
1692 String[] switchColumns = { SWITCH_DATAPATH_ID,
1693 SWITCH_CONTROLLER_ID,
1694 SWITCH_ACTIVE };
1695 String[] portColumns = { PORT_ID, PORT_SWITCH };
1696 IResultSet switchResultSet = null;
1697 try {
1698 OperatorPredicate op =
1699 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1700 OperatorPredicate.Operator.EQ,
1701 controllerId);
1702 switchResultSet =
1703 storageSource.executeQuery(SWITCH_TABLE_NAME,
1704 switchColumns,
1705 op, null);
1706 while (switchResultSet.next()) {
1707 IResultSet portResultSet = null;
1708 try {
1709 String datapathId =
1710 switchResultSet.getString(SWITCH_DATAPATH_ID);
1711 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1712 op = new OperatorPredicate(PORT_SWITCH,
1713 OperatorPredicate.Operator.EQ,
1714 datapathId);
1715 portResultSet =
1716 storageSource.executeQuery(PORT_TABLE_NAME,
1717 portColumns,
1718 op, null);
1719 while (portResultSet.next()) {
1720 portResultSet.deleteRow();
1721 }
1722 portResultSet.save();
1723 }
1724 finally {
1725 if (portResultSet != null)
1726 portResultSet.close();
1727 }
1728 }
1729 switchResultSet.save();
1730 }
1731 finally {
1732 if (switchResultSet != null)
1733 switchResultSet.close();
1734 }
1735 }
1736
1737 protected void updateControllerInfo() {
1738 updateAllInactiveSwitchInfo();
1739
1740 // Write out the controller info to the storage source
1741 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1742 String id = getControllerId();
1743 controllerInfo.put(CONTROLLER_ID, id);
1744 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1745 }
1746
1747 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1748 if (role == Role.SLAVE) {
1749 return;
1750 }
1751 // Obtain the row info for the switch
1752 Map<String, Object> switchInfo = new HashMap<String, Object>();
1753 String datapathIdString = sw.getStringId();
1754 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1755 String controllerId = getControllerId();
1756 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1757 Date connectedSince = sw.getConnectedSince();
1758 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1759 Channel channel = sw.getChannel();
1760 SocketAddress socketAddress = channel.getRemoteAddress();
1761 if (socketAddress != null) {
1762 String socketAddressString = socketAddress.toString();
1763 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1764 if (socketAddress instanceof InetSocketAddress) {
1765 InetSocketAddress inetSocketAddress =
1766 (InetSocketAddress)socketAddress;
1767 InetAddress inetAddress = inetSocketAddress.getAddress();
1768 String ip = inetAddress.getHostAddress();
1769 switchInfo.put(SWITCH_IP, ip);
1770 }
1771 }
1772
1773 // Write out the switch features info
1774 long capabilities = U32.f(sw.getCapabilities());
1775 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1776 long buffers = U32.f(sw.getBuffers());
1777 switchInfo.put(SWITCH_BUFFERS, buffers);
1778 long tables = U32.f(sw.getTables());
1779 switchInfo.put(SWITCH_TABLES, tables);
1780 long actions = U32.f(sw.getActions());
1781 switchInfo.put(SWITCH_ACTIONS, actions);
1782 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1783
1784 // Update the switch
1785 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1786
1787 // Update the ports
1788 for (OFPhysicalPort port: sw.getPorts()) {
1789 updatePortInfo(sw, port);
1790 }
1791 }
1792
1793 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1794 if (role == Role.SLAVE) {
1795 return;
1796 }
1797 log.debug("Update DB with inactiveSW {}", sw);
1798 // Update the controller info in the storage source to be inactive
1799 Map<String, Object> switchInfo = new HashMap<String, Object>();
1800 String datapathIdString = sw.getStringId();
1801 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1802 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1803 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1804 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1805 }
1806
1807 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1808 if (role == Role.SLAVE) {
1809 return;
1810 }
1811 String datapathIdString = sw.getStringId();
1812 Map<String, Object> portInfo = new HashMap<String, Object>();
1813 int portNumber = U16.f(port.getPortNumber());
1814 String id = datapathIdString + "|" + portNumber;
1815 portInfo.put(PORT_ID, id);
1816 portInfo.put(PORT_SWITCH, datapathIdString);
1817 portInfo.put(PORT_NUMBER, portNumber);
1818 byte[] hardwareAddress = port.getHardwareAddress();
1819 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1820 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1821 String name = port.getName();
1822 portInfo.put(PORT_NAME, name);
1823 long config = U32.f(port.getConfig());
1824 portInfo.put(PORT_CONFIG, config);
1825 long state = U32.f(port.getState());
1826 portInfo.put(PORT_STATE, state);
1827 long currentFeatures = U32.f(port.getCurrentFeatures());
1828 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1829 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1830 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1831 long supportedFeatures = U32.f(port.getSupportedFeatures());
1832 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1833 long peerFeatures = U32.f(port.getPeerFeatures());
1834 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1835 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1836 }
1837
1838 /**
1839 * Read switch port data from storage and write it into a switch object
1840 * @param sw the switch to update
1841 */
1842 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1843 OperatorPredicate op =
1844 new OperatorPredicate(PORT_SWITCH,
1845 OperatorPredicate.Operator.EQ,
1846 sw.getStringId());
1847 IResultSet portResultSet =
1848 storageSource.executeQuery(PORT_TABLE_NAME,
1849 null, op, null);
1850 //Map<Short, OFPhysicalPort> oldports =
1851 // new HashMap<Short, OFPhysicalPort>();
1852 //oldports.putAll(sw.getPorts());
1853
1854 while (portResultSet.next()) {
1855 try {
1856 OFPhysicalPort p = new OFPhysicalPort();
1857 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1858 p.setName(portResultSet.getString(PORT_NAME));
1859 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1860 p.setState((int)portResultSet.getLong(PORT_STATE));
1861 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1862 p.setHardwareAddress(HexString.fromHexString(portMac));
1863 p.setCurrentFeatures((int)portResultSet.
1864 getLong(PORT_CURRENT_FEATURES));
1865 p.setAdvertisedFeatures((int)portResultSet.
1866 getLong(PORT_ADVERTISED_FEATURES));
1867 p.setSupportedFeatures((int)portResultSet.
1868 getLong(PORT_SUPPORTED_FEATURES));
1869 p.setPeerFeatures((int)portResultSet.
1870 getLong(PORT_PEER_FEATURES));
1871 //oldports.remove(Short.valueOf(p.getPortNumber()));
1872 sw.setPort(p);
1873 } catch (NullPointerException e) {
1874 // ignore
1875 }
1876 }
1877 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1878 try {
1879 this.updates.put(update);
1880 } catch (InterruptedException e) {
1881 log.error("Failure adding update to queue", e);
1882 }
1883 }
1884
1885 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1886 if (role == Role.SLAVE) {
1887 return;
1888 }
1889 String datapathIdString = sw.getStringId();
1890 String id = datapathIdString + "|" + portNumber;
1891 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1892 }
1893
1894 /**
1895 * Sets the initial role based on properties in the config params.
1896 * It looks for two different properties.
1897 * If the "role" property is specified then the value should be
1898 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1899 * controller is set to the specified value. If the "role" property
1900 * is not specified then it looks next for the "role.path" property.
1901 * In this case the value should be the path to a property file in
1902 * the file system that contains a property called "floodlight.role"
1903 * which can be one of the values listed above for the "role" property.
1904 * The idea behind the "role.path" mechanism is that you have some
1905 * separate heartbeat and master controller election algorithm that
1906 * determines the role of the controller. When a role transition happens,
1907 * it updates the current role in the file specified by the "role.path"
1908 * file. Then if floodlight restarts for some reason it can get the
1909 * correct current role of the controller from the file.
1910 * @param configParams The config params for the FloodlightProvider service
1911 * @return A valid role if role information is specified in the
1912 * config params, otherwise null
1913 */
1914 @LogMessageDocs({
1915 @LogMessageDoc(message="Controller role set to {role}",
1916 explanation="Setting the initial HA role to "),
1917 @LogMessageDoc(level="ERROR",
1918 message="Invalid current role value: {role}",
1919 explanation="An invalid HA role value was read from the " +
1920 "properties file",
1921 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1922 })
1923 protected Role getInitialRole(Map<String, String> configParams) {
1924 Role role = null;
1925 String roleString = configParams.get("role");
1926 if (roleString == null) {
1927 String rolePath = configParams.get("rolepath");
1928 if (rolePath != null) {
1929 Properties properties = new Properties();
1930 try {
1931 properties.load(new FileInputStream(rolePath));
1932 roleString = properties.getProperty("floodlight.role");
1933 }
1934 catch (IOException exc) {
1935 // Don't treat it as an error if the file specified by the
1936 // rolepath property doesn't exist. This lets us enable the
1937 // HA mechanism by just creating/setting the floodlight.role
1938 // property in that file without having to modify the
1939 // floodlight properties.
1940 }
1941 }
1942 }
1943
1944 if (roleString != null) {
1945 // Canonicalize the string to the form used for the enum constants
1946 roleString = roleString.trim().toUpperCase();
1947 try {
1948 role = Role.valueOf(roleString);
1949 }
1950 catch (IllegalArgumentException exc) {
1951 log.error("Invalid current role value: {}", roleString);
1952 }
1953 }
1954
1955 log.info("Controller role set to {}", role);
1956
1957 return role;
1958 }
1959
1960 /**
1961 * Tell controller that we're ready to accept switches loop
1962 * @throws IOException
1963 */
1964 @LogMessageDocs({
1965 @LogMessageDoc(message="Listening for switch connections on {address}",
1966 explanation="The controller is ready and listening for new" +
1967 " switch connections"),
1968 @LogMessageDoc(message="Storage exception in controller " +
1969 "updates loop; terminating process",
1970 explanation=ERROR_DATABASE,
1971 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1972 @LogMessageDoc(level="ERROR",
1973 message="Exception in controller updates loop",
1974 explanation="Failed to dispatch controller event",
1975 recommendation=LogMessageDoc.GENERIC_ACTION)
1976 })
1977 public void run() {
1978 if (log.isDebugEnabled()) {
1979 logListeners();
1980 }
1981
1982 try {
1983 final ServerBootstrap bootstrap = createServerBootStrap();
1984
1985 bootstrap.setOption("reuseAddr", true);
1986 bootstrap.setOption("child.keepAlive", true);
1987 bootstrap.setOption("child.tcpNoDelay", true);
1988 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
1989
1990 ChannelPipelineFactory pfact =
1991 new OpenflowPipelineFactory(this, null);
1992 bootstrap.setPipelineFactory(pfact);
1993 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
1994 final ChannelGroup cg = new DefaultChannelGroup();
1995 cg.add(bootstrap.bind(sa));
1996
1997 log.info("Listening for switch connections on {}", sa);
1998 } catch (Exception e) {
1999 throw new RuntimeException(e);
2000 }
2001
2002 // main loop
2003 while (true) {
2004 try {
2005 IUpdate update = updates.take();
2006 update.dispatch();
2007 } catch (InterruptedException e) {
2008 return;
2009 } catch (StorageException e) {
2010 log.error("Storage exception in controller " +
2011 "updates loop; terminating process", e);
2012 return;
2013 } catch (Exception e) {
2014 log.error("Exception in controller updates loop", e);
2015 }
2016 }
2017 }
2018
2019 private ServerBootstrap createServerBootStrap() {
2020 if (workerThreads == 0) {
2021 return new ServerBootstrap(
2022 new NioServerSocketChannelFactory(
2023 Executors.newCachedThreadPool(),
2024 Executors.newCachedThreadPool()));
2025 } else {
2026 return new ServerBootstrap(
2027 new NioServerSocketChannelFactory(
2028 Executors.newCachedThreadPool(),
2029 Executors.newCachedThreadPool(), workerThreads));
2030 }
2031 }
2032
2033 public void setConfigParams(Map<String, String> configParams) {
2034 String ofPort = configParams.get("openflowport");
2035 if (ofPort != null) {
2036 this.openFlowPort = Integer.parseInt(ofPort);
2037 }
2038 log.debug("OpenFlow port set to {}", this.openFlowPort);
2039 String threads = configParams.get("workerthreads");
2040 if (threads != null) {
2041 this.workerThreads = Integer.parseInt(threads);
2042 }
2043 log.debug("Number of worker threads set to {}", this.workerThreads);
2044 String controllerId = configParams.get("controllerid");
2045 if (controllerId != null) {
2046 this.controllerId = controllerId;
2047 }
2048 log.debug("ControllerId set to {}", this.controllerId);
2049 }
2050
2051 private void initVendorMessages() {
2052 // Configure openflowj to be able to parse the role request/reply
2053 // vendor messages.
2054 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2055 OFNiciraVendorData.NX_VENDOR_ID, 4);
2056 OFVendorId.registerVendorId(niciraVendorId);
2057 OFBasicVendorDataType roleRequestVendorData =
2058 new OFBasicVendorDataType(
2059 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2060 OFRoleRequestVendorData.getInstantiable());
2061 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2062 OFBasicVendorDataType roleReplyVendorData =
2063 new OFBasicVendorDataType(
2064 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2065 OFRoleReplyVendorData.getInstantiable());
2066 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2067 }
2068
2069 /**
2070 * Initialize internal data structures
2071 */
2072 public void init(Map<String, String> configParams) {
2073 // These data structures are initialized here because other
2074 // module's startUp() might be called before ours
2075 this.messageListeners =
2076 new ConcurrentHashMap<OFType,
2077 ListenerDispatcher<OFType,
2078 IOFMessageListener>>();
2079 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2080 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2081 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2082 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2083 this.controllerNodeIPsCache = new HashMap<String, String>();
2084 this.updates = new LinkedBlockingQueue<IUpdate>();
2085 this.factory = new BasicFactory();
2086 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2087 setConfigParams(configParams);
2088 this.role = getInitialRole(configParams);
2089 this.roleChanger = new RoleChanger();
2090 initVendorMessages();
2091 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002092 }
2093
2094 /**
2095 * Startup all of the controller's components
2096 */
2097 @LogMessageDoc(message="Waiting for storage source",
2098 explanation="The system database is not yet ready",
2099 recommendation="If this message persists, this indicates " +
2100 "that the system database has failed to start. " +
2101 LogMessageDoc.CHECK_CONTROLLER)
2102 public void startupComponents() {
2103 // Create the table names we use
2104 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2105 storageSource.createTable(SWITCH_TABLE_NAME, null);
2106 storageSource.createTable(PORT_TABLE_NAME, null);
2107 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2108 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2109 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2110 CONTROLLER_ID);
2111 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2112 SWITCH_DATAPATH_ID);
2113 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2114 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2115 CONTROLLER_INTERFACE_ID);
2116 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2117
2118 while (true) {
2119 try {
2120 updateControllerInfo();
2121 break;
2122 }
2123 catch (StorageException e) {
2124 log.info("Waiting for storage source");
2125 try {
2126 Thread.sleep(1000);
2127 } catch (InterruptedException e1) {
2128 }
2129 }
2130 }
2131
2132 // Add our REST API
2133 restApi.addRestletRoutable(new CoreWebRoutable());
2134 }
2135
2136 @Override
2137 public void addInfoProvider(String type, IInfoProvider provider) {
2138 if (!providerMap.containsKey(type)) {
2139 providerMap.put(type, new ArrayList<IInfoProvider>());
2140 }
2141 providerMap.get(type).add(provider);
2142 }
2143
2144 @Override
2145 public void removeInfoProvider(String type, IInfoProvider provider) {
2146 if (!providerMap.containsKey(type)) {
2147 log.debug("Provider type {} doesn't exist.", type);
2148 return;
2149 }
2150
2151 providerMap.get(type).remove(provider);
2152 }
2153
2154 public Map<String, Object> getControllerInfo(String type) {
2155 if (!providerMap.containsKey(type)) return null;
2156
2157 Map<String, Object> result = new LinkedHashMap<String, Object>();
2158 for (IInfoProvider provider : providerMap.get(type)) {
2159 result.putAll(provider.getInfo(type));
2160 }
2161
2162 return result;
2163 }
2164
2165 @Override
2166 public void addHAListener(IHAListener listener) {
2167 this.haListeners.add(listener);
2168 }
2169
2170 @Override
2171 public void removeHAListener(IHAListener listener) {
2172 this.haListeners.remove(listener);
2173 }
2174
2175
2176 /**
2177 * Handle changes to the controller nodes IPs and dispatch update.
2178 */
2179 @SuppressWarnings("unchecked")
2180 protected void handleControllerNodeIPChanges() {
2181 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2182 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2183 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2184 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2185 CONTROLLER_INTERFACE_TYPE,
2186 CONTROLLER_INTERFACE_NUMBER,
2187 CONTROLLER_INTERFACE_DISCOVERED_IP };
2188 synchronized(controllerNodeIPsCache) {
2189 // We currently assume that interface Ethernet0 is the relevant
2190 // controller interface. Might change.
2191 // We could (should?) implement this using
2192 // predicates, but creating the individual and compound predicate
2193 // seems more overhead then just checking every row. Particularly,
2194 // since the number of rows is small and changes infrequent
2195 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2196 colNames,null, null);
2197 while (res.next()) {
2198 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2199 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2200 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2201 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2202 String curIP = controllerNodeIPsCache.get(controllerID);
2203
2204 curControllerNodeIPs.put(controllerID, discoveredIP);
2205 if (curIP == null) {
2206 // new controller node IP
2207 addedControllerNodeIPs.put(controllerID, discoveredIP);
2208 }
2209 else if (curIP != discoveredIP) {
2210 // IP changed
2211 removedControllerNodeIPs.put(controllerID, curIP);
2212 addedControllerNodeIPs.put(controllerID, discoveredIP);
2213 }
2214 }
2215 }
2216 // Now figure out if rows have been deleted. We can't use the
2217 // rowKeys from rowsDeleted directly, since the tables primary
2218 // key is a compound that we can't disassemble
2219 Set<String> curEntries = curControllerNodeIPs.keySet();
2220 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2221 removedEntries.removeAll(curEntries);
2222 for (String removedControllerID : removedEntries)
2223 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2224 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2225 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2226 curControllerNodeIPs, addedControllerNodeIPs,
2227 removedControllerNodeIPs);
2228 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2229 try {
2230 this.updates.put(update);
2231 } catch (InterruptedException e) {
2232 log.error("Failure adding update to queue", e);
2233 }
2234 }
2235 }
2236 }
2237
2238 @Override
2239 public Map<String, String> getControllerNodeIPs() {
2240 // We return a copy of the mapping so we can guarantee that
2241 // the mapping return is the same as one that will be (or was)
2242 // dispatched to IHAListeners
2243 HashMap<String,String> retval = new HashMap<String,String>();
2244 synchronized(controllerNodeIPsCache) {
2245 retval.putAll(controllerNodeIPsCache);
2246 }
2247 return retval;
2248 }
2249
2250 @Override
2251 public void rowsModified(String tableName, Set<Object> rowKeys) {
2252 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2253 handleControllerNodeIPChanges();
2254 }
2255
2256 }
2257
2258 @Override
2259 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2260 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2261 handleControllerNodeIPChanges();
2262 }
2263 }
2264
2265 @Override
2266 public long getSystemStartTime() {
2267 return (this.systemStartTime);
2268 }
2269
2270 @Override
2271 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2272 this.alwaysClearFlowsOnSwAdd = value;
2273 }
2274
2275 public boolean getAlwaysClearFlowsOnSwAdd() {
2276 return this.alwaysClearFlowsOnSwAdd;
2277 }
2278}