blob: 74e5fcf76f76769752cc1cfe2b6bf64eaf11b123 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.util.ArrayList;
26import java.nio.channels.ClosedChannelException;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Date;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.LinkedHashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Properties;
38import java.util.Set;
39import java.util.Stack;
40import java.util.concurrent.BlockingQueue;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ConcurrentMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.Executors;
45import java.util.concurrent.Future;
46import java.util.concurrent.LinkedBlockingQueue;
47import java.util.concurrent.RejectedExecutionException;
48import java.util.concurrent.TimeUnit;
49import java.util.concurrent.TimeoutException;
50
51import net.floodlightcontroller.core.FloodlightContext;
52import net.floodlightcontroller.core.IFloodlightProviderService;
53import net.floodlightcontroller.core.IHAListener;
54import net.floodlightcontroller.core.IInfoProvider;
Pankaj Berde8557a462013-01-07 08:59:31 -080055import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IOFMessageListener;
57import net.floodlightcontroller.core.IListener.Command;
58import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080061import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080062import net.floodlightcontroller.core.annotations.LogMessageDoc;
63import net.floodlightcontroller.core.annotations.LogMessageDocs;
64import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
65import net.floodlightcontroller.core.util.ListenerDispatcher;
66import net.floodlightcontroller.core.web.CoreWebRoutable;
67import net.floodlightcontroller.counter.ICounterStoreService;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -080068import net.floodlightcontroller.flowcache.IFlowService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080069import net.floodlightcontroller.mastership.IMastershipService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080070import net.floodlightcontroller.packet.Ethernet;
71import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
72import net.floodlightcontroller.restserver.IRestApiService;
73import net.floodlightcontroller.storage.IResultSet;
74import net.floodlightcontroller.storage.IStorageSourceListener;
75import net.floodlightcontroller.storage.IStorageSourceService;
76import net.floodlightcontroller.storage.OperatorPredicate;
77import net.floodlightcontroller.storage.StorageException;
78import net.floodlightcontroller.threadpool.IThreadPoolService;
79
80import org.jboss.netty.bootstrap.ServerBootstrap;
81import org.jboss.netty.buffer.ChannelBuffer;
82import org.jboss.netty.buffer.ChannelBuffers;
83import org.jboss.netty.channel.Channel;
84import org.jboss.netty.channel.ChannelHandlerContext;
85import org.jboss.netty.channel.ChannelPipelineFactory;
86import org.jboss.netty.channel.ChannelStateEvent;
87import org.jboss.netty.channel.ChannelUpstreamHandler;
88import org.jboss.netty.channel.Channels;
89import org.jboss.netty.channel.ExceptionEvent;
90import org.jboss.netty.channel.MessageEvent;
91import org.jboss.netty.channel.group.ChannelGroup;
92import org.jboss.netty.channel.group.DefaultChannelGroup;
93import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
94import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
95import org.jboss.netty.handler.timeout.IdleStateEvent;
96import org.jboss.netty.handler.timeout.ReadTimeoutException;
97import org.openflow.protocol.OFEchoReply;
98import org.openflow.protocol.OFError;
99import org.openflow.protocol.OFError.OFBadActionCode;
100import org.openflow.protocol.OFError.OFBadRequestCode;
101import org.openflow.protocol.OFError.OFErrorType;
102import org.openflow.protocol.OFError.OFFlowModFailedCode;
103import org.openflow.protocol.OFError.OFHelloFailedCode;
104import org.openflow.protocol.OFError.OFPortModFailedCode;
105import org.openflow.protocol.OFError.OFQueueOpFailedCode;
106import org.openflow.protocol.OFFeaturesReply;
107import org.openflow.protocol.OFGetConfigReply;
108import org.openflow.protocol.OFMessage;
109import org.openflow.protocol.OFPacketIn;
110import org.openflow.protocol.OFPhysicalPort;
111import org.openflow.protocol.OFPortStatus;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800112import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800113import org.openflow.protocol.OFPhysicalPort.OFPortState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800114import org.openflow.protocol.OFPortStatus.OFPortReason;
115import org.openflow.protocol.OFSetConfig;
116import org.openflow.protocol.OFStatisticsRequest;
117import org.openflow.protocol.OFSwitchConfig;
118import org.openflow.protocol.OFType;
119import org.openflow.protocol.OFVendor;
120import org.openflow.protocol.factory.BasicFactory;
121import org.openflow.protocol.factory.MessageParseException;
122import org.openflow.protocol.statistics.OFDescriptionStatistics;
123import org.openflow.protocol.statistics.OFStatistics;
124import org.openflow.protocol.statistics.OFStatisticsType;
125import org.openflow.protocol.vendor.OFBasicVendorDataType;
126import org.openflow.protocol.vendor.OFBasicVendorId;
127import org.openflow.protocol.vendor.OFVendorId;
128import org.openflow.util.HexString;
129import org.openflow.util.U16;
130import org.openflow.util.U32;
131import org.openflow.vendor.nicira.OFNiciraVendorData;
132import org.openflow.vendor.nicira.OFRoleReplyVendorData;
133import org.openflow.vendor.nicira.OFRoleRequestVendorData;
134import org.openflow.vendor.nicira.OFRoleVendorData;
135import org.slf4j.Logger;
136import org.slf4j.LoggerFactory;
137
138
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800139
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800140/**
141 * The main controller class. Handles all setup and network listeners
142 */
143public class Controller implements IFloodlightProviderService,
144 IStorageSourceListener {
Pankaj Berde29ab7fc2013-01-25 06:17:52 -0800145
146 ThreadLocal<SwitchStorageImpl> store = new ThreadLocal<SwitchStorageImpl>() {
147 @Override
148 protected SwitchStorageImpl initialValue() {
149 SwitchStorageImpl swStore = new SwitchStorageImpl();
150 //TODO: Get the file path from global properties
151 swStore.init("/tmp/cassandra.titan");
152 return swStore;
153 }
154 };
155
156 protected SwitchStorageImpl swStore = store.get();
Pankaj Berde8557a462013-01-07 08:59:31 -0800157
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800158 protected static Logger log = LoggerFactory.getLogger(Controller.class);
159
160 private static final String ERROR_DATABASE =
161 "The controller could not communicate with the system database.";
162
163 protected BasicFactory factory;
164 protected ConcurrentMap<OFType,
165 ListenerDispatcher<OFType,IOFMessageListener>>
166 messageListeners;
167 // The activeSwitches map contains only those switches that are actively
168 // being controlled by us -- it doesn't contain switches that are
169 // in the slave role
170 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
171 // connectedSwitches contains all connected switches, including ones where
172 // we're a slave controller. We need to keep track of them so that we can
173 // send role request messages to switches when our role changes to master
174 // We add a switch to this set after it successfully completes the
175 // handshake. Access to this Set needs to be synchronized with roleChanger
176 protected HashSet<OFSwitchImpl> connectedSwitches;
177
178 // The controllerNodeIPsCache maps Controller IDs to their IP address.
179 // It's only used by handleControllerNodeIPsChanged
180 protected HashMap<String, String> controllerNodeIPsCache;
181
182 protected Set<IOFSwitchListener> switchListeners;
183 protected Set<IHAListener> haListeners;
184 protected Map<String, List<IInfoProvider>> providerMap;
185 protected BlockingQueue<IUpdate> updates;
186
187 // Module dependencies
188 protected IRestApiService restApi;
189 protected ICounterStoreService counterStore = null;
190 protected IStorageSourceService storageSource;
191 protected IPktInProcessingTimeService pktinProcTime;
192 protected IThreadPoolService threadPool;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800193 protected IMastershipService masterHelper;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800194 protected IFlowService flowService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800195
196 // Configuration options
197 protected int openFlowPort = 6633;
198 protected int workerThreads = 0;
199 // The id for this controller node. Should be unique for each controller
200 // node in a controller cluster.
201 protected String controllerId = "localhost";
202
203 // The current role of the controller.
204 // If the controller isn't configured to support roles, then this is null.
205 protected Role role;
206 // A helper that handles sending and timeout handling for role requests
207 protected RoleChanger roleChanger;
208
209 // Start time of the controller
210 protected long systemStartTime;
211
212 // Flag to always flush flow table on switch reconnect (HA or otherwise)
213 protected boolean alwaysClearFlowsOnSwAdd = false;
214
215 // Storage table names
216 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
217 protected static final String CONTROLLER_ID = "id";
218
219 protected static final String SWITCH_TABLE_NAME = "controller_switch";
220 protected static final String SWITCH_DATAPATH_ID = "dpid";
221 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
222 protected static final String SWITCH_IP = "ip";
223 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
224 protected static final String SWITCH_ACTIVE = "active";
225 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
226 protected static final String SWITCH_CAPABILITIES = "capabilities";
227 protected static final String SWITCH_BUFFERS = "buffers";
228 protected static final String SWITCH_TABLES = "tables";
229 protected static final String SWITCH_ACTIONS = "actions";
230
231 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
232 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
233
234 protected static final String PORT_TABLE_NAME = "controller_port";
235 protected static final String PORT_ID = "id";
236 protected static final String PORT_SWITCH = "switch_id";
237 protected static final String PORT_NUMBER = "number";
238 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
239 protected static final String PORT_NAME = "name";
240 protected static final String PORT_CONFIG = "config";
241 protected static final String PORT_STATE = "state";
242 protected static final String PORT_CURRENT_FEATURES = "current_features";
243 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
244 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
245 protected static final String PORT_PEER_FEATURES = "peer_features";
246
247 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
248 protected static final String CONTROLLER_INTERFACE_ID = "id";
249 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
250 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
251 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
252 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
253
254
255
256 // Perf. related configuration
257 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
258 protected static final int BATCH_MAX_SIZE = 100;
259 protected static final boolean ALWAYS_DECODE_ETH = true;
260
261 /**
262 * Updates handled by the main loop
263 */
264 protected interface IUpdate {
265 /**
266 * Calls the appropriate listeners
267 */
268 public void dispatch();
269 }
270 public enum SwitchUpdateType {
271 ADDED,
272 REMOVED,
273 PORTCHANGED
274 }
275 /**
276 * Update message indicating a switch was added or removed
277 */
278 protected class SwitchUpdate implements IUpdate {
279 public IOFSwitch sw;
280 public SwitchUpdateType switchUpdateType;
281 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
282 this.sw = sw;
283 this.switchUpdateType = switchUpdateType;
284 }
285 public void dispatch() {
286 if (log.isTraceEnabled()) {
287 log.trace("Dispatching switch update {} {}",
288 sw, switchUpdateType);
289 }
290 if (switchListeners != null) {
291 for (IOFSwitchListener listener : switchListeners) {
292 switch(switchUpdateType) {
293 case ADDED:
294 listener.addedSwitch(sw);
295 break;
296 case REMOVED:
297 listener.removedSwitch(sw);
298 break;
299 case PORTCHANGED:
300 listener.switchPortChanged(sw.getId());
301 break;
302 }
303 }
304 }
305 }
306 }
307
308 /**
309 * Update message indicating controller's role has changed
310 */
311 protected class HARoleUpdate implements IUpdate {
312 public Role oldRole;
313 public Role newRole;
314 public HARoleUpdate(Role newRole, Role oldRole) {
315 this.oldRole = oldRole;
316 this.newRole = newRole;
317 }
318 public void dispatch() {
319 // Make sure that old and new roles are different.
320 if (oldRole == newRole) {
321 if (log.isTraceEnabled()) {
322 log.trace("HA role update ignored as the old and " +
323 "new roles are the same. newRole = {}" +
324 "oldRole = {}", newRole, oldRole);
325 }
326 return;
327 }
328 if (log.isTraceEnabled()) {
329 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
330 newRole, oldRole);
331 }
332 if (haListeners != null) {
333 for (IHAListener listener : haListeners) {
334 listener.roleChanged(oldRole, newRole);
335 }
336 }
337 }
338 }
339
340 /**
341 * Update message indicating
342 * IPs of controllers in controller cluster have changed.
343 */
344 protected class HAControllerNodeIPUpdate implements IUpdate {
345 public Map<String,String> curControllerNodeIPs;
346 public Map<String,String> addedControllerNodeIPs;
347 public Map<String,String> removedControllerNodeIPs;
348 public HAControllerNodeIPUpdate(
349 HashMap<String,String> curControllerNodeIPs,
350 HashMap<String,String> addedControllerNodeIPs,
351 HashMap<String,String> removedControllerNodeIPs) {
352 this.curControllerNodeIPs = curControllerNodeIPs;
353 this.addedControllerNodeIPs = addedControllerNodeIPs;
354 this.removedControllerNodeIPs = removedControllerNodeIPs;
355 }
356 public void dispatch() {
357 if (log.isTraceEnabled()) {
358 log.trace("Dispatching HA Controller Node IP update "
359 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
360 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
361 removedControllerNodeIPs }
362 );
363 }
364 if (haListeners != null) {
365 for (IHAListener listener: haListeners) {
366 listener.controllerNodeIPsChanged(curControllerNodeIPs,
367 addedControllerNodeIPs, removedControllerNodeIPs);
368 }
369 }
370 }
371 }
372
373 // ***************
374 // Getters/Setters
375 // ***************
376
377 public void setStorageSourceService(IStorageSourceService storageSource) {
378 this.storageSource = storageSource;
379 }
380
381 public void setCounterStore(ICounterStoreService counterStore) {
382 this.counterStore = counterStore;
383 }
384
385 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
386 this.pktinProcTime = pits;
387 }
388
389 public void setRestApiService(IRestApiService restApi) {
390 this.restApi = restApi;
391 }
392
393 public void setThreadPoolService(IThreadPoolService tp) {
394 this.threadPool = tp;
395 }
396
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800397 public void setMastershipService(IMastershipService serviceImpl) {
398 this.masterHelper = serviceImpl;
399 }
400
401 public void setFlowService(IFlowService serviceImpl) {
402 this.flowService = serviceImpl;
403 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800404
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800405 @Override
406 public Role getRole() {
407 synchronized(roleChanger) {
408 return role;
409 }
410 }
411
412 @Override
413 public void setRole(Role role) {
414 if (role == null) throw new NullPointerException("Role can not be null.");
415 if (role == Role.MASTER && this.role == Role.SLAVE) {
416 // Reset db state to Inactive for all switches.
417 updateAllInactiveSwitchInfo();
418 }
419
420 // Need to synchronize to ensure a reliable ordering on role request
421 // messages send and to ensure the list of connected switches is stable
422 // RoleChanger will handle the actual sending of the message and
423 // timeout handling
424 // @see RoleChanger
425 synchronized(roleChanger) {
426 if (role.equals(this.role)) {
427 log.debug("Ignoring role change: role is already {}", role);
428 return;
429 }
430
431 Role oldRole = this.role;
432 this.role = role;
433
434 log.debug("Submitting role change request to role {}", role);
435 roleChanger.submitRequest(connectedSwitches, role);
436
437 // Enqueue an update for our listeners.
438 try {
439 this.updates.put(new HARoleUpdate(role, oldRole));
440 } catch (InterruptedException e) {
441 log.error("Failure adding update to queue", e);
442 }
443 }
444 }
445
446
447
448 // **********************
449 // ChannelUpstreamHandler
450 // **********************
451
452 /**
453 * Return a new channel handler for processing a switch connections
454 * @param state The channel state object for the connection
455 * @return the new channel handler
456 */
457 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
458 return new OFChannelHandler(state);
459 }
460
461 /**
462 * Channel handler deals with the switch connection and dispatches
463 * switch messages to the appropriate locations.
464 * @author readams
465 */
466 protected class OFChannelHandler
467 extends IdleStateAwareChannelUpstreamHandler {
468 protected OFSwitchImpl sw;
469 protected OFChannelState state;
470
471 public OFChannelHandler(OFChannelState state) {
472 this.state = state;
473 }
474
475 @Override
476 @LogMessageDoc(message="New switch connection from {ip address}",
477 explanation="A new switch has connected from the " +
478 "specified IP address")
479 public void channelConnected(ChannelHandlerContext ctx,
480 ChannelStateEvent e) throws Exception {
481 log.info("New switch connection from {}",
482 e.getChannel().getRemoteAddress());
483
484 sw = new OFSwitchImpl();
485 sw.setChannel(e.getChannel());
486 sw.setFloodlightProvider(Controller.this);
487 sw.setThreadPoolService(threadPool);
488
489 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
490 msglist.add(factory.getMessage(OFType.HELLO));
491 e.getChannel().write(msglist);
492
493 }
494
495 @Override
496 @LogMessageDoc(message="Disconnected switch {switch information}",
497 explanation="The specified switch has disconnected.")
498 public void channelDisconnected(ChannelHandlerContext ctx,
499 ChannelStateEvent e) throws Exception {
500 if (sw != null && state.hsState == HandshakeState.READY) {
501 if (activeSwitches.containsKey(sw.getId())) {
502 // It's safe to call removeSwitch even though the map might
503 // not contain this particular switch but another with the
504 // same DPID
505 removeSwitch(sw);
506 }
507 synchronized(roleChanger) {
508 connectedSwitches.remove(sw);
509 }
510 sw.setConnected(false);
511 }
512 log.info("Disconnected switch {}", sw);
513 }
514
515 @Override
516 @LogMessageDocs({
517 @LogMessageDoc(level="ERROR",
518 message="Disconnecting switch {switch} due to read timeout",
519 explanation="The connected switch has failed to send any " +
520 "messages or respond to echo requests",
521 recommendation=LogMessageDoc.CHECK_SWITCH),
522 @LogMessageDoc(level="ERROR",
523 message="Disconnecting switch {switch}: failed to " +
524 "complete handshake",
525 explanation="The switch did not respond correctly " +
526 "to handshake messages",
527 recommendation=LogMessageDoc.CHECK_SWITCH),
528 @LogMessageDoc(level="ERROR",
529 message="Disconnecting switch {switch} due to IO Error: {}",
530 explanation="There was an error communicating with the switch",
531 recommendation=LogMessageDoc.CHECK_SWITCH),
532 @LogMessageDoc(level="ERROR",
533 message="Disconnecting switch {switch} due to switch " +
534 "state error: {error}",
535 explanation="The switch sent an unexpected message",
536 recommendation=LogMessageDoc.CHECK_SWITCH),
537 @LogMessageDoc(level="ERROR",
538 message="Disconnecting switch {switch} due to " +
539 "message parse failure",
540 explanation="Could not parse a message from the switch",
541 recommendation=LogMessageDoc.CHECK_SWITCH),
542 @LogMessageDoc(level="ERROR",
543 message="Terminating controller due to storage exception",
544 explanation=ERROR_DATABASE,
545 recommendation=LogMessageDoc.CHECK_CONTROLLER),
546 @LogMessageDoc(level="ERROR",
547 message="Could not process message: queue full",
548 explanation="OpenFlow messages are arriving faster than " +
549 " the controller can process them.",
550 recommendation=LogMessageDoc.CHECK_CONTROLLER),
551 @LogMessageDoc(level="ERROR",
552 message="Error while processing message " +
553 "from switch {switch} {cause}",
554 explanation="An error occurred processing the switch message",
555 recommendation=LogMessageDoc.GENERIC_ACTION)
556 })
557 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
558 throws Exception {
559 if (e.getCause() instanceof ReadTimeoutException) {
560 // switch timeout
561 log.error("Disconnecting switch {} due to read timeout", sw);
562 ctx.getChannel().close();
563 } else if (e.getCause() instanceof HandshakeTimeoutException) {
564 log.error("Disconnecting switch {}: failed to complete handshake",
565 sw);
566 ctx.getChannel().close();
567 } else if (e.getCause() instanceof ClosedChannelException) {
568 //log.warn("Channel for sw {} already closed", sw);
569 } else if (e.getCause() instanceof IOException) {
570 log.error("Disconnecting switch {} due to IO Error: {}",
571 sw, e.getCause().getMessage());
572 ctx.getChannel().close();
573 } else if (e.getCause() instanceof SwitchStateException) {
574 log.error("Disconnecting switch {} due to switch state error: {}",
575 sw, e.getCause().getMessage());
576 ctx.getChannel().close();
577 } else if (e.getCause() instanceof MessageParseException) {
578 log.error("Disconnecting switch " + sw +
579 " due to message parse failure",
580 e.getCause());
581 ctx.getChannel().close();
582 } else if (e.getCause() instanceof StorageException) {
583 log.error("Terminating controller due to storage exception",
584 e.getCause());
585 terminate();
586 } else if (e.getCause() instanceof RejectedExecutionException) {
587 log.warn("Could not process message: queue full");
588 } else {
589 log.error("Error while processing message from switch " + sw,
590 e.getCause());
591 ctx.getChannel().close();
592 }
593 }
594
595 @Override
596 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
597 throws Exception {
598 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
599 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
600 e.getChannel().write(msglist);
601 }
602
603 @Override
604 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
605 throws Exception {
606 if (e.getMessage() instanceof List) {
607 @SuppressWarnings("unchecked")
608 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
609
610 for (OFMessage ofm : msglist) {
611 try {
612 processOFMessage(ofm);
613 }
614 catch (Exception ex) {
615 // We are the last handler in the stream, so run the
616 // exception through the channel again by passing in
617 // ctx.getChannel().
618 Channels.fireExceptionCaught(ctx.getChannel(), ex);
619 }
620 }
621
622 // Flush all flow-mods/packet-out generated from this "train"
623 OFSwitchImpl.flush_all();
624 }
625 }
626
627 /**
628 * Process the request for the switch description
629 */
630 @LogMessageDoc(level="ERROR",
631 message="Exception in reading description " +
632 " during handshake {exception}",
633 explanation="Could not process the switch description string",
634 recommendation=LogMessageDoc.CHECK_SWITCH)
635 void processSwitchDescReply() {
636 try {
637 // Read description, if it has been updated
638 @SuppressWarnings("unchecked")
639 Future<List<OFStatistics>> desc_future =
640 (Future<List<OFStatistics>>)sw.
641 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
642 List<OFStatistics> values =
643 desc_future.get(0, TimeUnit.MILLISECONDS);
644 if (values != null) {
645 OFDescriptionStatistics description =
646 new OFDescriptionStatistics();
647 ChannelBuffer data =
648 ChannelBuffers.buffer(description.getLength());
649 for (OFStatistics f : values) {
650 f.writeTo(data);
651 description.readFrom(data);
652 break; // SHOULD be a list of length 1
653 }
654 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
655 description);
656 sw.setSwitchProperties(description);
657 data = null;
658
659 // At this time, also set other switch properties from storage
660 boolean is_core_switch = false;
661 IResultSet resultSet = null;
662 try {
663 String swid = sw.getStringId();
664 resultSet =
665 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
666 for (Iterator<IResultSet> it =
667 resultSet.iterator(); it.hasNext();) {
668 // In case of multiple rows, use the status
669 // in last row?
670 Map<String, Object> row = it.next().getRow();
671 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
672 if (log.isDebugEnabled()) {
673 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
674 "config for switch={}, is-core={}",
675 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
676 }
677 String ics =
678 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
679 is_core_switch = ics.equals("true");
680 }
681 }
682 }
683 finally {
684 if (resultSet != null)
685 resultSet.close();
686 }
687 if (is_core_switch) {
688 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
689 new Boolean(true));
690 }
691 }
692 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
693 state.hasDescription = true;
694 checkSwitchReady();
695 }
696 catch (InterruptedException ex) {
697 // Ignore
698 }
699 catch (TimeoutException ex) {
700 // Ignore
701 } catch (Exception ex) {
702 log.error("Exception in reading description " +
703 " during handshake", ex);
704 }
705 }
706
707 /**
708 * Send initial switch setup information that we need before adding
709 * the switch
710 * @throws IOException
711 */
712 void sendHelloConfiguration() throws IOException {
713 // Send initial Features Request
714 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
715 }
716
717 /**
718 * Send the configuration requests we can only do after we have
719 * the features reply
720 * @throws IOException
721 */
722 void sendFeatureReplyConfiguration() throws IOException {
723 // Ensure we receive the full packet via PacketIn
724 OFSetConfig config = (OFSetConfig) factory
725 .getMessage(OFType.SET_CONFIG);
726 config.setMissSendLength((short) 0xffff)
727 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
728 sw.write(config, null);
729 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
730 null);
731
732 // Get Description to set switch-specific flags
733 OFStatisticsRequest req = new OFStatisticsRequest();
734 req.setStatisticType(OFStatisticsType.DESC);
735 req.setLengthU(req.getLengthU());
736 Future<List<OFStatistics>> dfuture =
737 sw.getStatistics(req);
738 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
739 dfuture);
740
741 }
742
743 protected void checkSwitchReady() {
744 if (state.hsState == HandshakeState.FEATURES_REPLY &&
745 state.hasDescription && state.hasGetConfigReply) {
746
747 state.hsState = HandshakeState.READY;
748
749 synchronized(roleChanger) {
750 // We need to keep track of all of the switches that are connected
751 // to the controller, in any role, so that we can later send the
752 // role request messages when the controller role changes.
753 // We need to be synchronized while doing this: we must not
754 // send a another role request to the connectedSwitches until
755 // we were able to add this new switch to connectedSwitches
756 // *and* send the current role to the new switch.
757 connectedSwitches.add(sw);
758
759 if (role != null) {
760 // Send a role request if role support is enabled for the controller
761 // This is a probe that we'll use to determine if the switch
762 // actually supports the role request message. If it does we'll
763 // get back a role reply message. If it doesn't we'll get back an
764 // OFError message.
765 // If role is MASTER we will promote switch to active
766 // list when we receive the switch's role reply messages
767 log.debug("This controller's role is {}, " +
768 "sending initial role request msg to {}",
769 role, sw);
770 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
771 swList.add(sw);
772 roleChanger.submitRequest(swList, role);
773 }
774 else {
775 // Role supported not enabled on controller (for now)
776 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800777 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800778 "not sending role request msg to {}",
779 role, sw);
780 // Need to clear FlowMods before we add the switch
781 // and dispatch updates otherwise we have a race condition.
782 sw.clearAllFlowMods();
783 addSwitch(sw);
784 state.firstRoleReplyReceived = true;
785 }
786 }
787 }
788 }
789
790 /* Handle a role reply message we received from the switch. Since
791 * netty serializes message dispatch we don't need to synchronize
792 * against other receive operations from the same switch, so no need
793 * to synchronize addSwitch(), removeSwitch() operations from the same
794 * connection.
795 * FIXME: However, when a switch with the same DPID connects we do
796 * need some synchronization. However, handling switches with same
797 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
798 * removedSwitch notification):1
799 *
800 */
801 @LogMessageDoc(level="ERROR",
802 message="Invalid role value in role reply message",
803 explanation="Was unable to set the HA role (master or slave) " +
804 "for the controller.",
805 recommendation=LogMessageDoc.CHECK_CONTROLLER)
806 protected void handleRoleReplyMessage(OFVendor vendorMessage,
807 OFRoleReplyVendorData roleReplyVendorData) {
808 // Map from the role code in the message to our role enum
809 int nxRole = roleReplyVendorData.getRole();
810 Role role = null;
811 switch (nxRole) {
812 case OFRoleVendorData.NX_ROLE_OTHER:
813 role = Role.EQUAL;
814 break;
815 case OFRoleVendorData.NX_ROLE_MASTER:
816 role = Role.MASTER;
817 break;
818 case OFRoleVendorData.NX_ROLE_SLAVE:
819 role = Role.SLAVE;
820 break;
821 default:
822 log.error("Invalid role value in role reply message");
823 sw.getChannel().close();
824 return;
825 }
826
827 log.debug("Handling role reply for role {} from {}. " +
828 "Controller's role is {} ",
829 new Object[] { role, sw, Controller.this.role}
830 );
831
832 sw.deliverRoleReply(vendorMessage.getXid(), role);
833
834 boolean isActive = activeSwitches.containsKey(sw.getId());
835 if (!isActive && sw.isActive()) {
836 // Transition from SLAVE to MASTER.
837
838 if (!state.firstRoleReplyReceived ||
839 getAlwaysClearFlowsOnSwAdd()) {
840 // This is the first role-reply message we receive from
841 // this switch or roles were disabled when the switch
842 // connected:
843 // Delete all pre-existing flows for new connections to
844 // the master
845 //
846 // FIXME: Need to think more about what the test should
847 // be for when we flush the flow-table? For example,
848 // if all the controllers are temporarily in the backup
849 // role (e.g. right after a failure of the master
850 // controller) at the point the switch connects, then
851 // all of the controllers will initially connect as
852 // backup controllers and not flush the flow-table.
853 // Then when one of them is promoted to master following
854 // the master controller election the flow-table
855 // will still not be flushed because that's treated as
856 // a failover event where we don't want to flush the
857 // flow-table. The end result would be that the flow
858 // table for a newly connected switch is never
859 // flushed. Not sure how to handle that case though...
860 sw.clearAllFlowMods();
861 log.debug("First role reply from master switch {}, " +
862 "clear FlowTable to active switch list",
863 HexString.toHexString(sw.getId()));
864 }
865
866 // Some switches don't seem to update us with port
867 // status messages while in slave role.
868 readSwitchPortStateFromStorage(sw);
869
870 // Only add the switch to the active switch list if
871 // we're not in the slave role. Note that if the role
872 // attribute is null, then that means that the switch
873 // doesn't support the role request messages, so in that
874 // case we're effectively in the EQUAL role and the
875 // switch should be included in the active switch list.
876 addSwitch(sw);
877 log.debug("Added master switch {} to active switch list",
878 HexString.toHexString(sw.getId()));
879
880 }
881 else if (isActive && !sw.isActive()) {
882 // Transition from MASTER to SLAVE: remove switch
883 // from active switch list.
884 log.debug("Removed slave switch {} from active switch" +
885 " list", HexString.toHexString(sw.getId()));
886 removeSwitch(sw);
887 }
888
889 // Indicate that we have received a role reply message.
890 state.firstRoleReplyReceived = true;
891 }
892
893 protected boolean handleVendorMessage(OFVendor vendorMessage) {
894 boolean shouldHandleMessage = false;
895 int vendor = vendorMessage.getVendor();
896 switch (vendor) {
897 case OFNiciraVendorData.NX_VENDOR_ID:
898 OFNiciraVendorData niciraVendorData =
899 (OFNiciraVendorData)vendorMessage.getVendorData();
900 int dataType = niciraVendorData.getDataType();
901 switch (dataType) {
902 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
903 OFRoleReplyVendorData roleReplyVendorData =
904 (OFRoleReplyVendorData) niciraVendorData;
905 handleRoleReplyMessage(vendorMessage,
906 roleReplyVendorData);
907 break;
908 default:
909 log.warn("Unhandled Nicira VENDOR message; " +
910 "data type = {}", dataType);
911 break;
912 }
913 break;
914 default:
915 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
916 break;
917 }
918
919 return shouldHandleMessage;
920 }
921
922 /**
923 * Dispatch an Openflow message from a switch to the appropriate
924 * handler.
925 * @param m The message to process
926 * @throws IOException
927 * @throws SwitchStateException
928 */
929 @LogMessageDocs({
930 @LogMessageDoc(level="WARN",
931 message="Config Reply from {switch} has " +
932 "miss length set to {length}",
933 explanation="The controller requires that the switch " +
934 "use a miss length of 0xffff for correct " +
935 "function",
936 recommendation="Use a different switch to ensure " +
937 "correct function"),
938 @LogMessageDoc(level="WARN",
939 message="Received ERROR from sw {switch} that "
940 +"indicates roles are not supported "
941 +"but we have received a valid "
942 +"role reply earlier",
943 explanation="The switch sent a confusing message to the" +
944 "controller")
945 })
946 protected void processOFMessage(OFMessage m)
947 throws IOException, SwitchStateException {
948 boolean shouldHandleMessage = false;
949
950 switch (m.getType()) {
951 case HELLO:
952 if (log.isTraceEnabled())
953 log.trace("HELLO from {}", sw);
954
955 if (state.hsState.equals(HandshakeState.START)) {
956 state.hsState = HandshakeState.HELLO;
957 sendHelloConfiguration();
958 } else {
959 throw new SwitchStateException("Unexpected HELLO from "
960 + sw);
961 }
962 break;
963 case ECHO_REQUEST:
964 OFEchoReply reply =
965 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
966 reply.setXid(m.getXid());
967 sw.write(reply, null);
968 break;
969 case ECHO_REPLY:
970 break;
971 case FEATURES_REPLY:
972 if (log.isTraceEnabled())
973 log.trace("Features Reply from {}", sw);
974
975 sw.setFeaturesReply((OFFeaturesReply) m);
976 if (state.hsState.equals(HandshakeState.HELLO)) {
977 sendFeatureReplyConfiguration();
978 state.hsState = HandshakeState.FEATURES_REPLY;
979 // uncomment to enable "dumb" switches like cbench
980 // state.hsState = HandshakeState.READY;
981 // addSwitch(sw);
982 } else {
983 // return results to rest api caller
984 sw.deliverOFFeaturesReply(m);
985 // update database */
986 updateActiveSwitchInfo(sw);
987 }
988 break;
989 case GET_CONFIG_REPLY:
990 if (log.isTraceEnabled())
991 log.trace("Get config reply from {}", sw);
992
993 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
994 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
995 throw new SwitchStateException(em);
996 }
997 OFGetConfigReply cr = (OFGetConfigReply) m;
998 if (cr.getMissSendLength() == (short)0xffff) {
999 log.trace("Config Reply from {} confirms " +
1000 "miss length set to 0xffff", sw);
1001 } else {
1002 log.warn("Config Reply from {} has " +
1003 "miss length set to {}",
1004 sw, cr.getMissSendLength() & 0xffff);
1005 }
1006 state.hasGetConfigReply = true;
1007 checkSwitchReady();
1008 break;
1009 case VENDOR:
1010 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1011 break;
1012 case ERROR:
1013 // TODO: we need better error handling. Especially for
1014 // request/reply style message (stats, roles) we should have
1015 // a unified way to lookup the xid in the error message.
1016 // This will probable involve rewriting the way we handle
1017 // request/reply style messages.
1018 OFError error = (OFError) m;
1019 boolean shouldLogError = true;
1020 // TODO: should we check that firstRoleReplyReceived is false,
1021 // i.e., check only whether the first request fails?
1022 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1023 boolean isBadVendorError =
1024 (error.getErrorType() == OFError.OFErrorType.
1025 OFPET_BAD_REQUEST.getValue());
1026 // We expect to receive a bad vendor error when
1027 // we're connected to a switch that doesn't support
1028 // the Nicira vendor extensions (i.e. not OVS or
1029 // derived from OVS). By protocol, it should also be
1030 // BAD_VENDOR, but too many switch implementations
1031 // get it wrong and we can already check the xid()
1032 // so we can ignore the type with confidence that this
1033 // is not a spurious error
1034 shouldLogError = !isBadVendorError;
1035 if (isBadVendorError) {
1036 if (state.firstRoleReplyReceived && (role != null)) {
1037 log.warn("Received ERROR from sw {} that "
1038 +"indicates roles are not supported "
1039 +"but we have received a valid "
1040 +"role reply earlier", sw);
1041 }
1042 state.firstRoleReplyReceived = true;
1043 sw.deliverRoleRequestNotSupported(error.getXid());
1044 synchronized(roleChanger) {
1045 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1046 // the switch doesn't understand role request
1047 // messages and the current controller role is
1048 // slave. We need to disconnect the switch.
1049 // @see RoleChanger for rationale
1050 sw.getChannel().close();
1051 }
1052 else if (sw.role == null) {
1053 // Controller's role is master: add to
1054 // active
1055 // TODO: check if clearing flow table is
1056 // right choice here.
1057 // Need to clear FlowMods before we add the switch
1058 // and dispatch updates otherwise we have a race condition.
1059 // TODO: switch update is async. Won't we still have a potential
1060 // race condition?
1061 sw.clearAllFlowMods();
1062 addSwitch(sw);
1063 }
1064 }
1065 }
1066 else {
1067 // TODO: Is this the right thing to do if we receive
1068 // some other error besides a bad vendor error?
1069 // Presumably that means the switch did actually
1070 // understand the role request message, but there
1071 // was some other error from processing the message.
1072 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1073 // error code, but it doesn't look like the Nicira
1074 // role request has that. Should check OVS source
1075 // code to see if it's possible for any other errors
1076 // to be returned.
1077 // If we received an error the switch is not
1078 // in the correct role, so we need to disconnect it.
1079 // We could also resend the request but then we need to
1080 // check if there are other pending request in which
1081 // case we shouldn't resend. If we do resend we need
1082 // to make sure that the switch eventually accepts one
1083 // of our requests or disconnect the switch. This feels
1084 // cumbersome.
1085 sw.getChannel().close();
1086 }
1087 }
1088 // Once we support OF 1.2, we'd add code to handle it here.
1089 //if (error.getXid() == state.ofRoleRequestXid) {
1090 //}
1091 if (shouldLogError)
1092 logError(sw, error);
1093 break;
1094 case STATS_REPLY:
1095 if (state.hsState.ordinal() <
1096 HandshakeState.FEATURES_REPLY.ordinal()) {
1097 String em = "Unexpected STATS_REPLY from " + sw;
1098 throw new SwitchStateException(em);
1099 }
1100 sw.deliverStatisticsReply(m);
1101 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1102 processSwitchDescReply();
1103 }
1104 break;
1105 case PORT_STATUS:
1106 // We want to update our port state info even if we're in
1107 // the slave role, but we only want to update storage if
1108 // we're the master (or equal).
1109 boolean updateStorage = state.hsState.
1110 equals(HandshakeState.READY) &&
1111 (sw.getRole() != Role.SLAVE);
1112 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1113 shouldHandleMessage = true;
1114 break;
1115
1116 default:
1117 shouldHandleMessage = true;
1118 break;
1119 }
1120
1121 if (shouldHandleMessage) {
1122 sw.getListenerReadLock().lock();
1123 try {
1124 if (sw.isConnected()) {
1125 if (!state.hsState.equals(HandshakeState.READY)) {
1126 log.debug("Ignoring message type {} received " +
1127 "from switch {} before switch is " +
1128 "fully configured.", m.getType(), sw);
1129 }
1130 // Check if the controller is in the slave role for the
1131 // switch. If it is, then don't dispatch the message to
1132 // the listeners.
1133 // TODO: Should we dispatch messages that we expect to
1134 // receive when we're in the slave role, e.g. port
1135 // status messages? Since we're "hiding" switches from
1136 // the listeners when we're in the slave role, then it
1137 // seems a little weird to dispatch port status messages
1138 // to them. On the other hand there might be special
1139 // modules that care about all of the connected switches
1140 // and would like to receive port status notifications.
1141 else if (sw.getRole() == Role.SLAVE) {
1142 // Don't log message if it's a port status message
1143 // since we expect to receive those from the switch
1144 // and don't want to emit spurious messages.
1145 if (m.getType() != OFType.PORT_STATUS) {
1146 log.debug("Ignoring message type {} received " +
1147 "from switch {} while in the slave role.",
1148 m.getType(), sw);
1149 }
1150 } else {
1151 handleMessage(sw, m, null);
1152 }
1153 }
1154 }
1155 finally {
1156 sw.getListenerReadLock().unlock();
1157 }
1158 }
1159 }
1160 }
1161
1162 // ****************
1163 // Message handlers
1164 // ****************
1165
1166 protected void handlePortStatusMessage(IOFSwitch sw,
1167 OFPortStatus m,
1168 boolean updateStorage) {
1169 short portNumber = m.getDesc().getPortNumber();
1170 OFPhysicalPort port = m.getDesc();
1171 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001172 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1173 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001174 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001175 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001176 swStore.addPort(sw.getStringId(), port);
1177 } else {
1178 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1179 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001180 if (updateStorage)
1181 updatePortInfo(sw, port);
1182 log.debug("Port #{} modified for {}", portNumber, sw);
1183 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1184 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001185 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001186 if (updateStorage)
1187 updatePortInfo(sw, port);
1188 log.debug("Port #{} added for {}", portNumber, sw);
1189 } else if (m.getReason() ==
1190 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1191 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001192 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001193 if (updateStorage)
1194 removePortInfo(sw, portNumber);
1195 log.debug("Port #{} deleted for {}", portNumber, sw);
1196 }
1197 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1198 try {
1199 this.updates.put(update);
1200 } catch (InterruptedException e) {
1201 log.error("Failure adding update to queue", e);
1202 }
1203 }
1204
1205 /**
1206 * flcontext_cache - Keep a thread local stack of contexts
1207 */
1208 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1209 new ThreadLocal <Stack<FloodlightContext>> () {
1210 @Override
1211 protected Stack<FloodlightContext> initialValue() {
1212 return new Stack<FloodlightContext>();
1213 }
1214 };
1215
1216 /**
1217 * flcontext_alloc - pop a context off the stack, if required create a new one
1218 * @return FloodlightContext
1219 */
1220 protected static FloodlightContext flcontext_alloc() {
1221 FloodlightContext flcontext = null;
1222
1223 if (flcontext_cache.get().empty()) {
1224 flcontext = new FloodlightContext();
1225 }
1226 else {
1227 flcontext = flcontext_cache.get().pop();
1228 }
1229
1230 return flcontext;
1231 }
1232
1233 /**
1234 * flcontext_free - Free the context to the current thread
1235 * @param flcontext
1236 */
1237 protected void flcontext_free(FloodlightContext flcontext) {
1238 flcontext.getStorage().clear();
1239 flcontext_cache.get().push(flcontext);
1240 }
1241
1242 /**
1243 * Handle replies to certain OFMessages, and pass others off to listeners
1244 * @param sw The switch for the message
1245 * @param m The message
1246 * @param bContext The floodlight context. If null then floodlight context would
1247 * be allocated in this function
1248 * @throws IOException
1249 */
1250 @LogMessageDocs({
1251 @LogMessageDoc(level="ERROR",
1252 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1253 " field is empty.",
1254 explanation="The switch sent an improperly-formatted PacketIn" +
1255 " message",
1256 recommendation=LogMessageDoc.CHECK_SWITCH),
1257 @LogMessageDoc(level="WARN",
1258 message="Unhandled OF Message: {} from {}",
1259 explanation="The switch sent a message not handled by " +
1260 "the controller")
1261 })
1262 protected void handleMessage(IOFSwitch sw, OFMessage m,
1263 FloodlightContext bContext)
1264 throws IOException {
1265 Ethernet eth = null;
1266
1267 switch (m.getType()) {
1268 case PACKET_IN:
1269 OFPacketIn pi = (OFPacketIn)m;
1270
1271 if (pi.getPacketData().length <= 0) {
1272 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1273 ") because the data field is empty.");
1274 return;
1275 }
1276
1277 if (Controller.ALWAYS_DECODE_ETH) {
1278 eth = new Ethernet();
1279 eth.deserialize(pi.getPacketData(), 0,
1280 pi.getPacketData().length);
1281 counterStore.updatePacketInCounters(sw, m, eth);
1282 }
1283 // fall through to default case...
1284
1285 default:
1286
1287 List<IOFMessageListener> listeners = null;
1288 if (messageListeners.containsKey(m.getType())) {
1289 listeners = messageListeners.get(m.getType()).
1290 getOrderedListeners();
1291 }
1292
1293 FloodlightContext bc = null;
1294 if (listeners != null) {
1295 // Check if floodlight context is passed from the calling
1296 // function, if so use that floodlight context, otherwise
1297 // allocate one
1298 if (bContext == null) {
1299 bc = flcontext_alloc();
1300 } else {
1301 bc = bContext;
1302 }
1303 if (eth != null) {
1304 IFloodlightProviderService.bcStore.put(bc,
1305 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1306 eth);
1307 }
1308
1309 // Get the starting time (overall and per-component) of
1310 // the processing chain for this packet if performance
1311 // monitoring is turned on
1312 pktinProcTime.bootstrap(listeners);
1313 pktinProcTime.recordStartTimePktIn();
1314 Command cmd;
1315 for (IOFMessageListener listener : listeners) {
1316 if (listener instanceof IOFSwitchFilter) {
1317 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1318 continue;
1319 }
1320 }
1321
1322 pktinProcTime.recordStartTimeComp(listener);
1323 cmd = listener.receive(sw, m, bc);
1324 pktinProcTime.recordEndTimeComp(listener);
1325
1326 if (Command.STOP.equals(cmd)) {
1327 break;
1328 }
1329 }
1330 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1331 } else {
1332 log.warn("Unhandled OF Message: {} from {}", m, sw);
1333 }
1334
1335 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1336 }
1337 }
1338
1339 /**
1340 * Log an OpenFlow error message from a switch
1341 * @param sw The switch that sent the error
1342 * @param error The error message
1343 */
1344 @LogMessageDoc(level="ERROR",
1345 message="Error {error type} {error code} from {switch}",
1346 explanation="The switch responded with an unexpected error" +
1347 "to an OpenFlow message from the controller",
1348 recommendation="This could indicate improper network operation. " +
1349 "If the problem persists restarting the switch and " +
1350 "controller may help."
1351 )
1352 protected void logError(IOFSwitch sw, OFError error) {
1353 int etint = 0xffff & error.getErrorType();
1354 if (etint < 0 || etint >= OFErrorType.values().length) {
1355 log.error("Unknown error code {} from sw {}", etint, sw);
1356 }
1357 OFErrorType et = OFErrorType.values()[etint];
1358 switch (et) {
1359 case OFPET_HELLO_FAILED:
1360 OFHelloFailedCode hfc =
1361 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1362 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1363 break;
1364 case OFPET_BAD_REQUEST:
1365 OFBadRequestCode brc =
1366 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1367 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1368 break;
1369 case OFPET_BAD_ACTION:
1370 OFBadActionCode bac =
1371 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1372 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1373 break;
1374 case OFPET_FLOW_MOD_FAILED:
1375 OFFlowModFailedCode fmfc =
1376 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1377 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1378 break;
1379 case OFPET_PORT_MOD_FAILED:
1380 OFPortModFailedCode pmfc =
1381 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1382 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1383 break;
1384 case OFPET_QUEUE_OP_FAILED:
1385 OFQueueOpFailedCode qofc =
1386 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1387 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1388 break;
1389 default:
1390 break;
1391 }
1392 }
1393
1394 /**
1395 * Add a switch to the active switch list and call the switch listeners.
1396 * This happens either when a switch first connects (and the controller is
1397 * not in the slave role) or when the role of the controller changes from
1398 * slave to master.
1399 * @param sw the switch that has been added
1400 */
1401 // TODO: need to rethink locking and the synchronous switch update.
1402 // We can / should also handle duplicate DPIDs in connectedSwitches
1403 @LogMessageDoc(level="ERROR",
1404 message="New switch added {switch} for already-added switch {switch}",
1405 explanation="A switch with the same DPID as another switch " +
1406 "connected to the controller. This can be caused by " +
1407 "multiple switches configured with the same DPID, or " +
1408 "by a switch reconnected very quickly after " +
1409 "disconnecting.",
1410 recommendation="If this happens repeatedly, it is likely there " +
1411 "are switches with duplicate DPIDs on the network. " +
1412 "Reconfigure the appropriate switches. If it happens " +
1413 "very rarely, then it is likely this is a transient " +
1414 "network problem that can be ignored."
1415 )
1416 protected void addSwitch(IOFSwitch sw) {
1417 // TODO: is it safe to modify the HashMap without holding
1418 // the old switch's lock?
1419 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1420 if (sw == oldSw) {
1421 // Note == for object equality, not .equals for value
1422 log.info("New add switch for pre-existing switch {}", sw);
1423 return;
1424 }
1425
1426 if (oldSw != null) {
1427 oldSw.getListenerWriteLock().lock();
1428 try {
1429 log.error("New switch added {} for already-added switch {}",
1430 sw, oldSw);
1431 // Set the connected flag to false to suppress calling
1432 // the listeners for this switch in processOFMessage
1433 oldSw.setConnected(false);
1434
1435 oldSw.cancelAllStatisticsReplies();
1436
1437 updateInactiveSwitchInfo(oldSw);
1438
1439 // we need to clean out old switch state definitively
1440 // before adding the new switch
1441 // FIXME: It seems not completely kosher to call the
1442 // switch listeners here. I thought one of the points of
1443 // having the asynchronous switch update mechanism was so
1444 // the addedSwitch and removedSwitch were always called
1445 // from a single thread to simplify concurrency issues
1446 // for the listener.
1447 if (switchListeners != null) {
1448 for (IOFSwitchListener listener : switchListeners) {
1449 listener.removedSwitch(oldSw);
1450 }
1451 }
1452 // will eventually trigger a removeSwitch(), which will cause
1453 // a "Not removing Switch ... already removed debug message.
1454 // TODO: Figure out a way to handle this that avoids the
1455 // spurious debug message.
1456 oldSw.getChannel().close();
1457 }
1458 finally {
1459 oldSw.getListenerWriteLock().unlock();
1460 }
1461 }
1462
1463 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001464 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001465 for (OFPhysicalPort port: sw.getPorts()) {
1466 swStore.addPort(sw.getStringId(), port);
1467 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001468 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1469 try {
1470 this.updates.put(update);
1471 } catch (InterruptedException e) {
1472 log.error("Failure adding update to queue", e);
1473 }
1474 }
1475
1476 /**
1477 * Remove a switch from the active switch list and call the switch listeners.
1478 * This happens either when the switch is disconnected or when the
1479 * controller's role for the switch changes from master to slave.
1480 * @param sw the switch that has been removed
1481 */
1482 protected void removeSwitch(IOFSwitch sw) {
1483 // No need to acquire the listener lock, since
1484 // this method is only called after netty has processed all
1485 // pending messages
1486 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001487 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001488 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1489 log.debug("Not removing switch {}; already removed", sw);
1490 return;
1491 }
1492 // We cancel all outstanding statistics replies if the switch transition
1493 // from active. In the future we might allow statistics requests
1494 // from slave controllers. Then we need to move this cancelation
1495 // to switch disconnect
1496 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001497
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001498
1499 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1500 // here if role support is enabled. In that case if the switch is being
1501 // removed because we've been switched to being in the slave role, then I think
1502 // it's possible that the new master may have already been promoted to master
1503 // and written out the active switch state to storage. If we now execute
1504 // updateInactiveSwitchInfo we may wipe out all of the state that was
1505 // written out by the new master. Maybe need to revisit how we handle all
1506 // of the switch state that's written to storage.
1507
1508 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001509
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001510 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1511 try {
1512 this.updates.put(update);
1513 } catch (InterruptedException e) {
1514 log.error("Failure adding update to queue", e);
1515 }
1516 }
1517
1518 // ***************
1519 // IFloodlightProvider
1520 // ***************
1521
1522 @Override
1523 public synchronized void addOFMessageListener(OFType type,
1524 IOFMessageListener listener) {
1525 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1526 messageListeners.get(type);
1527 if (ldd == null) {
1528 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1529 messageListeners.put(type, ldd);
1530 }
1531 ldd.addListener(type, listener);
1532 }
1533
1534 @Override
1535 public synchronized void removeOFMessageListener(OFType type,
1536 IOFMessageListener listener) {
1537 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1538 messageListeners.get(type);
1539 if (ldd != null) {
1540 ldd.removeListener(listener);
1541 }
1542 }
1543
1544 private void logListeners() {
1545 for (Map.Entry<OFType,
1546 ListenerDispatcher<OFType,
1547 IOFMessageListener>> entry
1548 : messageListeners.entrySet()) {
1549
1550 OFType type = entry.getKey();
1551 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1552 entry.getValue();
1553
1554 StringBuffer sb = new StringBuffer();
1555 sb.append("OFListeners for ");
1556 sb.append(type);
1557 sb.append(": ");
1558 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1559 sb.append(l.getName());
1560 sb.append(",");
1561 }
1562 log.debug(sb.toString());
1563 }
1564 }
1565
1566 public void removeOFMessageListeners(OFType type) {
1567 messageListeners.remove(type);
1568 }
1569
1570 @Override
1571 public Map<Long, IOFSwitch> getSwitches() {
1572 return Collections.unmodifiableMap(this.activeSwitches);
1573 }
1574
1575 @Override
1576 public void addOFSwitchListener(IOFSwitchListener listener) {
1577 this.switchListeners.add(listener);
1578 }
1579
1580 @Override
1581 public void removeOFSwitchListener(IOFSwitchListener listener) {
1582 this.switchListeners.remove(listener);
1583 }
1584
1585 @Override
1586 public Map<OFType, List<IOFMessageListener>> getListeners() {
1587 Map<OFType, List<IOFMessageListener>> lers =
1588 new HashMap<OFType, List<IOFMessageListener>>();
1589 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1590 messageListeners.entrySet()) {
1591 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1592 }
1593 return Collections.unmodifiableMap(lers);
1594 }
1595
1596 @Override
1597 @LogMessageDocs({
1598 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1599 "a null switch",
1600 explanation="Failed to process a message because the switch " +
1601 " is no longer connected."),
1602 @LogMessageDoc(level="ERROR",
1603 message="Error reinjecting OFMessage on switch {switch}",
1604 explanation="An I/O error occured while attempting to " +
1605 "process an OpenFlow message",
1606 recommendation=LogMessageDoc.CHECK_SWITCH)
1607 })
1608 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1609 FloodlightContext bc) {
1610 if (sw == null) {
1611 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1612 return false;
1613 }
1614
1615 // FIXME: Do we need to be able to inject messages to switches
1616 // where we're the slave controller (i.e. they're connected but
1617 // not active)?
1618 // FIXME: Don't we need synchronization logic here so we're holding
1619 // the listener read lock when we call handleMessage? After some
1620 // discussions it sounds like the right thing to do here would be to
1621 // inject the message as a netty upstream channel event so it goes
1622 // through the normal netty event processing, including being
1623 // handled
1624 if (!activeSwitches.containsKey(sw.getId())) return false;
1625
1626 try {
1627 // Pass Floodlight context to the handleMessages()
1628 handleMessage(sw, msg, bc);
1629 } catch (IOException e) {
1630 log.error("Error reinjecting OFMessage on switch {}",
1631 HexString.toHexString(sw.getId()));
1632 return false;
1633 }
1634 return true;
1635 }
1636
1637 @Override
1638 @LogMessageDoc(message="Calling System.exit",
1639 explanation="The controller is terminating")
1640 public synchronized void terminate() {
1641 log.info("Calling System.exit");
1642 System.exit(1);
1643 }
1644
1645 @Override
1646 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1647 // call the overloaded version with floodlight context set to null
1648 return injectOfMessage(sw, msg, null);
1649 }
1650
1651 @Override
1652 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1653 FloodlightContext bc) {
1654 if (log.isTraceEnabled()) {
1655 String str = OFMessage.getDataAsString(sw, m, bc);
1656 log.trace("{}", str);
1657 }
1658
1659 List<IOFMessageListener> listeners = null;
1660 if (messageListeners.containsKey(m.getType())) {
1661 listeners =
1662 messageListeners.get(m.getType()).getOrderedListeners();
1663 }
1664
1665 if (listeners != null) {
1666 for (IOFMessageListener listener : listeners) {
1667 if (listener instanceof IOFSwitchFilter) {
1668 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1669 continue;
1670 }
1671 }
1672 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1673 break;
1674 }
1675 }
1676 }
1677 }
1678
1679 @Override
1680 public BasicFactory getOFMessageFactory() {
1681 return factory;
1682 }
1683
1684 @Override
1685 public String getControllerId() {
1686 return controllerId;
1687 }
1688
1689 // **************
1690 // Initialization
1691 // **************
1692
1693 protected void updateAllInactiveSwitchInfo() {
1694 if (role == Role.SLAVE) {
1695 return;
1696 }
1697 String controllerId = getControllerId();
1698 String[] switchColumns = { SWITCH_DATAPATH_ID,
1699 SWITCH_CONTROLLER_ID,
1700 SWITCH_ACTIVE };
1701 String[] portColumns = { PORT_ID, PORT_SWITCH };
1702 IResultSet switchResultSet = null;
1703 try {
1704 OperatorPredicate op =
1705 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1706 OperatorPredicate.Operator.EQ,
1707 controllerId);
1708 switchResultSet =
1709 storageSource.executeQuery(SWITCH_TABLE_NAME,
1710 switchColumns,
1711 op, null);
1712 while (switchResultSet.next()) {
1713 IResultSet portResultSet = null;
1714 try {
1715 String datapathId =
1716 switchResultSet.getString(SWITCH_DATAPATH_ID);
1717 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1718 op = new OperatorPredicate(PORT_SWITCH,
1719 OperatorPredicate.Operator.EQ,
1720 datapathId);
1721 portResultSet =
1722 storageSource.executeQuery(PORT_TABLE_NAME,
1723 portColumns,
1724 op, null);
1725 while (portResultSet.next()) {
1726 portResultSet.deleteRow();
1727 }
1728 portResultSet.save();
1729 }
1730 finally {
1731 if (portResultSet != null)
1732 portResultSet.close();
1733 }
1734 }
1735 switchResultSet.save();
1736 }
1737 finally {
1738 if (switchResultSet != null)
1739 switchResultSet.close();
1740 }
1741 }
1742
1743 protected void updateControllerInfo() {
1744 updateAllInactiveSwitchInfo();
1745
1746 // Write out the controller info to the storage source
1747 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1748 String id = getControllerId();
1749 controllerInfo.put(CONTROLLER_ID, id);
1750 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1751 }
1752
1753 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1754 if (role == Role.SLAVE) {
1755 return;
1756 }
1757 // Obtain the row info for the switch
1758 Map<String, Object> switchInfo = new HashMap<String, Object>();
1759 String datapathIdString = sw.getStringId();
1760 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1761 String controllerId = getControllerId();
1762 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1763 Date connectedSince = sw.getConnectedSince();
1764 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1765 Channel channel = sw.getChannel();
1766 SocketAddress socketAddress = channel.getRemoteAddress();
1767 if (socketAddress != null) {
1768 String socketAddressString = socketAddress.toString();
1769 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1770 if (socketAddress instanceof InetSocketAddress) {
1771 InetSocketAddress inetSocketAddress =
1772 (InetSocketAddress)socketAddress;
1773 InetAddress inetAddress = inetSocketAddress.getAddress();
1774 String ip = inetAddress.getHostAddress();
1775 switchInfo.put(SWITCH_IP, ip);
1776 }
1777 }
1778
1779 // Write out the switch features info
1780 long capabilities = U32.f(sw.getCapabilities());
1781 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1782 long buffers = U32.f(sw.getBuffers());
1783 switchInfo.put(SWITCH_BUFFERS, buffers);
1784 long tables = U32.f(sw.getTables());
1785 switchInfo.put(SWITCH_TABLES, tables);
1786 long actions = U32.f(sw.getActions());
1787 switchInfo.put(SWITCH_ACTIONS, actions);
1788 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1789
1790 // Update the switch
1791 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1792
1793 // Update the ports
1794 for (OFPhysicalPort port: sw.getPorts()) {
1795 updatePortInfo(sw, port);
1796 }
1797 }
1798
1799 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1800 if (role == Role.SLAVE) {
1801 return;
1802 }
1803 log.debug("Update DB with inactiveSW {}", sw);
1804 // Update the controller info in the storage source to be inactive
1805 Map<String, Object> switchInfo = new HashMap<String, Object>();
1806 String datapathIdString = sw.getStringId();
1807 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1808 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1809 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1810 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1811 }
1812
1813 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1814 if (role == Role.SLAVE) {
1815 return;
1816 }
1817 String datapathIdString = sw.getStringId();
1818 Map<String, Object> portInfo = new HashMap<String, Object>();
1819 int portNumber = U16.f(port.getPortNumber());
1820 String id = datapathIdString + "|" + portNumber;
1821 portInfo.put(PORT_ID, id);
1822 portInfo.put(PORT_SWITCH, datapathIdString);
1823 portInfo.put(PORT_NUMBER, portNumber);
1824 byte[] hardwareAddress = port.getHardwareAddress();
1825 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1826 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1827 String name = port.getName();
1828 portInfo.put(PORT_NAME, name);
1829 long config = U32.f(port.getConfig());
1830 portInfo.put(PORT_CONFIG, config);
1831 long state = U32.f(port.getState());
1832 portInfo.put(PORT_STATE, state);
1833 long currentFeatures = U32.f(port.getCurrentFeatures());
1834 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1835 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1836 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1837 long supportedFeatures = U32.f(port.getSupportedFeatures());
1838 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1839 long peerFeatures = U32.f(port.getPeerFeatures());
1840 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1841 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1842 }
1843
1844 /**
1845 * Read switch port data from storage and write it into a switch object
1846 * @param sw the switch to update
1847 */
1848 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1849 OperatorPredicate op =
1850 new OperatorPredicate(PORT_SWITCH,
1851 OperatorPredicate.Operator.EQ,
1852 sw.getStringId());
1853 IResultSet portResultSet =
1854 storageSource.executeQuery(PORT_TABLE_NAME,
1855 null, op, null);
1856 //Map<Short, OFPhysicalPort> oldports =
1857 // new HashMap<Short, OFPhysicalPort>();
1858 //oldports.putAll(sw.getPorts());
1859
1860 while (portResultSet.next()) {
1861 try {
1862 OFPhysicalPort p = new OFPhysicalPort();
1863 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1864 p.setName(portResultSet.getString(PORT_NAME));
1865 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1866 p.setState((int)portResultSet.getLong(PORT_STATE));
1867 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1868 p.setHardwareAddress(HexString.fromHexString(portMac));
1869 p.setCurrentFeatures((int)portResultSet.
1870 getLong(PORT_CURRENT_FEATURES));
1871 p.setAdvertisedFeatures((int)portResultSet.
1872 getLong(PORT_ADVERTISED_FEATURES));
1873 p.setSupportedFeatures((int)portResultSet.
1874 getLong(PORT_SUPPORTED_FEATURES));
1875 p.setPeerFeatures((int)portResultSet.
1876 getLong(PORT_PEER_FEATURES));
1877 //oldports.remove(Short.valueOf(p.getPortNumber()));
1878 sw.setPort(p);
1879 } catch (NullPointerException e) {
1880 // ignore
1881 }
1882 }
1883 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1884 try {
1885 this.updates.put(update);
1886 } catch (InterruptedException e) {
1887 log.error("Failure adding update to queue", e);
1888 }
1889 }
1890
1891 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1892 if (role == Role.SLAVE) {
1893 return;
1894 }
1895 String datapathIdString = sw.getStringId();
1896 String id = datapathIdString + "|" + portNumber;
1897 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1898 }
1899
1900 /**
1901 * Sets the initial role based on properties in the config params.
1902 * It looks for two different properties.
1903 * If the "role" property is specified then the value should be
1904 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1905 * controller is set to the specified value. If the "role" property
1906 * is not specified then it looks next for the "role.path" property.
1907 * In this case the value should be the path to a property file in
1908 * the file system that contains a property called "floodlight.role"
1909 * which can be one of the values listed above for the "role" property.
1910 * The idea behind the "role.path" mechanism is that you have some
1911 * separate heartbeat and master controller election algorithm that
1912 * determines the role of the controller. When a role transition happens,
1913 * it updates the current role in the file specified by the "role.path"
1914 * file. Then if floodlight restarts for some reason it can get the
1915 * correct current role of the controller from the file.
1916 * @param configParams The config params for the FloodlightProvider service
1917 * @return A valid role if role information is specified in the
1918 * config params, otherwise null
1919 */
1920 @LogMessageDocs({
1921 @LogMessageDoc(message="Controller role set to {role}",
1922 explanation="Setting the initial HA role to "),
1923 @LogMessageDoc(level="ERROR",
1924 message="Invalid current role value: {role}",
1925 explanation="An invalid HA role value was read from the " +
1926 "properties file",
1927 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1928 })
1929 protected Role getInitialRole(Map<String, String> configParams) {
1930 Role role = null;
1931 String roleString = configParams.get("role");
1932 if (roleString == null) {
1933 String rolePath = configParams.get("rolepath");
1934 if (rolePath != null) {
1935 Properties properties = new Properties();
1936 try {
1937 properties.load(new FileInputStream(rolePath));
1938 roleString = properties.getProperty("floodlight.role");
1939 }
1940 catch (IOException exc) {
1941 // Don't treat it as an error if the file specified by the
1942 // rolepath property doesn't exist. This lets us enable the
1943 // HA mechanism by just creating/setting the floodlight.role
1944 // property in that file without having to modify the
1945 // floodlight properties.
1946 }
1947 }
1948 }
1949
1950 if (roleString != null) {
1951 // Canonicalize the string to the form used for the enum constants
1952 roleString = roleString.trim().toUpperCase();
1953 try {
1954 role = Role.valueOf(roleString);
1955 }
1956 catch (IllegalArgumentException exc) {
1957 log.error("Invalid current role value: {}", roleString);
1958 }
1959 }
1960
1961 log.info("Controller role set to {}", role);
1962
1963 return role;
1964 }
1965
1966 /**
1967 * Tell controller that we're ready to accept switches loop
1968 * @throws IOException
1969 */
1970 @LogMessageDocs({
1971 @LogMessageDoc(message="Listening for switch connections on {address}",
1972 explanation="The controller is ready and listening for new" +
1973 " switch connections"),
1974 @LogMessageDoc(message="Storage exception in controller " +
1975 "updates loop; terminating process",
1976 explanation=ERROR_DATABASE,
1977 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1978 @LogMessageDoc(level="ERROR",
1979 message="Exception in controller updates loop",
1980 explanation="Failed to dispatch controller event",
1981 recommendation=LogMessageDoc.GENERIC_ACTION)
1982 })
1983 public void run() {
1984 if (log.isDebugEnabled()) {
1985 logListeners();
1986 }
1987
1988 try {
1989 final ServerBootstrap bootstrap = createServerBootStrap();
1990
1991 bootstrap.setOption("reuseAddr", true);
1992 bootstrap.setOption("child.keepAlive", true);
1993 bootstrap.setOption("child.tcpNoDelay", true);
1994 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
1995
1996 ChannelPipelineFactory pfact =
1997 new OpenflowPipelineFactory(this, null);
1998 bootstrap.setPipelineFactory(pfact);
1999 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2000 final ChannelGroup cg = new DefaultChannelGroup();
2001 cg.add(bootstrap.bind(sa));
2002
2003 log.info("Listening for switch connections on {}", sa);
2004 } catch (Exception e) {
2005 throw new RuntimeException(e);
2006 }
2007
2008 // main loop
2009 while (true) {
2010 try {
2011 IUpdate update = updates.take();
2012 update.dispatch();
2013 } catch (InterruptedException e) {
2014 return;
2015 } catch (StorageException e) {
2016 log.error("Storage exception in controller " +
2017 "updates loop; terminating process", e);
2018 return;
2019 } catch (Exception e) {
2020 log.error("Exception in controller updates loop", e);
2021 }
2022 }
2023 }
2024
2025 private ServerBootstrap createServerBootStrap() {
2026 if (workerThreads == 0) {
2027 return new ServerBootstrap(
2028 new NioServerSocketChannelFactory(
2029 Executors.newCachedThreadPool(),
2030 Executors.newCachedThreadPool()));
2031 } else {
2032 return new ServerBootstrap(
2033 new NioServerSocketChannelFactory(
2034 Executors.newCachedThreadPool(),
2035 Executors.newCachedThreadPool(), workerThreads));
2036 }
2037 }
2038
2039 public void setConfigParams(Map<String, String> configParams) {
2040 String ofPort = configParams.get("openflowport");
2041 if (ofPort != null) {
2042 this.openFlowPort = Integer.parseInt(ofPort);
2043 }
2044 log.debug("OpenFlow port set to {}", this.openFlowPort);
2045 String threads = configParams.get("workerthreads");
2046 if (threads != null) {
2047 this.workerThreads = Integer.parseInt(threads);
2048 }
2049 log.debug("Number of worker threads set to {}", this.workerThreads);
2050 String controllerId = configParams.get("controllerid");
2051 if (controllerId != null) {
2052 this.controllerId = controllerId;
2053 }
2054 log.debug("ControllerId set to {}", this.controllerId);
2055 }
2056
2057 private void initVendorMessages() {
2058 // Configure openflowj to be able to parse the role request/reply
2059 // vendor messages.
2060 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2061 OFNiciraVendorData.NX_VENDOR_ID, 4);
2062 OFVendorId.registerVendorId(niciraVendorId);
2063 OFBasicVendorDataType roleRequestVendorData =
2064 new OFBasicVendorDataType(
2065 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2066 OFRoleRequestVendorData.getInstantiable());
2067 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2068 OFBasicVendorDataType roleReplyVendorData =
2069 new OFBasicVendorDataType(
2070 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2071 OFRoleReplyVendorData.getInstantiable());
2072 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2073 }
2074
2075 /**
2076 * Initialize internal data structures
2077 */
2078 public void init(Map<String, String> configParams) {
2079 // These data structures are initialized here because other
2080 // module's startUp() might be called before ours
2081 this.messageListeners =
2082 new ConcurrentHashMap<OFType,
2083 ListenerDispatcher<OFType,
2084 IOFMessageListener>>();
2085 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2086 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2087 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2088 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2089 this.controllerNodeIPsCache = new HashMap<String, String>();
2090 this.updates = new LinkedBlockingQueue<IUpdate>();
2091 this.factory = new BasicFactory();
2092 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2093 setConfigParams(configParams);
2094 this.role = getInitialRole(configParams);
2095 this.roleChanger = new RoleChanger();
2096 initVendorMessages();
2097 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002098 }
2099
2100 /**
2101 * Startup all of the controller's components
2102 */
2103 @LogMessageDoc(message="Waiting for storage source",
2104 explanation="The system database is not yet ready",
2105 recommendation="If this message persists, this indicates " +
2106 "that the system database has failed to start. " +
2107 LogMessageDoc.CHECK_CONTROLLER)
2108 public void startupComponents() {
2109 // Create the table names we use
2110 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2111 storageSource.createTable(SWITCH_TABLE_NAME, null);
2112 storageSource.createTable(PORT_TABLE_NAME, null);
2113 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2114 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2115 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2116 CONTROLLER_ID);
2117 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2118 SWITCH_DATAPATH_ID);
2119 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2120 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2121 CONTROLLER_INTERFACE_ID);
2122 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2123
2124 while (true) {
2125 try {
2126 updateControllerInfo();
2127 break;
2128 }
2129 catch (StorageException e) {
2130 log.info("Waiting for storage source");
2131 try {
2132 Thread.sleep(1000);
2133 } catch (InterruptedException e1) {
2134 }
2135 }
2136 }
2137
2138 // Add our REST API
2139 restApi.addRestletRoutable(new CoreWebRoutable());
2140 }
2141
2142 @Override
2143 public void addInfoProvider(String type, IInfoProvider provider) {
2144 if (!providerMap.containsKey(type)) {
2145 providerMap.put(type, new ArrayList<IInfoProvider>());
2146 }
2147 providerMap.get(type).add(provider);
2148 }
2149
2150 @Override
2151 public void removeInfoProvider(String type, IInfoProvider provider) {
2152 if (!providerMap.containsKey(type)) {
2153 log.debug("Provider type {} doesn't exist.", type);
2154 return;
2155 }
2156
2157 providerMap.get(type).remove(provider);
2158 }
2159
2160 public Map<String, Object> getControllerInfo(String type) {
2161 if (!providerMap.containsKey(type)) return null;
2162
2163 Map<String, Object> result = new LinkedHashMap<String, Object>();
2164 for (IInfoProvider provider : providerMap.get(type)) {
2165 result.putAll(provider.getInfo(type));
2166 }
2167
2168 return result;
2169 }
2170
2171 @Override
2172 public void addHAListener(IHAListener listener) {
2173 this.haListeners.add(listener);
2174 }
2175
2176 @Override
2177 public void removeHAListener(IHAListener listener) {
2178 this.haListeners.remove(listener);
2179 }
2180
2181
2182 /**
2183 * Handle changes to the controller nodes IPs and dispatch update.
2184 */
2185 @SuppressWarnings("unchecked")
2186 protected void handleControllerNodeIPChanges() {
2187 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2188 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2189 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2190 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2191 CONTROLLER_INTERFACE_TYPE,
2192 CONTROLLER_INTERFACE_NUMBER,
2193 CONTROLLER_INTERFACE_DISCOVERED_IP };
2194 synchronized(controllerNodeIPsCache) {
2195 // We currently assume that interface Ethernet0 is the relevant
2196 // controller interface. Might change.
2197 // We could (should?) implement this using
2198 // predicates, but creating the individual and compound predicate
2199 // seems more overhead then just checking every row. Particularly,
2200 // since the number of rows is small and changes infrequent
2201 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2202 colNames,null, null);
2203 while (res.next()) {
2204 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2205 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2206 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2207 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2208 String curIP = controllerNodeIPsCache.get(controllerID);
2209
2210 curControllerNodeIPs.put(controllerID, discoveredIP);
2211 if (curIP == null) {
2212 // new controller node IP
2213 addedControllerNodeIPs.put(controllerID, discoveredIP);
2214 }
2215 else if (curIP != discoveredIP) {
2216 // IP changed
2217 removedControllerNodeIPs.put(controllerID, curIP);
2218 addedControllerNodeIPs.put(controllerID, discoveredIP);
2219 }
2220 }
2221 }
2222 // Now figure out if rows have been deleted. We can't use the
2223 // rowKeys from rowsDeleted directly, since the tables primary
2224 // key is a compound that we can't disassemble
2225 Set<String> curEntries = curControllerNodeIPs.keySet();
2226 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2227 removedEntries.removeAll(curEntries);
2228 for (String removedControllerID : removedEntries)
2229 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2230 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2231 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2232 curControllerNodeIPs, addedControllerNodeIPs,
2233 removedControllerNodeIPs);
2234 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2235 try {
2236 this.updates.put(update);
2237 } catch (InterruptedException e) {
2238 log.error("Failure adding update to queue", e);
2239 }
2240 }
2241 }
2242 }
2243
2244 @Override
2245 public Map<String, String> getControllerNodeIPs() {
2246 // We return a copy of the mapping so we can guarantee that
2247 // the mapping return is the same as one that will be (or was)
2248 // dispatched to IHAListeners
2249 HashMap<String,String> retval = new HashMap<String,String>();
2250 synchronized(controllerNodeIPsCache) {
2251 retval.putAll(controllerNodeIPsCache);
2252 }
2253 return retval;
2254 }
2255
2256 @Override
2257 public void rowsModified(String tableName, Set<Object> rowKeys) {
2258 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2259 handleControllerNodeIPChanges();
2260 }
2261
2262 }
2263
2264 @Override
2265 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2266 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2267 handleControllerNodeIPChanges();
2268 }
2269 }
2270
2271 @Override
2272 public long getSystemStartTime() {
2273 return (this.systemStartTime);
2274 }
2275
2276 @Override
2277 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2278 this.alwaysClearFlowsOnSwAdd = value;
2279 }
2280
2281 public boolean getAlwaysClearFlowsOnSwAdd() {
2282 return this.alwaysClearFlowsOnSwAdd;
2283 }
2284}