blob: cbcd9a074207ec091567cdbe4e6ed225087049f7 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
25import java.util.ArrayList;
26import java.nio.channels.ClosedChannelException;
27import java.util.Collection;
28import java.util.Collections;
29import java.util.Date;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.LinkedHashMap;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Properties;
38import java.util.Set;
39import java.util.Stack;
40import java.util.concurrent.BlockingQueue;
41import java.util.concurrent.ConcurrentHashMap;
42import java.util.concurrent.ConcurrentMap;
43import java.util.concurrent.CopyOnWriteArraySet;
44import java.util.concurrent.Executors;
45import java.util.concurrent.Future;
46import java.util.concurrent.LinkedBlockingQueue;
47import java.util.concurrent.RejectedExecutionException;
48import java.util.concurrent.TimeUnit;
49import java.util.concurrent.TimeoutException;
50
51import net.floodlightcontroller.core.FloodlightContext;
52import net.floodlightcontroller.core.IFloodlightProviderService;
53import net.floodlightcontroller.core.IHAListener;
54import net.floodlightcontroller.core.IInfoProvider;
Pankaj Berde8557a462013-01-07 08:59:31 -080055import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IOFMessageListener;
57import net.floodlightcontroller.core.IListener.Command;
58import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -080061import net.floodlightcontroller.core.INetMapTopologyService.ITopoRouteService;
Pankaj Berde8557a462013-01-07 08:59:31 -080062import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080063import net.floodlightcontroller.core.annotations.LogMessageDoc;
64import net.floodlightcontroller.core.annotations.LogMessageDocs;
65import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
66import net.floodlightcontroller.core.util.ListenerDispatcher;
67import net.floodlightcontroller.core.web.CoreWebRoutable;
68import net.floodlightcontroller.counter.ICounterStoreService;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -080069import net.floodlightcontroller.flowcache.IFlowService;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -080070import net.floodlightcontroller.mastership.IMastershipService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080071import net.floodlightcontroller.packet.Ethernet;
72import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
73import net.floodlightcontroller.restserver.IRestApiService;
74import net.floodlightcontroller.storage.IResultSet;
75import net.floodlightcontroller.storage.IStorageSourceListener;
76import net.floodlightcontroller.storage.IStorageSourceService;
77import net.floodlightcontroller.storage.OperatorPredicate;
78import net.floodlightcontroller.storage.StorageException;
79import net.floodlightcontroller.threadpool.IThreadPoolService;
80
81import org.jboss.netty.bootstrap.ServerBootstrap;
82import org.jboss.netty.buffer.ChannelBuffer;
83import org.jboss.netty.buffer.ChannelBuffers;
84import org.jboss.netty.channel.Channel;
85import org.jboss.netty.channel.ChannelHandlerContext;
86import org.jboss.netty.channel.ChannelPipelineFactory;
87import org.jboss.netty.channel.ChannelStateEvent;
88import org.jboss.netty.channel.ChannelUpstreamHandler;
89import org.jboss.netty.channel.Channels;
90import org.jboss.netty.channel.ExceptionEvent;
91import org.jboss.netty.channel.MessageEvent;
92import org.jboss.netty.channel.group.ChannelGroup;
93import org.jboss.netty.channel.group.DefaultChannelGroup;
94import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
95import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
96import org.jboss.netty.handler.timeout.IdleStateEvent;
97import org.jboss.netty.handler.timeout.ReadTimeoutException;
98import org.openflow.protocol.OFEchoReply;
99import org.openflow.protocol.OFError;
100import org.openflow.protocol.OFError.OFBadActionCode;
101import org.openflow.protocol.OFError.OFBadRequestCode;
102import org.openflow.protocol.OFError.OFErrorType;
103import org.openflow.protocol.OFError.OFFlowModFailedCode;
104import org.openflow.protocol.OFError.OFHelloFailedCode;
105import org.openflow.protocol.OFError.OFPortModFailedCode;
106import org.openflow.protocol.OFError.OFQueueOpFailedCode;
107import org.openflow.protocol.OFFeaturesReply;
108import org.openflow.protocol.OFGetConfigReply;
109import org.openflow.protocol.OFMessage;
110import org.openflow.protocol.OFPacketIn;
111import org.openflow.protocol.OFPhysicalPort;
112import org.openflow.protocol.OFPortStatus;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800113import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800115import org.openflow.protocol.OFPortStatus.OFPortReason;
116import org.openflow.protocol.OFSetConfig;
117import org.openflow.protocol.OFStatisticsRequest;
118import org.openflow.protocol.OFSwitchConfig;
119import org.openflow.protocol.OFType;
120import org.openflow.protocol.OFVendor;
121import org.openflow.protocol.factory.BasicFactory;
122import org.openflow.protocol.factory.MessageParseException;
123import org.openflow.protocol.statistics.OFDescriptionStatistics;
124import org.openflow.protocol.statistics.OFStatistics;
125import org.openflow.protocol.statistics.OFStatisticsType;
126import org.openflow.protocol.vendor.OFBasicVendorDataType;
127import org.openflow.protocol.vendor.OFBasicVendorId;
128import org.openflow.protocol.vendor.OFVendorId;
129import org.openflow.util.HexString;
130import org.openflow.util.U16;
131import org.openflow.util.U32;
132import org.openflow.vendor.nicira.OFNiciraVendorData;
133import org.openflow.vendor.nicira.OFRoleReplyVendorData;
134import org.openflow.vendor.nicira.OFRoleRequestVendorData;
135import org.openflow.vendor.nicira.OFRoleVendorData;
136import org.slf4j.Logger;
137import org.slf4j.LoggerFactory;
138
139
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800140
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800141/**
142 * The main controller class. Handles all setup and network listeners
143 */
144public class Controller implements IFloodlightProviderService,
145 IStorageSourceListener {
Pankaj Berde29ab7fc2013-01-25 06:17:52 -0800146
147 ThreadLocal<SwitchStorageImpl> store = new ThreadLocal<SwitchStorageImpl>() {
148 @Override
149 protected SwitchStorageImpl initialValue() {
150 SwitchStorageImpl swStore = new SwitchStorageImpl();
151 //TODO: Get the file path from global properties
152 swStore.init("/tmp/cassandra.titan");
153 return swStore;
154 }
155 };
156
157 protected SwitchStorageImpl swStore = store.get();
Pankaj Berde8557a462013-01-07 08:59:31 -0800158
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800159 protected static Logger log = LoggerFactory.getLogger(Controller.class);
160
161 private static final String ERROR_DATABASE =
162 "The controller could not communicate with the system database.";
163
164 protected BasicFactory factory;
165 protected ConcurrentMap<OFType,
166 ListenerDispatcher<OFType,IOFMessageListener>>
167 messageListeners;
168 // The activeSwitches map contains only those switches that are actively
169 // being controlled by us -- it doesn't contain switches that are
170 // in the slave role
171 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
172 // connectedSwitches contains all connected switches, including ones where
173 // we're a slave controller. We need to keep track of them so that we can
174 // send role request messages to switches when our role changes to master
175 // We add a switch to this set after it successfully completes the
176 // handshake. Access to this Set needs to be synchronized with roleChanger
177 protected HashSet<OFSwitchImpl> connectedSwitches;
178
179 // The controllerNodeIPsCache maps Controller IDs to their IP address.
180 // It's only used by handleControllerNodeIPsChanged
181 protected HashMap<String, String> controllerNodeIPsCache;
182
183 protected Set<IOFSwitchListener> switchListeners;
184 protected Set<IHAListener> haListeners;
185 protected Map<String, List<IInfoProvider>> providerMap;
186 protected BlockingQueue<IUpdate> updates;
187
188 // Module dependencies
189 protected IRestApiService restApi;
190 protected ICounterStoreService counterStore = null;
191 protected IStorageSourceService storageSource;
192 protected IPktInProcessingTimeService pktinProcTime;
193 protected IThreadPoolService threadPool;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800194 protected IMastershipService masterHelper;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800195 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800196 protected ITopoRouteService topoRouteService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800197
198 // Configuration options
199 protected int openFlowPort = 6633;
200 protected int workerThreads = 0;
201 // The id for this controller node. Should be unique for each controller
202 // node in a controller cluster.
203 protected String controllerId = "localhost";
204
205 // The current role of the controller.
206 // If the controller isn't configured to support roles, then this is null.
207 protected Role role;
208 // A helper that handles sending and timeout handling for role requests
209 protected RoleChanger roleChanger;
210
211 // Start time of the controller
212 protected long systemStartTime;
213
214 // Flag to always flush flow table on switch reconnect (HA or otherwise)
215 protected boolean alwaysClearFlowsOnSwAdd = false;
216
217 // Storage table names
218 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
219 protected static final String CONTROLLER_ID = "id";
220
221 protected static final String SWITCH_TABLE_NAME = "controller_switch";
222 protected static final String SWITCH_DATAPATH_ID = "dpid";
223 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
224 protected static final String SWITCH_IP = "ip";
225 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
226 protected static final String SWITCH_ACTIVE = "active";
227 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
228 protected static final String SWITCH_CAPABILITIES = "capabilities";
229 protected static final String SWITCH_BUFFERS = "buffers";
230 protected static final String SWITCH_TABLES = "tables";
231 protected static final String SWITCH_ACTIONS = "actions";
232
233 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
234 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
235
236 protected static final String PORT_TABLE_NAME = "controller_port";
237 protected static final String PORT_ID = "id";
238 protected static final String PORT_SWITCH = "switch_id";
239 protected static final String PORT_NUMBER = "number";
240 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
241 protected static final String PORT_NAME = "name";
242 protected static final String PORT_CONFIG = "config";
243 protected static final String PORT_STATE = "state";
244 protected static final String PORT_CURRENT_FEATURES = "current_features";
245 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
246 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
247 protected static final String PORT_PEER_FEATURES = "peer_features";
248
249 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
250 protected static final String CONTROLLER_INTERFACE_ID = "id";
251 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
252 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
253 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
254 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
255
256
257
258 // Perf. related configuration
259 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
260 protected static final int BATCH_MAX_SIZE = 100;
261 protected static final boolean ALWAYS_DECODE_ETH = true;
262
263 /**
264 * Updates handled by the main loop
265 */
266 protected interface IUpdate {
267 /**
268 * Calls the appropriate listeners
269 */
270 public void dispatch();
271 }
272 public enum SwitchUpdateType {
273 ADDED,
274 REMOVED,
275 PORTCHANGED
276 }
277 /**
278 * Update message indicating a switch was added or removed
279 */
280 protected class SwitchUpdate implements IUpdate {
281 public IOFSwitch sw;
282 public SwitchUpdateType switchUpdateType;
283 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
284 this.sw = sw;
285 this.switchUpdateType = switchUpdateType;
286 }
287 public void dispatch() {
288 if (log.isTraceEnabled()) {
289 log.trace("Dispatching switch update {} {}",
290 sw, switchUpdateType);
291 }
292 if (switchListeners != null) {
293 for (IOFSwitchListener listener : switchListeners) {
294 switch(switchUpdateType) {
295 case ADDED:
296 listener.addedSwitch(sw);
297 break;
298 case REMOVED:
299 listener.removedSwitch(sw);
300 break;
301 case PORTCHANGED:
302 listener.switchPortChanged(sw.getId());
303 break;
304 }
305 }
306 }
307 }
308 }
309
310 /**
311 * Update message indicating controller's role has changed
312 */
313 protected class HARoleUpdate implements IUpdate {
314 public Role oldRole;
315 public Role newRole;
316 public HARoleUpdate(Role newRole, Role oldRole) {
317 this.oldRole = oldRole;
318 this.newRole = newRole;
319 }
320 public void dispatch() {
321 // Make sure that old and new roles are different.
322 if (oldRole == newRole) {
323 if (log.isTraceEnabled()) {
324 log.trace("HA role update ignored as the old and " +
325 "new roles are the same. newRole = {}" +
326 "oldRole = {}", newRole, oldRole);
327 }
328 return;
329 }
330 if (log.isTraceEnabled()) {
331 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
332 newRole, oldRole);
333 }
334 if (haListeners != null) {
335 for (IHAListener listener : haListeners) {
336 listener.roleChanged(oldRole, newRole);
337 }
338 }
339 }
340 }
341
342 /**
343 * Update message indicating
344 * IPs of controllers in controller cluster have changed.
345 */
346 protected class HAControllerNodeIPUpdate implements IUpdate {
347 public Map<String,String> curControllerNodeIPs;
348 public Map<String,String> addedControllerNodeIPs;
349 public Map<String,String> removedControllerNodeIPs;
350 public HAControllerNodeIPUpdate(
351 HashMap<String,String> curControllerNodeIPs,
352 HashMap<String,String> addedControllerNodeIPs,
353 HashMap<String,String> removedControllerNodeIPs) {
354 this.curControllerNodeIPs = curControllerNodeIPs;
355 this.addedControllerNodeIPs = addedControllerNodeIPs;
356 this.removedControllerNodeIPs = removedControllerNodeIPs;
357 }
358 public void dispatch() {
359 if (log.isTraceEnabled()) {
360 log.trace("Dispatching HA Controller Node IP update "
361 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
362 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
363 removedControllerNodeIPs }
364 );
365 }
366 if (haListeners != null) {
367 for (IHAListener listener: haListeners) {
368 listener.controllerNodeIPsChanged(curControllerNodeIPs,
369 addedControllerNodeIPs, removedControllerNodeIPs);
370 }
371 }
372 }
373 }
374
375 // ***************
376 // Getters/Setters
377 // ***************
378
379 public void setStorageSourceService(IStorageSourceService storageSource) {
380 this.storageSource = storageSource;
381 }
382
383 public void setCounterStore(ICounterStoreService counterStore) {
384 this.counterStore = counterStore;
385 }
386
387 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
388 this.pktinProcTime = pits;
389 }
390
391 public void setRestApiService(IRestApiService restApi) {
392 this.restApi = restApi;
393 }
394
395 public void setThreadPoolService(IThreadPoolService tp) {
396 this.threadPool = tp;
397 }
398
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800399 public void setMastershipService(IMastershipService serviceImpl) {
400 this.masterHelper = serviceImpl;
401 }
402
403 public void setFlowService(IFlowService serviceImpl) {
404 this.flowService = serviceImpl;
405 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800406
407 public void setTopoRouteService(ITopoRouteService serviceImpl) {
408 this.topoRouteService = serviceImpl;
409 }
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800410
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800411 @Override
412 public Role getRole() {
413 synchronized(roleChanger) {
414 return role;
415 }
416 }
417
418 @Override
419 public void setRole(Role role) {
420 if (role == null) throw new NullPointerException("Role can not be null.");
421 if (role == Role.MASTER && this.role == Role.SLAVE) {
422 // Reset db state to Inactive for all switches.
423 updateAllInactiveSwitchInfo();
424 }
425
426 // Need to synchronize to ensure a reliable ordering on role request
427 // messages send and to ensure the list of connected switches is stable
428 // RoleChanger will handle the actual sending of the message and
429 // timeout handling
430 // @see RoleChanger
431 synchronized(roleChanger) {
432 if (role.equals(this.role)) {
433 log.debug("Ignoring role change: role is already {}", role);
434 return;
435 }
436
437 Role oldRole = this.role;
438 this.role = role;
439
440 log.debug("Submitting role change request to role {}", role);
441 roleChanger.submitRequest(connectedSwitches, role);
442
443 // Enqueue an update for our listeners.
444 try {
445 this.updates.put(new HARoleUpdate(role, oldRole));
446 } catch (InterruptedException e) {
447 log.error("Failure adding update to queue", e);
448 }
449 }
450 }
451
452
453
454 // **********************
455 // ChannelUpstreamHandler
456 // **********************
457
458 /**
459 * Return a new channel handler for processing a switch connections
460 * @param state The channel state object for the connection
461 * @return the new channel handler
462 */
463 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
464 return new OFChannelHandler(state);
465 }
466
467 /**
468 * Channel handler deals with the switch connection and dispatches
469 * switch messages to the appropriate locations.
470 * @author readams
471 */
472 protected class OFChannelHandler
473 extends IdleStateAwareChannelUpstreamHandler {
474 protected OFSwitchImpl sw;
475 protected OFChannelState state;
476
477 public OFChannelHandler(OFChannelState state) {
478 this.state = state;
479 }
480
481 @Override
482 @LogMessageDoc(message="New switch connection from {ip address}",
483 explanation="A new switch has connected from the " +
484 "specified IP address")
485 public void channelConnected(ChannelHandlerContext ctx,
486 ChannelStateEvent e) throws Exception {
487 log.info("New switch connection from {}",
488 e.getChannel().getRemoteAddress());
489
490 sw = new OFSwitchImpl();
491 sw.setChannel(e.getChannel());
492 sw.setFloodlightProvider(Controller.this);
493 sw.setThreadPoolService(threadPool);
494
495 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
496 msglist.add(factory.getMessage(OFType.HELLO));
497 e.getChannel().write(msglist);
498
499 }
500
501 @Override
502 @LogMessageDoc(message="Disconnected switch {switch information}",
503 explanation="The specified switch has disconnected.")
504 public void channelDisconnected(ChannelHandlerContext ctx,
505 ChannelStateEvent e) throws Exception {
506 if (sw != null && state.hsState == HandshakeState.READY) {
507 if (activeSwitches.containsKey(sw.getId())) {
508 // It's safe to call removeSwitch even though the map might
509 // not contain this particular switch but another with the
510 // same DPID
511 removeSwitch(sw);
512 }
513 synchronized(roleChanger) {
514 connectedSwitches.remove(sw);
515 }
516 sw.setConnected(false);
517 }
518 log.info("Disconnected switch {}", sw);
519 }
520
521 @Override
522 @LogMessageDocs({
523 @LogMessageDoc(level="ERROR",
524 message="Disconnecting switch {switch} due to read timeout",
525 explanation="The connected switch has failed to send any " +
526 "messages or respond to echo requests",
527 recommendation=LogMessageDoc.CHECK_SWITCH),
528 @LogMessageDoc(level="ERROR",
529 message="Disconnecting switch {switch}: failed to " +
530 "complete handshake",
531 explanation="The switch did not respond correctly " +
532 "to handshake messages",
533 recommendation=LogMessageDoc.CHECK_SWITCH),
534 @LogMessageDoc(level="ERROR",
535 message="Disconnecting switch {switch} due to IO Error: {}",
536 explanation="There was an error communicating with the switch",
537 recommendation=LogMessageDoc.CHECK_SWITCH),
538 @LogMessageDoc(level="ERROR",
539 message="Disconnecting switch {switch} due to switch " +
540 "state error: {error}",
541 explanation="The switch sent an unexpected message",
542 recommendation=LogMessageDoc.CHECK_SWITCH),
543 @LogMessageDoc(level="ERROR",
544 message="Disconnecting switch {switch} due to " +
545 "message parse failure",
546 explanation="Could not parse a message from the switch",
547 recommendation=LogMessageDoc.CHECK_SWITCH),
548 @LogMessageDoc(level="ERROR",
549 message="Terminating controller due to storage exception",
550 explanation=ERROR_DATABASE,
551 recommendation=LogMessageDoc.CHECK_CONTROLLER),
552 @LogMessageDoc(level="ERROR",
553 message="Could not process message: queue full",
554 explanation="OpenFlow messages are arriving faster than " +
555 " the controller can process them.",
556 recommendation=LogMessageDoc.CHECK_CONTROLLER),
557 @LogMessageDoc(level="ERROR",
558 message="Error while processing message " +
559 "from switch {switch} {cause}",
560 explanation="An error occurred processing the switch message",
561 recommendation=LogMessageDoc.GENERIC_ACTION)
562 })
563 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
564 throws Exception {
565 if (e.getCause() instanceof ReadTimeoutException) {
566 // switch timeout
567 log.error("Disconnecting switch {} due to read timeout", sw);
568 ctx.getChannel().close();
569 } else if (e.getCause() instanceof HandshakeTimeoutException) {
570 log.error("Disconnecting switch {}: failed to complete handshake",
571 sw);
572 ctx.getChannel().close();
573 } else if (e.getCause() instanceof ClosedChannelException) {
574 //log.warn("Channel for sw {} already closed", sw);
575 } else if (e.getCause() instanceof IOException) {
576 log.error("Disconnecting switch {} due to IO Error: {}",
577 sw, e.getCause().getMessage());
578 ctx.getChannel().close();
579 } else if (e.getCause() instanceof SwitchStateException) {
580 log.error("Disconnecting switch {} due to switch state error: {}",
581 sw, e.getCause().getMessage());
582 ctx.getChannel().close();
583 } else if (e.getCause() instanceof MessageParseException) {
584 log.error("Disconnecting switch " + sw +
585 " due to message parse failure",
586 e.getCause());
587 ctx.getChannel().close();
588 } else if (e.getCause() instanceof StorageException) {
589 log.error("Terminating controller due to storage exception",
590 e.getCause());
591 terminate();
592 } else if (e.getCause() instanceof RejectedExecutionException) {
593 log.warn("Could not process message: queue full");
594 } else {
595 log.error("Error while processing message from switch " + sw,
596 e.getCause());
597 ctx.getChannel().close();
598 }
599 }
600
601 @Override
602 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
603 throws Exception {
604 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
605 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
606 e.getChannel().write(msglist);
607 }
608
609 @Override
610 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
611 throws Exception {
612 if (e.getMessage() instanceof List) {
613 @SuppressWarnings("unchecked")
614 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
615
616 for (OFMessage ofm : msglist) {
617 try {
618 processOFMessage(ofm);
619 }
620 catch (Exception ex) {
621 // We are the last handler in the stream, so run the
622 // exception through the channel again by passing in
623 // ctx.getChannel().
624 Channels.fireExceptionCaught(ctx.getChannel(), ex);
625 }
626 }
627
628 // Flush all flow-mods/packet-out generated from this "train"
629 OFSwitchImpl.flush_all();
630 }
631 }
632
633 /**
634 * Process the request for the switch description
635 */
636 @LogMessageDoc(level="ERROR",
637 message="Exception in reading description " +
638 " during handshake {exception}",
639 explanation="Could not process the switch description string",
640 recommendation=LogMessageDoc.CHECK_SWITCH)
641 void processSwitchDescReply() {
642 try {
643 // Read description, if it has been updated
644 @SuppressWarnings("unchecked")
645 Future<List<OFStatistics>> desc_future =
646 (Future<List<OFStatistics>>)sw.
647 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
648 List<OFStatistics> values =
649 desc_future.get(0, TimeUnit.MILLISECONDS);
650 if (values != null) {
651 OFDescriptionStatistics description =
652 new OFDescriptionStatistics();
653 ChannelBuffer data =
654 ChannelBuffers.buffer(description.getLength());
655 for (OFStatistics f : values) {
656 f.writeTo(data);
657 description.readFrom(data);
658 break; // SHOULD be a list of length 1
659 }
660 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
661 description);
662 sw.setSwitchProperties(description);
663 data = null;
664
665 // At this time, also set other switch properties from storage
666 boolean is_core_switch = false;
667 IResultSet resultSet = null;
668 try {
669 String swid = sw.getStringId();
670 resultSet =
671 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
672 for (Iterator<IResultSet> it =
673 resultSet.iterator(); it.hasNext();) {
674 // In case of multiple rows, use the status
675 // in last row?
676 Map<String, Object> row = it.next().getRow();
677 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
678 if (log.isDebugEnabled()) {
679 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
680 "config for switch={}, is-core={}",
681 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
682 }
683 String ics =
684 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
685 is_core_switch = ics.equals("true");
686 }
687 }
688 }
689 finally {
690 if (resultSet != null)
691 resultSet.close();
692 }
693 if (is_core_switch) {
694 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
695 new Boolean(true));
696 }
697 }
698 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
699 state.hasDescription = true;
700 checkSwitchReady();
701 }
702 catch (InterruptedException ex) {
703 // Ignore
704 }
705 catch (TimeoutException ex) {
706 // Ignore
707 } catch (Exception ex) {
708 log.error("Exception in reading description " +
709 " during handshake", ex);
710 }
711 }
712
713 /**
714 * Send initial switch setup information that we need before adding
715 * the switch
716 * @throws IOException
717 */
718 void sendHelloConfiguration() throws IOException {
719 // Send initial Features Request
720 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
721 }
722
723 /**
724 * Send the configuration requests we can only do after we have
725 * the features reply
726 * @throws IOException
727 */
728 void sendFeatureReplyConfiguration() throws IOException {
729 // Ensure we receive the full packet via PacketIn
730 OFSetConfig config = (OFSetConfig) factory
731 .getMessage(OFType.SET_CONFIG);
732 config.setMissSendLength((short) 0xffff)
733 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
734 sw.write(config, null);
735 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
736 null);
737
738 // Get Description to set switch-specific flags
739 OFStatisticsRequest req = new OFStatisticsRequest();
740 req.setStatisticType(OFStatisticsType.DESC);
741 req.setLengthU(req.getLengthU());
742 Future<List<OFStatistics>> dfuture =
743 sw.getStatistics(req);
744 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
745 dfuture);
746
747 }
748
749 protected void checkSwitchReady() {
750 if (state.hsState == HandshakeState.FEATURES_REPLY &&
751 state.hasDescription && state.hasGetConfigReply) {
752
753 state.hsState = HandshakeState.READY;
754
755 synchronized(roleChanger) {
756 // We need to keep track of all of the switches that are connected
757 // to the controller, in any role, so that we can later send the
758 // role request messages when the controller role changes.
759 // We need to be synchronized while doing this: we must not
760 // send a another role request to the connectedSwitches until
761 // we were able to add this new switch to connectedSwitches
762 // *and* send the current role to the new switch.
763 connectedSwitches.add(sw);
764
765 if (role != null) {
766 // Send a role request if role support is enabled for the controller
767 // This is a probe that we'll use to determine if the switch
768 // actually supports the role request message. If it does we'll
769 // get back a role reply message. If it doesn't we'll get back an
770 // OFError message.
771 // If role is MASTER we will promote switch to active
772 // list when we receive the switch's role reply messages
773 log.debug("This controller's role is {}, " +
774 "sending initial role request msg to {}",
775 role, sw);
776 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
777 swList.add(sw);
778 roleChanger.submitRequest(swList, role);
779 }
780 else {
781 // Role supported not enabled on controller (for now)
782 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800783 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800784 "not sending role request msg to {}",
785 role, sw);
786 // Need to clear FlowMods before we add the switch
787 // and dispatch updates otherwise we have a race condition.
788 sw.clearAllFlowMods();
789 addSwitch(sw);
790 state.firstRoleReplyReceived = true;
791 }
792 }
793 }
794 }
795
796 /* Handle a role reply message we received from the switch. Since
797 * netty serializes message dispatch we don't need to synchronize
798 * against other receive operations from the same switch, so no need
799 * to synchronize addSwitch(), removeSwitch() operations from the same
800 * connection.
801 * FIXME: However, when a switch with the same DPID connects we do
802 * need some synchronization. However, handling switches with same
803 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
804 * removedSwitch notification):1
805 *
806 */
807 @LogMessageDoc(level="ERROR",
808 message="Invalid role value in role reply message",
809 explanation="Was unable to set the HA role (master or slave) " +
810 "for the controller.",
811 recommendation=LogMessageDoc.CHECK_CONTROLLER)
812 protected void handleRoleReplyMessage(OFVendor vendorMessage,
813 OFRoleReplyVendorData roleReplyVendorData) {
814 // Map from the role code in the message to our role enum
815 int nxRole = roleReplyVendorData.getRole();
816 Role role = null;
817 switch (nxRole) {
818 case OFRoleVendorData.NX_ROLE_OTHER:
819 role = Role.EQUAL;
820 break;
821 case OFRoleVendorData.NX_ROLE_MASTER:
822 role = Role.MASTER;
823 break;
824 case OFRoleVendorData.NX_ROLE_SLAVE:
825 role = Role.SLAVE;
826 break;
827 default:
828 log.error("Invalid role value in role reply message");
829 sw.getChannel().close();
830 return;
831 }
832
833 log.debug("Handling role reply for role {} from {}. " +
834 "Controller's role is {} ",
835 new Object[] { role, sw, Controller.this.role}
836 );
837
838 sw.deliverRoleReply(vendorMessage.getXid(), role);
839
840 boolean isActive = activeSwitches.containsKey(sw.getId());
841 if (!isActive && sw.isActive()) {
842 // Transition from SLAVE to MASTER.
843
844 if (!state.firstRoleReplyReceived ||
845 getAlwaysClearFlowsOnSwAdd()) {
846 // This is the first role-reply message we receive from
847 // this switch or roles were disabled when the switch
848 // connected:
849 // Delete all pre-existing flows for new connections to
850 // the master
851 //
852 // FIXME: Need to think more about what the test should
853 // be for when we flush the flow-table? For example,
854 // if all the controllers are temporarily in the backup
855 // role (e.g. right after a failure of the master
856 // controller) at the point the switch connects, then
857 // all of the controllers will initially connect as
858 // backup controllers and not flush the flow-table.
859 // Then when one of them is promoted to master following
860 // the master controller election the flow-table
861 // will still not be flushed because that's treated as
862 // a failover event where we don't want to flush the
863 // flow-table. The end result would be that the flow
864 // table for a newly connected switch is never
865 // flushed. Not sure how to handle that case though...
866 sw.clearAllFlowMods();
867 log.debug("First role reply from master switch {}, " +
868 "clear FlowTable to active switch list",
869 HexString.toHexString(sw.getId()));
870 }
871
872 // Some switches don't seem to update us with port
873 // status messages while in slave role.
874 readSwitchPortStateFromStorage(sw);
875
876 // Only add the switch to the active switch list if
877 // we're not in the slave role. Note that if the role
878 // attribute is null, then that means that the switch
879 // doesn't support the role request messages, so in that
880 // case we're effectively in the EQUAL role and the
881 // switch should be included in the active switch list.
882 addSwitch(sw);
883 log.debug("Added master switch {} to active switch list",
884 HexString.toHexString(sw.getId()));
885
886 }
887 else if (isActive && !sw.isActive()) {
888 // Transition from MASTER to SLAVE: remove switch
889 // from active switch list.
890 log.debug("Removed slave switch {} from active switch" +
891 " list", HexString.toHexString(sw.getId()));
892 removeSwitch(sw);
893 }
894
895 // Indicate that we have received a role reply message.
896 state.firstRoleReplyReceived = true;
897 }
898
899 protected boolean handleVendorMessage(OFVendor vendorMessage) {
900 boolean shouldHandleMessage = false;
901 int vendor = vendorMessage.getVendor();
902 switch (vendor) {
903 case OFNiciraVendorData.NX_VENDOR_ID:
904 OFNiciraVendorData niciraVendorData =
905 (OFNiciraVendorData)vendorMessage.getVendorData();
906 int dataType = niciraVendorData.getDataType();
907 switch (dataType) {
908 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
909 OFRoleReplyVendorData roleReplyVendorData =
910 (OFRoleReplyVendorData) niciraVendorData;
911 handleRoleReplyMessage(vendorMessage,
912 roleReplyVendorData);
913 break;
914 default:
915 log.warn("Unhandled Nicira VENDOR message; " +
916 "data type = {}", dataType);
917 break;
918 }
919 break;
920 default:
921 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
922 break;
923 }
924
925 return shouldHandleMessage;
926 }
927
928 /**
929 * Dispatch an Openflow message from a switch to the appropriate
930 * handler.
931 * @param m The message to process
932 * @throws IOException
933 * @throws SwitchStateException
934 */
935 @LogMessageDocs({
936 @LogMessageDoc(level="WARN",
937 message="Config Reply from {switch} has " +
938 "miss length set to {length}",
939 explanation="The controller requires that the switch " +
940 "use a miss length of 0xffff for correct " +
941 "function",
942 recommendation="Use a different switch to ensure " +
943 "correct function"),
944 @LogMessageDoc(level="WARN",
945 message="Received ERROR from sw {switch} that "
946 +"indicates roles are not supported "
947 +"but we have received a valid "
948 +"role reply earlier",
949 explanation="The switch sent a confusing message to the" +
950 "controller")
951 })
952 protected void processOFMessage(OFMessage m)
953 throws IOException, SwitchStateException {
954 boolean shouldHandleMessage = false;
955
956 switch (m.getType()) {
957 case HELLO:
958 if (log.isTraceEnabled())
959 log.trace("HELLO from {}", sw);
960
961 if (state.hsState.equals(HandshakeState.START)) {
962 state.hsState = HandshakeState.HELLO;
963 sendHelloConfiguration();
964 } else {
965 throw new SwitchStateException("Unexpected HELLO from "
966 + sw);
967 }
968 break;
969 case ECHO_REQUEST:
970 OFEchoReply reply =
971 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
972 reply.setXid(m.getXid());
973 sw.write(reply, null);
974 break;
975 case ECHO_REPLY:
976 break;
977 case FEATURES_REPLY:
978 if (log.isTraceEnabled())
979 log.trace("Features Reply from {}", sw);
980
981 sw.setFeaturesReply((OFFeaturesReply) m);
982 if (state.hsState.equals(HandshakeState.HELLO)) {
983 sendFeatureReplyConfiguration();
984 state.hsState = HandshakeState.FEATURES_REPLY;
985 // uncomment to enable "dumb" switches like cbench
986 // state.hsState = HandshakeState.READY;
987 // addSwitch(sw);
988 } else {
989 // return results to rest api caller
990 sw.deliverOFFeaturesReply(m);
991 // update database */
992 updateActiveSwitchInfo(sw);
993 }
994 break;
995 case GET_CONFIG_REPLY:
996 if (log.isTraceEnabled())
997 log.trace("Get config reply from {}", sw);
998
999 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1000 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1001 throw new SwitchStateException(em);
1002 }
1003 OFGetConfigReply cr = (OFGetConfigReply) m;
1004 if (cr.getMissSendLength() == (short)0xffff) {
1005 log.trace("Config Reply from {} confirms " +
1006 "miss length set to 0xffff", sw);
1007 } else {
1008 log.warn("Config Reply from {} has " +
1009 "miss length set to {}",
1010 sw, cr.getMissSendLength() & 0xffff);
1011 }
1012 state.hasGetConfigReply = true;
1013 checkSwitchReady();
1014 break;
1015 case VENDOR:
1016 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1017 break;
1018 case ERROR:
1019 // TODO: we need better error handling. Especially for
1020 // request/reply style message (stats, roles) we should have
1021 // a unified way to lookup the xid in the error message.
1022 // This will probable involve rewriting the way we handle
1023 // request/reply style messages.
1024 OFError error = (OFError) m;
1025 boolean shouldLogError = true;
1026 // TODO: should we check that firstRoleReplyReceived is false,
1027 // i.e., check only whether the first request fails?
1028 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1029 boolean isBadVendorError =
1030 (error.getErrorType() == OFError.OFErrorType.
1031 OFPET_BAD_REQUEST.getValue());
1032 // We expect to receive a bad vendor error when
1033 // we're connected to a switch that doesn't support
1034 // the Nicira vendor extensions (i.e. not OVS or
1035 // derived from OVS). By protocol, it should also be
1036 // BAD_VENDOR, but too many switch implementations
1037 // get it wrong and we can already check the xid()
1038 // so we can ignore the type with confidence that this
1039 // is not a spurious error
1040 shouldLogError = !isBadVendorError;
1041 if (isBadVendorError) {
1042 if (state.firstRoleReplyReceived && (role != null)) {
1043 log.warn("Received ERROR from sw {} that "
1044 +"indicates roles are not supported "
1045 +"but we have received a valid "
1046 +"role reply earlier", sw);
1047 }
1048 state.firstRoleReplyReceived = true;
1049 sw.deliverRoleRequestNotSupported(error.getXid());
1050 synchronized(roleChanger) {
1051 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1052 // the switch doesn't understand role request
1053 // messages and the current controller role is
1054 // slave. We need to disconnect the switch.
1055 // @see RoleChanger for rationale
1056 sw.getChannel().close();
1057 }
1058 else if (sw.role == null) {
1059 // Controller's role is master: add to
1060 // active
1061 // TODO: check if clearing flow table is
1062 // right choice here.
1063 // Need to clear FlowMods before we add the switch
1064 // and dispatch updates otherwise we have a race condition.
1065 // TODO: switch update is async. Won't we still have a potential
1066 // race condition?
1067 sw.clearAllFlowMods();
1068 addSwitch(sw);
1069 }
1070 }
1071 }
1072 else {
1073 // TODO: Is this the right thing to do if we receive
1074 // some other error besides a bad vendor error?
1075 // Presumably that means the switch did actually
1076 // understand the role request message, but there
1077 // was some other error from processing the message.
1078 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1079 // error code, but it doesn't look like the Nicira
1080 // role request has that. Should check OVS source
1081 // code to see if it's possible for any other errors
1082 // to be returned.
1083 // If we received an error the switch is not
1084 // in the correct role, so we need to disconnect it.
1085 // We could also resend the request but then we need to
1086 // check if there are other pending request in which
1087 // case we shouldn't resend. If we do resend we need
1088 // to make sure that the switch eventually accepts one
1089 // of our requests or disconnect the switch. This feels
1090 // cumbersome.
1091 sw.getChannel().close();
1092 }
1093 }
1094 // Once we support OF 1.2, we'd add code to handle it here.
1095 //if (error.getXid() == state.ofRoleRequestXid) {
1096 //}
1097 if (shouldLogError)
1098 logError(sw, error);
1099 break;
1100 case STATS_REPLY:
1101 if (state.hsState.ordinal() <
1102 HandshakeState.FEATURES_REPLY.ordinal()) {
1103 String em = "Unexpected STATS_REPLY from " + sw;
1104 throw new SwitchStateException(em);
1105 }
1106 sw.deliverStatisticsReply(m);
1107 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1108 processSwitchDescReply();
1109 }
1110 break;
1111 case PORT_STATUS:
1112 // We want to update our port state info even if we're in
1113 // the slave role, but we only want to update storage if
1114 // we're the master (or equal).
1115 boolean updateStorage = state.hsState.
1116 equals(HandshakeState.READY) &&
1117 (sw.getRole() != Role.SLAVE);
1118 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1119 shouldHandleMessage = true;
1120 break;
1121
1122 default:
1123 shouldHandleMessage = true;
1124 break;
1125 }
1126
1127 if (shouldHandleMessage) {
1128 sw.getListenerReadLock().lock();
1129 try {
1130 if (sw.isConnected()) {
1131 if (!state.hsState.equals(HandshakeState.READY)) {
1132 log.debug("Ignoring message type {} received " +
1133 "from switch {} before switch is " +
1134 "fully configured.", m.getType(), sw);
1135 }
1136 // Check if the controller is in the slave role for the
1137 // switch. If it is, then don't dispatch the message to
1138 // the listeners.
1139 // TODO: Should we dispatch messages that we expect to
1140 // receive when we're in the slave role, e.g. port
1141 // status messages? Since we're "hiding" switches from
1142 // the listeners when we're in the slave role, then it
1143 // seems a little weird to dispatch port status messages
1144 // to them. On the other hand there might be special
1145 // modules that care about all of the connected switches
1146 // and would like to receive port status notifications.
1147 else if (sw.getRole() == Role.SLAVE) {
1148 // Don't log message if it's a port status message
1149 // since we expect to receive those from the switch
1150 // and don't want to emit spurious messages.
1151 if (m.getType() != OFType.PORT_STATUS) {
1152 log.debug("Ignoring message type {} received " +
1153 "from switch {} while in the slave role.",
1154 m.getType(), sw);
1155 }
1156 } else {
1157 handleMessage(sw, m, null);
1158 }
1159 }
1160 }
1161 finally {
1162 sw.getListenerReadLock().unlock();
1163 }
1164 }
1165 }
1166 }
1167
1168 // ****************
1169 // Message handlers
1170 // ****************
1171
1172 protected void handlePortStatusMessage(IOFSwitch sw,
1173 OFPortStatus m,
1174 boolean updateStorage) {
1175 short portNumber = m.getDesc().getPortNumber();
1176 OFPhysicalPort port = m.getDesc();
1177 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001178 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1179 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001180 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001181 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001182 swStore.addPort(sw.getStringId(), port);
1183 } else {
1184 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1185 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001186 if (updateStorage)
1187 updatePortInfo(sw, port);
1188 log.debug("Port #{} modified for {}", portNumber, sw);
1189 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1190 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001191 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001192 if (updateStorage)
1193 updatePortInfo(sw, port);
1194 log.debug("Port #{} added for {}", portNumber, sw);
1195 } else if (m.getReason() ==
1196 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1197 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001198 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001199 if (updateStorage)
1200 removePortInfo(sw, portNumber);
1201 log.debug("Port #{} deleted for {}", portNumber, sw);
1202 }
1203 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1204 try {
1205 this.updates.put(update);
1206 } catch (InterruptedException e) {
1207 log.error("Failure adding update to queue", e);
1208 }
1209 }
1210
1211 /**
1212 * flcontext_cache - Keep a thread local stack of contexts
1213 */
1214 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1215 new ThreadLocal <Stack<FloodlightContext>> () {
1216 @Override
1217 protected Stack<FloodlightContext> initialValue() {
1218 return new Stack<FloodlightContext>();
1219 }
1220 };
1221
1222 /**
1223 * flcontext_alloc - pop a context off the stack, if required create a new one
1224 * @return FloodlightContext
1225 */
1226 protected static FloodlightContext flcontext_alloc() {
1227 FloodlightContext flcontext = null;
1228
1229 if (flcontext_cache.get().empty()) {
1230 flcontext = new FloodlightContext();
1231 }
1232 else {
1233 flcontext = flcontext_cache.get().pop();
1234 }
1235
1236 return flcontext;
1237 }
1238
1239 /**
1240 * flcontext_free - Free the context to the current thread
1241 * @param flcontext
1242 */
1243 protected void flcontext_free(FloodlightContext flcontext) {
1244 flcontext.getStorage().clear();
1245 flcontext_cache.get().push(flcontext);
1246 }
1247
1248 /**
1249 * Handle replies to certain OFMessages, and pass others off to listeners
1250 * @param sw The switch for the message
1251 * @param m The message
1252 * @param bContext The floodlight context. If null then floodlight context would
1253 * be allocated in this function
1254 * @throws IOException
1255 */
1256 @LogMessageDocs({
1257 @LogMessageDoc(level="ERROR",
1258 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1259 " field is empty.",
1260 explanation="The switch sent an improperly-formatted PacketIn" +
1261 " message",
1262 recommendation=LogMessageDoc.CHECK_SWITCH),
1263 @LogMessageDoc(level="WARN",
1264 message="Unhandled OF Message: {} from {}",
1265 explanation="The switch sent a message not handled by " +
1266 "the controller")
1267 })
1268 protected void handleMessage(IOFSwitch sw, OFMessage m,
1269 FloodlightContext bContext)
1270 throws IOException {
1271 Ethernet eth = null;
1272
1273 switch (m.getType()) {
1274 case PACKET_IN:
1275 OFPacketIn pi = (OFPacketIn)m;
1276
1277 if (pi.getPacketData().length <= 0) {
1278 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1279 ") because the data field is empty.");
1280 return;
1281 }
1282
1283 if (Controller.ALWAYS_DECODE_ETH) {
1284 eth = new Ethernet();
1285 eth.deserialize(pi.getPacketData(), 0,
1286 pi.getPacketData().length);
1287 counterStore.updatePacketInCounters(sw, m, eth);
1288 }
1289 // fall through to default case...
1290
1291 default:
1292
1293 List<IOFMessageListener> listeners = null;
1294 if (messageListeners.containsKey(m.getType())) {
1295 listeners = messageListeners.get(m.getType()).
1296 getOrderedListeners();
1297 }
1298
1299 FloodlightContext bc = null;
1300 if (listeners != null) {
1301 // Check if floodlight context is passed from the calling
1302 // function, if so use that floodlight context, otherwise
1303 // allocate one
1304 if (bContext == null) {
1305 bc = flcontext_alloc();
1306 } else {
1307 bc = bContext;
1308 }
1309 if (eth != null) {
1310 IFloodlightProviderService.bcStore.put(bc,
1311 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1312 eth);
1313 }
1314
1315 // Get the starting time (overall and per-component) of
1316 // the processing chain for this packet if performance
1317 // monitoring is turned on
1318 pktinProcTime.bootstrap(listeners);
1319 pktinProcTime.recordStartTimePktIn();
1320 Command cmd;
1321 for (IOFMessageListener listener : listeners) {
1322 if (listener instanceof IOFSwitchFilter) {
1323 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1324 continue;
1325 }
1326 }
1327
1328 pktinProcTime.recordStartTimeComp(listener);
1329 cmd = listener.receive(sw, m, bc);
1330 pktinProcTime.recordEndTimeComp(listener);
1331
1332 if (Command.STOP.equals(cmd)) {
1333 break;
1334 }
1335 }
1336 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1337 } else {
1338 log.warn("Unhandled OF Message: {} from {}", m, sw);
1339 }
1340
1341 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1342 }
1343 }
1344
1345 /**
1346 * Log an OpenFlow error message from a switch
1347 * @param sw The switch that sent the error
1348 * @param error The error message
1349 */
1350 @LogMessageDoc(level="ERROR",
1351 message="Error {error type} {error code} from {switch}",
1352 explanation="The switch responded with an unexpected error" +
1353 "to an OpenFlow message from the controller",
1354 recommendation="This could indicate improper network operation. " +
1355 "If the problem persists restarting the switch and " +
1356 "controller may help."
1357 )
1358 protected void logError(IOFSwitch sw, OFError error) {
1359 int etint = 0xffff & error.getErrorType();
1360 if (etint < 0 || etint >= OFErrorType.values().length) {
1361 log.error("Unknown error code {} from sw {}", etint, sw);
1362 }
1363 OFErrorType et = OFErrorType.values()[etint];
1364 switch (et) {
1365 case OFPET_HELLO_FAILED:
1366 OFHelloFailedCode hfc =
1367 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1368 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1369 break;
1370 case OFPET_BAD_REQUEST:
1371 OFBadRequestCode brc =
1372 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1373 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1374 break;
1375 case OFPET_BAD_ACTION:
1376 OFBadActionCode bac =
1377 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1378 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1379 break;
1380 case OFPET_FLOW_MOD_FAILED:
1381 OFFlowModFailedCode fmfc =
1382 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1383 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1384 break;
1385 case OFPET_PORT_MOD_FAILED:
1386 OFPortModFailedCode pmfc =
1387 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1388 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1389 break;
1390 case OFPET_QUEUE_OP_FAILED:
1391 OFQueueOpFailedCode qofc =
1392 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1393 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1394 break;
1395 default:
1396 break;
1397 }
1398 }
1399
1400 /**
1401 * Add a switch to the active switch list and call the switch listeners.
1402 * This happens either when a switch first connects (and the controller is
1403 * not in the slave role) or when the role of the controller changes from
1404 * slave to master.
1405 * @param sw the switch that has been added
1406 */
1407 // TODO: need to rethink locking and the synchronous switch update.
1408 // We can / should also handle duplicate DPIDs in connectedSwitches
1409 @LogMessageDoc(level="ERROR",
1410 message="New switch added {switch} for already-added switch {switch}",
1411 explanation="A switch with the same DPID as another switch " +
1412 "connected to the controller. This can be caused by " +
1413 "multiple switches configured with the same DPID, or " +
1414 "by a switch reconnected very quickly after " +
1415 "disconnecting.",
1416 recommendation="If this happens repeatedly, it is likely there " +
1417 "are switches with duplicate DPIDs on the network. " +
1418 "Reconfigure the appropriate switches. If it happens " +
1419 "very rarely, then it is likely this is a transient " +
1420 "network problem that can be ignored."
1421 )
1422 protected void addSwitch(IOFSwitch sw) {
1423 // TODO: is it safe to modify the HashMap without holding
1424 // the old switch's lock?
1425 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1426 if (sw == oldSw) {
1427 // Note == for object equality, not .equals for value
1428 log.info("New add switch for pre-existing switch {}", sw);
1429 return;
1430 }
1431
1432 if (oldSw != null) {
1433 oldSw.getListenerWriteLock().lock();
1434 try {
1435 log.error("New switch added {} for already-added switch {}",
1436 sw, oldSw);
1437 // Set the connected flag to false to suppress calling
1438 // the listeners for this switch in processOFMessage
1439 oldSw.setConnected(false);
1440
1441 oldSw.cancelAllStatisticsReplies();
1442
1443 updateInactiveSwitchInfo(oldSw);
1444
1445 // we need to clean out old switch state definitively
1446 // before adding the new switch
1447 // FIXME: It seems not completely kosher to call the
1448 // switch listeners here. I thought one of the points of
1449 // having the asynchronous switch update mechanism was so
1450 // the addedSwitch and removedSwitch were always called
1451 // from a single thread to simplify concurrency issues
1452 // for the listener.
1453 if (switchListeners != null) {
1454 for (IOFSwitchListener listener : switchListeners) {
1455 listener.removedSwitch(oldSw);
1456 }
1457 }
1458 // will eventually trigger a removeSwitch(), which will cause
1459 // a "Not removing Switch ... already removed debug message.
1460 // TODO: Figure out a way to handle this that avoids the
1461 // spurious debug message.
1462 oldSw.getChannel().close();
1463 }
1464 finally {
1465 oldSw.getListenerWriteLock().unlock();
1466 }
1467 }
1468
1469 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001470 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001471 for (OFPhysicalPort port: sw.getPorts()) {
1472 swStore.addPort(sw.getStringId(), port);
1473 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001474 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1475 try {
1476 this.updates.put(update);
1477 } catch (InterruptedException e) {
1478 log.error("Failure adding update to queue", e);
1479 }
1480 }
1481
1482 /**
1483 * Remove a switch from the active switch list and call the switch listeners.
1484 * This happens either when the switch is disconnected or when the
1485 * controller's role for the switch changes from master to slave.
1486 * @param sw the switch that has been removed
1487 */
1488 protected void removeSwitch(IOFSwitch sw) {
1489 // No need to acquire the listener lock, since
1490 // this method is only called after netty has processed all
1491 // pending messages
1492 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001493 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001494 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1495 log.debug("Not removing switch {}; already removed", sw);
1496 return;
1497 }
1498 // We cancel all outstanding statistics replies if the switch transition
1499 // from active. In the future we might allow statistics requests
1500 // from slave controllers. Then we need to move this cancelation
1501 // to switch disconnect
1502 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001503
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001504
1505 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1506 // here if role support is enabled. In that case if the switch is being
1507 // removed because we've been switched to being in the slave role, then I think
1508 // it's possible that the new master may have already been promoted to master
1509 // and written out the active switch state to storage. If we now execute
1510 // updateInactiveSwitchInfo we may wipe out all of the state that was
1511 // written out by the new master. Maybe need to revisit how we handle all
1512 // of the switch state that's written to storage.
1513
1514 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001515
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001516 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1517 try {
1518 this.updates.put(update);
1519 } catch (InterruptedException e) {
1520 log.error("Failure adding update to queue", e);
1521 }
1522 }
1523
1524 // ***************
1525 // IFloodlightProvider
1526 // ***************
1527
1528 @Override
1529 public synchronized void addOFMessageListener(OFType type,
1530 IOFMessageListener listener) {
1531 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1532 messageListeners.get(type);
1533 if (ldd == null) {
1534 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1535 messageListeners.put(type, ldd);
1536 }
1537 ldd.addListener(type, listener);
1538 }
1539
1540 @Override
1541 public synchronized void removeOFMessageListener(OFType type,
1542 IOFMessageListener listener) {
1543 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1544 messageListeners.get(type);
1545 if (ldd != null) {
1546 ldd.removeListener(listener);
1547 }
1548 }
1549
1550 private void logListeners() {
1551 for (Map.Entry<OFType,
1552 ListenerDispatcher<OFType,
1553 IOFMessageListener>> entry
1554 : messageListeners.entrySet()) {
1555
1556 OFType type = entry.getKey();
1557 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1558 entry.getValue();
1559
1560 StringBuffer sb = new StringBuffer();
1561 sb.append("OFListeners for ");
1562 sb.append(type);
1563 sb.append(": ");
1564 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1565 sb.append(l.getName());
1566 sb.append(",");
1567 }
1568 log.debug(sb.toString());
1569 }
1570 }
1571
1572 public void removeOFMessageListeners(OFType type) {
1573 messageListeners.remove(type);
1574 }
1575
1576 @Override
1577 public Map<Long, IOFSwitch> getSwitches() {
1578 return Collections.unmodifiableMap(this.activeSwitches);
1579 }
1580
1581 @Override
1582 public void addOFSwitchListener(IOFSwitchListener listener) {
1583 this.switchListeners.add(listener);
1584 }
1585
1586 @Override
1587 public void removeOFSwitchListener(IOFSwitchListener listener) {
1588 this.switchListeners.remove(listener);
1589 }
1590
1591 @Override
1592 public Map<OFType, List<IOFMessageListener>> getListeners() {
1593 Map<OFType, List<IOFMessageListener>> lers =
1594 new HashMap<OFType, List<IOFMessageListener>>();
1595 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1596 messageListeners.entrySet()) {
1597 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1598 }
1599 return Collections.unmodifiableMap(lers);
1600 }
1601
1602 @Override
1603 @LogMessageDocs({
1604 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1605 "a null switch",
1606 explanation="Failed to process a message because the switch " +
1607 " is no longer connected."),
1608 @LogMessageDoc(level="ERROR",
1609 message="Error reinjecting OFMessage on switch {switch}",
1610 explanation="An I/O error occured while attempting to " +
1611 "process an OpenFlow message",
1612 recommendation=LogMessageDoc.CHECK_SWITCH)
1613 })
1614 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1615 FloodlightContext bc) {
1616 if (sw == null) {
1617 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1618 return false;
1619 }
1620
1621 // FIXME: Do we need to be able to inject messages to switches
1622 // where we're the slave controller (i.e. they're connected but
1623 // not active)?
1624 // FIXME: Don't we need synchronization logic here so we're holding
1625 // the listener read lock when we call handleMessage? After some
1626 // discussions it sounds like the right thing to do here would be to
1627 // inject the message as a netty upstream channel event so it goes
1628 // through the normal netty event processing, including being
1629 // handled
1630 if (!activeSwitches.containsKey(sw.getId())) return false;
1631
1632 try {
1633 // Pass Floodlight context to the handleMessages()
1634 handleMessage(sw, msg, bc);
1635 } catch (IOException e) {
1636 log.error("Error reinjecting OFMessage on switch {}",
1637 HexString.toHexString(sw.getId()));
1638 return false;
1639 }
1640 return true;
1641 }
1642
1643 @Override
1644 @LogMessageDoc(message="Calling System.exit",
1645 explanation="The controller is terminating")
1646 public synchronized void terminate() {
1647 log.info("Calling System.exit");
1648 System.exit(1);
1649 }
1650
1651 @Override
1652 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1653 // call the overloaded version with floodlight context set to null
1654 return injectOfMessage(sw, msg, null);
1655 }
1656
1657 @Override
1658 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1659 FloodlightContext bc) {
1660 if (log.isTraceEnabled()) {
1661 String str = OFMessage.getDataAsString(sw, m, bc);
1662 log.trace("{}", str);
1663 }
1664
1665 List<IOFMessageListener> listeners = null;
1666 if (messageListeners.containsKey(m.getType())) {
1667 listeners =
1668 messageListeners.get(m.getType()).getOrderedListeners();
1669 }
1670
1671 if (listeners != null) {
1672 for (IOFMessageListener listener : listeners) {
1673 if (listener instanceof IOFSwitchFilter) {
1674 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1675 continue;
1676 }
1677 }
1678 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1679 break;
1680 }
1681 }
1682 }
1683 }
1684
1685 @Override
1686 public BasicFactory getOFMessageFactory() {
1687 return factory;
1688 }
1689
1690 @Override
1691 public String getControllerId() {
1692 return controllerId;
1693 }
1694
1695 // **************
1696 // Initialization
1697 // **************
1698
1699 protected void updateAllInactiveSwitchInfo() {
1700 if (role == Role.SLAVE) {
1701 return;
1702 }
1703 String controllerId = getControllerId();
1704 String[] switchColumns = { SWITCH_DATAPATH_ID,
1705 SWITCH_CONTROLLER_ID,
1706 SWITCH_ACTIVE };
1707 String[] portColumns = { PORT_ID, PORT_SWITCH };
1708 IResultSet switchResultSet = null;
1709 try {
1710 OperatorPredicate op =
1711 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1712 OperatorPredicate.Operator.EQ,
1713 controllerId);
1714 switchResultSet =
1715 storageSource.executeQuery(SWITCH_TABLE_NAME,
1716 switchColumns,
1717 op, null);
1718 while (switchResultSet.next()) {
1719 IResultSet portResultSet = null;
1720 try {
1721 String datapathId =
1722 switchResultSet.getString(SWITCH_DATAPATH_ID);
1723 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1724 op = new OperatorPredicate(PORT_SWITCH,
1725 OperatorPredicate.Operator.EQ,
1726 datapathId);
1727 portResultSet =
1728 storageSource.executeQuery(PORT_TABLE_NAME,
1729 portColumns,
1730 op, null);
1731 while (portResultSet.next()) {
1732 portResultSet.deleteRow();
1733 }
1734 portResultSet.save();
1735 }
1736 finally {
1737 if (portResultSet != null)
1738 portResultSet.close();
1739 }
1740 }
1741 switchResultSet.save();
1742 }
1743 finally {
1744 if (switchResultSet != null)
1745 switchResultSet.close();
1746 }
1747 }
1748
1749 protected void updateControllerInfo() {
1750 updateAllInactiveSwitchInfo();
1751
1752 // Write out the controller info to the storage source
1753 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1754 String id = getControllerId();
1755 controllerInfo.put(CONTROLLER_ID, id);
1756 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1757 }
1758
1759 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1760 if (role == Role.SLAVE) {
1761 return;
1762 }
1763 // Obtain the row info for the switch
1764 Map<String, Object> switchInfo = new HashMap<String, Object>();
1765 String datapathIdString = sw.getStringId();
1766 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1767 String controllerId = getControllerId();
1768 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1769 Date connectedSince = sw.getConnectedSince();
1770 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1771 Channel channel = sw.getChannel();
1772 SocketAddress socketAddress = channel.getRemoteAddress();
1773 if (socketAddress != null) {
1774 String socketAddressString = socketAddress.toString();
1775 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1776 if (socketAddress instanceof InetSocketAddress) {
1777 InetSocketAddress inetSocketAddress =
1778 (InetSocketAddress)socketAddress;
1779 InetAddress inetAddress = inetSocketAddress.getAddress();
1780 String ip = inetAddress.getHostAddress();
1781 switchInfo.put(SWITCH_IP, ip);
1782 }
1783 }
1784
1785 // Write out the switch features info
1786 long capabilities = U32.f(sw.getCapabilities());
1787 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1788 long buffers = U32.f(sw.getBuffers());
1789 switchInfo.put(SWITCH_BUFFERS, buffers);
1790 long tables = U32.f(sw.getTables());
1791 switchInfo.put(SWITCH_TABLES, tables);
1792 long actions = U32.f(sw.getActions());
1793 switchInfo.put(SWITCH_ACTIONS, actions);
1794 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1795
1796 // Update the switch
1797 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1798
1799 // Update the ports
1800 for (OFPhysicalPort port: sw.getPorts()) {
1801 updatePortInfo(sw, port);
1802 }
1803 }
1804
1805 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1806 if (role == Role.SLAVE) {
1807 return;
1808 }
1809 log.debug("Update DB with inactiveSW {}", sw);
1810 // Update the controller info in the storage source to be inactive
1811 Map<String, Object> switchInfo = new HashMap<String, Object>();
1812 String datapathIdString = sw.getStringId();
1813 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1814 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1815 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1816 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1817 }
1818
1819 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1820 if (role == Role.SLAVE) {
1821 return;
1822 }
1823 String datapathIdString = sw.getStringId();
1824 Map<String, Object> portInfo = new HashMap<String, Object>();
1825 int portNumber = U16.f(port.getPortNumber());
1826 String id = datapathIdString + "|" + portNumber;
1827 portInfo.put(PORT_ID, id);
1828 portInfo.put(PORT_SWITCH, datapathIdString);
1829 portInfo.put(PORT_NUMBER, portNumber);
1830 byte[] hardwareAddress = port.getHardwareAddress();
1831 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1832 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1833 String name = port.getName();
1834 portInfo.put(PORT_NAME, name);
1835 long config = U32.f(port.getConfig());
1836 portInfo.put(PORT_CONFIG, config);
1837 long state = U32.f(port.getState());
1838 portInfo.put(PORT_STATE, state);
1839 long currentFeatures = U32.f(port.getCurrentFeatures());
1840 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1841 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1842 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1843 long supportedFeatures = U32.f(port.getSupportedFeatures());
1844 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1845 long peerFeatures = U32.f(port.getPeerFeatures());
1846 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1847 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1848 }
1849
1850 /**
1851 * Read switch port data from storage and write it into a switch object
1852 * @param sw the switch to update
1853 */
1854 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1855 OperatorPredicate op =
1856 new OperatorPredicate(PORT_SWITCH,
1857 OperatorPredicate.Operator.EQ,
1858 sw.getStringId());
1859 IResultSet portResultSet =
1860 storageSource.executeQuery(PORT_TABLE_NAME,
1861 null, op, null);
1862 //Map<Short, OFPhysicalPort> oldports =
1863 // new HashMap<Short, OFPhysicalPort>();
1864 //oldports.putAll(sw.getPorts());
1865
1866 while (portResultSet.next()) {
1867 try {
1868 OFPhysicalPort p = new OFPhysicalPort();
1869 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1870 p.setName(portResultSet.getString(PORT_NAME));
1871 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1872 p.setState((int)portResultSet.getLong(PORT_STATE));
1873 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1874 p.setHardwareAddress(HexString.fromHexString(portMac));
1875 p.setCurrentFeatures((int)portResultSet.
1876 getLong(PORT_CURRENT_FEATURES));
1877 p.setAdvertisedFeatures((int)portResultSet.
1878 getLong(PORT_ADVERTISED_FEATURES));
1879 p.setSupportedFeatures((int)portResultSet.
1880 getLong(PORT_SUPPORTED_FEATURES));
1881 p.setPeerFeatures((int)portResultSet.
1882 getLong(PORT_PEER_FEATURES));
1883 //oldports.remove(Short.valueOf(p.getPortNumber()));
1884 sw.setPort(p);
1885 } catch (NullPointerException e) {
1886 // ignore
1887 }
1888 }
1889 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1890 try {
1891 this.updates.put(update);
1892 } catch (InterruptedException e) {
1893 log.error("Failure adding update to queue", e);
1894 }
1895 }
1896
1897 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1898 if (role == Role.SLAVE) {
1899 return;
1900 }
1901 String datapathIdString = sw.getStringId();
1902 String id = datapathIdString + "|" + portNumber;
1903 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1904 }
1905
1906 /**
1907 * Sets the initial role based on properties in the config params.
1908 * It looks for two different properties.
1909 * If the "role" property is specified then the value should be
1910 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1911 * controller is set to the specified value. If the "role" property
1912 * is not specified then it looks next for the "role.path" property.
1913 * In this case the value should be the path to a property file in
1914 * the file system that contains a property called "floodlight.role"
1915 * which can be one of the values listed above for the "role" property.
1916 * The idea behind the "role.path" mechanism is that you have some
1917 * separate heartbeat and master controller election algorithm that
1918 * determines the role of the controller. When a role transition happens,
1919 * it updates the current role in the file specified by the "role.path"
1920 * file. Then if floodlight restarts for some reason it can get the
1921 * correct current role of the controller from the file.
1922 * @param configParams The config params for the FloodlightProvider service
1923 * @return A valid role if role information is specified in the
1924 * config params, otherwise null
1925 */
1926 @LogMessageDocs({
1927 @LogMessageDoc(message="Controller role set to {role}",
1928 explanation="Setting the initial HA role to "),
1929 @LogMessageDoc(level="ERROR",
1930 message="Invalid current role value: {role}",
1931 explanation="An invalid HA role value was read from the " +
1932 "properties file",
1933 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1934 })
1935 protected Role getInitialRole(Map<String, String> configParams) {
1936 Role role = null;
1937 String roleString = configParams.get("role");
1938 if (roleString == null) {
1939 String rolePath = configParams.get("rolepath");
1940 if (rolePath != null) {
1941 Properties properties = new Properties();
1942 try {
1943 properties.load(new FileInputStream(rolePath));
1944 roleString = properties.getProperty("floodlight.role");
1945 }
1946 catch (IOException exc) {
1947 // Don't treat it as an error if the file specified by the
1948 // rolepath property doesn't exist. This lets us enable the
1949 // HA mechanism by just creating/setting the floodlight.role
1950 // property in that file without having to modify the
1951 // floodlight properties.
1952 }
1953 }
1954 }
1955
1956 if (roleString != null) {
1957 // Canonicalize the string to the form used for the enum constants
1958 roleString = roleString.trim().toUpperCase();
1959 try {
1960 role = Role.valueOf(roleString);
1961 }
1962 catch (IllegalArgumentException exc) {
1963 log.error("Invalid current role value: {}", roleString);
1964 }
1965 }
1966
1967 log.info("Controller role set to {}", role);
1968
1969 return role;
1970 }
1971
1972 /**
1973 * Tell controller that we're ready to accept switches loop
1974 * @throws IOException
1975 */
1976 @LogMessageDocs({
1977 @LogMessageDoc(message="Listening for switch connections on {address}",
1978 explanation="The controller is ready and listening for new" +
1979 " switch connections"),
1980 @LogMessageDoc(message="Storage exception in controller " +
1981 "updates loop; terminating process",
1982 explanation=ERROR_DATABASE,
1983 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1984 @LogMessageDoc(level="ERROR",
1985 message="Exception in controller updates loop",
1986 explanation="Failed to dispatch controller event",
1987 recommendation=LogMessageDoc.GENERIC_ACTION)
1988 })
1989 public void run() {
1990 if (log.isDebugEnabled()) {
1991 logListeners();
1992 }
1993
1994 try {
1995 final ServerBootstrap bootstrap = createServerBootStrap();
1996
1997 bootstrap.setOption("reuseAddr", true);
1998 bootstrap.setOption("child.keepAlive", true);
1999 bootstrap.setOption("child.tcpNoDelay", true);
2000 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2001
2002 ChannelPipelineFactory pfact =
2003 new OpenflowPipelineFactory(this, null);
2004 bootstrap.setPipelineFactory(pfact);
2005 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2006 final ChannelGroup cg = new DefaultChannelGroup();
2007 cg.add(bootstrap.bind(sa));
2008
2009 log.info("Listening for switch connections on {}", sa);
2010 } catch (Exception e) {
2011 throw new RuntimeException(e);
2012 }
2013
2014 // main loop
2015 while (true) {
2016 try {
2017 IUpdate update = updates.take();
2018 update.dispatch();
2019 } catch (InterruptedException e) {
2020 return;
2021 } catch (StorageException e) {
2022 log.error("Storage exception in controller " +
2023 "updates loop; terminating process", e);
2024 return;
2025 } catch (Exception e) {
2026 log.error("Exception in controller updates loop", e);
2027 }
2028 }
2029 }
2030
2031 private ServerBootstrap createServerBootStrap() {
2032 if (workerThreads == 0) {
2033 return new ServerBootstrap(
2034 new NioServerSocketChannelFactory(
2035 Executors.newCachedThreadPool(),
2036 Executors.newCachedThreadPool()));
2037 } else {
2038 return new ServerBootstrap(
2039 new NioServerSocketChannelFactory(
2040 Executors.newCachedThreadPool(),
2041 Executors.newCachedThreadPool(), workerThreads));
2042 }
2043 }
2044
2045 public void setConfigParams(Map<String, String> configParams) {
2046 String ofPort = configParams.get("openflowport");
2047 if (ofPort != null) {
2048 this.openFlowPort = Integer.parseInt(ofPort);
2049 }
2050 log.debug("OpenFlow port set to {}", this.openFlowPort);
2051 String threads = configParams.get("workerthreads");
2052 if (threads != null) {
2053 this.workerThreads = Integer.parseInt(threads);
2054 }
2055 log.debug("Number of worker threads set to {}", this.workerThreads);
2056 String controllerId = configParams.get("controllerid");
2057 if (controllerId != null) {
2058 this.controllerId = controllerId;
2059 }
2060 log.debug("ControllerId set to {}", this.controllerId);
2061 }
2062
2063 private void initVendorMessages() {
2064 // Configure openflowj to be able to parse the role request/reply
2065 // vendor messages.
2066 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2067 OFNiciraVendorData.NX_VENDOR_ID, 4);
2068 OFVendorId.registerVendorId(niciraVendorId);
2069 OFBasicVendorDataType roleRequestVendorData =
2070 new OFBasicVendorDataType(
2071 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2072 OFRoleRequestVendorData.getInstantiable());
2073 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2074 OFBasicVendorDataType roleReplyVendorData =
2075 new OFBasicVendorDataType(
2076 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2077 OFRoleReplyVendorData.getInstantiable());
2078 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2079 }
2080
2081 /**
2082 * Initialize internal data structures
2083 */
2084 public void init(Map<String, String> configParams) {
2085 // These data structures are initialized here because other
2086 // module's startUp() might be called before ours
2087 this.messageListeners =
2088 new ConcurrentHashMap<OFType,
2089 ListenerDispatcher<OFType,
2090 IOFMessageListener>>();
2091 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2092 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2093 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2094 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2095 this.controllerNodeIPsCache = new HashMap<String, String>();
2096 this.updates = new LinkedBlockingQueue<IUpdate>();
2097 this.factory = new BasicFactory();
2098 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2099 setConfigParams(configParams);
2100 this.role = getInitialRole(configParams);
2101 this.roleChanger = new RoleChanger();
2102 initVendorMessages();
2103 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002104 }
2105
2106 /**
2107 * Startup all of the controller's components
2108 */
2109 @LogMessageDoc(message="Waiting for storage source",
2110 explanation="The system database is not yet ready",
2111 recommendation="If this message persists, this indicates " +
2112 "that the system database has failed to start. " +
2113 LogMessageDoc.CHECK_CONTROLLER)
2114 public void startupComponents() {
2115 // Create the table names we use
2116 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2117 storageSource.createTable(SWITCH_TABLE_NAME, null);
2118 storageSource.createTable(PORT_TABLE_NAME, null);
2119 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2120 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2121 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2122 CONTROLLER_ID);
2123 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2124 SWITCH_DATAPATH_ID);
2125 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2126 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2127 CONTROLLER_INTERFACE_ID);
2128 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2129
2130 while (true) {
2131 try {
2132 updateControllerInfo();
2133 break;
2134 }
2135 catch (StorageException e) {
2136 log.info("Waiting for storage source");
2137 try {
2138 Thread.sleep(1000);
2139 } catch (InterruptedException e1) {
2140 }
2141 }
2142 }
2143
2144 // Add our REST API
2145 restApi.addRestletRoutable(new CoreWebRoutable());
2146 }
2147
2148 @Override
2149 public void addInfoProvider(String type, IInfoProvider provider) {
2150 if (!providerMap.containsKey(type)) {
2151 providerMap.put(type, new ArrayList<IInfoProvider>());
2152 }
2153 providerMap.get(type).add(provider);
2154 }
2155
2156 @Override
2157 public void removeInfoProvider(String type, IInfoProvider provider) {
2158 if (!providerMap.containsKey(type)) {
2159 log.debug("Provider type {} doesn't exist.", type);
2160 return;
2161 }
2162
2163 providerMap.get(type).remove(provider);
2164 }
2165
2166 public Map<String, Object> getControllerInfo(String type) {
2167 if (!providerMap.containsKey(type)) return null;
2168
2169 Map<String, Object> result = new LinkedHashMap<String, Object>();
2170 for (IInfoProvider provider : providerMap.get(type)) {
2171 result.putAll(provider.getInfo(type));
2172 }
2173
2174 return result;
2175 }
2176
2177 @Override
2178 public void addHAListener(IHAListener listener) {
2179 this.haListeners.add(listener);
2180 }
2181
2182 @Override
2183 public void removeHAListener(IHAListener listener) {
2184 this.haListeners.remove(listener);
2185 }
2186
2187
2188 /**
2189 * Handle changes to the controller nodes IPs and dispatch update.
2190 */
2191 @SuppressWarnings("unchecked")
2192 protected void handleControllerNodeIPChanges() {
2193 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2194 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2195 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2196 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2197 CONTROLLER_INTERFACE_TYPE,
2198 CONTROLLER_INTERFACE_NUMBER,
2199 CONTROLLER_INTERFACE_DISCOVERED_IP };
2200 synchronized(controllerNodeIPsCache) {
2201 // We currently assume that interface Ethernet0 is the relevant
2202 // controller interface. Might change.
2203 // We could (should?) implement this using
2204 // predicates, but creating the individual and compound predicate
2205 // seems more overhead then just checking every row. Particularly,
2206 // since the number of rows is small and changes infrequent
2207 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2208 colNames,null, null);
2209 while (res.next()) {
2210 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2211 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2212 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2213 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2214 String curIP = controllerNodeIPsCache.get(controllerID);
2215
2216 curControllerNodeIPs.put(controllerID, discoveredIP);
2217 if (curIP == null) {
2218 // new controller node IP
2219 addedControllerNodeIPs.put(controllerID, discoveredIP);
2220 }
2221 else if (curIP != discoveredIP) {
2222 // IP changed
2223 removedControllerNodeIPs.put(controllerID, curIP);
2224 addedControllerNodeIPs.put(controllerID, discoveredIP);
2225 }
2226 }
2227 }
2228 // Now figure out if rows have been deleted. We can't use the
2229 // rowKeys from rowsDeleted directly, since the tables primary
2230 // key is a compound that we can't disassemble
2231 Set<String> curEntries = curControllerNodeIPs.keySet();
2232 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2233 removedEntries.removeAll(curEntries);
2234 for (String removedControllerID : removedEntries)
2235 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2236 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2237 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2238 curControllerNodeIPs, addedControllerNodeIPs,
2239 removedControllerNodeIPs);
2240 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2241 try {
2242 this.updates.put(update);
2243 } catch (InterruptedException e) {
2244 log.error("Failure adding update to queue", e);
2245 }
2246 }
2247 }
2248 }
2249
2250 @Override
2251 public Map<String, String> getControllerNodeIPs() {
2252 // We return a copy of the mapping so we can guarantee that
2253 // the mapping return is the same as one that will be (or was)
2254 // dispatched to IHAListeners
2255 HashMap<String,String> retval = new HashMap<String,String>();
2256 synchronized(controllerNodeIPsCache) {
2257 retval.putAll(controllerNodeIPsCache);
2258 }
2259 return retval;
2260 }
2261
2262 @Override
2263 public void rowsModified(String tableName, Set<Object> rowKeys) {
2264 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2265 handleControllerNodeIPChanges();
2266 }
2267
2268 }
2269
2270 @Override
2271 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2272 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2273 handleControllerNodeIPChanges();
2274 }
2275 }
2276
2277 @Override
2278 public long getSystemStartTime() {
2279 return (this.systemStartTime);
2280 }
2281
2282 @Override
2283 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2284 this.alwaysClearFlowsOnSwAdd = value;
2285 }
2286
2287 public boolean getAlwaysClearFlowsOnSwAdd() {
2288 return this.alwaysClearFlowsOnSwAdd;
2289 }
2290}