blob: 6e816c756aa3fdf76faea99da6edd94ef698a836 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.util.ArrayList;
26import java.nio.channels.ClosedChannelException;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Date;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.LinkedHashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Properties;
38import java.util.Set;
39import java.util.Stack;
40import java.util.concurrent.BlockingQueue;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ConcurrentMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.Executors;
45import java.util.concurrent.Future;
46import java.util.concurrent.LinkedBlockingQueue;
47import java.util.concurrent.RejectedExecutionException;
48import java.util.concurrent.TimeUnit;
49import java.util.concurrent.TimeoutException;
50
51import net.floodlightcontroller.core.FloodlightContext;
52import net.floodlightcontroller.core.IFloodlightProviderService;
53import net.floodlightcontroller.core.IHAListener;
54import net.floodlightcontroller.core.IInfoProvider;
Pankaj Berde8557a462013-01-07 08:59:31 -080055import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IOFMessageListener;
57import net.floodlightcontroller.core.IListener.Command;
58import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080061import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
69import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
70import net.floodlightcontroller.restserver.IRestApiService;
71import net.floodlightcontroller.storage.IResultSet;
72import net.floodlightcontroller.storage.IStorageSourceListener;
73import net.floodlightcontroller.storage.IStorageSourceService;
74import net.floodlightcontroller.storage.OperatorPredicate;
75import net.floodlightcontroller.storage.StorageException;
76import net.floodlightcontroller.threadpool.IThreadPoolService;
77
78import org.jboss.netty.bootstrap.ServerBootstrap;
79import org.jboss.netty.buffer.ChannelBuffer;
80import org.jboss.netty.buffer.ChannelBuffers;
81import org.jboss.netty.channel.Channel;
82import org.jboss.netty.channel.ChannelHandlerContext;
83import org.jboss.netty.channel.ChannelPipelineFactory;
84import org.jboss.netty.channel.ChannelStateEvent;
85import org.jboss.netty.channel.ChannelUpstreamHandler;
86import org.jboss.netty.channel.Channels;
87import org.jboss.netty.channel.ExceptionEvent;
88import org.jboss.netty.channel.MessageEvent;
89import org.jboss.netty.channel.group.ChannelGroup;
90import org.jboss.netty.channel.group.DefaultChannelGroup;
91import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
92import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
93import org.jboss.netty.handler.timeout.IdleStateEvent;
94import org.jboss.netty.handler.timeout.ReadTimeoutException;
95import org.openflow.protocol.OFEchoReply;
96import org.openflow.protocol.OFError;
97import org.openflow.protocol.OFError.OFBadActionCode;
98import org.openflow.protocol.OFError.OFBadRequestCode;
99import org.openflow.protocol.OFError.OFErrorType;
100import org.openflow.protocol.OFError.OFFlowModFailedCode;
101import org.openflow.protocol.OFError.OFHelloFailedCode;
102import org.openflow.protocol.OFError.OFPortModFailedCode;
103import org.openflow.protocol.OFError.OFQueueOpFailedCode;
104import org.openflow.protocol.OFFeaturesReply;
105import org.openflow.protocol.OFGetConfigReply;
106import org.openflow.protocol.OFMessage;
107import org.openflow.protocol.OFPacketIn;
108import org.openflow.protocol.OFPhysicalPort;
109import org.openflow.protocol.OFPortStatus;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800110import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800111import org.openflow.protocol.OFPhysicalPort.OFPortState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800112import org.openflow.protocol.OFPortStatus.OFPortReason;
113import org.openflow.protocol.OFSetConfig;
114import org.openflow.protocol.OFStatisticsRequest;
115import org.openflow.protocol.OFSwitchConfig;
116import org.openflow.protocol.OFType;
117import org.openflow.protocol.OFVendor;
118import org.openflow.protocol.factory.BasicFactory;
119import org.openflow.protocol.factory.MessageParseException;
120import org.openflow.protocol.statistics.OFDescriptionStatistics;
121import org.openflow.protocol.statistics.OFStatistics;
122import org.openflow.protocol.statistics.OFStatisticsType;
123import org.openflow.protocol.vendor.OFBasicVendorDataType;
124import org.openflow.protocol.vendor.OFBasicVendorId;
125import org.openflow.protocol.vendor.OFVendorId;
126import org.openflow.util.HexString;
127import org.openflow.util.U16;
128import org.openflow.util.U32;
129import org.openflow.vendor.nicira.OFNiciraVendorData;
130import org.openflow.vendor.nicira.OFRoleReplyVendorData;
131import org.openflow.vendor.nicira.OFRoleRequestVendorData;
132import org.openflow.vendor.nicira.OFRoleVendorData;
133import org.slf4j.Logger;
134import org.slf4j.LoggerFactory;
135
136
137/**
138 * The main controller class. Handles all setup and network listeners
139 */
140public class Controller implements IFloodlightProviderService,
141 IStorageSourceListener {
142
Pankaj Berde8557a462013-01-07 08:59:31 -0800143 protected SwitchStorageImpl swStore;
144
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800145 protected static Logger log = LoggerFactory.getLogger(Controller.class);
146
147 private static final String ERROR_DATABASE =
148 "The controller could not communicate with the system database.";
149
150 protected BasicFactory factory;
151 protected ConcurrentMap<OFType,
152 ListenerDispatcher<OFType,IOFMessageListener>>
153 messageListeners;
154 // The activeSwitches map contains only those switches that are actively
155 // being controlled by us -- it doesn't contain switches that are
156 // in the slave role
157 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
158 // connectedSwitches contains all connected switches, including ones where
159 // we're a slave controller. We need to keep track of them so that we can
160 // send role request messages to switches when our role changes to master
161 // We add a switch to this set after it successfully completes the
162 // handshake. Access to this Set needs to be synchronized with roleChanger
163 protected HashSet<OFSwitchImpl> connectedSwitches;
164
165 // The controllerNodeIPsCache maps Controller IDs to their IP address.
166 // It's only used by handleControllerNodeIPsChanged
167 protected HashMap<String, String> controllerNodeIPsCache;
168
169 protected Set<IOFSwitchListener> switchListeners;
170 protected Set<IHAListener> haListeners;
171 protected Map<String, List<IInfoProvider>> providerMap;
172 protected BlockingQueue<IUpdate> updates;
173
174 // Module dependencies
175 protected IRestApiService restApi;
176 protected ICounterStoreService counterStore = null;
177 protected IStorageSourceService storageSource;
178 protected IPktInProcessingTimeService pktinProcTime;
179 protected IThreadPoolService threadPool;
180
181 // Configuration options
182 protected int openFlowPort = 6633;
183 protected int workerThreads = 0;
184 // The id for this controller node. Should be unique for each controller
185 // node in a controller cluster.
186 protected String controllerId = "localhost";
187
188 // The current role of the controller.
189 // If the controller isn't configured to support roles, then this is null.
190 protected Role role;
191 // A helper that handles sending and timeout handling for role requests
192 protected RoleChanger roleChanger;
193
194 // Start time of the controller
195 protected long systemStartTime;
196
197 // Flag to always flush flow table on switch reconnect (HA or otherwise)
198 protected boolean alwaysClearFlowsOnSwAdd = false;
199
200 // Storage table names
201 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
202 protected static final String CONTROLLER_ID = "id";
203
204 protected static final String SWITCH_TABLE_NAME = "controller_switch";
205 protected static final String SWITCH_DATAPATH_ID = "dpid";
206 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
207 protected static final String SWITCH_IP = "ip";
208 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
209 protected static final String SWITCH_ACTIVE = "active";
210 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
211 protected static final String SWITCH_CAPABILITIES = "capabilities";
212 protected static final String SWITCH_BUFFERS = "buffers";
213 protected static final String SWITCH_TABLES = "tables";
214 protected static final String SWITCH_ACTIONS = "actions";
215
216 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
217 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
218
219 protected static final String PORT_TABLE_NAME = "controller_port";
220 protected static final String PORT_ID = "id";
221 protected static final String PORT_SWITCH = "switch_id";
222 protected static final String PORT_NUMBER = "number";
223 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
224 protected static final String PORT_NAME = "name";
225 protected static final String PORT_CONFIG = "config";
226 protected static final String PORT_STATE = "state";
227 protected static final String PORT_CURRENT_FEATURES = "current_features";
228 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
229 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
230 protected static final String PORT_PEER_FEATURES = "peer_features";
231
232 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
233 protected static final String CONTROLLER_INTERFACE_ID = "id";
234 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
235 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
236 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
237 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
238
239
240
241 // Perf. related configuration
242 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
243 protected static final int BATCH_MAX_SIZE = 100;
244 protected static final boolean ALWAYS_DECODE_ETH = true;
245
246 /**
247 * Updates handled by the main loop
248 */
249 protected interface IUpdate {
250 /**
251 * Calls the appropriate listeners
252 */
253 public void dispatch();
254 }
255 public enum SwitchUpdateType {
256 ADDED,
257 REMOVED,
258 PORTCHANGED
259 }
260 /**
261 * Update message indicating a switch was added or removed
262 */
263 protected class SwitchUpdate implements IUpdate {
264 public IOFSwitch sw;
265 public SwitchUpdateType switchUpdateType;
266 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
267 this.sw = sw;
268 this.switchUpdateType = switchUpdateType;
269 }
270 public void dispatch() {
271 if (log.isTraceEnabled()) {
272 log.trace("Dispatching switch update {} {}",
273 sw, switchUpdateType);
274 }
275 if (switchListeners != null) {
276 for (IOFSwitchListener listener : switchListeners) {
277 switch(switchUpdateType) {
278 case ADDED:
279 listener.addedSwitch(sw);
280 break;
281 case REMOVED:
282 listener.removedSwitch(sw);
283 break;
284 case PORTCHANGED:
285 listener.switchPortChanged(sw.getId());
286 break;
287 }
288 }
289 }
290 }
291 }
292
293 /**
294 * Update message indicating controller's role has changed
295 */
296 protected class HARoleUpdate implements IUpdate {
297 public Role oldRole;
298 public Role newRole;
299 public HARoleUpdate(Role newRole, Role oldRole) {
300 this.oldRole = oldRole;
301 this.newRole = newRole;
302 }
303 public void dispatch() {
304 // Make sure that old and new roles are different.
305 if (oldRole == newRole) {
306 if (log.isTraceEnabled()) {
307 log.trace("HA role update ignored as the old and " +
308 "new roles are the same. newRole = {}" +
309 "oldRole = {}", newRole, oldRole);
310 }
311 return;
312 }
313 if (log.isTraceEnabled()) {
314 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
315 newRole, oldRole);
316 }
317 if (haListeners != null) {
318 for (IHAListener listener : haListeners) {
319 listener.roleChanged(oldRole, newRole);
320 }
321 }
322 }
323 }
324
325 /**
326 * Update message indicating
327 * IPs of controllers in controller cluster have changed.
328 */
329 protected class HAControllerNodeIPUpdate implements IUpdate {
330 public Map<String,String> curControllerNodeIPs;
331 public Map<String,String> addedControllerNodeIPs;
332 public Map<String,String> removedControllerNodeIPs;
333 public HAControllerNodeIPUpdate(
334 HashMap<String,String> curControllerNodeIPs,
335 HashMap<String,String> addedControllerNodeIPs,
336 HashMap<String,String> removedControllerNodeIPs) {
337 this.curControllerNodeIPs = curControllerNodeIPs;
338 this.addedControllerNodeIPs = addedControllerNodeIPs;
339 this.removedControllerNodeIPs = removedControllerNodeIPs;
340 }
341 public void dispatch() {
342 if (log.isTraceEnabled()) {
343 log.trace("Dispatching HA Controller Node IP update "
344 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
345 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
346 removedControllerNodeIPs }
347 );
348 }
349 if (haListeners != null) {
350 for (IHAListener listener: haListeners) {
351 listener.controllerNodeIPsChanged(curControllerNodeIPs,
352 addedControllerNodeIPs, removedControllerNodeIPs);
353 }
354 }
355 }
356 }
357
358 // ***************
359 // Getters/Setters
360 // ***************
361
362 public void setStorageSourceService(IStorageSourceService storageSource) {
363 this.storageSource = storageSource;
364 }
365
366 public void setCounterStore(ICounterStoreService counterStore) {
367 this.counterStore = counterStore;
368 }
369
370 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
371 this.pktinProcTime = pits;
372 }
373
374 public void setRestApiService(IRestApiService restApi) {
375 this.restApi = restApi;
376 }
377
378 public void setThreadPoolService(IThreadPoolService tp) {
379 this.threadPool = tp;
380 }
381
382 @Override
383 public Role getRole() {
384 synchronized(roleChanger) {
385 return role;
386 }
387 }
388
389 @Override
390 public void setRole(Role role) {
391 if (role == null) throw new NullPointerException("Role can not be null.");
392 if (role == Role.MASTER && this.role == Role.SLAVE) {
393 // Reset db state to Inactive for all switches.
394 updateAllInactiveSwitchInfo();
395 }
396
397 // Need to synchronize to ensure a reliable ordering on role request
398 // messages send and to ensure the list of connected switches is stable
399 // RoleChanger will handle the actual sending of the message and
400 // timeout handling
401 // @see RoleChanger
402 synchronized(roleChanger) {
403 if (role.equals(this.role)) {
404 log.debug("Ignoring role change: role is already {}", role);
405 return;
406 }
407
408 Role oldRole = this.role;
409 this.role = role;
410
411 log.debug("Submitting role change request to role {}", role);
412 roleChanger.submitRequest(connectedSwitches, role);
413
414 // Enqueue an update for our listeners.
415 try {
416 this.updates.put(new HARoleUpdate(role, oldRole));
417 } catch (InterruptedException e) {
418 log.error("Failure adding update to queue", e);
419 }
420 }
421 }
422
423
424
425 // **********************
426 // ChannelUpstreamHandler
427 // **********************
428
429 /**
430 * Return a new channel handler for processing a switch connections
431 * @param state The channel state object for the connection
432 * @return the new channel handler
433 */
434 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
435 return new OFChannelHandler(state);
436 }
437
438 /**
439 * Channel handler deals with the switch connection and dispatches
440 * switch messages to the appropriate locations.
441 * @author readams
442 */
443 protected class OFChannelHandler
444 extends IdleStateAwareChannelUpstreamHandler {
445 protected OFSwitchImpl sw;
446 protected OFChannelState state;
447
448 public OFChannelHandler(OFChannelState state) {
449 this.state = state;
450 }
451
452 @Override
453 @LogMessageDoc(message="New switch connection from {ip address}",
454 explanation="A new switch has connected from the " +
455 "specified IP address")
456 public void channelConnected(ChannelHandlerContext ctx,
457 ChannelStateEvent e) throws Exception {
458 log.info("New switch connection from {}",
459 e.getChannel().getRemoteAddress());
460
461 sw = new OFSwitchImpl();
462 sw.setChannel(e.getChannel());
463 sw.setFloodlightProvider(Controller.this);
464 sw.setThreadPoolService(threadPool);
465
466 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
467 msglist.add(factory.getMessage(OFType.HELLO));
468 e.getChannel().write(msglist);
469
470 }
471
472 @Override
473 @LogMessageDoc(message="Disconnected switch {switch information}",
474 explanation="The specified switch has disconnected.")
475 public void channelDisconnected(ChannelHandlerContext ctx,
476 ChannelStateEvent e) throws Exception {
477 if (sw != null && state.hsState == HandshakeState.READY) {
478 if (activeSwitches.containsKey(sw.getId())) {
479 // It's safe to call removeSwitch even though the map might
480 // not contain this particular switch but another with the
481 // same DPID
482 removeSwitch(sw);
483 }
484 synchronized(roleChanger) {
485 connectedSwitches.remove(sw);
486 }
487 sw.setConnected(false);
488 }
489 log.info("Disconnected switch {}", sw);
490 }
491
492 @Override
493 @LogMessageDocs({
494 @LogMessageDoc(level="ERROR",
495 message="Disconnecting switch {switch} due to read timeout",
496 explanation="The connected switch has failed to send any " +
497 "messages or respond to echo requests",
498 recommendation=LogMessageDoc.CHECK_SWITCH),
499 @LogMessageDoc(level="ERROR",
500 message="Disconnecting switch {switch}: failed to " +
501 "complete handshake",
502 explanation="The switch did not respond correctly " +
503 "to handshake messages",
504 recommendation=LogMessageDoc.CHECK_SWITCH),
505 @LogMessageDoc(level="ERROR",
506 message="Disconnecting switch {switch} due to IO Error: {}",
507 explanation="There was an error communicating with the switch",
508 recommendation=LogMessageDoc.CHECK_SWITCH),
509 @LogMessageDoc(level="ERROR",
510 message="Disconnecting switch {switch} due to switch " +
511 "state error: {error}",
512 explanation="The switch sent an unexpected message",
513 recommendation=LogMessageDoc.CHECK_SWITCH),
514 @LogMessageDoc(level="ERROR",
515 message="Disconnecting switch {switch} due to " +
516 "message parse failure",
517 explanation="Could not parse a message from the switch",
518 recommendation=LogMessageDoc.CHECK_SWITCH),
519 @LogMessageDoc(level="ERROR",
520 message="Terminating controller due to storage exception",
521 explanation=ERROR_DATABASE,
522 recommendation=LogMessageDoc.CHECK_CONTROLLER),
523 @LogMessageDoc(level="ERROR",
524 message="Could not process message: queue full",
525 explanation="OpenFlow messages are arriving faster than " +
526 " the controller can process them.",
527 recommendation=LogMessageDoc.CHECK_CONTROLLER),
528 @LogMessageDoc(level="ERROR",
529 message="Error while processing message " +
530 "from switch {switch} {cause}",
531 explanation="An error occurred processing the switch message",
532 recommendation=LogMessageDoc.GENERIC_ACTION)
533 })
534 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
535 throws Exception {
536 if (e.getCause() instanceof ReadTimeoutException) {
537 // switch timeout
538 log.error("Disconnecting switch {} due to read timeout", sw);
539 ctx.getChannel().close();
540 } else if (e.getCause() instanceof HandshakeTimeoutException) {
541 log.error("Disconnecting switch {}: failed to complete handshake",
542 sw);
543 ctx.getChannel().close();
544 } else if (e.getCause() instanceof ClosedChannelException) {
545 //log.warn("Channel for sw {} already closed", sw);
546 } else if (e.getCause() instanceof IOException) {
547 log.error("Disconnecting switch {} due to IO Error: {}",
548 sw, e.getCause().getMessage());
549 ctx.getChannel().close();
550 } else if (e.getCause() instanceof SwitchStateException) {
551 log.error("Disconnecting switch {} due to switch state error: {}",
552 sw, e.getCause().getMessage());
553 ctx.getChannel().close();
554 } else if (e.getCause() instanceof MessageParseException) {
555 log.error("Disconnecting switch " + sw +
556 " due to message parse failure",
557 e.getCause());
558 ctx.getChannel().close();
559 } else if (e.getCause() instanceof StorageException) {
560 log.error("Terminating controller due to storage exception",
561 e.getCause());
562 terminate();
563 } else if (e.getCause() instanceof RejectedExecutionException) {
564 log.warn("Could not process message: queue full");
565 } else {
566 log.error("Error while processing message from switch " + sw,
567 e.getCause());
568 ctx.getChannel().close();
569 }
570 }
571
572 @Override
573 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
574 throws Exception {
575 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
576 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
577 e.getChannel().write(msglist);
578 }
579
580 @Override
581 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
582 throws Exception {
583 if (e.getMessage() instanceof List) {
584 @SuppressWarnings("unchecked")
585 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
586
587 for (OFMessage ofm : msglist) {
588 try {
589 processOFMessage(ofm);
590 }
591 catch (Exception ex) {
592 // We are the last handler in the stream, so run the
593 // exception through the channel again by passing in
594 // ctx.getChannel().
595 Channels.fireExceptionCaught(ctx.getChannel(), ex);
596 }
597 }
598
599 // Flush all flow-mods/packet-out generated from this "train"
600 OFSwitchImpl.flush_all();
601 }
602 }
603
604 /**
605 * Process the request for the switch description
606 */
607 @LogMessageDoc(level="ERROR",
608 message="Exception in reading description " +
609 " during handshake {exception}",
610 explanation="Could not process the switch description string",
611 recommendation=LogMessageDoc.CHECK_SWITCH)
612 void processSwitchDescReply() {
613 try {
614 // Read description, if it has been updated
615 @SuppressWarnings("unchecked")
616 Future<List<OFStatistics>> desc_future =
617 (Future<List<OFStatistics>>)sw.
618 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
619 List<OFStatistics> values =
620 desc_future.get(0, TimeUnit.MILLISECONDS);
621 if (values != null) {
622 OFDescriptionStatistics description =
623 new OFDescriptionStatistics();
624 ChannelBuffer data =
625 ChannelBuffers.buffer(description.getLength());
626 for (OFStatistics f : values) {
627 f.writeTo(data);
628 description.readFrom(data);
629 break; // SHOULD be a list of length 1
630 }
631 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
632 description);
633 sw.setSwitchProperties(description);
634 data = null;
635
636 // At this time, also set other switch properties from storage
637 boolean is_core_switch = false;
638 IResultSet resultSet = null;
639 try {
640 String swid = sw.getStringId();
641 resultSet =
642 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
643 for (Iterator<IResultSet> it =
644 resultSet.iterator(); it.hasNext();) {
645 // In case of multiple rows, use the status
646 // in last row?
647 Map<String, Object> row = it.next().getRow();
648 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
649 if (log.isDebugEnabled()) {
650 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
651 "config for switch={}, is-core={}",
652 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
653 }
654 String ics =
655 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
656 is_core_switch = ics.equals("true");
657 }
658 }
659 }
660 finally {
661 if (resultSet != null)
662 resultSet.close();
663 }
664 if (is_core_switch) {
665 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
666 new Boolean(true));
667 }
668 }
669 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
670 state.hasDescription = true;
671 checkSwitchReady();
672 }
673 catch (InterruptedException ex) {
674 // Ignore
675 }
676 catch (TimeoutException ex) {
677 // Ignore
678 } catch (Exception ex) {
679 log.error("Exception in reading description " +
680 " during handshake", ex);
681 }
682 }
683
684 /**
685 * Send initial switch setup information that we need before adding
686 * the switch
687 * @throws IOException
688 */
689 void sendHelloConfiguration() throws IOException {
690 // Send initial Features Request
691 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
692 }
693
694 /**
695 * Send the configuration requests we can only do after we have
696 * the features reply
697 * @throws IOException
698 */
699 void sendFeatureReplyConfiguration() throws IOException {
700 // Ensure we receive the full packet via PacketIn
701 OFSetConfig config = (OFSetConfig) factory
702 .getMessage(OFType.SET_CONFIG);
703 config.setMissSendLength((short) 0xffff)
704 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
705 sw.write(config, null);
706 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
707 null);
708
709 // Get Description to set switch-specific flags
710 OFStatisticsRequest req = new OFStatisticsRequest();
711 req.setStatisticType(OFStatisticsType.DESC);
712 req.setLengthU(req.getLengthU());
713 Future<List<OFStatistics>> dfuture =
714 sw.getStatistics(req);
715 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
716 dfuture);
717
718 }
719
720 protected void checkSwitchReady() {
721 if (state.hsState == HandshakeState.FEATURES_REPLY &&
722 state.hasDescription && state.hasGetConfigReply) {
723
724 state.hsState = HandshakeState.READY;
725
726 synchronized(roleChanger) {
727 // We need to keep track of all of the switches that are connected
728 // to the controller, in any role, so that we can later send the
729 // role request messages when the controller role changes.
730 // We need to be synchronized while doing this: we must not
731 // send a another role request to the connectedSwitches until
732 // we were able to add this new switch to connectedSwitches
733 // *and* send the current role to the new switch.
734 connectedSwitches.add(sw);
735
736 if (role != null) {
737 // Send a role request if role support is enabled for the controller
738 // This is a probe that we'll use to determine if the switch
739 // actually supports the role request message. If it does we'll
740 // get back a role reply message. If it doesn't we'll get back an
741 // OFError message.
742 // If role is MASTER we will promote switch to active
743 // list when we receive the switch's role reply messages
744 log.debug("This controller's role is {}, " +
745 "sending initial role request msg to {}",
746 role, sw);
747 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
748 swList.add(sw);
749 roleChanger.submitRequest(swList, role);
750 }
751 else {
752 // Role supported not enabled on controller (for now)
753 // automatically promote switch to active state.
754 log.debug("This controller's role is null, " +
755 "not sending role request msg to {}",
756 role, sw);
757 // Need to clear FlowMods before we add the switch
758 // and dispatch updates otherwise we have a race condition.
759 sw.clearAllFlowMods();
760 addSwitch(sw);
761 state.firstRoleReplyReceived = true;
762 }
763 }
764 }
765 }
766
767 /* Handle a role reply message we received from the switch. Since
768 * netty serializes message dispatch we don't need to synchronize
769 * against other receive operations from the same switch, so no need
770 * to synchronize addSwitch(), removeSwitch() operations from the same
771 * connection.
772 * FIXME: However, when a switch with the same DPID connects we do
773 * need some synchronization. However, handling switches with same
774 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
775 * removedSwitch notification):1
776 *
777 */
778 @LogMessageDoc(level="ERROR",
779 message="Invalid role value in role reply message",
780 explanation="Was unable to set the HA role (master or slave) " +
781 "for the controller.",
782 recommendation=LogMessageDoc.CHECK_CONTROLLER)
783 protected void handleRoleReplyMessage(OFVendor vendorMessage,
784 OFRoleReplyVendorData roleReplyVendorData) {
785 // Map from the role code in the message to our role enum
786 int nxRole = roleReplyVendorData.getRole();
787 Role role = null;
788 switch (nxRole) {
789 case OFRoleVendorData.NX_ROLE_OTHER:
790 role = Role.EQUAL;
791 break;
792 case OFRoleVendorData.NX_ROLE_MASTER:
793 role = Role.MASTER;
794 break;
795 case OFRoleVendorData.NX_ROLE_SLAVE:
796 role = Role.SLAVE;
797 break;
798 default:
799 log.error("Invalid role value in role reply message");
800 sw.getChannel().close();
801 return;
802 }
803
804 log.debug("Handling role reply for role {} from {}. " +
805 "Controller's role is {} ",
806 new Object[] { role, sw, Controller.this.role}
807 );
808
809 sw.deliverRoleReply(vendorMessage.getXid(), role);
810
811 boolean isActive = activeSwitches.containsKey(sw.getId());
812 if (!isActive && sw.isActive()) {
813 // Transition from SLAVE to MASTER.
814
815 if (!state.firstRoleReplyReceived ||
816 getAlwaysClearFlowsOnSwAdd()) {
817 // This is the first role-reply message we receive from
818 // this switch or roles were disabled when the switch
819 // connected:
820 // Delete all pre-existing flows for new connections to
821 // the master
822 //
823 // FIXME: Need to think more about what the test should
824 // be for when we flush the flow-table? For example,
825 // if all the controllers are temporarily in the backup
826 // role (e.g. right after a failure of the master
827 // controller) at the point the switch connects, then
828 // all of the controllers will initially connect as
829 // backup controllers and not flush the flow-table.
830 // Then when one of them is promoted to master following
831 // the master controller election the flow-table
832 // will still not be flushed because that's treated as
833 // a failover event where we don't want to flush the
834 // flow-table. The end result would be that the flow
835 // table for a newly connected switch is never
836 // flushed. Not sure how to handle that case though...
837 sw.clearAllFlowMods();
838 log.debug("First role reply from master switch {}, " +
839 "clear FlowTable to active switch list",
840 HexString.toHexString(sw.getId()));
841 }
842
843 // Some switches don't seem to update us with port
844 // status messages while in slave role.
845 readSwitchPortStateFromStorage(sw);
846
847 // Only add the switch to the active switch list if
848 // we're not in the slave role. Note that if the role
849 // attribute is null, then that means that the switch
850 // doesn't support the role request messages, so in that
851 // case we're effectively in the EQUAL role and the
852 // switch should be included in the active switch list.
853 addSwitch(sw);
854 log.debug("Added master switch {} to active switch list",
855 HexString.toHexString(sw.getId()));
856
857 }
858 else if (isActive && !sw.isActive()) {
859 // Transition from MASTER to SLAVE: remove switch
860 // from active switch list.
861 log.debug("Removed slave switch {} from active switch" +
862 " list", HexString.toHexString(sw.getId()));
863 removeSwitch(sw);
864 }
865
866 // Indicate that we have received a role reply message.
867 state.firstRoleReplyReceived = true;
868 }
869
870 protected boolean handleVendorMessage(OFVendor vendorMessage) {
871 boolean shouldHandleMessage = false;
872 int vendor = vendorMessage.getVendor();
873 switch (vendor) {
874 case OFNiciraVendorData.NX_VENDOR_ID:
875 OFNiciraVendorData niciraVendorData =
876 (OFNiciraVendorData)vendorMessage.getVendorData();
877 int dataType = niciraVendorData.getDataType();
878 switch (dataType) {
879 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
880 OFRoleReplyVendorData roleReplyVendorData =
881 (OFRoleReplyVendorData) niciraVendorData;
882 handleRoleReplyMessage(vendorMessage,
883 roleReplyVendorData);
884 break;
885 default:
886 log.warn("Unhandled Nicira VENDOR message; " +
887 "data type = {}", dataType);
888 break;
889 }
890 break;
891 default:
892 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
893 break;
894 }
895
896 return shouldHandleMessage;
897 }
898
899 /**
900 * Dispatch an Openflow message from a switch to the appropriate
901 * handler.
902 * @param m The message to process
903 * @throws IOException
904 * @throws SwitchStateException
905 */
906 @LogMessageDocs({
907 @LogMessageDoc(level="WARN",
908 message="Config Reply from {switch} has " +
909 "miss length set to {length}",
910 explanation="The controller requires that the switch " +
911 "use a miss length of 0xffff for correct " +
912 "function",
913 recommendation="Use a different switch to ensure " +
914 "correct function"),
915 @LogMessageDoc(level="WARN",
916 message="Received ERROR from sw {switch} that "
917 +"indicates roles are not supported "
918 +"but we have received a valid "
919 +"role reply earlier",
920 explanation="The switch sent a confusing message to the" +
921 "controller")
922 })
923 protected void processOFMessage(OFMessage m)
924 throws IOException, SwitchStateException {
925 boolean shouldHandleMessage = false;
926
927 switch (m.getType()) {
928 case HELLO:
929 if (log.isTraceEnabled())
930 log.trace("HELLO from {}", sw);
931
932 if (state.hsState.equals(HandshakeState.START)) {
933 state.hsState = HandshakeState.HELLO;
934 sendHelloConfiguration();
935 } else {
936 throw new SwitchStateException("Unexpected HELLO from "
937 + sw);
938 }
939 break;
940 case ECHO_REQUEST:
941 OFEchoReply reply =
942 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
943 reply.setXid(m.getXid());
944 sw.write(reply, null);
945 break;
946 case ECHO_REPLY:
947 break;
948 case FEATURES_REPLY:
949 if (log.isTraceEnabled())
950 log.trace("Features Reply from {}", sw);
951
952 sw.setFeaturesReply((OFFeaturesReply) m);
953 if (state.hsState.equals(HandshakeState.HELLO)) {
954 sendFeatureReplyConfiguration();
955 state.hsState = HandshakeState.FEATURES_REPLY;
956 // uncomment to enable "dumb" switches like cbench
957 // state.hsState = HandshakeState.READY;
958 // addSwitch(sw);
959 } else {
960 // return results to rest api caller
961 sw.deliverOFFeaturesReply(m);
962 // update database */
963 updateActiveSwitchInfo(sw);
964 }
965 break;
966 case GET_CONFIG_REPLY:
967 if (log.isTraceEnabled())
968 log.trace("Get config reply from {}", sw);
969
970 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
971 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
972 throw new SwitchStateException(em);
973 }
974 OFGetConfigReply cr = (OFGetConfigReply) m;
975 if (cr.getMissSendLength() == (short)0xffff) {
976 log.trace("Config Reply from {} confirms " +
977 "miss length set to 0xffff", sw);
978 } else {
979 log.warn("Config Reply from {} has " +
980 "miss length set to {}",
981 sw, cr.getMissSendLength() & 0xffff);
982 }
983 state.hasGetConfigReply = true;
984 checkSwitchReady();
985 break;
986 case VENDOR:
987 shouldHandleMessage = handleVendorMessage((OFVendor)m);
988 break;
989 case ERROR:
990 // TODO: we need better error handling. Especially for
991 // request/reply style message (stats, roles) we should have
992 // a unified way to lookup the xid in the error message.
993 // This will probable involve rewriting the way we handle
994 // request/reply style messages.
995 OFError error = (OFError) m;
996 boolean shouldLogError = true;
997 // TODO: should we check that firstRoleReplyReceived is false,
998 // i.e., check only whether the first request fails?
999 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1000 boolean isBadVendorError =
1001 (error.getErrorType() == OFError.OFErrorType.
1002 OFPET_BAD_REQUEST.getValue());
1003 // We expect to receive a bad vendor error when
1004 // we're connected to a switch that doesn't support
1005 // the Nicira vendor extensions (i.e. not OVS or
1006 // derived from OVS). By protocol, it should also be
1007 // BAD_VENDOR, but too many switch implementations
1008 // get it wrong and we can already check the xid()
1009 // so we can ignore the type with confidence that this
1010 // is not a spurious error
1011 shouldLogError = !isBadVendorError;
1012 if (isBadVendorError) {
1013 if (state.firstRoleReplyReceived && (role != null)) {
1014 log.warn("Received ERROR from sw {} that "
1015 +"indicates roles are not supported "
1016 +"but we have received a valid "
1017 +"role reply earlier", sw);
1018 }
1019 state.firstRoleReplyReceived = true;
1020 sw.deliverRoleRequestNotSupported(error.getXid());
1021 synchronized(roleChanger) {
1022 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1023 // the switch doesn't understand role request
1024 // messages and the current controller role is
1025 // slave. We need to disconnect the switch.
1026 // @see RoleChanger for rationale
1027 sw.getChannel().close();
1028 }
1029 else if (sw.role == null) {
1030 // Controller's role is master: add to
1031 // active
1032 // TODO: check if clearing flow table is
1033 // right choice here.
1034 // Need to clear FlowMods before we add the switch
1035 // and dispatch updates otherwise we have a race condition.
1036 // TODO: switch update is async. Won't we still have a potential
1037 // race condition?
1038 sw.clearAllFlowMods();
1039 addSwitch(sw);
1040 }
1041 }
1042 }
1043 else {
1044 // TODO: Is this the right thing to do if we receive
1045 // some other error besides a bad vendor error?
1046 // Presumably that means the switch did actually
1047 // understand the role request message, but there
1048 // was some other error from processing the message.
1049 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1050 // error code, but it doesn't look like the Nicira
1051 // role request has that. Should check OVS source
1052 // code to see if it's possible for any other errors
1053 // to be returned.
1054 // If we received an error the switch is not
1055 // in the correct role, so we need to disconnect it.
1056 // We could also resend the request but then we need to
1057 // check if there are other pending request in which
1058 // case we shouldn't resend. If we do resend we need
1059 // to make sure that the switch eventually accepts one
1060 // of our requests or disconnect the switch. This feels
1061 // cumbersome.
1062 sw.getChannel().close();
1063 }
1064 }
1065 // Once we support OF 1.2, we'd add code to handle it here.
1066 //if (error.getXid() == state.ofRoleRequestXid) {
1067 //}
1068 if (shouldLogError)
1069 logError(sw, error);
1070 break;
1071 case STATS_REPLY:
1072 if (state.hsState.ordinal() <
1073 HandshakeState.FEATURES_REPLY.ordinal()) {
1074 String em = "Unexpected STATS_REPLY from " + sw;
1075 throw new SwitchStateException(em);
1076 }
1077 sw.deliverStatisticsReply(m);
1078 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1079 processSwitchDescReply();
1080 }
1081 break;
1082 case PORT_STATUS:
1083 // We want to update our port state info even if we're in
1084 // the slave role, but we only want to update storage if
1085 // we're the master (or equal).
1086 boolean updateStorage = state.hsState.
1087 equals(HandshakeState.READY) &&
1088 (sw.getRole() != Role.SLAVE);
1089 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1090 shouldHandleMessage = true;
1091 break;
1092
1093 default:
1094 shouldHandleMessage = true;
1095 break;
1096 }
1097
1098 if (shouldHandleMessage) {
1099 sw.getListenerReadLock().lock();
1100 try {
1101 if (sw.isConnected()) {
1102 if (!state.hsState.equals(HandshakeState.READY)) {
1103 log.debug("Ignoring message type {} received " +
1104 "from switch {} before switch is " +
1105 "fully configured.", m.getType(), sw);
1106 }
1107 // Check if the controller is in the slave role for the
1108 // switch. If it is, then don't dispatch the message to
1109 // the listeners.
1110 // TODO: Should we dispatch messages that we expect to
1111 // receive when we're in the slave role, e.g. port
1112 // status messages? Since we're "hiding" switches from
1113 // the listeners when we're in the slave role, then it
1114 // seems a little weird to dispatch port status messages
1115 // to them. On the other hand there might be special
1116 // modules that care about all of the connected switches
1117 // and would like to receive port status notifications.
1118 else if (sw.getRole() == Role.SLAVE) {
1119 // Don't log message if it's a port status message
1120 // since we expect to receive those from the switch
1121 // and don't want to emit spurious messages.
1122 if (m.getType() != OFType.PORT_STATUS) {
1123 log.debug("Ignoring message type {} received " +
1124 "from switch {} while in the slave role.",
1125 m.getType(), sw);
1126 }
1127 } else {
1128 handleMessage(sw, m, null);
1129 }
1130 }
1131 }
1132 finally {
1133 sw.getListenerReadLock().unlock();
1134 }
1135 }
1136 }
1137 }
1138
1139 // ****************
1140 // Message handlers
1141 // ****************
1142
1143 protected void handlePortStatusMessage(IOFSwitch sw,
1144 OFPortStatus m,
1145 boolean updateStorage) {
1146 short portNumber = m.getDesc().getPortNumber();
1147 OFPhysicalPort port = m.getDesc();
1148 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001149 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1150 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001151 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001152 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001153 swStore.addPort(sw.getStringId(), port);
1154 } else {
1155 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1156 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001157 if (updateStorage)
1158 updatePortInfo(sw, port);
1159 log.debug("Port #{} modified for {}", portNumber, sw);
1160 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1161 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001162 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001163 if (updateStorage)
1164 updatePortInfo(sw, port);
1165 log.debug("Port #{} added for {}", portNumber, sw);
1166 } else if (m.getReason() ==
1167 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1168 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001169 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001170 if (updateStorage)
1171 removePortInfo(sw, portNumber);
1172 log.debug("Port #{} deleted for {}", portNumber, sw);
1173 }
1174 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1175 try {
1176 this.updates.put(update);
1177 } catch (InterruptedException e) {
1178 log.error("Failure adding update to queue", e);
1179 }
1180 }
1181
1182 /**
1183 * flcontext_cache - Keep a thread local stack of contexts
1184 */
1185 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1186 new ThreadLocal <Stack<FloodlightContext>> () {
1187 @Override
1188 protected Stack<FloodlightContext> initialValue() {
1189 return new Stack<FloodlightContext>();
1190 }
1191 };
1192
1193 /**
1194 * flcontext_alloc - pop a context off the stack, if required create a new one
1195 * @return FloodlightContext
1196 */
1197 protected static FloodlightContext flcontext_alloc() {
1198 FloodlightContext flcontext = null;
1199
1200 if (flcontext_cache.get().empty()) {
1201 flcontext = new FloodlightContext();
1202 }
1203 else {
1204 flcontext = flcontext_cache.get().pop();
1205 }
1206
1207 return flcontext;
1208 }
1209
1210 /**
1211 * flcontext_free - Free the context to the current thread
1212 * @param flcontext
1213 */
1214 protected void flcontext_free(FloodlightContext flcontext) {
1215 flcontext.getStorage().clear();
1216 flcontext_cache.get().push(flcontext);
1217 }
1218
1219 /**
1220 * Handle replies to certain OFMessages, and pass others off to listeners
1221 * @param sw The switch for the message
1222 * @param m The message
1223 * @param bContext The floodlight context. If null then floodlight context would
1224 * be allocated in this function
1225 * @throws IOException
1226 */
1227 @LogMessageDocs({
1228 @LogMessageDoc(level="ERROR",
1229 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1230 " field is empty.",
1231 explanation="The switch sent an improperly-formatted PacketIn" +
1232 " message",
1233 recommendation=LogMessageDoc.CHECK_SWITCH),
1234 @LogMessageDoc(level="WARN",
1235 message="Unhandled OF Message: {} from {}",
1236 explanation="The switch sent a message not handled by " +
1237 "the controller")
1238 })
1239 protected void handleMessage(IOFSwitch sw, OFMessage m,
1240 FloodlightContext bContext)
1241 throws IOException {
1242 Ethernet eth = null;
1243
1244 switch (m.getType()) {
1245 case PACKET_IN:
1246 OFPacketIn pi = (OFPacketIn)m;
1247
1248 if (pi.getPacketData().length <= 0) {
1249 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1250 ") because the data field is empty.");
1251 return;
1252 }
1253
1254 if (Controller.ALWAYS_DECODE_ETH) {
1255 eth = new Ethernet();
1256 eth.deserialize(pi.getPacketData(), 0,
1257 pi.getPacketData().length);
1258 counterStore.updatePacketInCounters(sw, m, eth);
1259 }
1260 // fall through to default case...
1261
1262 default:
1263
1264 List<IOFMessageListener> listeners = null;
1265 if (messageListeners.containsKey(m.getType())) {
1266 listeners = messageListeners.get(m.getType()).
1267 getOrderedListeners();
1268 }
1269
1270 FloodlightContext bc = null;
1271 if (listeners != null) {
1272 // Check if floodlight context is passed from the calling
1273 // function, if so use that floodlight context, otherwise
1274 // allocate one
1275 if (bContext == null) {
1276 bc = flcontext_alloc();
1277 } else {
1278 bc = bContext;
1279 }
1280 if (eth != null) {
1281 IFloodlightProviderService.bcStore.put(bc,
1282 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1283 eth);
1284 }
1285
1286 // Get the starting time (overall and per-component) of
1287 // the processing chain for this packet if performance
1288 // monitoring is turned on
1289 pktinProcTime.bootstrap(listeners);
1290 pktinProcTime.recordStartTimePktIn();
1291 Command cmd;
1292 for (IOFMessageListener listener : listeners) {
1293 if (listener instanceof IOFSwitchFilter) {
1294 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1295 continue;
1296 }
1297 }
1298
1299 pktinProcTime.recordStartTimeComp(listener);
1300 cmd = listener.receive(sw, m, bc);
1301 pktinProcTime.recordEndTimeComp(listener);
1302
1303 if (Command.STOP.equals(cmd)) {
1304 break;
1305 }
1306 }
1307 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1308 } else {
1309 log.warn("Unhandled OF Message: {} from {}", m, sw);
1310 }
1311
1312 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1313 }
1314 }
1315
1316 /**
1317 * Log an OpenFlow error message from a switch
1318 * @param sw The switch that sent the error
1319 * @param error The error message
1320 */
1321 @LogMessageDoc(level="ERROR",
1322 message="Error {error type} {error code} from {switch}",
1323 explanation="The switch responded with an unexpected error" +
1324 "to an OpenFlow message from the controller",
1325 recommendation="This could indicate improper network operation. " +
1326 "If the problem persists restarting the switch and " +
1327 "controller may help."
1328 )
1329 protected void logError(IOFSwitch sw, OFError error) {
1330 int etint = 0xffff & error.getErrorType();
1331 if (etint < 0 || etint >= OFErrorType.values().length) {
1332 log.error("Unknown error code {} from sw {}", etint, sw);
1333 }
1334 OFErrorType et = OFErrorType.values()[etint];
1335 switch (et) {
1336 case OFPET_HELLO_FAILED:
1337 OFHelloFailedCode hfc =
1338 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1339 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1340 break;
1341 case OFPET_BAD_REQUEST:
1342 OFBadRequestCode brc =
1343 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1344 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1345 break;
1346 case OFPET_BAD_ACTION:
1347 OFBadActionCode bac =
1348 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1349 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1350 break;
1351 case OFPET_FLOW_MOD_FAILED:
1352 OFFlowModFailedCode fmfc =
1353 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1354 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1355 break;
1356 case OFPET_PORT_MOD_FAILED:
1357 OFPortModFailedCode pmfc =
1358 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1359 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1360 break;
1361 case OFPET_QUEUE_OP_FAILED:
1362 OFQueueOpFailedCode qofc =
1363 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1364 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1365 break;
1366 default:
1367 break;
1368 }
1369 }
1370
1371 /**
1372 * Add a switch to the active switch list and call the switch listeners.
1373 * This happens either when a switch first connects (and the controller is
1374 * not in the slave role) or when the role of the controller changes from
1375 * slave to master.
1376 * @param sw the switch that has been added
1377 */
1378 // TODO: need to rethink locking and the synchronous switch update.
1379 // We can / should also handle duplicate DPIDs in connectedSwitches
1380 @LogMessageDoc(level="ERROR",
1381 message="New switch added {switch} for already-added switch {switch}",
1382 explanation="A switch with the same DPID as another switch " +
1383 "connected to the controller. This can be caused by " +
1384 "multiple switches configured with the same DPID, or " +
1385 "by a switch reconnected very quickly after " +
1386 "disconnecting.",
1387 recommendation="If this happens repeatedly, it is likely there " +
1388 "are switches with duplicate DPIDs on the network. " +
1389 "Reconfigure the appropriate switches. If it happens " +
1390 "very rarely, then it is likely this is a transient " +
1391 "network problem that can be ignored."
1392 )
1393 protected void addSwitch(IOFSwitch sw) {
1394 // TODO: is it safe to modify the HashMap without holding
1395 // the old switch's lock?
1396 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1397 if (sw == oldSw) {
1398 // Note == for object equality, not .equals for value
1399 log.info("New add switch for pre-existing switch {}", sw);
1400 return;
1401 }
1402
1403 if (oldSw != null) {
1404 oldSw.getListenerWriteLock().lock();
1405 try {
1406 log.error("New switch added {} for already-added switch {}",
1407 sw, oldSw);
1408 // Set the connected flag to false to suppress calling
1409 // the listeners for this switch in processOFMessage
1410 oldSw.setConnected(false);
1411
1412 oldSw.cancelAllStatisticsReplies();
1413
1414 updateInactiveSwitchInfo(oldSw);
1415
1416 // we need to clean out old switch state definitively
1417 // before adding the new switch
1418 // FIXME: It seems not completely kosher to call the
1419 // switch listeners here. I thought one of the points of
1420 // having the asynchronous switch update mechanism was so
1421 // the addedSwitch and removedSwitch were always called
1422 // from a single thread to simplify concurrency issues
1423 // for the listener.
1424 if (switchListeners != null) {
1425 for (IOFSwitchListener listener : switchListeners) {
1426 listener.removedSwitch(oldSw);
1427 }
1428 }
1429 // will eventually trigger a removeSwitch(), which will cause
1430 // a "Not removing Switch ... already removed debug message.
1431 // TODO: Figure out a way to handle this that avoids the
1432 // spurious debug message.
1433 oldSw.getChannel().close();
1434 }
1435 finally {
1436 oldSw.getListenerWriteLock().unlock();
1437 }
1438 }
1439
1440 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001441 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001442 for (OFPhysicalPort port: sw.getPorts()) {
1443 swStore.addPort(sw.getStringId(), port);
1444 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001445 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1446 try {
1447 this.updates.put(update);
1448 } catch (InterruptedException e) {
1449 log.error("Failure adding update to queue", e);
1450 }
1451 }
1452
1453 /**
1454 * Remove a switch from the active switch list and call the switch listeners.
1455 * This happens either when the switch is disconnected or when the
1456 * controller's role for the switch changes from master to slave.
1457 * @param sw the switch that has been removed
1458 */
1459 protected void removeSwitch(IOFSwitch sw) {
1460 // No need to acquire the listener lock, since
1461 // this method is only called after netty has processed all
1462 // pending messages
1463 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001464 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001465 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1466 log.debug("Not removing switch {}; already removed", sw);
1467 return;
1468 }
1469 // We cancel all outstanding statistics replies if the switch transition
1470 // from active. In the future we might allow statistics requests
1471 // from slave controllers. Then we need to move this cancelation
1472 // to switch disconnect
1473 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001474
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001475
1476 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1477 // here if role support is enabled. In that case if the switch is being
1478 // removed because we've been switched to being in the slave role, then I think
1479 // it's possible that the new master may have already been promoted to master
1480 // and written out the active switch state to storage. If we now execute
1481 // updateInactiveSwitchInfo we may wipe out all of the state that was
1482 // written out by the new master. Maybe need to revisit how we handle all
1483 // of the switch state that's written to storage.
1484
1485 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001486
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001487 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1488 try {
1489 this.updates.put(update);
1490 } catch (InterruptedException e) {
1491 log.error("Failure adding update to queue", e);
1492 }
1493 }
1494
1495 // ***************
1496 // IFloodlightProvider
1497 // ***************
1498
1499 @Override
1500 public synchronized void addOFMessageListener(OFType type,
1501 IOFMessageListener listener) {
1502 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1503 messageListeners.get(type);
1504 if (ldd == null) {
1505 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1506 messageListeners.put(type, ldd);
1507 }
1508 ldd.addListener(type, listener);
1509 }
1510
1511 @Override
1512 public synchronized void removeOFMessageListener(OFType type,
1513 IOFMessageListener listener) {
1514 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1515 messageListeners.get(type);
1516 if (ldd != null) {
1517 ldd.removeListener(listener);
1518 }
1519 }
1520
1521 private void logListeners() {
1522 for (Map.Entry<OFType,
1523 ListenerDispatcher<OFType,
1524 IOFMessageListener>> entry
1525 : messageListeners.entrySet()) {
1526
1527 OFType type = entry.getKey();
1528 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1529 entry.getValue();
1530
1531 StringBuffer sb = new StringBuffer();
1532 sb.append("OFListeners for ");
1533 sb.append(type);
1534 sb.append(": ");
1535 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1536 sb.append(l.getName());
1537 sb.append(",");
1538 }
1539 log.debug(sb.toString());
1540 }
1541 }
1542
1543 public void removeOFMessageListeners(OFType type) {
1544 messageListeners.remove(type);
1545 }
1546
1547 @Override
1548 public Map<Long, IOFSwitch> getSwitches() {
1549 return Collections.unmodifiableMap(this.activeSwitches);
1550 }
1551
1552 @Override
1553 public void addOFSwitchListener(IOFSwitchListener listener) {
1554 this.switchListeners.add(listener);
1555 }
1556
1557 @Override
1558 public void removeOFSwitchListener(IOFSwitchListener listener) {
1559 this.switchListeners.remove(listener);
1560 }
1561
1562 @Override
1563 public Map<OFType, List<IOFMessageListener>> getListeners() {
1564 Map<OFType, List<IOFMessageListener>> lers =
1565 new HashMap<OFType, List<IOFMessageListener>>();
1566 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1567 messageListeners.entrySet()) {
1568 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1569 }
1570 return Collections.unmodifiableMap(lers);
1571 }
1572
1573 @Override
1574 @LogMessageDocs({
1575 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1576 "a null switch",
1577 explanation="Failed to process a message because the switch " +
1578 " is no longer connected."),
1579 @LogMessageDoc(level="ERROR",
1580 message="Error reinjecting OFMessage on switch {switch}",
1581 explanation="An I/O error occured while attempting to " +
1582 "process an OpenFlow message",
1583 recommendation=LogMessageDoc.CHECK_SWITCH)
1584 })
1585 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1586 FloodlightContext bc) {
1587 if (sw == null) {
1588 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1589 return false;
1590 }
1591
1592 // FIXME: Do we need to be able to inject messages to switches
1593 // where we're the slave controller (i.e. they're connected but
1594 // not active)?
1595 // FIXME: Don't we need synchronization logic here so we're holding
1596 // the listener read lock when we call handleMessage? After some
1597 // discussions it sounds like the right thing to do here would be to
1598 // inject the message as a netty upstream channel event so it goes
1599 // through the normal netty event processing, including being
1600 // handled
1601 if (!activeSwitches.containsKey(sw.getId())) return false;
1602
1603 try {
1604 // Pass Floodlight context to the handleMessages()
1605 handleMessage(sw, msg, bc);
1606 } catch (IOException e) {
1607 log.error("Error reinjecting OFMessage on switch {}",
1608 HexString.toHexString(sw.getId()));
1609 return false;
1610 }
1611 return true;
1612 }
1613
1614 @Override
1615 @LogMessageDoc(message="Calling System.exit",
1616 explanation="The controller is terminating")
1617 public synchronized void terminate() {
1618 log.info("Calling System.exit");
1619 System.exit(1);
1620 }
1621
1622 @Override
1623 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1624 // call the overloaded version with floodlight context set to null
1625 return injectOfMessage(sw, msg, null);
1626 }
1627
1628 @Override
1629 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1630 FloodlightContext bc) {
1631 if (log.isTraceEnabled()) {
1632 String str = OFMessage.getDataAsString(sw, m, bc);
1633 log.trace("{}", str);
1634 }
1635
1636 List<IOFMessageListener> listeners = null;
1637 if (messageListeners.containsKey(m.getType())) {
1638 listeners =
1639 messageListeners.get(m.getType()).getOrderedListeners();
1640 }
1641
1642 if (listeners != null) {
1643 for (IOFMessageListener listener : listeners) {
1644 if (listener instanceof IOFSwitchFilter) {
1645 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1646 continue;
1647 }
1648 }
1649 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1650 break;
1651 }
1652 }
1653 }
1654 }
1655
1656 @Override
1657 public BasicFactory getOFMessageFactory() {
1658 return factory;
1659 }
1660
1661 @Override
1662 public String getControllerId() {
1663 return controllerId;
1664 }
1665
1666 // **************
1667 // Initialization
1668 // **************
1669
1670 protected void updateAllInactiveSwitchInfo() {
1671 if (role == Role.SLAVE) {
1672 return;
1673 }
1674 String controllerId = getControllerId();
1675 String[] switchColumns = { SWITCH_DATAPATH_ID,
1676 SWITCH_CONTROLLER_ID,
1677 SWITCH_ACTIVE };
1678 String[] portColumns = { PORT_ID, PORT_SWITCH };
1679 IResultSet switchResultSet = null;
1680 try {
1681 OperatorPredicate op =
1682 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1683 OperatorPredicate.Operator.EQ,
1684 controllerId);
1685 switchResultSet =
1686 storageSource.executeQuery(SWITCH_TABLE_NAME,
1687 switchColumns,
1688 op, null);
1689 while (switchResultSet.next()) {
1690 IResultSet portResultSet = null;
1691 try {
1692 String datapathId =
1693 switchResultSet.getString(SWITCH_DATAPATH_ID);
1694 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1695 op = new OperatorPredicate(PORT_SWITCH,
1696 OperatorPredicate.Operator.EQ,
1697 datapathId);
1698 portResultSet =
1699 storageSource.executeQuery(PORT_TABLE_NAME,
1700 portColumns,
1701 op, null);
1702 while (portResultSet.next()) {
1703 portResultSet.deleteRow();
1704 }
1705 portResultSet.save();
1706 }
1707 finally {
1708 if (portResultSet != null)
1709 portResultSet.close();
1710 }
1711 }
1712 switchResultSet.save();
1713 }
1714 finally {
1715 if (switchResultSet != null)
1716 switchResultSet.close();
1717 }
1718 }
1719
1720 protected void updateControllerInfo() {
1721 updateAllInactiveSwitchInfo();
1722
1723 // Write out the controller info to the storage source
1724 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1725 String id = getControllerId();
1726 controllerInfo.put(CONTROLLER_ID, id);
1727 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1728 }
1729
1730 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1731 if (role == Role.SLAVE) {
1732 return;
1733 }
1734 // Obtain the row info for the switch
1735 Map<String, Object> switchInfo = new HashMap<String, Object>();
1736 String datapathIdString = sw.getStringId();
1737 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1738 String controllerId = getControllerId();
1739 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1740 Date connectedSince = sw.getConnectedSince();
1741 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1742 Channel channel = sw.getChannel();
1743 SocketAddress socketAddress = channel.getRemoteAddress();
1744 if (socketAddress != null) {
1745 String socketAddressString = socketAddress.toString();
1746 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1747 if (socketAddress instanceof InetSocketAddress) {
1748 InetSocketAddress inetSocketAddress =
1749 (InetSocketAddress)socketAddress;
1750 InetAddress inetAddress = inetSocketAddress.getAddress();
1751 String ip = inetAddress.getHostAddress();
1752 switchInfo.put(SWITCH_IP, ip);
1753 }
1754 }
1755
1756 // Write out the switch features info
1757 long capabilities = U32.f(sw.getCapabilities());
1758 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1759 long buffers = U32.f(sw.getBuffers());
1760 switchInfo.put(SWITCH_BUFFERS, buffers);
1761 long tables = U32.f(sw.getTables());
1762 switchInfo.put(SWITCH_TABLES, tables);
1763 long actions = U32.f(sw.getActions());
1764 switchInfo.put(SWITCH_ACTIONS, actions);
1765 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1766
1767 // Update the switch
1768 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1769
1770 // Update the ports
1771 for (OFPhysicalPort port: sw.getPorts()) {
1772 updatePortInfo(sw, port);
1773 }
1774 }
1775
1776 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1777 if (role == Role.SLAVE) {
1778 return;
1779 }
1780 log.debug("Update DB with inactiveSW {}", sw);
1781 // Update the controller info in the storage source to be inactive
1782 Map<String, Object> switchInfo = new HashMap<String, Object>();
1783 String datapathIdString = sw.getStringId();
1784 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1785 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1786 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1787 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1788 }
1789
1790 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1791 if (role == Role.SLAVE) {
1792 return;
1793 }
1794 String datapathIdString = sw.getStringId();
1795 Map<String, Object> portInfo = new HashMap<String, Object>();
1796 int portNumber = U16.f(port.getPortNumber());
1797 String id = datapathIdString + "|" + portNumber;
1798 portInfo.put(PORT_ID, id);
1799 portInfo.put(PORT_SWITCH, datapathIdString);
1800 portInfo.put(PORT_NUMBER, portNumber);
1801 byte[] hardwareAddress = port.getHardwareAddress();
1802 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1803 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1804 String name = port.getName();
1805 portInfo.put(PORT_NAME, name);
1806 long config = U32.f(port.getConfig());
1807 portInfo.put(PORT_CONFIG, config);
1808 long state = U32.f(port.getState());
1809 portInfo.put(PORT_STATE, state);
1810 long currentFeatures = U32.f(port.getCurrentFeatures());
1811 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1812 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1813 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1814 long supportedFeatures = U32.f(port.getSupportedFeatures());
1815 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1816 long peerFeatures = U32.f(port.getPeerFeatures());
1817 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1818 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1819 }
1820
1821 /**
1822 * Read switch port data from storage and write it into a switch object
1823 * @param sw the switch to update
1824 */
1825 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1826 OperatorPredicate op =
1827 new OperatorPredicate(PORT_SWITCH,
1828 OperatorPredicate.Operator.EQ,
1829 sw.getStringId());
1830 IResultSet portResultSet =
1831 storageSource.executeQuery(PORT_TABLE_NAME,
1832 null, op, null);
1833 //Map<Short, OFPhysicalPort> oldports =
1834 // new HashMap<Short, OFPhysicalPort>();
1835 //oldports.putAll(sw.getPorts());
1836
1837 while (portResultSet.next()) {
1838 try {
1839 OFPhysicalPort p = new OFPhysicalPort();
1840 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1841 p.setName(portResultSet.getString(PORT_NAME));
1842 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1843 p.setState((int)portResultSet.getLong(PORT_STATE));
1844 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1845 p.setHardwareAddress(HexString.fromHexString(portMac));
1846 p.setCurrentFeatures((int)portResultSet.
1847 getLong(PORT_CURRENT_FEATURES));
1848 p.setAdvertisedFeatures((int)portResultSet.
1849 getLong(PORT_ADVERTISED_FEATURES));
1850 p.setSupportedFeatures((int)portResultSet.
1851 getLong(PORT_SUPPORTED_FEATURES));
1852 p.setPeerFeatures((int)portResultSet.
1853 getLong(PORT_PEER_FEATURES));
1854 //oldports.remove(Short.valueOf(p.getPortNumber()));
1855 sw.setPort(p);
1856 } catch (NullPointerException e) {
1857 // ignore
1858 }
1859 }
1860 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1861 try {
1862 this.updates.put(update);
1863 } catch (InterruptedException e) {
1864 log.error("Failure adding update to queue", e);
1865 }
1866 }
1867
1868 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1869 if (role == Role.SLAVE) {
1870 return;
1871 }
1872 String datapathIdString = sw.getStringId();
1873 String id = datapathIdString + "|" + portNumber;
1874 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1875 }
1876
1877 /**
1878 * Sets the initial role based on properties in the config params.
1879 * It looks for two different properties.
1880 * If the "role" property is specified then the value should be
1881 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1882 * controller is set to the specified value. If the "role" property
1883 * is not specified then it looks next for the "role.path" property.
1884 * In this case the value should be the path to a property file in
1885 * the file system that contains a property called "floodlight.role"
1886 * which can be one of the values listed above for the "role" property.
1887 * The idea behind the "role.path" mechanism is that you have some
1888 * separate heartbeat and master controller election algorithm that
1889 * determines the role of the controller. When a role transition happens,
1890 * it updates the current role in the file specified by the "role.path"
1891 * file. Then if floodlight restarts for some reason it can get the
1892 * correct current role of the controller from the file.
1893 * @param configParams The config params for the FloodlightProvider service
1894 * @return A valid role if role information is specified in the
1895 * config params, otherwise null
1896 */
1897 @LogMessageDocs({
1898 @LogMessageDoc(message="Controller role set to {role}",
1899 explanation="Setting the initial HA role to "),
1900 @LogMessageDoc(level="ERROR",
1901 message="Invalid current role value: {role}",
1902 explanation="An invalid HA role value was read from the " +
1903 "properties file",
1904 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1905 })
1906 protected Role getInitialRole(Map<String, String> configParams) {
1907 Role role = null;
1908 String roleString = configParams.get("role");
1909 if (roleString == null) {
1910 String rolePath = configParams.get("rolepath");
1911 if (rolePath != null) {
1912 Properties properties = new Properties();
1913 try {
1914 properties.load(new FileInputStream(rolePath));
1915 roleString = properties.getProperty("floodlight.role");
1916 }
1917 catch (IOException exc) {
1918 // Don't treat it as an error if the file specified by the
1919 // rolepath property doesn't exist. This lets us enable the
1920 // HA mechanism by just creating/setting the floodlight.role
1921 // property in that file without having to modify the
1922 // floodlight properties.
1923 }
1924 }
1925 }
1926
1927 if (roleString != null) {
1928 // Canonicalize the string to the form used for the enum constants
1929 roleString = roleString.trim().toUpperCase();
1930 try {
1931 role = Role.valueOf(roleString);
1932 }
1933 catch (IllegalArgumentException exc) {
1934 log.error("Invalid current role value: {}", roleString);
1935 }
1936 }
1937
1938 log.info("Controller role set to {}", role);
1939
1940 return role;
1941 }
1942
1943 /**
1944 * Tell controller that we're ready to accept switches loop
1945 * @throws IOException
1946 */
1947 @LogMessageDocs({
1948 @LogMessageDoc(message="Listening for switch connections on {address}",
1949 explanation="The controller is ready and listening for new" +
1950 " switch connections"),
1951 @LogMessageDoc(message="Storage exception in controller " +
1952 "updates loop; terminating process",
1953 explanation=ERROR_DATABASE,
1954 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1955 @LogMessageDoc(level="ERROR",
1956 message="Exception in controller updates loop",
1957 explanation="Failed to dispatch controller event",
1958 recommendation=LogMessageDoc.GENERIC_ACTION)
1959 })
1960 public void run() {
1961 if (log.isDebugEnabled()) {
1962 logListeners();
1963 }
1964
1965 try {
1966 final ServerBootstrap bootstrap = createServerBootStrap();
1967
1968 bootstrap.setOption("reuseAddr", true);
1969 bootstrap.setOption("child.keepAlive", true);
1970 bootstrap.setOption("child.tcpNoDelay", true);
1971 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
1972
1973 ChannelPipelineFactory pfact =
1974 new OpenflowPipelineFactory(this, null);
1975 bootstrap.setPipelineFactory(pfact);
1976 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
1977 final ChannelGroup cg = new DefaultChannelGroup();
1978 cg.add(bootstrap.bind(sa));
1979
1980 log.info("Listening for switch connections on {}", sa);
1981 } catch (Exception e) {
1982 throw new RuntimeException(e);
1983 }
1984
1985 // main loop
1986 while (true) {
1987 try {
1988 IUpdate update = updates.take();
1989 update.dispatch();
1990 } catch (InterruptedException e) {
1991 return;
1992 } catch (StorageException e) {
1993 log.error("Storage exception in controller " +
1994 "updates loop; terminating process", e);
1995 return;
1996 } catch (Exception e) {
1997 log.error("Exception in controller updates loop", e);
1998 }
1999 }
2000 }
2001
2002 private ServerBootstrap createServerBootStrap() {
2003 if (workerThreads == 0) {
2004 return new ServerBootstrap(
2005 new NioServerSocketChannelFactory(
2006 Executors.newCachedThreadPool(),
2007 Executors.newCachedThreadPool()));
2008 } else {
2009 return new ServerBootstrap(
2010 new NioServerSocketChannelFactory(
2011 Executors.newCachedThreadPool(),
2012 Executors.newCachedThreadPool(), workerThreads));
2013 }
2014 }
2015
2016 public void setConfigParams(Map<String, String> configParams) {
2017 String ofPort = configParams.get("openflowport");
2018 if (ofPort != null) {
2019 this.openFlowPort = Integer.parseInt(ofPort);
2020 }
2021 log.debug("OpenFlow port set to {}", this.openFlowPort);
2022 String threads = configParams.get("workerthreads");
2023 if (threads != null) {
2024 this.workerThreads = Integer.parseInt(threads);
2025 }
2026 log.debug("Number of worker threads set to {}", this.workerThreads);
2027 String controllerId = configParams.get("controllerid");
2028 if (controllerId != null) {
2029 this.controllerId = controllerId;
2030 }
2031 log.debug("ControllerId set to {}", this.controllerId);
2032 }
2033
2034 private void initVendorMessages() {
2035 // Configure openflowj to be able to parse the role request/reply
2036 // vendor messages.
2037 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2038 OFNiciraVendorData.NX_VENDOR_ID, 4);
2039 OFVendorId.registerVendorId(niciraVendorId);
2040 OFBasicVendorDataType roleRequestVendorData =
2041 new OFBasicVendorDataType(
2042 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2043 OFRoleRequestVendorData.getInstantiable());
2044 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2045 OFBasicVendorDataType roleReplyVendorData =
2046 new OFBasicVendorDataType(
2047 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2048 OFRoleReplyVendorData.getInstantiable());
2049 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2050 }
2051
2052 /**
2053 * Initialize internal data structures
2054 */
2055 public void init(Map<String, String> configParams) {
2056 // These data structures are initialized here because other
2057 // module's startUp() might be called before ours
2058 this.messageListeners =
2059 new ConcurrentHashMap<OFType,
2060 ListenerDispatcher<OFType,
2061 IOFMessageListener>>();
2062 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2063 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2064 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2065 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2066 this.controllerNodeIPsCache = new HashMap<String, String>();
2067 this.updates = new LinkedBlockingQueue<IUpdate>();
2068 this.factory = new BasicFactory();
2069 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2070 setConfigParams(configParams);
2071 this.role = getInitialRole(configParams);
2072 this.roleChanger = new RoleChanger();
2073 initVendorMessages();
2074 this.systemStartTime = System.currentTimeMillis();
Pankaj Berdeafb20532013-01-08 15:05:24 -08002075 this.swStore = new SwitchStorageImpl();
Pankaj Berde67f88fb2013-01-15 17:16:01 -08002076 this.swStore.init("/tmp/cassandra.titan");
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002077 }
2078
2079 /**
2080 * Startup all of the controller's components
2081 */
2082 @LogMessageDoc(message="Waiting for storage source",
2083 explanation="The system database is not yet ready",
2084 recommendation="If this message persists, this indicates " +
2085 "that the system database has failed to start. " +
2086 LogMessageDoc.CHECK_CONTROLLER)
2087 public void startupComponents() {
2088 // Create the table names we use
2089 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2090 storageSource.createTable(SWITCH_TABLE_NAME, null);
2091 storageSource.createTable(PORT_TABLE_NAME, null);
2092 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2093 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2094 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2095 CONTROLLER_ID);
2096 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2097 SWITCH_DATAPATH_ID);
2098 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2099 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2100 CONTROLLER_INTERFACE_ID);
2101 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2102
2103 while (true) {
2104 try {
2105 updateControllerInfo();
2106 break;
2107 }
2108 catch (StorageException e) {
2109 log.info("Waiting for storage source");
2110 try {
2111 Thread.sleep(1000);
2112 } catch (InterruptedException e1) {
2113 }
2114 }
2115 }
2116
2117 // Add our REST API
2118 restApi.addRestletRoutable(new CoreWebRoutable());
2119 }
2120
2121 @Override
2122 public void addInfoProvider(String type, IInfoProvider provider) {
2123 if (!providerMap.containsKey(type)) {
2124 providerMap.put(type, new ArrayList<IInfoProvider>());
2125 }
2126 providerMap.get(type).add(provider);
2127 }
2128
2129 @Override
2130 public void removeInfoProvider(String type, IInfoProvider provider) {
2131 if (!providerMap.containsKey(type)) {
2132 log.debug("Provider type {} doesn't exist.", type);
2133 return;
2134 }
2135
2136 providerMap.get(type).remove(provider);
2137 }
2138
2139 public Map<String, Object> getControllerInfo(String type) {
2140 if (!providerMap.containsKey(type)) return null;
2141
2142 Map<String, Object> result = new LinkedHashMap<String, Object>();
2143 for (IInfoProvider provider : providerMap.get(type)) {
2144 result.putAll(provider.getInfo(type));
2145 }
2146
2147 return result;
2148 }
2149
2150 @Override
2151 public void addHAListener(IHAListener listener) {
2152 this.haListeners.add(listener);
2153 }
2154
2155 @Override
2156 public void removeHAListener(IHAListener listener) {
2157 this.haListeners.remove(listener);
2158 }
2159
2160
2161 /**
2162 * Handle changes to the controller nodes IPs and dispatch update.
2163 */
2164 @SuppressWarnings("unchecked")
2165 protected void handleControllerNodeIPChanges() {
2166 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2167 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2168 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2169 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2170 CONTROLLER_INTERFACE_TYPE,
2171 CONTROLLER_INTERFACE_NUMBER,
2172 CONTROLLER_INTERFACE_DISCOVERED_IP };
2173 synchronized(controllerNodeIPsCache) {
2174 // We currently assume that interface Ethernet0 is the relevant
2175 // controller interface. Might change.
2176 // We could (should?) implement this using
2177 // predicates, but creating the individual and compound predicate
2178 // seems more overhead then just checking every row. Particularly,
2179 // since the number of rows is small and changes infrequent
2180 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2181 colNames,null, null);
2182 while (res.next()) {
2183 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2184 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2185 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2186 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2187 String curIP = controllerNodeIPsCache.get(controllerID);
2188
2189 curControllerNodeIPs.put(controllerID, discoveredIP);
2190 if (curIP == null) {
2191 // new controller node IP
2192 addedControllerNodeIPs.put(controllerID, discoveredIP);
2193 }
2194 else if (curIP != discoveredIP) {
2195 // IP changed
2196 removedControllerNodeIPs.put(controllerID, curIP);
2197 addedControllerNodeIPs.put(controllerID, discoveredIP);
2198 }
2199 }
2200 }
2201 // Now figure out if rows have been deleted. We can't use the
2202 // rowKeys from rowsDeleted directly, since the tables primary
2203 // key is a compound that we can't disassemble
2204 Set<String> curEntries = curControllerNodeIPs.keySet();
2205 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2206 removedEntries.removeAll(curEntries);
2207 for (String removedControllerID : removedEntries)
2208 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2209 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2210 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2211 curControllerNodeIPs, addedControllerNodeIPs,
2212 removedControllerNodeIPs);
2213 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2214 try {
2215 this.updates.put(update);
2216 } catch (InterruptedException e) {
2217 log.error("Failure adding update to queue", e);
2218 }
2219 }
2220 }
2221 }
2222
2223 @Override
2224 public Map<String, String> getControllerNodeIPs() {
2225 // We return a copy of the mapping so we can guarantee that
2226 // the mapping return is the same as one that will be (or was)
2227 // dispatched to IHAListeners
2228 HashMap<String,String> retval = new HashMap<String,String>();
2229 synchronized(controllerNodeIPsCache) {
2230 retval.putAll(controllerNodeIPsCache);
2231 }
2232 return retval;
2233 }
2234
2235 @Override
2236 public void rowsModified(String tableName, Set<Object> rowKeys) {
2237 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2238 handleControllerNodeIPChanges();
2239 }
2240
2241 }
2242
2243 @Override
2244 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2245 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2246 handleControllerNodeIPChanges();
2247 }
2248 }
2249
2250 @Override
2251 public long getSystemStartTime() {
2252 return (this.systemStartTime);
2253 }
2254
2255 @Override
2256 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2257 this.alwaysClearFlowsOnSwAdd = value;
2258 }
2259
2260 public boolean getAlwaysClearFlowsOnSwAdd() {
2261 return this.alwaysClearFlowsOnSwAdd;
2262 }
2263}