blob: 0cbcfcbb477c8ab4660f266fd0d7ea6732d75871 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.util.ArrayList;
26import java.nio.channels.ClosedChannelException;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Date;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.LinkedHashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Properties;
38import java.util.Set;
39import java.util.Stack;
40import java.util.concurrent.BlockingQueue;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ConcurrentMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.Executors;
45import java.util.concurrent.Future;
46import java.util.concurrent.LinkedBlockingQueue;
47import java.util.concurrent.RejectedExecutionException;
48import java.util.concurrent.TimeUnit;
49import java.util.concurrent.TimeoutException;
50
51import net.floodlightcontroller.core.FloodlightContext;
52import net.floodlightcontroller.core.IFloodlightProviderService;
53import net.floodlightcontroller.core.IHAListener;
54import net.floodlightcontroller.core.IInfoProvider;
Pankaj Berde8557a462013-01-07 08:59:31 -080055import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IOFMessageListener;
57import net.floodlightcontroller.core.IListener.Command;
58import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080061import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
68import net.floodlightcontroller.packet.Ethernet;
69import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
70import net.floodlightcontroller.restserver.IRestApiService;
71import net.floodlightcontroller.storage.IResultSet;
72import net.floodlightcontroller.storage.IStorageSourceListener;
73import net.floodlightcontroller.storage.IStorageSourceService;
74import net.floodlightcontroller.storage.OperatorPredicate;
75import net.floodlightcontroller.storage.StorageException;
76import net.floodlightcontroller.threadpool.IThreadPoolService;
77
78import org.jboss.netty.bootstrap.ServerBootstrap;
79import org.jboss.netty.buffer.ChannelBuffer;
80import org.jboss.netty.buffer.ChannelBuffers;
81import org.jboss.netty.channel.Channel;
82import org.jboss.netty.channel.ChannelHandlerContext;
83import org.jboss.netty.channel.ChannelPipelineFactory;
84import org.jboss.netty.channel.ChannelStateEvent;
85import org.jboss.netty.channel.ChannelUpstreamHandler;
86import org.jboss.netty.channel.Channels;
87import org.jboss.netty.channel.ExceptionEvent;
88import org.jboss.netty.channel.MessageEvent;
89import org.jboss.netty.channel.group.ChannelGroup;
90import org.jboss.netty.channel.group.DefaultChannelGroup;
91import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
92import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
93import org.jboss.netty.handler.timeout.IdleStateEvent;
94import org.jboss.netty.handler.timeout.ReadTimeoutException;
95import org.openflow.protocol.OFEchoReply;
96import org.openflow.protocol.OFError;
97import org.openflow.protocol.OFError.OFBadActionCode;
98import org.openflow.protocol.OFError.OFBadRequestCode;
99import org.openflow.protocol.OFError.OFErrorType;
100import org.openflow.protocol.OFError.OFFlowModFailedCode;
101import org.openflow.protocol.OFError.OFHelloFailedCode;
102import org.openflow.protocol.OFError.OFPortModFailedCode;
103import org.openflow.protocol.OFError.OFQueueOpFailedCode;
104import org.openflow.protocol.OFFeaturesReply;
105import org.openflow.protocol.OFGetConfigReply;
106import org.openflow.protocol.OFMessage;
107import org.openflow.protocol.OFPacketIn;
108import org.openflow.protocol.OFPhysicalPort;
109import org.openflow.protocol.OFPortStatus;
Pankaj Berde6debb042013-01-16 18:04:32 -0800110import org.openflow.protocol.OFPhysicalPort.OFPortState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800111import org.openflow.protocol.OFPortStatus.OFPortReason;
112import org.openflow.protocol.OFSetConfig;
113import org.openflow.protocol.OFStatisticsRequest;
114import org.openflow.protocol.OFSwitchConfig;
115import org.openflow.protocol.OFType;
116import org.openflow.protocol.OFVendor;
117import org.openflow.protocol.factory.BasicFactory;
118import org.openflow.protocol.factory.MessageParseException;
119import org.openflow.protocol.statistics.OFDescriptionStatistics;
120import org.openflow.protocol.statistics.OFStatistics;
121import org.openflow.protocol.statistics.OFStatisticsType;
122import org.openflow.protocol.vendor.OFBasicVendorDataType;
123import org.openflow.protocol.vendor.OFBasicVendorId;
124import org.openflow.protocol.vendor.OFVendorId;
125import org.openflow.util.HexString;
126import org.openflow.util.U16;
127import org.openflow.util.U32;
128import org.openflow.vendor.nicira.OFNiciraVendorData;
129import org.openflow.vendor.nicira.OFRoleReplyVendorData;
130import org.openflow.vendor.nicira.OFRoleRequestVendorData;
131import org.openflow.vendor.nicira.OFRoleVendorData;
132import org.slf4j.Logger;
133import org.slf4j.LoggerFactory;
134
135
136/**
137 * The main controller class. Handles all setup and network listeners
138 */
139public class Controller implements IFloodlightProviderService,
140 IStorageSourceListener {
141
Pankaj Berde8557a462013-01-07 08:59:31 -0800142 protected SwitchStorageImpl swStore;
143
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800144 protected static Logger log = LoggerFactory.getLogger(Controller.class);
145
146 private static final String ERROR_DATABASE =
147 "The controller could not communicate with the system database.";
148
149 protected BasicFactory factory;
150 protected ConcurrentMap<OFType,
151 ListenerDispatcher<OFType,IOFMessageListener>>
152 messageListeners;
153 // The activeSwitches map contains only those switches that are actively
154 // being controlled by us -- it doesn't contain switches that are
155 // in the slave role
156 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
157 // connectedSwitches contains all connected switches, including ones where
158 // we're a slave controller. We need to keep track of them so that we can
159 // send role request messages to switches when our role changes to master
160 // We add a switch to this set after it successfully completes the
161 // handshake. Access to this Set needs to be synchronized with roleChanger
162 protected HashSet<OFSwitchImpl> connectedSwitches;
163
164 // The controllerNodeIPsCache maps Controller IDs to their IP address.
165 // It's only used by handleControllerNodeIPsChanged
166 protected HashMap<String, String> controllerNodeIPsCache;
167
168 protected Set<IOFSwitchListener> switchListeners;
169 protected Set<IHAListener> haListeners;
170 protected Map<String, List<IInfoProvider>> providerMap;
171 protected BlockingQueue<IUpdate> updates;
172
173 // Module dependencies
174 protected IRestApiService restApi;
175 protected ICounterStoreService counterStore = null;
176 protected IStorageSourceService storageSource;
177 protected IPktInProcessingTimeService pktinProcTime;
178 protected IThreadPoolService threadPool;
179
180 // Configuration options
181 protected int openFlowPort = 6633;
182 protected int workerThreads = 0;
183 // The id for this controller node. Should be unique for each controller
184 // node in a controller cluster.
185 protected String controllerId = "localhost";
186
187 // The current role of the controller.
188 // If the controller isn't configured to support roles, then this is null.
189 protected Role role;
190 // A helper that handles sending and timeout handling for role requests
191 protected RoleChanger roleChanger;
192
193 // Start time of the controller
194 protected long systemStartTime;
195
196 // Flag to always flush flow table on switch reconnect (HA or otherwise)
197 protected boolean alwaysClearFlowsOnSwAdd = false;
198
199 // Storage table names
200 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
201 protected static final String CONTROLLER_ID = "id";
202
203 protected static final String SWITCH_TABLE_NAME = "controller_switch";
204 protected static final String SWITCH_DATAPATH_ID = "dpid";
205 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
206 protected static final String SWITCH_IP = "ip";
207 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
208 protected static final String SWITCH_ACTIVE = "active";
209 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
210 protected static final String SWITCH_CAPABILITIES = "capabilities";
211 protected static final String SWITCH_BUFFERS = "buffers";
212 protected static final String SWITCH_TABLES = "tables";
213 protected static final String SWITCH_ACTIONS = "actions";
214
215 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
216 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
217
218 protected static final String PORT_TABLE_NAME = "controller_port";
219 protected static final String PORT_ID = "id";
220 protected static final String PORT_SWITCH = "switch_id";
221 protected static final String PORT_NUMBER = "number";
222 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
223 protected static final String PORT_NAME = "name";
224 protected static final String PORT_CONFIG = "config";
225 protected static final String PORT_STATE = "state";
226 protected static final String PORT_CURRENT_FEATURES = "current_features";
227 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
228 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
229 protected static final String PORT_PEER_FEATURES = "peer_features";
230
231 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
232 protected static final String CONTROLLER_INTERFACE_ID = "id";
233 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
234 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
235 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
236 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
237
238
239
240 // Perf. related configuration
241 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
242 protected static final int BATCH_MAX_SIZE = 100;
243 protected static final boolean ALWAYS_DECODE_ETH = true;
244
245 /**
246 * Updates handled by the main loop
247 */
248 protected interface IUpdate {
249 /**
250 * Calls the appropriate listeners
251 */
252 public void dispatch();
253 }
254 public enum SwitchUpdateType {
255 ADDED,
256 REMOVED,
257 PORTCHANGED
258 }
259 /**
260 * Update message indicating a switch was added or removed
261 */
262 protected class SwitchUpdate implements IUpdate {
263 public IOFSwitch sw;
264 public SwitchUpdateType switchUpdateType;
265 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
266 this.sw = sw;
267 this.switchUpdateType = switchUpdateType;
268 }
269 public void dispatch() {
270 if (log.isTraceEnabled()) {
271 log.trace("Dispatching switch update {} {}",
272 sw, switchUpdateType);
273 }
274 if (switchListeners != null) {
275 for (IOFSwitchListener listener : switchListeners) {
276 switch(switchUpdateType) {
277 case ADDED:
278 listener.addedSwitch(sw);
279 break;
280 case REMOVED:
281 listener.removedSwitch(sw);
282 break;
283 case PORTCHANGED:
284 listener.switchPortChanged(sw.getId());
285 break;
286 }
287 }
288 }
289 }
290 }
291
292 /**
293 * Update message indicating controller's role has changed
294 */
295 protected class HARoleUpdate implements IUpdate {
296 public Role oldRole;
297 public Role newRole;
298 public HARoleUpdate(Role newRole, Role oldRole) {
299 this.oldRole = oldRole;
300 this.newRole = newRole;
301 }
302 public void dispatch() {
303 // Make sure that old and new roles are different.
304 if (oldRole == newRole) {
305 if (log.isTraceEnabled()) {
306 log.trace("HA role update ignored as the old and " +
307 "new roles are the same. newRole = {}" +
308 "oldRole = {}", newRole, oldRole);
309 }
310 return;
311 }
312 if (log.isTraceEnabled()) {
313 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
314 newRole, oldRole);
315 }
316 if (haListeners != null) {
317 for (IHAListener listener : haListeners) {
318 listener.roleChanged(oldRole, newRole);
319 }
320 }
321 }
322 }
323
324 /**
325 * Update message indicating
326 * IPs of controllers in controller cluster have changed.
327 */
328 protected class HAControllerNodeIPUpdate implements IUpdate {
329 public Map<String,String> curControllerNodeIPs;
330 public Map<String,String> addedControllerNodeIPs;
331 public Map<String,String> removedControllerNodeIPs;
332 public HAControllerNodeIPUpdate(
333 HashMap<String,String> curControllerNodeIPs,
334 HashMap<String,String> addedControllerNodeIPs,
335 HashMap<String,String> removedControllerNodeIPs) {
336 this.curControllerNodeIPs = curControllerNodeIPs;
337 this.addedControllerNodeIPs = addedControllerNodeIPs;
338 this.removedControllerNodeIPs = removedControllerNodeIPs;
339 }
340 public void dispatch() {
341 if (log.isTraceEnabled()) {
342 log.trace("Dispatching HA Controller Node IP update "
343 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
344 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
345 removedControllerNodeIPs }
346 );
347 }
348 if (haListeners != null) {
349 for (IHAListener listener: haListeners) {
350 listener.controllerNodeIPsChanged(curControllerNodeIPs,
351 addedControllerNodeIPs, removedControllerNodeIPs);
352 }
353 }
354 }
355 }
356
357 // ***************
358 // Getters/Setters
359 // ***************
360
361 public void setStorageSourceService(IStorageSourceService storageSource) {
362 this.storageSource = storageSource;
363 }
364
365 public void setCounterStore(ICounterStoreService counterStore) {
366 this.counterStore = counterStore;
367 }
368
369 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
370 this.pktinProcTime = pits;
371 }
372
373 public void setRestApiService(IRestApiService restApi) {
374 this.restApi = restApi;
375 }
376
377 public void setThreadPoolService(IThreadPoolService tp) {
378 this.threadPool = tp;
379 }
380
381 @Override
382 public Role getRole() {
383 synchronized(roleChanger) {
384 return role;
385 }
386 }
387
388 @Override
389 public void setRole(Role role) {
390 if (role == null) throw new NullPointerException("Role can not be null.");
391 if (role == Role.MASTER && this.role == Role.SLAVE) {
392 // Reset db state to Inactive for all switches.
393 updateAllInactiveSwitchInfo();
394 }
395
396 // Need to synchronize to ensure a reliable ordering on role request
397 // messages send and to ensure the list of connected switches is stable
398 // RoleChanger will handle the actual sending of the message and
399 // timeout handling
400 // @see RoleChanger
401 synchronized(roleChanger) {
402 if (role.equals(this.role)) {
403 log.debug("Ignoring role change: role is already {}", role);
404 return;
405 }
406
407 Role oldRole = this.role;
408 this.role = role;
409
410 log.debug("Submitting role change request to role {}", role);
411 roleChanger.submitRequest(connectedSwitches, role);
412
413 // Enqueue an update for our listeners.
414 try {
415 this.updates.put(new HARoleUpdate(role, oldRole));
416 } catch (InterruptedException e) {
417 log.error("Failure adding update to queue", e);
418 }
419 }
420 }
421
422
423
424 // **********************
425 // ChannelUpstreamHandler
426 // **********************
427
428 /**
429 * Return a new channel handler for processing a switch connections
430 * @param state The channel state object for the connection
431 * @return the new channel handler
432 */
433 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
434 return new OFChannelHandler(state);
435 }
436
437 /**
438 * Channel handler deals with the switch connection and dispatches
439 * switch messages to the appropriate locations.
440 * @author readams
441 */
442 protected class OFChannelHandler
443 extends IdleStateAwareChannelUpstreamHandler {
444 protected OFSwitchImpl sw;
445 protected OFChannelState state;
446
447 public OFChannelHandler(OFChannelState state) {
448 this.state = state;
449 }
450
451 @Override
452 @LogMessageDoc(message="New switch connection from {ip address}",
453 explanation="A new switch has connected from the " +
454 "specified IP address")
455 public void channelConnected(ChannelHandlerContext ctx,
456 ChannelStateEvent e) throws Exception {
457 log.info("New switch connection from {}",
458 e.getChannel().getRemoteAddress());
459
460 sw = new OFSwitchImpl();
461 sw.setChannel(e.getChannel());
462 sw.setFloodlightProvider(Controller.this);
463 sw.setThreadPoolService(threadPool);
464
465 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
466 msglist.add(factory.getMessage(OFType.HELLO));
467 e.getChannel().write(msglist);
468
469 }
470
471 @Override
472 @LogMessageDoc(message="Disconnected switch {switch information}",
473 explanation="The specified switch has disconnected.")
474 public void channelDisconnected(ChannelHandlerContext ctx,
475 ChannelStateEvent e) throws Exception {
476 if (sw != null && state.hsState == HandshakeState.READY) {
477 if (activeSwitches.containsKey(sw.getId())) {
478 // It's safe to call removeSwitch even though the map might
479 // not contain this particular switch but another with the
480 // same DPID
481 removeSwitch(sw);
482 }
483 synchronized(roleChanger) {
484 connectedSwitches.remove(sw);
485 }
486 sw.setConnected(false);
487 }
488 log.info("Disconnected switch {}", sw);
489 }
490
491 @Override
492 @LogMessageDocs({
493 @LogMessageDoc(level="ERROR",
494 message="Disconnecting switch {switch} due to read timeout",
495 explanation="The connected switch has failed to send any " +
496 "messages or respond to echo requests",
497 recommendation=LogMessageDoc.CHECK_SWITCH),
498 @LogMessageDoc(level="ERROR",
499 message="Disconnecting switch {switch}: failed to " +
500 "complete handshake",
501 explanation="The switch did not respond correctly " +
502 "to handshake messages",
503 recommendation=LogMessageDoc.CHECK_SWITCH),
504 @LogMessageDoc(level="ERROR",
505 message="Disconnecting switch {switch} due to IO Error: {}",
506 explanation="There was an error communicating with the switch",
507 recommendation=LogMessageDoc.CHECK_SWITCH),
508 @LogMessageDoc(level="ERROR",
509 message="Disconnecting switch {switch} due to switch " +
510 "state error: {error}",
511 explanation="The switch sent an unexpected message",
512 recommendation=LogMessageDoc.CHECK_SWITCH),
513 @LogMessageDoc(level="ERROR",
514 message="Disconnecting switch {switch} due to " +
515 "message parse failure",
516 explanation="Could not parse a message from the switch",
517 recommendation=LogMessageDoc.CHECK_SWITCH),
518 @LogMessageDoc(level="ERROR",
519 message="Terminating controller due to storage exception",
520 explanation=ERROR_DATABASE,
521 recommendation=LogMessageDoc.CHECK_CONTROLLER),
522 @LogMessageDoc(level="ERROR",
523 message="Could not process message: queue full",
524 explanation="OpenFlow messages are arriving faster than " +
525 " the controller can process them.",
526 recommendation=LogMessageDoc.CHECK_CONTROLLER),
527 @LogMessageDoc(level="ERROR",
528 message="Error while processing message " +
529 "from switch {switch} {cause}",
530 explanation="An error occurred processing the switch message",
531 recommendation=LogMessageDoc.GENERIC_ACTION)
532 })
533 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
534 throws Exception {
535 if (e.getCause() instanceof ReadTimeoutException) {
536 // switch timeout
537 log.error("Disconnecting switch {} due to read timeout", sw);
538 ctx.getChannel().close();
539 } else if (e.getCause() instanceof HandshakeTimeoutException) {
540 log.error("Disconnecting switch {}: failed to complete handshake",
541 sw);
542 ctx.getChannel().close();
543 } else if (e.getCause() instanceof ClosedChannelException) {
544 //log.warn("Channel for sw {} already closed", sw);
545 } else if (e.getCause() instanceof IOException) {
546 log.error("Disconnecting switch {} due to IO Error: {}",
547 sw, e.getCause().getMessage());
548 ctx.getChannel().close();
549 } else if (e.getCause() instanceof SwitchStateException) {
550 log.error("Disconnecting switch {} due to switch state error: {}",
551 sw, e.getCause().getMessage());
552 ctx.getChannel().close();
553 } else if (e.getCause() instanceof MessageParseException) {
554 log.error("Disconnecting switch " + sw +
555 " due to message parse failure",
556 e.getCause());
557 ctx.getChannel().close();
558 } else if (e.getCause() instanceof StorageException) {
559 log.error("Terminating controller due to storage exception",
560 e.getCause());
561 terminate();
562 } else if (e.getCause() instanceof RejectedExecutionException) {
563 log.warn("Could not process message: queue full");
564 } else {
565 log.error("Error while processing message from switch " + sw,
566 e.getCause());
567 ctx.getChannel().close();
568 }
569 }
570
571 @Override
572 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
573 throws Exception {
574 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
575 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
576 e.getChannel().write(msglist);
577 }
578
579 @Override
580 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
581 throws Exception {
582 if (e.getMessage() instanceof List) {
583 @SuppressWarnings("unchecked")
584 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
585
586 for (OFMessage ofm : msglist) {
587 try {
588 processOFMessage(ofm);
589 }
590 catch (Exception ex) {
591 // We are the last handler in the stream, so run the
592 // exception through the channel again by passing in
593 // ctx.getChannel().
594 Channels.fireExceptionCaught(ctx.getChannel(), ex);
595 }
596 }
597
598 // Flush all flow-mods/packet-out generated from this "train"
599 OFSwitchImpl.flush_all();
600 }
601 }
602
603 /**
604 * Process the request for the switch description
605 */
606 @LogMessageDoc(level="ERROR",
607 message="Exception in reading description " +
608 " during handshake {exception}",
609 explanation="Could not process the switch description string",
610 recommendation=LogMessageDoc.CHECK_SWITCH)
611 void processSwitchDescReply() {
612 try {
613 // Read description, if it has been updated
614 @SuppressWarnings("unchecked")
615 Future<List<OFStatistics>> desc_future =
616 (Future<List<OFStatistics>>)sw.
617 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
618 List<OFStatistics> values =
619 desc_future.get(0, TimeUnit.MILLISECONDS);
620 if (values != null) {
621 OFDescriptionStatistics description =
622 new OFDescriptionStatistics();
623 ChannelBuffer data =
624 ChannelBuffers.buffer(description.getLength());
625 for (OFStatistics f : values) {
626 f.writeTo(data);
627 description.readFrom(data);
628 break; // SHOULD be a list of length 1
629 }
630 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
631 description);
632 sw.setSwitchProperties(description);
633 data = null;
634
635 // At this time, also set other switch properties from storage
636 boolean is_core_switch = false;
637 IResultSet resultSet = null;
638 try {
639 String swid = sw.getStringId();
640 resultSet =
641 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
642 for (Iterator<IResultSet> it =
643 resultSet.iterator(); it.hasNext();) {
644 // In case of multiple rows, use the status
645 // in last row?
646 Map<String, Object> row = it.next().getRow();
647 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
648 if (log.isDebugEnabled()) {
649 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
650 "config for switch={}, is-core={}",
651 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
652 }
653 String ics =
654 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
655 is_core_switch = ics.equals("true");
656 }
657 }
658 }
659 finally {
660 if (resultSet != null)
661 resultSet.close();
662 }
663 if (is_core_switch) {
664 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
665 new Boolean(true));
666 }
667 }
668 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
669 state.hasDescription = true;
670 checkSwitchReady();
671 }
672 catch (InterruptedException ex) {
673 // Ignore
674 }
675 catch (TimeoutException ex) {
676 // Ignore
677 } catch (Exception ex) {
678 log.error("Exception in reading description " +
679 " during handshake", ex);
680 }
681 }
682
683 /**
684 * Send initial switch setup information that we need before adding
685 * the switch
686 * @throws IOException
687 */
688 void sendHelloConfiguration() throws IOException {
689 // Send initial Features Request
690 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
691 }
692
693 /**
694 * Send the configuration requests we can only do after we have
695 * the features reply
696 * @throws IOException
697 */
698 void sendFeatureReplyConfiguration() throws IOException {
699 // Ensure we receive the full packet via PacketIn
700 OFSetConfig config = (OFSetConfig) factory
701 .getMessage(OFType.SET_CONFIG);
702 config.setMissSendLength((short) 0xffff)
703 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
704 sw.write(config, null);
705 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
706 null);
707
708 // Get Description to set switch-specific flags
709 OFStatisticsRequest req = new OFStatisticsRequest();
710 req.setStatisticType(OFStatisticsType.DESC);
711 req.setLengthU(req.getLengthU());
712 Future<List<OFStatistics>> dfuture =
713 sw.getStatistics(req);
714 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
715 dfuture);
716
717 }
718
719 protected void checkSwitchReady() {
720 if (state.hsState == HandshakeState.FEATURES_REPLY &&
721 state.hasDescription && state.hasGetConfigReply) {
722
723 state.hsState = HandshakeState.READY;
724
725 synchronized(roleChanger) {
726 // We need to keep track of all of the switches that are connected
727 // to the controller, in any role, so that we can later send the
728 // role request messages when the controller role changes.
729 // We need to be synchronized while doing this: we must not
730 // send a another role request to the connectedSwitches until
731 // we were able to add this new switch to connectedSwitches
732 // *and* send the current role to the new switch.
733 connectedSwitches.add(sw);
734
735 if (role != null) {
736 // Send a role request if role support is enabled for the controller
737 // This is a probe that we'll use to determine if the switch
738 // actually supports the role request message. If it does we'll
739 // get back a role reply message. If it doesn't we'll get back an
740 // OFError message.
741 // If role is MASTER we will promote switch to active
742 // list when we receive the switch's role reply messages
743 log.debug("This controller's role is {}, " +
744 "sending initial role request msg to {}",
745 role, sw);
746 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
747 swList.add(sw);
748 roleChanger.submitRequest(swList, role);
749 }
750 else {
751 // Role supported not enabled on controller (for now)
752 // automatically promote switch to active state.
753 log.debug("This controller's role is null, " +
754 "not sending role request msg to {}",
755 role, sw);
756 // Need to clear FlowMods before we add the switch
757 // and dispatch updates otherwise we have a race condition.
758 sw.clearAllFlowMods();
759 addSwitch(sw);
760 state.firstRoleReplyReceived = true;
761 }
762 }
763 }
764 }
765
766 /* Handle a role reply message we received from the switch. Since
767 * netty serializes message dispatch we don't need to synchronize
768 * against other receive operations from the same switch, so no need
769 * to synchronize addSwitch(), removeSwitch() operations from the same
770 * connection.
771 * FIXME: However, when a switch with the same DPID connects we do
772 * need some synchronization. However, handling switches with same
773 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
774 * removedSwitch notification):1
775 *
776 */
777 @LogMessageDoc(level="ERROR",
778 message="Invalid role value in role reply message",
779 explanation="Was unable to set the HA role (master or slave) " +
780 "for the controller.",
781 recommendation=LogMessageDoc.CHECK_CONTROLLER)
782 protected void handleRoleReplyMessage(OFVendor vendorMessage,
783 OFRoleReplyVendorData roleReplyVendorData) {
784 // Map from the role code in the message to our role enum
785 int nxRole = roleReplyVendorData.getRole();
786 Role role = null;
787 switch (nxRole) {
788 case OFRoleVendorData.NX_ROLE_OTHER:
789 role = Role.EQUAL;
790 break;
791 case OFRoleVendorData.NX_ROLE_MASTER:
792 role = Role.MASTER;
793 break;
794 case OFRoleVendorData.NX_ROLE_SLAVE:
795 role = Role.SLAVE;
796 break;
797 default:
798 log.error("Invalid role value in role reply message");
799 sw.getChannel().close();
800 return;
801 }
802
803 log.debug("Handling role reply for role {} from {}. " +
804 "Controller's role is {} ",
805 new Object[] { role, sw, Controller.this.role}
806 );
807
808 sw.deliverRoleReply(vendorMessage.getXid(), role);
809
810 boolean isActive = activeSwitches.containsKey(sw.getId());
811 if (!isActive && sw.isActive()) {
812 // Transition from SLAVE to MASTER.
813
814 if (!state.firstRoleReplyReceived ||
815 getAlwaysClearFlowsOnSwAdd()) {
816 // This is the first role-reply message we receive from
817 // this switch or roles were disabled when the switch
818 // connected:
819 // Delete all pre-existing flows for new connections to
820 // the master
821 //
822 // FIXME: Need to think more about what the test should
823 // be for when we flush the flow-table? For example,
824 // if all the controllers are temporarily in the backup
825 // role (e.g. right after a failure of the master
826 // controller) at the point the switch connects, then
827 // all of the controllers will initially connect as
828 // backup controllers and not flush the flow-table.
829 // Then when one of them is promoted to master following
830 // the master controller election the flow-table
831 // will still not be flushed because that's treated as
832 // a failover event where we don't want to flush the
833 // flow-table. The end result would be that the flow
834 // table for a newly connected switch is never
835 // flushed. Not sure how to handle that case though...
836 sw.clearAllFlowMods();
837 log.debug("First role reply from master switch {}, " +
838 "clear FlowTable to active switch list",
839 HexString.toHexString(sw.getId()));
840 }
841
842 // Some switches don't seem to update us with port
843 // status messages while in slave role.
844 readSwitchPortStateFromStorage(sw);
845
846 // Only add the switch to the active switch list if
847 // we're not in the slave role. Note that if the role
848 // attribute is null, then that means that the switch
849 // doesn't support the role request messages, so in that
850 // case we're effectively in the EQUAL role and the
851 // switch should be included in the active switch list.
852 addSwitch(sw);
853 log.debug("Added master switch {} to active switch list",
854 HexString.toHexString(sw.getId()));
855
856 }
857 else if (isActive && !sw.isActive()) {
858 // Transition from MASTER to SLAVE: remove switch
859 // from active switch list.
860 log.debug("Removed slave switch {} from active switch" +
861 " list", HexString.toHexString(sw.getId()));
862 removeSwitch(sw);
863 }
864
865 // Indicate that we have received a role reply message.
866 state.firstRoleReplyReceived = true;
867 }
868
869 protected boolean handleVendorMessage(OFVendor vendorMessage) {
870 boolean shouldHandleMessage = false;
871 int vendor = vendorMessage.getVendor();
872 switch (vendor) {
873 case OFNiciraVendorData.NX_VENDOR_ID:
874 OFNiciraVendorData niciraVendorData =
875 (OFNiciraVendorData)vendorMessage.getVendorData();
876 int dataType = niciraVendorData.getDataType();
877 switch (dataType) {
878 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
879 OFRoleReplyVendorData roleReplyVendorData =
880 (OFRoleReplyVendorData) niciraVendorData;
881 handleRoleReplyMessage(vendorMessage,
882 roleReplyVendorData);
883 break;
884 default:
885 log.warn("Unhandled Nicira VENDOR message; " +
886 "data type = {}", dataType);
887 break;
888 }
889 break;
890 default:
891 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
892 break;
893 }
894
895 return shouldHandleMessage;
896 }
897
898 /**
899 * Dispatch an Openflow message from a switch to the appropriate
900 * handler.
901 * @param m The message to process
902 * @throws IOException
903 * @throws SwitchStateException
904 */
905 @LogMessageDocs({
906 @LogMessageDoc(level="WARN",
907 message="Config Reply from {switch} has " +
908 "miss length set to {length}",
909 explanation="The controller requires that the switch " +
910 "use a miss length of 0xffff for correct " +
911 "function",
912 recommendation="Use a different switch to ensure " +
913 "correct function"),
914 @LogMessageDoc(level="WARN",
915 message="Received ERROR from sw {switch} that "
916 +"indicates roles are not supported "
917 +"but we have received a valid "
918 +"role reply earlier",
919 explanation="The switch sent a confusing message to the" +
920 "controller")
921 })
922 protected void processOFMessage(OFMessage m)
923 throws IOException, SwitchStateException {
924 boolean shouldHandleMessage = false;
925
926 switch (m.getType()) {
927 case HELLO:
928 if (log.isTraceEnabled())
929 log.trace("HELLO from {}", sw);
930
931 if (state.hsState.equals(HandshakeState.START)) {
932 state.hsState = HandshakeState.HELLO;
933 sendHelloConfiguration();
934 } else {
935 throw new SwitchStateException("Unexpected HELLO from "
936 + sw);
937 }
938 break;
939 case ECHO_REQUEST:
940 OFEchoReply reply =
941 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
942 reply.setXid(m.getXid());
943 sw.write(reply, null);
944 break;
945 case ECHO_REPLY:
946 break;
947 case FEATURES_REPLY:
948 if (log.isTraceEnabled())
949 log.trace("Features Reply from {}", sw);
950
951 sw.setFeaturesReply((OFFeaturesReply) m);
952 if (state.hsState.equals(HandshakeState.HELLO)) {
953 sendFeatureReplyConfiguration();
954 state.hsState = HandshakeState.FEATURES_REPLY;
955 // uncomment to enable "dumb" switches like cbench
956 // state.hsState = HandshakeState.READY;
957 // addSwitch(sw);
958 } else {
959 // return results to rest api caller
960 sw.deliverOFFeaturesReply(m);
961 // update database */
962 updateActiveSwitchInfo(sw);
963 }
964 break;
965 case GET_CONFIG_REPLY:
966 if (log.isTraceEnabled())
967 log.trace("Get config reply from {}", sw);
968
969 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
970 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
971 throw new SwitchStateException(em);
972 }
973 OFGetConfigReply cr = (OFGetConfigReply) m;
974 if (cr.getMissSendLength() == (short)0xffff) {
975 log.trace("Config Reply from {} confirms " +
976 "miss length set to 0xffff", sw);
977 } else {
978 log.warn("Config Reply from {} has " +
979 "miss length set to {}",
980 sw, cr.getMissSendLength() & 0xffff);
981 }
982 state.hasGetConfigReply = true;
983 checkSwitchReady();
984 break;
985 case VENDOR:
986 shouldHandleMessage = handleVendorMessage((OFVendor)m);
987 break;
988 case ERROR:
989 // TODO: we need better error handling. Especially for
990 // request/reply style message (stats, roles) we should have
991 // a unified way to lookup the xid in the error message.
992 // This will probable involve rewriting the way we handle
993 // request/reply style messages.
994 OFError error = (OFError) m;
995 boolean shouldLogError = true;
996 // TODO: should we check that firstRoleReplyReceived is false,
997 // i.e., check only whether the first request fails?
998 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
999 boolean isBadVendorError =
1000 (error.getErrorType() == OFError.OFErrorType.
1001 OFPET_BAD_REQUEST.getValue());
1002 // We expect to receive a bad vendor error when
1003 // we're connected to a switch that doesn't support
1004 // the Nicira vendor extensions (i.e. not OVS or
1005 // derived from OVS). By protocol, it should also be
1006 // BAD_VENDOR, but too many switch implementations
1007 // get it wrong and we can already check the xid()
1008 // so we can ignore the type with confidence that this
1009 // is not a spurious error
1010 shouldLogError = !isBadVendorError;
1011 if (isBadVendorError) {
1012 if (state.firstRoleReplyReceived && (role != null)) {
1013 log.warn("Received ERROR from sw {} that "
1014 +"indicates roles are not supported "
1015 +"but we have received a valid "
1016 +"role reply earlier", sw);
1017 }
1018 state.firstRoleReplyReceived = true;
1019 sw.deliverRoleRequestNotSupported(error.getXid());
1020 synchronized(roleChanger) {
1021 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1022 // the switch doesn't understand role request
1023 // messages and the current controller role is
1024 // slave. We need to disconnect the switch.
1025 // @see RoleChanger for rationale
1026 sw.getChannel().close();
1027 }
1028 else if (sw.role == null) {
1029 // Controller's role is master: add to
1030 // active
1031 // TODO: check if clearing flow table is
1032 // right choice here.
1033 // Need to clear FlowMods before we add the switch
1034 // and dispatch updates otherwise we have a race condition.
1035 // TODO: switch update is async. Won't we still have a potential
1036 // race condition?
1037 sw.clearAllFlowMods();
1038 addSwitch(sw);
1039 }
1040 }
1041 }
1042 else {
1043 // TODO: Is this the right thing to do if we receive
1044 // some other error besides a bad vendor error?
1045 // Presumably that means the switch did actually
1046 // understand the role request message, but there
1047 // was some other error from processing the message.
1048 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1049 // error code, but it doesn't look like the Nicira
1050 // role request has that. Should check OVS source
1051 // code to see if it's possible for any other errors
1052 // to be returned.
1053 // If we received an error the switch is not
1054 // in the correct role, so we need to disconnect it.
1055 // We could also resend the request but then we need to
1056 // check if there are other pending request in which
1057 // case we shouldn't resend. If we do resend we need
1058 // to make sure that the switch eventually accepts one
1059 // of our requests or disconnect the switch. This feels
1060 // cumbersome.
1061 sw.getChannel().close();
1062 }
1063 }
1064 // Once we support OF 1.2, we'd add code to handle it here.
1065 //if (error.getXid() == state.ofRoleRequestXid) {
1066 //}
1067 if (shouldLogError)
1068 logError(sw, error);
1069 break;
1070 case STATS_REPLY:
1071 if (state.hsState.ordinal() <
1072 HandshakeState.FEATURES_REPLY.ordinal()) {
1073 String em = "Unexpected STATS_REPLY from " + sw;
1074 throw new SwitchStateException(em);
1075 }
1076 sw.deliverStatisticsReply(m);
1077 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1078 processSwitchDescReply();
1079 }
1080 break;
1081 case PORT_STATUS:
1082 // We want to update our port state info even if we're in
1083 // the slave role, but we only want to update storage if
1084 // we're the master (or equal).
1085 boolean updateStorage = state.hsState.
1086 equals(HandshakeState.READY) &&
1087 (sw.getRole() != Role.SLAVE);
1088 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1089 shouldHandleMessage = true;
1090 break;
1091
1092 default:
1093 shouldHandleMessage = true;
1094 break;
1095 }
1096
1097 if (shouldHandleMessage) {
1098 sw.getListenerReadLock().lock();
1099 try {
1100 if (sw.isConnected()) {
1101 if (!state.hsState.equals(HandshakeState.READY)) {
1102 log.debug("Ignoring message type {} received " +
1103 "from switch {} before switch is " +
1104 "fully configured.", m.getType(), sw);
1105 }
1106 // Check if the controller is in the slave role for the
1107 // switch. If it is, then don't dispatch the message to
1108 // the listeners.
1109 // TODO: Should we dispatch messages that we expect to
1110 // receive when we're in the slave role, e.g. port
1111 // status messages? Since we're "hiding" switches from
1112 // the listeners when we're in the slave role, then it
1113 // seems a little weird to dispatch port status messages
1114 // to them. On the other hand there might be special
1115 // modules that care about all of the connected switches
1116 // and would like to receive port status notifications.
1117 else if (sw.getRole() == Role.SLAVE) {
1118 // Don't log message if it's a port status message
1119 // since we expect to receive those from the switch
1120 // and don't want to emit spurious messages.
1121 if (m.getType() != OFType.PORT_STATUS) {
1122 log.debug("Ignoring message type {} received " +
1123 "from switch {} while in the slave role.",
1124 m.getType(), sw);
1125 }
1126 } else {
1127 handleMessage(sw, m, null);
1128 }
1129 }
1130 }
1131 finally {
1132 sw.getListenerReadLock().unlock();
1133 }
1134 }
1135 }
1136 }
1137
1138 // ****************
1139 // Message handlers
1140 // ****************
1141
1142 protected void handlePortStatusMessage(IOFSwitch sw,
1143 OFPortStatus m,
1144 boolean updateStorage) {
1145 short portNumber = m.getDesc().getPortNumber();
1146 OFPhysicalPort port = m.getDesc();
1147 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001148 int srcPortState = port.getState();
1149 boolean portUp = ((srcPortState &
1150 OFPortState.OFPPS_STP_MASK.getValue()) !=
1151 OFPortState.OFPPS_STP_BLOCK.getValue());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001152 sw.setPort(port);
Pankaj Berde6debb042013-01-16 18:04:32 -08001153 if (portUp) {
1154 swStore.addPort(sw.getStringId(), port);
1155 } else {
1156 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1157 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001158 if (updateStorage)
1159 updatePortInfo(sw, port);
1160 log.debug("Port #{} modified for {}", portNumber, sw);
1161 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1162 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001163 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001164 if (updateStorage)
1165 updatePortInfo(sw, port);
1166 log.debug("Port #{} added for {}", portNumber, sw);
1167 } else if (m.getReason() ==
1168 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1169 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001170 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001171 if (updateStorage)
1172 removePortInfo(sw, portNumber);
1173 log.debug("Port #{} deleted for {}", portNumber, sw);
1174 }
1175 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1176 try {
1177 this.updates.put(update);
1178 } catch (InterruptedException e) {
1179 log.error("Failure adding update to queue", e);
1180 }
1181 }
1182
1183 /**
1184 * flcontext_cache - Keep a thread local stack of contexts
1185 */
1186 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1187 new ThreadLocal <Stack<FloodlightContext>> () {
1188 @Override
1189 protected Stack<FloodlightContext> initialValue() {
1190 return new Stack<FloodlightContext>();
1191 }
1192 };
1193
1194 /**
1195 * flcontext_alloc - pop a context off the stack, if required create a new one
1196 * @return FloodlightContext
1197 */
1198 protected static FloodlightContext flcontext_alloc() {
1199 FloodlightContext flcontext = null;
1200
1201 if (flcontext_cache.get().empty()) {
1202 flcontext = new FloodlightContext();
1203 }
1204 else {
1205 flcontext = flcontext_cache.get().pop();
1206 }
1207
1208 return flcontext;
1209 }
1210
1211 /**
1212 * flcontext_free - Free the context to the current thread
1213 * @param flcontext
1214 */
1215 protected void flcontext_free(FloodlightContext flcontext) {
1216 flcontext.getStorage().clear();
1217 flcontext_cache.get().push(flcontext);
1218 }
1219
1220 /**
1221 * Handle replies to certain OFMessages, and pass others off to listeners
1222 * @param sw The switch for the message
1223 * @param m The message
1224 * @param bContext The floodlight context. If null then floodlight context would
1225 * be allocated in this function
1226 * @throws IOException
1227 */
1228 @LogMessageDocs({
1229 @LogMessageDoc(level="ERROR",
1230 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1231 " field is empty.",
1232 explanation="The switch sent an improperly-formatted PacketIn" +
1233 " message",
1234 recommendation=LogMessageDoc.CHECK_SWITCH),
1235 @LogMessageDoc(level="WARN",
1236 message="Unhandled OF Message: {} from {}",
1237 explanation="The switch sent a message not handled by " +
1238 "the controller")
1239 })
1240 protected void handleMessage(IOFSwitch sw, OFMessage m,
1241 FloodlightContext bContext)
1242 throws IOException {
1243 Ethernet eth = null;
1244
1245 switch (m.getType()) {
1246 case PACKET_IN:
1247 OFPacketIn pi = (OFPacketIn)m;
1248
1249 if (pi.getPacketData().length <= 0) {
1250 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1251 ") because the data field is empty.");
1252 return;
1253 }
1254
1255 if (Controller.ALWAYS_DECODE_ETH) {
1256 eth = new Ethernet();
1257 eth.deserialize(pi.getPacketData(), 0,
1258 pi.getPacketData().length);
1259 counterStore.updatePacketInCounters(sw, m, eth);
1260 }
1261 // fall through to default case...
1262
1263 default:
1264
1265 List<IOFMessageListener> listeners = null;
1266 if (messageListeners.containsKey(m.getType())) {
1267 listeners = messageListeners.get(m.getType()).
1268 getOrderedListeners();
1269 }
1270
1271 FloodlightContext bc = null;
1272 if (listeners != null) {
1273 // Check if floodlight context is passed from the calling
1274 // function, if so use that floodlight context, otherwise
1275 // allocate one
1276 if (bContext == null) {
1277 bc = flcontext_alloc();
1278 } else {
1279 bc = bContext;
1280 }
1281 if (eth != null) {
1282 IFloodlightProviderService.bcStore.put(bc,
1283 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1284 eth);
1285 }
1286
1287 // Get the starting time (overall and per-component) of
1288 // the processing chain for this packet if performance
1289 // monitoring is turned on
1290 pktinProcTime.bootstrap(listeners);
1291 pktinProcTime.recordStartTimePktIn();
1292 Command cmd;
1293 for (IOFMessageListener listener : listeners) {
1294 if (listener instanceof IOFSwitchFilter) {
1295 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1296 continue;
1297 }
1298 }
1299
1300 pktinProcTime.recordStartTimeComp(listener);
1301 cmd = listener.receive(sw, m, bc);
1302 pktinProcTime.recordEndTimeComp(listener);
1303
1304 if (Command.STOP.equals(cmd)) {
1305 break;
1306 }
1307 }
1308 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1309 } else {
1310 log.warn("Unhandled OF Message: {} from {}", m, sw);
1311 }
1312
1313 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1314 }
1315 }
1316
1317 /**
1318 * Log an OpenFlow error message from a switch
1319 * @param sw The switch that sent the error
1320 * @param error The error message
1321 */
1322 @LogMessageDoc(level="ERROR",
1323 message="Error {error type} {error code} from {switch}",
1324 explanation="The switch responded with an unexpected error" +
1325 "to an OpenFlow message from the controller",
1326 recommendation="This could indicate improper network operation. " +
1327 "If the problem persists restarting the switch and " +
1328 "controller may help."
1329 )
1330 protected void logError(IOFSwitch sw, OFError error) {
1331 int etint = 0xffff & error.getErrorType();
1332 if (etint < 0 || etint >= OFErrorType.values().length) {
1333 log.error("Unknown error code {} from sw {}", etint, sw);
1334 }
1335 OFErrorType et = OFErrorType.values()[etint];
1336 switch (et) {
1337 case OFPET_HELLO_FAILED:
1338 OFHelloFailedCode hfc =
1339 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1340 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1341 break;
1342 case OFPET_BAD_REQUEST:
1343 OFBadRequestCode brc =
1344 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1345 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1346 break;
1347 case OFPET_BAD_ACTION:
1348 OFBadActionCode bac =
1349 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1350 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1351 break;
1352 case OFPET_FLOW_MOD_FAILED:
1353 OFFlowModFailedCode fmfc =
1354 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1355 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1356 break;
1357 case OFPET_PORT_MOD_FAILED:
1358 OFPortModFailedCode pmfc =
1359 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1360 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1361 break;
1362 case OFPET_QUEUE_OP_FAILED:
1363 OFQueueOpFailedCode qofc =
1364 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1365 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1366 break;
1367 default:
1368 break;
1369 }
1370 }
1371
1372 /**
1373 * Add a switch to the active switch list and call the switch listeners.
1374 * This happens either when a switch first connects (and the controller is
1375 * not in the slave role) or when the role of the controller changes from
1376 * slave to master.
1377 * @param sw the switch that has been added
1378 */
1379 // TODO: need to rethink locking and the synchronous switch update.
1380 // We can / should also handle duplicate DPIDs in connectedSwitches
1381 @LogMessageDoc(level="ERROR",
1382 message="New switch added {switch} for already-added switch {switch}",
1383 explanation="A switch with the same DPID as another switch " +
1384 "connected to the controller. This can be caused by " +
1385 "multiple switches configured with the same DPID, or " +
1386 "by a switch reconnected very quickly after " +
1387 "disconnecting.",
1388 recommendation="If this happens repeatedly, it is likely there " +
1389 "are switches with duplicate DPIDs on the network. " +
1390 "Reconfigure the appropriate switches. If it happens " +
1391 "very rarely, then it is likely this is a transient " +
1392 "network problem that can be ignored."
1393 )
1394 protected void addSwitch(IOFSwitch sw) {
1395 // TODO: is it safe to modify the HashMap without holding
1396 // the old switch's lock?
1397 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1398 if (sw == oldSw) {
1399 // Note == for object equality, not .equals for value
1400 log.info("New add switch for pre-existing switch {}", sw);
1401 return;
1402 }
1403
1404 if (oldSw != null) {
1405 oldSw.getListenerWriteLock().lock();
1406 try {
1407 log.error("New switch added {} for already-added switch {}",
1408 sw, oldSw);
1409 // Set the connected flag to false to suppress calling
1410 // the listeners for this switch in processOFMessage
1411 oldSw.setConnected(false);
1412
1413 oldSw.cancelAllStatisticsReplies();
1414
1415 updateInactiveSwitchInfo(oldSw);
1416
1417 // we need to clean out old switch state definitively
1418 // before adding the new switch
1419 // FIXME: It seems not completely kosher to call the
1420 // switch listeners here. I thought one of the points of
1421 // having the asynchronous switch update mechanism was so
1422 // the addedSwitch and removedSwitch were always called
1423 // from a single thread to simplify concurrency issues
1424 // for the listener.
1425 if (switchListeners != null) {
1426 for (IOFSwitchListener listener : switchListeners) {
1427 listener.removedSwitch(oldSw);
1428 }
1429 }
1430 // will eventually trigger a removeSwitch(), which will cause
1431 // a "Not removing Switch ... already removed debug message.
1432 // TODO: Figure out a way to handle this that avoids the
1433 // spurious debug message.
1434 oldSw.getChannel().close();
1435 }
1436 finally {
1437 oldSw.getListenerWriteLock().unlock();
1438 }
1439 }
1440
1441 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001442 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001443 for (OFPhysicalPort port: sw.getPorts()) {
1444 swStore.addPort(sw.getStringId(), port);
1445 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001446 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1447 try {
1448 this.updates.put(update);
1449 } catch (InterruptedException e) {
1450 log.error("Failure adding update to queue", e);
1451 }
1452 }
1453
1454 /**
1455 * Remove a switch from the active switch list and call the switch listeners.
1456 * This happens either when the switch is disconnected or when the
1457 * controller's role for the switch changes from master to slave.
1458 * @param sw the switch that has been removed
1459 */
1460 protected void removeSwitch(IOFSwitch sw) {
1461 // No need to acquire the listener lock, since
1462 // this method is only called after netty has processed all
1463 // pending messages
1464 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001465 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001466 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1467 log.debug("Not removing switch {}; already removed", sw);
1468 return;
1469 }
1470 // We cancel all outstanding statistics replies if the switch transition
1471 // from active. In the future we might allow statistics requests
1472 // from slave controllers. Then we need to move this cancelation
1473 // to switch disconnect
1474 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001475
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001476
1477 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1478 // here if role support is enabled. In that case if the switch is being
1479 // removed because we've been switched to being in the slave role, then I think
1480 // it's possible that the new master may have already been promoted to master
1481 // and written out the active switch state to storage. If we now execute
1482 // updateInactiveSwitchInfo we may wipe out all of the state that was
1483 // written out by the new master. Maybe need to revisit how we handle all
1484 // of the switch state that's written to storage.
1485
1486 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001487
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001488 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1489 try {
1490 this.updates.put(update);
1491 } catch (InterruptedException e) {
1492 log.error("Failure adding update to queue", e);
1493 }
1494 }
1495
1496 // ***************
1497 // IFloodlightProvider
1498 // ***************
1499
1500 @Override
1501 public synchronized void addOFMessageListener(OFType type,
1502 IOFMessageListener listener) {
1503 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1504 messageListeners.get(type);
1505 if (ldd == null) {
1506 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1507 messageListeners.put(type, ldd);
1508 }
1509 ldd.addListener(type, listener);
1510 }
1511
1512 @Override
1513 public synchronized void removeOFMessageListener(OFType type,
1514 IOFMessageListener listener) {
1515 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1516 messageListeners.get(type);
1517 if (ldd != null) {
1518 ldd.removeListener(listener);
1519 }
1520 }
1521
1522 private void logListeners() {
1523 for (Map.Entry<OFType,
1524 ListenerDispatcher<OFType,
1525 IOFMessageListener>> entry
1526 : messageListeners.entrySet()) {
1527
1528 OFType type = entry.getKey();
1529 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1530 entry.getValue();
1531
1532 StringBuffer sb = new StringBuffer();
1533 sb.append("OFListeners for ");
1534 sb.append(type);
1535 sb.append(": ");
1536 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1537 sb.append(l.getName());
1538 sb.append(",");
1539 }
1540 log.debug(sb.toString());
1541 }
1542 }
1543
1544 public void removeOFMessageListeners(OFType type) {
1545 messageListeners.remove(type);
1546 }
1547
1548 @Override
1549 public Map<Long, IOFSwitch> getSwitches() {
1550 return Collections.unmodifiableMap(this.activeSwitches);
1551 }
1552
1553 @Override
1554 public void addOFSwitchListener(IOFSwitchListener listener) {
1555 this.switchListeners.add(listener);
1556 }
1557
1558 @Override
1559 public void removeOFSwitchListener(IOFSwitchListener listener) {
1560 this.switchListeners.remove(listener);
1561 }
1562
1563 @Override
1564 public Map<OFType, List<IOFMessageListener>> getListeners() {
1565 Map<OFType, List<IOFMessageListener>> lers =
1566 new HashMap<OFType, List<IOFMessageListener>>();
1567 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1568 messageListeners.entrySet()) {
1569 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1570 }
1571 return Collections.unmodifiableMap(lers);
1572 }
1573
1574 @Override
1575 @LogMessageDocs({
1576 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1577 "a null switch",
1578 explanation="Failed to process a message because the switch " +
1579 " is no longer connected."),
1580 @LogMessageDoc(level="ERROR",
1581 message="Error reinjecting OFMessage on switch {switch}",
1582 explanation="An I/O error occured while attempting to " +
1583 "process an OpenFlow message",
1584 recommendation=LogMessageDoc.CHECK_SWITCH)
1585 })
1586 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1587 FloodlightContext bc) {
1588 if (sw == null) {
1589 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1590 return false;
1591 }
1592
1593 // FIXME: Do we need to be able to inject messages to switches
1594 // where we're the slave controller (i.e. they're connected but
1595 // not active)?
1596 // FIXME: Don't we need synchronization logic here so we're holding
1597 // the listener read lock when we call handleMessage? After some
1598 // discussions it sounds like the right thing to do here would be to
1599 // inject the message as a netty upstream channel event so it goes
1600 // through the normal netty event processing, including being
1601 // handled
1602 if (!activeSwitches.containsKey(sw.getId())) return false;
1603
1604 try {
1605 // Pass Floodlight context to the handleMessages()
1606 handleMessage(sw, msg, bc);
1607 } catch (IOException e) {
1608 log.error("Error reinjecting OFMessage on switch {}",
1609 HexString.toHexString(sw.getId()));
1610 return false;
1611 }
1612 return true;
1613 }
1614
1615 @Override
1616 @LogMessageDoc(message="Calling System.exit",
1617 explanation="The controller is terminating")
1618 public synchronized void terminate() {
1619 log.info("Calling System.exit");
1620 System.exit(1);
1621 }
1622
1623 @Override
1624 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1625 // call the overloaded version with floodlight context set to null
1626 return injectOfMessage(sw, msg, null);
1627 }
1628
1629 @Override
1630 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1631 FloodlightContext bc) {
1632 if (log.isTraceEnabled()) {
1633 String str = OFMessage.getDataAsString(sw, m, bc);
1634 log.trace("{}", str);
1635 }
1636
1637 List<IOFMessageListener> listeners = null;
1638 if (messageListeners.containsKey(m.getType())) {
1639 listeners =
1640 messageListeners.get(m.getType()).getOrderedListeners();
1641 }
1642
1643 if (listeners != null) {
1644 for (IOFMessageListener listener : listeners) {
1645 if (listener instanceof IOFSwitchFilter) {
1646 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1647 continue;
1648 }
1649 }
1650 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1651 break;
1652 }
1653 }
1654 }
1655 }
1656
1657 @Override
1658 public BasicFactory getOFMessageFactory() {
1659 return factory;
1660 }
1661
1662 @Override
1663 public String getControllerId() {
1664 return controllerId;
1665 }
1666
1667 // **************
1668 // Initialization
1669 // **************
1670
1671 protected void updateAllInactiveSwitchInfo() {
1672 if (role == Role.SLAVE) {
1673 return;
1674 }
1675 String controllerId = getControllerId();
1676 String[] switchColumns = { SWITCH_DATAPATH_ID,
1677 SWITCH_CONTROLLER_ID,
1678 SWITCH_ACTIVE };
1679 String[] portColumns = { PORT_ID, PORT_SWITCH };
1680 IResultSet switchResultSet = null;
1681 try {
1682 OperatorPredicate op =
1683 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1684 OperatorPredicate.Operator.EQ,
1685 controllerId);
1686 switchResultSet =
1687 storageSource.executeQuery(SWITCH_TABLE_NAME,
1688 switchColumns,
1689 op, null);
1690 while (switchResultSet.next()) {
1691 IResultSet portResultSet = null;
1692 try {
1693 String datapathId =
1694 switchResultSet.getString(SWITCH_DATAPATH_ID);
1695 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1696 op = new OperatorPredicate(PORT_SWITCH,
1697 OperatorPredicate.Operator.EQ,
1698 datapathId);
1699 portResultSet =
1700 storageSource.executeQuery(PORT_TABLE_NAME,
1701 portColumns,
1702 op, null);
1703 while (portResultSet.next()) {
1704 portResultSet.deleteRow();
1705 }
1706 portResultSet.save();
1707 }
1708 finally {
1709 if (portResultSet != null)
1710 portResultSet.close();
1711 }
1712 }
1713 switchResultSet.save();
1714 }
1715 finally {
1716 if (switchResultSet != null)
1717 switchResultSet.close();
1718 }
1719 }
1720
1721 protected void updateControllerInfo() {
1722 updateAllInactiveSwitchInfo();
1723
1724 // Write out the controller info to the storage source
1725 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1726 String id = getControllerId();
1727 controllerInfo.put(CONTROLLER_ID, id);
1728 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1729 }
1730
1731 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1732 if (role == Role.SLAVE) {
1733 return;
1734 }
1735 // Obtain the row info for the switch
1736 Map<String, Object> switchInfo = new HashMap<String, Object>();
1737 String datapathIdString = sw.getStringId();
1738 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1739 String controllerId = getControllerId();
1740 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1741 Date connectedSince = sw.getConnectedSince();
1742 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1743 Channel channel = sw.getChannel();
1744 SocketAddress socketAddress = channel.getRemoteAddress();
1745 if (socketAddress != null) {
1746 String socketAddressString = socketAddress.toString();
1747 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1748 if (socketAddress instanceof InetSocketAddress) {
1749 InetSocketAddress inetSocketAddress =
1750 (InetSocketAddress)socketAddress;
1751 InetAddress inetAddress = inetSocketAddress.getAddress();
1752 String ip = inetAddress.getHostAddress();
1753 switchInfo.put(SWITCH_IP, ip);
1754 }
1755 }
1756
1757 // Write out the switch features info
1758 long capabilities = U32.f(sw.getCapabilities());
1759 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1760 long buffers = U32.f(sw.getBuffers());
1761 switchInfo.put(SWITCH_BUFFERS, buffers);
1762 long tables = U32.f(sw.getTables());
1763 switchInfo.put(SWITCH_TABLES, tables);
1764 long actions = U32.f(sw.getActions());
1765 switchInfo.put(SWITCH_ACTIONS, actions);
1766 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1767
1768 // Update the switch
1769 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1770
1771 // Update the ports
1772 for (OFPhysicalPort port: sw.getPorts()) {
1773 updatePortInfo(sw, port);
1774 }
1775 }
1776
1777 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1778 if (role == Role.SLAVE) {
1779 return;
1780 }
1781 log.debug("Update DB with inactiveSW {}", sw);
1782 // Update the controller info in the storage source to be inactive
1783 Map<String, Object> switchInfo = new HashMap<String, Object>();
1784 String datapathIdString = sw.getStringId();
1785 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1786 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1787 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1788 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1789 }
1790
1791 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1792 if (role == Role.SLAVE) {
1793 return;
1794 }
1795 String datapathIdString = sw.getStringId();
1796 Map<String, Object> portInfo = new HashMap<String, Object>();
1797 int portNumber = U16.f(port.getPortNumber());
1798 String id = datapathIdString + "|" + portNumber;
1799 portInfo.put(PORT_ID, id);
1800 portInfo.put(PORT_SWITCH, datapathIdString);
1801 portInfo.put(PORT_NUMBER, portNumber);
1802 byte[] hardwareAddress = port.getHardwareAddress();
1803 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1804 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1805 String name = port.getName();
1806 portInfo.put(PORT_NAME, name);
1807 long config = U32.f(port.getConfig());
1808 portInfo.put(PORT_CONFIG, config);
1809 long state = U32.f(port.getState());
1810 portInfo.put(PORT_STATE, state);
1811 long currentFeatures = U32.f(port.getCurrentFeatures());
1812 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1813 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1814 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1815 long supportedFeatures = U32.f(port.getSupportedFeatures());
1816 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1817 long peerFeatures = U32.f(port.getPeerFeatures());
1818 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1819 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1820 }
1821
1822 /**
1823 * Read switch port data from storage and write it into a switch object
1824 * @param sw the switch to update
1825 */
1826 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1827 OperatorPredicate op =
1828 new OperatorPredicate(PORT_SWITCH,
1829 OperatorPredicate.Operator.EQ,
1830 sw.getStringId());
1831 IResultSet portResultSet =
1832 storageSource.executeQuery(PORT_TABLE_NAME,
1833 null, op, null);
1834 //Map<Short, OFPhysicalPort> oldports =
1835 // new HashMap<Short, OFPhysicalPort>();
1836 //oldports.putAll(sw.getPorts());
1837
1838 while (portResultSet.next()) {
1839 try {
1840 OFPhysicalPort p = new OFPhysicalPort();
1841 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1842 p.setName(portResultSet.getString(PORT_NAME));
1843 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1844 p.setState((int)portResultSet.getLong(PORT_STATE));
1845 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1846 p.setHardwareAddress(HexString.fromHexString(portMac));
1847 p.setCurrentFeatures((int)portResultSet.
1848 getLong(PORT_CURRENT_FEATURES));
1849 p.setAdvertisedFeatures((int)portResultSet.
1850 getLong(PORT_ADVERTISED_FEATURES));
1851 p.setSupportedFeatures((int)portResultSet.
1852 getLong(PORT_SUPPORTED_FEATURES));
1853 p.setPeerFeatures((int)portResultSet.
1854 getLong(PORT_PEER_FEATURES));
1855 //oldports.remove(Short.valueOf(p.getPortNumber()));
1856 sw.setPort(p);
1857 } catch (NullPointerException e) {
1858 // ignore
1859 }
1860 }
1861 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1862 try {
1863 this.updates.put(update);
1864 } catch (InterruptedException e) {
1865 log.error("Failure adding update to queue", e);
1866 }
1867 }
1868
1869 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1870 if (role == Role.SLAVE) {
1871 return;
1872 }
1873 String datapathIdString = sw.getStringId();
1874 String id = datapathIdString + "|" + portNumber;
1875 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1876 }
1877
1878 /**
1879 * Sets the initial role based on properties in the config params.
1880 * It looks for two different properties.
1881 * If the "role" property is specified then the value should be
1882 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1883 * controller is set to the specified value. If the "role" property
1884 * is not specified then it looks next for the "role.path" property.
1885 * In this case the value should be the path to a property file in
1886 * the file system that contains a property called "floodlight.role"
1887 * which can be one of the values listed above for the "role" property.
1888 * The idea behind the "role.path" mechanism is that you have some
1889 * separate heartbeat and master controller election algorithm that
1890 * determines the role of the controller. When a role transition happens,
1891 * it updates the current role in the file specified by the "role.path"
1892 * file. Then if floodlight restarts for some reason it can get the
1893 * correct current role of the controller from the file.
1894 * @param configParams The config params for the FloodlightProvider service
1895 * @return A valid role if role information is specified in the
1896 * config params, otherwise null
1897 */
1898 @LogMessageDocs({
1899 @LogMessageDoc(message="Controller role set to {role}",
1900 explanation="Setting the initial HA role to "),
1901 @LogMessageDoc(level="ERROR",
1902 message="Invalid current role value: {role}",
1903 explanation="An invalid HA role value was read from the " +
1904 "properties file",
1905 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1906 })
1907 protected Role getInitialRole(Map<String, String> configParams) {
1908 Role role = null;
1909 String roleString = configParams.get("role");
1910 if (roleString == null) {
1911 String rolePath = configParams.get("rolepath");
1912 if (rolePath != null) {
1913 Properties properties = new Properties();
1914 try {
1915 properties.load(new FileInputStream(rolePath));
1916 roleString = properties.getProperty("floodlight.role");
1917 }
1918 catch (IOException exc) {
1919 // Don't treat it as an error if the file specified by the
1920 // rolepath property doesn't exist. This lets us enable the
1921 // HA mechanism by just creating/setting the floodlight.role
1922 // property in that file without having to modify the
1923 // floodlight properties.
1924 }
1925 }
1926 }
1927
1928 if (roleString != null) {
1929 // Canonicalize the string to the form used for the enum constants
1930 roleString = roleString.trim().toUpperCase();
1931 try {
1932 role = Role.valueOf(roleString);
1933 }
1934 catch (IllegalArgumentException exc) {
1935 log.error("Invalid current role value: {}", roleString);
1936 }
1937 }
1938
1939 log.info("Controller role set to {}", role);
1940
1941 return role;
1942 }
1943
1944 /**
1945 * Tell controller that we're ready to accept switches loop
1946 * @throws IOException
1947 */
1948 @LogMessageDocs({
1949 @LogMessageDoc(message="Listening for switch connections on {address}",
1950 explanation="The controller is ready and listening for new" +
1951 " switch connections"),
1952 @LogMessageDoc(message="Storage exception in controller " +
1953 "updates loop; terminating process",
1954 explanation=ERROR_DATABASE,
1955 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1956 @LogMessageDoc(level="ERROR",
1957 message="Exception in controller updates loop",
1958 explanation="Failed to dispatch controller event",
1959 recommendation=LogMessageDoc.GENERIC_ACTION)
1960 })
1961 public void run() {
1962 if (log.isDebugEnabled()) {
1963 logListeners();
1964 }
1965
1966 try {
1967 final ServerBootstrap bootstrap = createServerBootStrap();
1968
1969 bootstrap.setOption("reuseAddr", true);
1970 bootstrap.setOption("child.keepAlive", true);
1971 bootstrap.setOption("child.tcpNoDelay", true);
1972 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
1973
1974 ChannelPipelineFactory pfact =
1975 new OpenflowPipelineFactory(this, null);
1976 bootstrap.setPipelineFactory(pfact);
1977 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
1978 final ChannelGroup cg = new DefaultChannelGroup();
1979 cg.add(bootstrap.bind(sa));
1980
1981 log.info("Listening for switch connections on {}", sa);
1982 } catch (Exception e) {
1983 throw new RuntimeException(e);
1984 }
1985
1986 // main loop
1987 while (true) {
1988 try {
1989 IUpdate update = updates.take();
1990 update.dispatch();
1991 } catch (InterruptedException e) {
1992 return;
1993 } catch (StorageException e) {
1994 log.error("Storage exception in controller " +
1995 "updates loop; terminating process", e);
1996 return;
1997 } catch (Exception e) {
1998 log.error("Exception in controller updates loop", e);
1999 }
2000 }
2001 }
2002
2003 private ServerBootstrap createServerBootStrap() {
2004 if (workerThreads == 0) {
2005 return new ServerBootstrap(
2006 new NioServerSocketChannelFactory(
2007 Executors.newCachedThreadPool(),
2008 Executors.newCachedThreadPool()));
2009 } else {
2010 return new ServerBootstrap(
2011 new NioServerSocketChannelFactory(
2012 Executors.newCachedThreadPool(),
2013 Executors.newCachedThreadPool(), workerThreads));
2014 }
2015 }
2016
2017 public void setConfigParams(Map<String, String> configParams) {
2018 String ofPort = configParams.get("openflowport");
2019 if (ofPort != null) {
2020 this.openFlowPort = Integer.parseInt(ofPort);
2021 }
2022 log.debug("OpenFlow port set to {}", this.openFlowPort);
2023 String threads = configParams.get("workerthreads");
2024 if (threads != null) {
2025 this.workerThreads = Integer.parseInt(threads);
2026 }
2027 log.debug("Number of worker threads set to {}", this.workerThreads);
2028 String controllerId = configParams.get("controllerid");
2029 if (controllerId != null) {
2030 this.controllerId = controllerId;
2031 }
2032 log.debug("ControllerId set to {}", this.controllerId);
2033 }
2034
2035 private void initVendorMessages() {
2036 // Configure openflowj to be able to parse the role request/reply
2037 // vendor messages.
2038 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2039 OFNiciraVendorData.NX_VENDOR_ID, 4);
2040 OFVendorId.registerVendorId(niciraVendorId);
2041 OFBasicVendorDataType roleRequestVendorData =
2042 new OFBasicVendorDataType(
2043 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2044 OFRoleRequestVendorData.getInstantiable());
2045 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2046 OFBasicVendorDataType roleReplyVendorData =
2047 new OFBasicVendorDataType(
2048 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2049 OFRoleReplyVendorData.getInstantiable());
2050 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2051 }
2052
2053 /**
2054 * Initialize internal data structures
2055 */
2056 public void init(Map<String, String> configParams) {
2057 // These data structures are initialized here because other
2058 // module's startUp() might be called before ours
2059 this.messageListeners =
2060 new ConcurrentHashMap<OFType,
2061 ListenerDispatcher<OFType,
2062 IOFMessageListener>>();
2063 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2064 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2065 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2066 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2067 this.controllerNodeIPsCache = new HashMap<String, String>();
2068 this.updates = new LinkedBlockingQueue<IUpdate>();
2069 this.factory = new BasicFactory();
2070 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2071 setConfigParams(configParams);
2072 this.role = getInitialRole(configParams);
2073 this.roleChanger = new RoleChanger();
2074 initVendorMessages();
2075 this.systemStartTime = System.currentTimeMillis();
Pankaj Berdeafb20532013-01-08 15:05:24 -08002076 this.swStore = new SwitchStorageImpl();
Pankaj Berde67f88fb2013-01-15 17:16:01 -08002077 this.swStore.init("/tmp/cassandra.titan");
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002078 }
2079
2080 /**
2081 * Startup all of the controller's components
2082 */
2083 @LogMessageDoc(message="Waiting for storage source",
2084 explanation="The system database is not yet ready",
2085 recommendation="If this message persists, this indicates " +
2086 "that the system database has failed to start. " +
2087 LogMessageDoc.CHECK_CONTROLLER)
2088 public void startupComponents() {
2089 // Create the table names we use
2090 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2091 storageSource.createTable(SWITCH_TABLE_NAME, null);
2092 storageSource.createTable(PORT_TABLE_NAME, null);
2093 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2094 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2095 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2096 CONTROLLER_ID);
2097 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2098 SWITCH_DATAPATH_ID);
2099 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2100 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2101 CONTROLLER_INTERFACE_ID);
2102 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2103
2104 while (true) {
2105 try {
2106 updateControllerInfo();
2107 break;
2108 }
2109 catch (StorageException e) {
2110 log.info("Waiting for storage source");
2111 try {
2112 Thread.sleep(1000);
2113 } catch (InterruptedException e1) {
2114 }
2115 }
2116 }
2117
2118 // Add our REST API
2119 restApi.addRestletRoutable(new CoreWebRoutable());
2120 }
2121
2122 @Override
2123 public void addInfoProvider(String type, IInfoProvider provider) {
2124 if (!providerMap.containsKey(type)) {
2125 providerMap.put(type, new ArrayList<IInfoProvider>());
2126 }
2127 providerMap.get(type).add(provider);
2128 }
2129
2130 @Override
2131 public void removeInfoProvider(String type, IInfoProvider provider) {
2132 if (!providerMap.containsKey(type)) {
2133 log.debug("Provider type {} doesn't exist.", type);
2134 return;
2135 }
2136
2137 providerMap.get(type).remove(provider);
2138 }
2139
2140 public Map<String, Object> getControllerInfo(String type) {
2141 if (!providerMap.containsKey(type)) return null;
2142
2143 Map<String, Object> result = new LinkedHashMap<String, Object>();
2144 for (IInfoProvider provider : providerMap.get(type)) {
2145 result.putAll(provider.getInfo(type));
2146 }
2147
2148 return result;
2149 }
2150
2151 @Override
2152 public void addHAListener(IHAListener listener) {
2153 this.haListeners.add(listener);
2154 }
2155
2156 @Override
2157 public void removeHAListener(IHAListener listener) {
2158 this.haListeners.remove(listener);
2159 }
2160
2161
2162 /**
2163 * Handle changes to the controller nodes IPs and dispatch update.
2164 */
2165 @SuppressWarnings("unchecked")
2166 protected void handleControllerNodeIPChanges() {
2167 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2168 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2169 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2170 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2171 CONTROLLER_INTERFACE_TYPE,
2172 CONTROLLER_INTERFACE_NUMBER,
2173 CONTROLLER_INTERFACE_DISCOVERED_IP };
2174 synchronized(controllerNodeIPsCache) {
2175 // We currently assume that interface Ethernet0 is the relevant
2176 // controller interface. Might change.
2177 // We could (should?) implement this using
2178 // predicates, but creating the individual and compound predicate
2179 // seems more overhead then just checking every row. Particularly,
2180 // since the number of rows is small and changes infrequent
2181 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2182 colNames,null, null);
2183 while (res.next()) {
2184 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2185 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2186 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2187 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2188 String curIP = controllerNodeIPsCache.get(controllerID);
2189
2190 curControllerNodeIPs.put(controllerID, discoveredIP);
2191 if (curIP == null) {
2192 // new controller node IP
2193 addedControllerNodeIPs.put(controllerID, discoveredIP);
2194 }
2195 else if (curIP != discoveredIP) {
2196 // IP changed
2197 removedControllerNodeIPs.put(controllerID, curIP);
2198 addedControllerNodeIPs.put(controllerID, discoveredIP);
2199 }
2200 }
2201 }
2202 // Now figure out if rows have been deleted. We can't use the
2203 // rowKeys from rowsDeleted directly, since the tables primary
2204 // key is a compound that we can't disassemble
2205 Set<String> curEntries = curControllerNodeIPs.keySet();
2206 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2207 removedEntries.removeAll(curEntries);
2208 for (String removedControllerID : removedEntries)
2209 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2210 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2211 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2212 curControllerNodeIPs, addedControllerNodeIPs,
2213 removedControllerNodeIPs);
2214 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2215 try {
2216 this.updates.put(update);
2217 } catch (InterruptedException e) {
2218 log.error("Failure adding update to queue", e);
2219 }
2220 }
2221 }
2222 }
2223
2224 @Override
2225 public Map<String, String> getControllerNodeIPs() {
2226 // We return a copy of the mapping so we can guarantee that
2227 // the mapping return is the same as one that will be (or was)
2228 // dispatched to IHAListeners
2229 HashMap<String,String> retval = new HashMap<String,String>();
2230 synchronized(controllerNodeIPsCache) {
2231 retval.putAll(controllerNodeIPsCache);
2232 }
2233 return retval;
2234 }
2235
2236 @Override
2237 public void rowsModified(String tableName, Set<Object> rowKeys) {
2238 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2239 handleControllerNodeIPChanges();
2240 }
2241
2242 }
2243
2244 @Override
2245 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2246 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2247 handleControllerNodeIPChanges();
2248 }
2249 }
2250
2251 @Override
2252 public long getSystemStartTime() {
2253 return (this.systemStartTime);
2254 }
2255
2256 @Override
2257 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2258 this.alwaysClearFlowsOnSwAdd = value;
2259 }
2260
2261 public boolean getAlwaysClearFlowsOnSwAdd() {
2262 return this.alwaysClearFlowsOnSwAdd;
2263 }
2264}