blob: f288ae4f495dc1a148e51a84f1d270b8f99f6890 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.INetMapStorage.DM_OPERATION;
58import net.floodlightcontroller.core.INetMapTopologyService.ITopoRouteService;
59import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080060import net.floodlightcontroller.core.IOFSwitch;
61import net.floodlightcontroller.core.IOFSwitchFilter;
62import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berde8557a462013-01-07 08:59:31 -080063import net.floodlightcontroller.core.ISwitchStorage.SwitchState;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080064import net.floodlightcontroller.core.annotations.LogMessageDoc;
65import net.floodlightcontroller.core.annotations.LogMessageDocs;
66import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
67import net.floodlightcontroller.core.util.ListenerDispatcher;
68import net.floodlightcontroller.core.web.CoreWebRoutable;
69import net.floodlightcontroller.counter.ICounterStoreService;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -080070import net.floodlightcontroller.flowcache.IFlowService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080071import net.floodlightcontroller.packet.Ethernet;
72import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
73import net.floodlightcontroller.restserver.IRestApiService;
74import net.floodlightcontroller.storage.IResultSet;
75import net.floodlightcontroller.storage.IStorageSourceListener;
76import net.floodlightcontroller.storage.IStorageSourceService;
77import net.floodlightcontroller.storage.OperatorPredicate;
78import net.floodlightcontroller.storage.StorageException;
79import net.floodlightcontroller.threadpool.IThreadPoolService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080080import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartd10008d2013-02-23 17:04:08 -080081import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080082
83import org.jboss.netty.bootstrap.ServerBootstrap;
84import org.jboss.netty.buffer.ChannelBuffer;
85import org.jboss.netty.buffer.ChannelBuffers;
86import org.jboss.netty.channel.Channel;
87import org.jboss.netty.channel.ChannelHandlerContext;
88import org.jboss.netty.channel.ChannelPipelineFactory;
89import org.jboss.netty.channel.ChannelStateEvent;
90import org.jboss.netty.channel.ChannelUpstreamHandler;
91import org.jboss.netty.channel.Channels;
92import org.jboss.netty.channel.ExceptionEvent;
93import org.jboss.netty.channel.MessageEvent;
94import org.jboss.netty.channel.group.ChannelGroup;
95import org.jboss.netty.channel.group.DefaultChannelGroup;
96import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
97import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
98import org.jboss.netty.handler.timeout.IdleStateEvent;
99import org.jboss.netty.handler.timeout.ReadTimeoutException;
100import org.openflow.protocol.OFEchoReply;
101import org.openflow.protocol.OFError;
102import org.openflow.protocol.OFError.OFBadActionCode;
103import org.openflow.protocol.OFError.OFBadRequestCode;
104import org.openflow.protocol.OFError.OFErrorType;
105import org.openflow.protocol.OFError.OFFlowModFailedCode;
106import org.openflow.protocol.OFError.OFHelloFailedCode;
107import org.openflow.protocol.OFError.OFPortModFailedCode;
108import org.openflow.protocol.OFError.OFQueueOpFailedCode;
109import org.openflow.protocol.OFFeaturesReply;
110import org.openflow.protocol.OFGetConfigReply;
111import org.openflow.protocol.OFMessage;
112import org.openflow.protocol.OFPacketIn;
113import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800116import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800117import org.openflow.protocol.OFPortStatus.OFPortReason;
118import org.openflow.protocol.OFSetConfig;
119import org.openflow.protocol.OFStatisticsRequest;
120import org.openflow.protocol.OFSwitchConfig;
121import org.openflow.protocol.OFType;
122import org.openflow.protocol.OFVendor;
123import org.openflow.protocol.factory.BasicFactory;
124import org.openflow.protocol.factory.MessageParseException;
125import org.openflow.protocol.statistics.OFDescriptionStatistics;
126import org.openflow.protocol.statistics.OFStatistics;
127import org.openflow.protocol.statistics.OFStatisticsType;
128import org.openflow.protocol.vendor.OFBasicVendorDataType;
129import org.openflow.protocol.vendor.OFBasicVendorId;
130import org.openflow.protocol.vendor.OFVendorId;
131import org.openflow.util.HexString;
132import org.openflow.util.U16;
133import org.openflow.util.U32;
134import org.openflow.vendor.nicira.OFNiciraVendorData;
135import org.openflow.vendor.nicira.OFRoleReplyVendorData;
136import org.openflow.vendor.nicira.OFRoleRequestVendorData;
137import org.openflow.vendor.nicira.OFRoleVendorData;
138import org.slf4j.Logger;
139import org.slf4j.LoggerFactory;
140
141
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800142
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800143/**
144 * The main controller class. Handles all setup and network listeners
145 */
146public class Controller implements IFloodlightProviderService,
147 IStorageSourceListener {
Pankaj Berde29ab7fc2013-01-25 06:17:52 -0800148
149 ThreadLocal<SwitchStorageImpl> store = new ThreadLocal<SwitchStorageImpl>() {
150 @Override
151 protected SwitchStorageImpl initialValue() {
152 SwitchStorageImpl swStore = new SwitchStorageImpl();
153 //TODO: Get the file path from global properties
154 swStore.init("/tmp/cassandra.titan");
155 return swStore;
156 }
157 };
158
159 protected SwitchStorageImpl swStore = store.get();
Pankaj Berde8557a462013-01-07 08:59:31 -0800160
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800161 protected static Logger log = LoggerFactory.getLogger(Controller.class);
162
163 private static final String ERROR_DATABASE =
164 "The controller could not communicate with the system database.";
165
166 protected BasicFactory factory;
167 protected ConcurrentMap<OFType,
168 ListenerDispatcher<OFType,IOFMessageListener>>
169 messageListeners;
170 // The activeSwitches map contains only those switches that are actively
171 // being controlled by us -- it doesn't contain switches that are
172 // in the slave role
173 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
174 // connectedSwitches contains all connected switches, including ones where
175 // we're a slave controller. We need to keep track of them so that we can
176 // send role request messages to switches when our role changes to master
177 // We add a switch to this set after it successfully completes the
178 // handshake. Access to this Set needs to be synchronized with roleChanger
179 protected HashSet<OFSwitchImpl> connectedSwitches;
180
181 // The controllerNodeIPsCache maps Controller IDs to their IP address.
182 // It's only used by handleControllerNodeIPsChanged
183 protected HashMap<String, String> controllerNodeIPsCache;
184
185 protected Set<IOFSwitchListener> switchListeners;
186 protected Set<IHAListener> haListeners;
187 protected Map<String, List<IInfoProvider>> providerMap;
188 protected BlockingQueue<IUpdate> updates;
189
190 // Module dependencies
191 protected IRestApiService restApi;
192 protected ICounterStoreService counterStore = null;
193 protected IStorageSourceService storageSource;
194 protected IPktInProcessingTimeService pktinProcTime;
195 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800196 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800197 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800198 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800199
200 // Configuration options
201 protected int openFlowPort = 6633;
202 protected int workerThreads = 0;
203 // The id for this controller node. Should be unique for each controller
204 // node in a controller cluster.
205 protected String controllerId = "localhost";
206
207 // The current role of the controller.
208 // If the controller isn't configured to support roles, then this is null.
209 protected Role role;
210 // A helper that handles sending and timeout handling for role requests
211 protected RoleChanger roleChanger;
212
213 // Start time of the controller
214 protected long systemStartTime;
215
216 // Flag to always flush flow table on switch reconnect (HA or otherwise)
217 protected boolean alwaysClearFlowsOnSwAdd = false;
218
219 // Storage table names
220 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
221 protected static final String CONTROLLER_ID = "id";
222
223 protected static final String SWITCH_TABLE_NAME = "controller_switch";
224 protected static final String SWITCH_DATAPATH_ID = "dpid";
225 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
226 protected static final String SWITCH_IP = "ip";
227 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
228 protected static final String SWITCH_ACTIVE = "active";
229 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
230 protected static final String SWITCH_CAPABILITIES = "capabilities";
231 protected static final String SWITCH_BUFFERS = "buffers";
232 protected static final String SWITCH_TABLES = "tables";
233 protected static final String SWITCH_ACTIONS = "actions";
234
235 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
236 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
237
238 protected static final String PORT_TABLE_NAME = "controller_port";
239 protected static final String PORT_ID = "id";
240 protected static final String PORT_SWITCH = "switch_id";
241 protected static final String PORT_NUMBER = "number";
242 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
243 protected static final String PORT_NAME = "name";
244 protected static final String PORT_CONFIG = "config";
245 protected static final String PORT_STATE = "state";
246 protected static final String PORT_CURRENT_FEATURES = "current_features";
247 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
248 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
249 protected static final String PORT_PEER_FEATURES = "peer_features";
250
251 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
252 protected static final String CONTROLLER_INTERFACE_ID = "id";
253 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
254 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
255 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
256 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
257
258
259
260 // Perf. related configuration
261 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
262 protected static final int BATCH_MAX_SIZE = 100;
263 protected static final boolean ALWAYS_DECODE_ETH = true;
264
265 /**
266 * Updates handled by the main loop
267 */
268 protected interface IUpdate {
269 /**
270 * Calls the appropriate listeners
271 */
272 public void dispatch();
273 }
274 public enum SwitchUpdateType {
275 ADDED,
276 REMOVED,
277 PORTCHANGED
278 }
279 /**
280 * Update message indicating a switch was added or removed
281 */
282 protected class SwitchUpdate implements IUpdate {
283 public IOFSwitch sw;
284 public SwitchUpdateType switchUpdateType;
285 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
286 this.sw = sw;
287 this.switchUpdateType = switchUpdateType;
288 }
289 public void dispatch() {
290 if (log.isTraceEnabled()) {
291 log.trace("Dispatching switch update {} {}",
292 sw, switchUpdateType);
293 }
294 if (switchListeners != null) {
295 for (IOFSwitchListener listener : switchListeners) {
296 switch(switchUpdateType) {
297 case ADDED:
298 listener.addedSwitch(sw);
299 break;
300 case REMOVED:
301 listener.removedSwitch(sw);
302 break;
303 case PORTCHANGED:
304 listener.switchPortChanged(sw.getId());
305 break;
306 }
307 }
308 }
309 }
310 }
311
312 /**
313 * Update message indicating controller's role has changed
314 */
315 protected class HARoleUpdate implements IUpdate {
316 public Role oldRole;
317 public Role newRole;
318 public HARoleUpdate(Role newRole, Role oldRole) {
319 this.oldRole = oldRole;
320 this.newRole = newRole;
321 }
322 public void dispatch() {
323 // Make sure that old and new roles are different.
324 if (oldRole == newRole) {
325 if (log.isTraceEnabled()) {
326 log.trace("HA role update ignored as the old and " +
327 "new roles are the same. newRole = {}" +
328 "oldRole = {}", newRole, oldRole);
329 }
330 return;
331 }
332 if (log.isTraceEnabled()) {
333 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
334 newRole, oldRole);
335 }
336 if (haListeners != null) {
337 for (IHAListener listener : haListeners) {
338 listener.roleChanged(oldRole, newRole);
339 }
340 }
341 }
342 }
343
344 /**
345 * Update message indicating
346 * IPs of controllers in controller cluster have changed.
347 */
348 protected class HAControllerNodeIPUpdate implements IUpdate {
349 public Map<String,String> curControllerNodeIPs;
350 public Map<String,String> addedControllerNodeIPs;
351 public Map<String,String> removedControllerNodeIPs;
352 public HAControllerNodeIPUpdate(
353 HashMap<String,String> curControllerNodeIPs,
354 HashMap<String,String> addedControllerNodeIPs,
355 HashMap<String,String> removedControllerNodeIPs) {
356 this.curControllerNodeIPs = curControllerNodeIPs;
357 this.addedControllerNodeIPs = addedControllerNodeIPs;
358 this.removedControllerNodeIPs = removedControllerNodeIPs;
359 }
360 public void dispatch() {
361 if (log.isTraceEnabled()) {
362 log.trace("Dispatching HA Controller Node IP update "
363 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
364 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
365 removedControllerNodeIPs }
366 );
367 }
368 if (haListeners != null) {
369 for (IHAListener listener: haListeners) {
370 listener.controllerNodeIPsChanged(curControllerNodeIPs,
371 addedControllerNodeIPs, removedControllerNodeIPs);
372 }
373 }
374 }
375 }
376
377 // ***************
378 // Getters/Setters
379 // ***************
380
381 public void setStorageSourceService(IStorageSourceService storageSource) {
382 this.storageSource = storageSource;
383 }
384
385 public void setCounterStore(ICounterStoreService counterStore) {
386 this.counterStore = counterStore;
387 }
388
389 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
390 this.pktinProcTime = pits;
391 }
392
393 public void setRestApiService(IRestApiService restApi) {
394 this.restApi = restApi;
395 }
396
397 public void setThreadPoolService(IThreadPoolService tp) {
398 this.threadPool = tp;
399 }
400
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800401 public void setFlowService(IFlowService serviceImpl) {
402 this.flowService = serviceImpl;
403 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800404
405 public void setTopoRouteService(ITopoRouteService serviceImpl) {
406 this.topoRouteService = serviceImpl;
407 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800408
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800409 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800410 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800411 }
412
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800413 @Override
414 public Role getRole() {
415 synchronized(roleChanger) {
416 return role;
417 }
418 }
419
420 @Override
421 public void setRole(Role role) {
422 if (role == null) throw new NullPointerException("Role can not be null.");
423 if (role == Role.MASTER && this.role == Role.SLAVE) {
424 // Reset db state to Inactive for all switches.
425 updateAllInactiveSwitchInfo();
426 }
427
428 // Need to synchronize to ensure a reliable ordering on role request
429 // messages send and to ensure the list of connected switches is stable
430 // RoleChanger will handle the actual sending of the message and
431 // timeout handling
432 // @see RoleChanger
433 synchronized(roleChanger) {
434 if (role.equals(this.role)) {
435 log.debug("Ignoring role change: role is already {}", role);
436 return;
437 }
438
439 Role oldRole = this.role;
440 this.role = role;
441
442 log.debug("Submitting role change request to role {}", role);
443 roleChanger.submitRequest(connectedSwitches, role);
444
445 // Enqueue an update for our listeners.
446 try {
447 this.updates.put(new HARoleUpdate(role, oldRole));
448 } catch (InterruptedException e) {
449 log.error("Failure adding update to queue", e);
450 }
451 }
452 }
453
454
455
456 // **********************
457 // ChannelUpstreamHandler
458 // **********************
459
460 /**
461 * Return a new channel handler for processing a switch connections
462 * @param state The channel state object for the connection
463 * @return the new channel handler
464 */
465 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
466 return new OFChannelHandler(state);
467 }
468
469 /**
470 * Channel handler deals with the switch connection and dispatches
471 * switch messages to the appropriate locations.
472 * @author readams
473 */
474 protected class OFChannelHandler
475 extends IdleStateAwareChannelUpstreamHandler {
476 protected OFSwitchImpl sw;
477 protected OFChannelState state;
478
479 public OFChannelHandler(OFChannelState state) {
480 this.state = state;
481 }
482
483 @Override
484 @LogMessageDoc(message="New switch connection from {ip address}",
485 explanation="A new switch has connected from the " +
486 "specified IP address")
487 public void channelConnected(ChannelHandlerContext ctx,
488 ChannelStateEvent e) throws Exception {
489 log.info("New switch connection from {}",
490 e.getChannel().getRemoteAddress());
491
492 sw = new OFSwitchImpl();
493 sw.setChannel(e.getChannel());
494 sw.setFloodlightProvider(Controller.this);
495 sw.setThreadPoolService(threadPool);
496
497 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
498 msglist.add(factory.getMessage(OFType.HELLO));
499 e.getChannel().write(msglist);
500
501 }
502
503 @Override
504 @LogMessageDoc(message="Disconnected switch {switch information}",
505 explanation="The specified switch has disconnected.")
506 public void channelDisconnected(ChannelHandlerContext ctx,
507 ChannelStateEvent e) throws Exception {
508 if (sw != null && state.hsState == HandshakeState.READY) {
509 if (activeSwitches.containsKey(sw.getId())) {
510 // It's safe to call removeSwitch even though the map might
511 // not contain this particular switch but another with the
512 // same DPID
513 removeSwitch(sw);
514 }
515 synchronized(roleChanger) {
516 connectedSwitches.remove(sw);
517 }
518 sw.setConnected(false);
519 }
520 log.info("Disconnected switch {}", sw);
521 }
522
523 @Override
524 @LogMessageDocs({
525 @LogMessageDoc(level="ERROR",
526 message="Disconnecting switch {switch} due to read timeout",
527 explanation="The connected switch has failed to send any " +
528 "messages or respond to echo requests",
529 recommendation=LogMessageDoc.CHECK_SWITCH),
530 @LogMessageDoc(level="ERROR",
531 message="Disconnecting switch {switch}: failed to " +
532 "complete handshake",
533 explanation="The switch did not respond correctly " +
534 "to handshake messages",
535 recommendation=LogMessageDoc.CHECK_SWITCH),
536 @LogMessageDoc(level="ERROR",
537 message="Disconnecting switch {switch} due to IO Error: {}",
538 explanation="There was an error communicating with the switch",
539 recommendation=LogMessageDoc.CHECK_SWITCH),
540 @LogMessageDoc(level="ERROR",
541 message="Disconnecting switch {switch} due to switch " +
542 "state error: {error}",
543 explanation="The switch sent an unexpected message",
544 recommendation=LogMessageDoc.CHECK_SWITCH),
545 @LogMessageDoc(level="ERROR",
546 message="Disconnecting switch {switch} due to " +
547 "message parse failure",
548 explanation="Could not parse a message from the switch",
549 recommendation=LogMessageDoc.CHECK_SWITCH),
550 @LogMessageDoc(level="ERROR",
551 message="Terminating controller due to storage exception",
552 explanation=ERROR_DATABASE,
553 recommendation=LogMessageDoc.CHECK_CONTROLLER),
554 @LogMessageDoc(level="ERROR",
555 message="Could not process message: queue full",
556 explanation="OpenFlow messages are arriving faster than " +
557 " the controller can process them.",
558 recommendation=LogMessageDoc.CHECK_CONTROLLER),
559 @LogMessageDoc(level="ERROR",
560 message="Error while processing message " +
561 "from switch {switch} {cause}",
562 explanation="An error occurred processing the switch message",
563 recommendation=LogMessageDoc.GENERIC_ACTION)
564 })
565 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
566 throws Exception {
567 if (e.getCause() instanceof ReadTimeoutException) {
568 // switch timeout
569 log.error("Disconnecting switch {} due to read timeout", sw);
570 ctx.getChannel().close();
571 } else if (e.getCause() instanceof HandshakeTimeoutException) {
572 log.error("Disconnecting switch {}: failed to complete handshake",
573 sw);
574 ctx.getChannel().close();
575 } else if (e.getCause() instanceof ClosedChannelException) {
576 //log.warn("Channel for sw {} already closed", sw);
577 } else if (e.getCause() instanceof IOException) {
578 log.error("Disconnecting switch {} due to IO Error: {}",
579 sw, e.getCause().getMessage());
580 ctx.getChannel().close();
581 } else if (e.getCause() instanceof SwitchStateException) {
582 log.error("Disconnecting switch {} due to switch state error: {}",
583 sw, e.getCause().getMessage());
584 ctx.getChannel().close();
585 } else if (e.getCause() instanceof MessageParseException) {
586 log.error("Disconnecting switch " + sw +
587 " due to message parse failure",
588 e.getCause());
589 ctx.getChannel().close();
590 } else if (e.getCause() instanceof StorageException) {
591 log.error("Terminating controller due to storage exception",
592 e.getCause());
593 terminate();
594 } else if (e.getCause() instanceof RejectedExecutionException) {
595 log.warn("Could not process message: queue full");
596 } else {
597 log.error("Error while processing message from switch " + sw,
598 e.getCause());
599 ctx.getChannel().close();
600 }
601 }
602
603 @Override
604 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
605 throws Exception {
606 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
607 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
608 e.getChannel().write(msglist);
609 }
610
611 @Override
612 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
613 throws Exception {
614 if (e.getMessage() instanceof List) {
615 @SuppressWarnings("unchecked")
616 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
617
618 for (OFMessage ofm : msglist) {
619 try {
620 processOFMessage(ofm);
621 }
622 catch (Exception ex) {
623 // We are the last handler in the stream, so run the
624 // exception through the channel again by passing in
625 // ctx.getChannel().
626 Channels.fireExceptionCaught(ctx.getChannel(), ex);
627 }
628 }
629
630 // Flush all flow-mods/packet-out generated from this "train"
631 OFSwitchImpl.flush_all();
632 }
633 }
634
635 /**
636 * Process the request for the switch description
637 */
638 @LogMessageDoc(level="ERROR",
639 message="Exception in reading description " +
640 " during handshake {exception}",
641 explanation="Could not process the switch description string",
642 recommendation=LogMessageDoc.CHECK_SWITCH)
643 void processSwitchDescReply() {
644 try {
645 // Read description, if it has been updated
646 @SuppressWarnings("unchecked")
647 Future<List<OFStatistics>> desc_future =
648 (Future<List<OFStatistics>>)sw.
649 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
650 List<OFStatistics> values =
651 desc_future.get(0, TimeUnit.MILLISECONDS);
652 if (values != null) {
653 OFDescriptionStatistics description =
654 new OFDescriptionStatistics();
655 ChannelBuffer data =
656 ChannelBuffers.buffer(description.getLength());
657 for (OFStatistics f : values) {
658 f.writeTo(data);
659 description.readFrom(data);
660 break; // SHOULD be a list of length 1
661 }
662 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
663 description);
664 sw.setSwitchProperties(description);
665 data = null;
666
667 // At this time, also set other switch properties from storage
668 boolean is_core_switch = false;
669 IResultSet resultSet = null;
670 try {
671 String swid = sw.getStringId();
672 resultSet =
673 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
674 for (Iterator<IResultSet> it =
675 resultSet.iterator(); it.hasNext();) {
676 // In case of multiple rows, use the status
677 // in last row?
678 Map<String, Object> row = it.next().getRow();
679 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
680 if (log.isDebugEnabled()) {
681 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
682 "config for switch={}, is-core={}",
683 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
684 }
685 String ics =
686 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
687 is_core_switch = ics.equals("true");
688 }
689 }
690 }
691 finally {
692 if (resultSet != null)
693 resultSet.close();
694 }
695 if (is_core_switch) {
696 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
697 new Boolean(true));
698 }
699 }
700 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
701 state.hasDescription = true;
702 checkSwitchReady();
703 }
704 catch (InterruptedException ex) {
705 // Ignore
706 }
707 catch (TimeoutException ex) {
708 // Ignore
709 } catch (Exception ex) {
710 log.error("Exception in reading description " +
711 " during handshake", ex);
712 }
713 }
714
715 /**
716 * Send initial switch setup information that we need before adding
717 * the switch
718 * @throws IOException
719 */
720 void sendHelloConfiguration() throws IOException {
721 // Send initial Features Request
722 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
723 }
724
725 /**
726 * Send the configuration requests we can only do after we have
727 * the features reply
728 * @throws IOException
729 */
730 void sendFeatureReplyConfiguration() throws IOException {
731 // Ensure we receive the full packet via PacketIn
732 OFSetConfig config = (OFSetConfig) factory
733 .getMessage(OFType.SET_CONFIG);
734 config.setMissSendLength((short) 0xffff)
735 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
736 sw.write(config, null);
737 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
738 null);
739
740 // Get Description to set switch-specific flags
741 OFStatisticsRequest req = new OFStatisticsRequest();
742 req.setStatisticType(OFStatisticsType.DESC);
743 req.setLengthU(req.getLengthU());
744 Future<List<OFStatistics>> dfuture =
745 sw.getStatistics(req);
746 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
747 dfuture);
748
749 }
750
751 protected void checkSwitchReady() {
752 if (state.hsState == HandshakeState.FEATURES_REPLY &&
753 state.hasDescription && state.hasGetConfigReply) {
754
755 state.hsState = HandshakeState.READY;
756
757 synchronized(roleChanger) {
758 // We need to keep track of all of the switches that are connected
759 // to the controller, in any role, so that we can later send the
760 // role request messages when the controller role changes.
761 // We need to be synchronized while doing this: we must not
762 // send a another role request to the connectedSwitches until
763 // we were able to add this new switch to connectedSwitches
764 // *and* send the current role to the new switch.
765 connectedSwitches.add(sw);
766
767 if (role != null) {
768 // Send a role request if role support is enabled for the controller
769 // This is a probe that we'll use to determine if the switch
770 // actually supports the role request message. If it does we'll
771 // get back a role reply message. If it doesn't we'll get back an
772 // OFError message.
773 // If role is MASTER we will promote switch to active
774 // list when we receive the switch's role reply messages
775 log.debug("This controller's role is {}, " +
776 "sending initial role request msg to {}",
777 role, sw);
778 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
779 swList.add(sw);
780 roleChanger.submitRequest(swList, role);
781 }
782 else {
783 // Role supported not enabled on controller (for now)
784 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800785 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800786 "not sending role request msg to {}",
787 role, sw);
788 // Need to clear FlowMods before we add the switch
789 // and dispatch updates otherwise we have a race condition.
790 sw.clearAllFlowMods();
791 addSwitch(sw);
792 state.firstRoleReplyReceived = true;
793 }
794 }
795 }
796 }
797
798 /* Handle a role reply message we received from the switch. Since
799 * netty serializes message dispatch we don't need to synchronize
800 * against other receive operations from the same switch, so no need
801 * to synchronize addSwitch(), removeSwitch() operations from the same
802 * connection.
803 * FIXME: However, when a switch with the same DPID connects we do
804 * need some synchronization. However, handling switches with same
805 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
806 * removedSwitch notification):1
807 *
808 */
809 @LogMessageDoc(level="ERROR",
810 message="Invalid role value in role reply message",
811 explanation="Was unable to set the HA role (master or slave) " +
812 "for the controller.",
813 recommendation=LogMessageDoc.CHECK_CONTROLLER)
814 protected void handleRoleReplyMessage(OFVendor vendorMessage,
815 OFRoleReplyVendorData roleReplyVendorData) {
816 // Map from the role code in the message to our role enum
817 int nxRole = roleReplyVendorData.getRole();
818 Role role = null;
819 switch (nxRole) {
820 case OFRoleVendorData.NX_ROLE_OTHER:
821 role = Role.EQUAL;
822 break;
823 case OFRoleVendorData.NX_ROLE_MASTER:
824 role = Role.MASTER;
825 break;
826 case OFRoleVendorData.NX_ROLE_SLAVE:
827 role = Role.SLAVE;
828 break;
829 default:
830 log.error("Invalid role value in role reply message");
831 sw.getChannel().close();
832 return;
833 }
834
835 log.debug("Handling role reply for role {} from {}. " +
836 "Controller's role is {} ",
837 new Object[] { role, sw, Controller.this.role}
838 );
839
840 sw.deliverRoleReply(vendorMessage.getXid(), role);
841
842 boolean isActive = activeSwitches.containsKey(sw.getId());
843 if (!isActive && sw.isActive()) {
844 // Transition from SLAVE to MASTER.
845
846 if (!state.firstRoleReplyReceived ||
847 getAlwaysClearFlowsOnSwAdd()) {
848 // This is the first role-reply message we receive from
849 // this switch or roles were disabled when the switch
850 // connected:
851 // Delete all pre-existing flows for new connections to
852 // the master
853 //
854 // FIXME: Need to think more about what the test should
855 // be for when we flush the flow-table? For example,
856 // if all the controllers are temporarily in the backup
857 // role (e.g. right after a failure of the master
858 // controller) at the point the switch connects, then
859 // all of the controllers will initially connect as
860 // backup controllers and not flush the flow-table.
861 // Then when one of them is promoted to master following
862 // the master controller election the flow-table
863 // will still not be flushed because that's treated as
864 // a failover event where we don't want to flush the
865 // flow-table. The end result would be that the flow
866 // table for a newly connected switch is never
867 // flushed. Not sure how to handle that case though...
868 sw.clearAllFlowMods();
869 log.debug("First role reply from master switch {}, " +
870 "clear FlowTable to active switch list",
871 HexString.toHexString(sw.getId()));
872 }
873
874 // Some switches don't seem to update us with port
875 // status messages while in slave role.
876 readSwitchPortStateFromStorage(sw);
877
878 // Only add the switch to the active switch list if
879 // we're not in the slave role. Note that if the role
880 // attribute is null, then that means that the switch
881 // doesn't support the role request messages, so in that
882 // case we're effectively in the EQUAL role and the
883 // switch should be included in the active switch list.
884 addSwitch(sw);
885 log.debug("Added master switch {} to active switch list",
886 HexString.toHexString(sw.getId()));
887
888 }
889 else if (isActive && !sw.isActive()) {
890 // Transition from MASTER to SLAVE: remove switch
891 // from active switch list.
892 log.debug("Removed slave switch {} from active switch" +
893 " list", HexString.toHexString(sw.getId()));
894 removeSwitch(sw);
895 }
896
897 // Indicate that we have received a role reply message.
898 state.firstRoleReplyReceived = true;
899 }
900
901 protected boolean handleVendorMessage(OFVendor vendorMessage) {
902 boolean shouldHandleMessage = false;
903 int vendor = vendorMessage.getVendor();
904 switch (vendor) {
905 case OFNiciraVendorData.NX_VENDOR_ID:
906 OFNiciraVendorData niciraVendorData =
907 (OFNiciraVendorData)vendorMessage.getVendorData();
908 int dataType = niciraVendorData.getDataType();
909 switch (dataType) {
910 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
911 OFRoleReplyVendorData roleReplyVendorData =
912 (OFRoleReplyVendorData) niciraVendorData;
913 handleRoleReplyMessage(vendorMessage,
914 roleReplyVendorData);
915 break;
916 default:
917 log.warn("Unhandled Nicira VENDOR message; " +
918 "data type = {}", dataType);
919 break;
920 }
921 break;
922 default:
923 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
924 break;
925 }
926
927 return shouldHandleMessage;
928 }
929
930 /**
931 * Dispatch an Openflow message from a switch to the appropriate
932 * handler.
933 * @param m The message to process
934 * @throws IOException
935 * @throws SwitchStateException
936 */
937 @LogMessageDocs({
938 @LogMessageDoc(level="WARN",
939 message="Config Reply from {switch} has " +
940 "miss length set to {length}",
941 explanation="The controller requires that the switch " +
942 "use a miss length of 0xffff for correct " +
943 "function",
944 recommendation="Use a different switch to ensure " +
945 "correct function"),
946 @LogMessageDoc(level="WARN",
947 message="Received ERROR from sw {switch} that "
948 +"indicates roles are not supported "
949 +"but we have received a valid "
950 +"role reply earlier",
951 explanation="The switch sent a confusing message to the" +
952 "controller")
953 })
954 protected void processOFMessage(OFMessage m)
955 throws IOException, SwitchStateException {
956 boolean shouldHandleMessage = false;
957
958 switch (m.getType()) {
959 case HELLO:
960 if (log.isTraceEnabled())
961 log.trace("HELLO from {}", sw);
962
963 if (state.hsState.equals(HandshakeState.START)) {
964 state.hsState = HandshakeState.HELLO;
965 sendHelloConfiguration();
966 } else {
967 throw new SwitchStateException("Unexpected HELLO from "
968 + sw);
969 }
970 break;
971 case ECHO_REQUEST:
972 OFEchoReply reply =
973 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
974 reply.setXid(m.getXid());
975 sw.write(reply, null);
976 break;
977 case ECHO_REPLY:
978 break;
979 case FEATURES_REPLY:
980 if (log.isTraceEnabled())
981 log.trace("Features Reply from {}", sw);
982
983 sw.setFeaturesReply((OFFeaturesReply) m);
984 if (state.hsState.equals(HandshakeState.HELLO)) {
985 sendFeatureReplyConfiguration();
986 state.hsState = HandshakeState.FEATURES_REPLY;
987 // uncomment to enable "dumb" switches like cbench
988 // state.hsState = HandshakeState.READY;
989 // addSwitch(sw);
990 } else {
991 // return results to rest api caller
992 sw.deliverOFFeaturesReply(m);
993 // update database */
994 updateActiveSwitchInfo(sw);
995 }
996 break;
997 case GET_CONFIG_REPLY:
998 if (log.isTraceEnabled())
999 log.trace("Get config reply from {}", sw);
1000
1001 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1002 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1003 throw new SwitchStateException(em);
1004 }
1005 OFGetConfigReply cr = (OFGetConfigReply) m;
1006 if (cr.getMissSendLength() == (short)0xffff) {
1007 log.trace("Config Reply from {} confirms " +
1008 "miss length set to 0xffff", sw);
1009 } else {
1010 log.warn("Config Reply from {} has " +
1011 "miss length set to {}",
1012 sw, cr.getMissSendLength() & 0xffff);
1013 }
1014 state.hasGetConfigReply = true;
1015 checkSwitchReady();
1016 break;
1017 case VENDOR:
1018 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1019 break;
1020 case ERROR:
1021 // TODO: we need better error handling. Especially for
1022 // request/reply style message (stats, roles) we should have
1023 // a unified way to lookup the xid in the error message.
1024 // This will probable involve rewriting the way we handle
1025 // request/reply style messages.
1026 OFError error = (OFError) m;
1027 boolean shouldLogError = true;
1028 // TODO: should we check that firstRoleReplyReceived is false,
1029 // i.e., check only whether the first request fails?
1030 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1031 boolean isBadVendorError =
1032 (error.getErrorType() == OFError.OFErrorType.
1033 OFPET_BAD_REQUEST.getValue());
1034 // We expect to receive a bad vendor error when
1035 // we're connected to a switch that doesn't support
1036 // the Nicira vendor extensions (i.e. not OVS or
1037 // derived from OVS). By protocol, it should also be
1038 // BAD_VENDOR, but too many switch implementations
1039 // get it wrong and we can already check the xid()
1040 // so we can ignore the type with confidence that this
1041 // is not a spurious error
1042 shouldLogError = !isBadVendorError;
1043 if (isBadVendorError) {
1044 if (state.firstRoleReplyReceived && (role != null)) {
1045 log.warn("Received ERROR from sw {} that "
1046 +"indicates roles are not supported "
1047 +"but we have received a valid "
1048 +"role reply earlier", sw);
1049 }
1050 state.firstRoleReplyReceived = true;
1051 sw.deliverRoleRequestNotSupported(error.getXid());
1052 synchronized(roleChanger) {
1053 if (sw.role == null && Controller.this.role==Role.SLAVE) {
1054 // the switch doesn't understand role request
1055 // messages and the current controller role is
1056 // slave. We need to disconnect the switch.
1057 // @see RoleChanger for rationale
1058 sw.getChannel().close();
1059 }
1060 else if (sw.role == null) {
1061 // Controller's role is master: add to
1062 // active
1063 // TODO: check if clearing flow table is
1064 // right choice here.
1065 // Need to clear FlowMods before we add the switch
1066 // and dispatch updates otherwise we have a race condition.
1067 // TODO: switch update is async. Won't we still have a potential
1068 // race condition?
1069 sw.clearAllFlowMods();
1070 addSwitch(sw);
1071 }
1072 }
1073 }
1074 else {
1075 // TODO: Is this the right thing to do if we receive
1076 // some other error besides a bad vendor error?
1077 // Presumably that means the switch did actually
1078 // understand the role request message, but there
1079 // was some other error from processing the message.
1080 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1081 // error code, but it doesn't look like the Nicira
1082 // role request has that. Should check OVS source
1083 // code to see if it's possible for any other errors
1084 // to be returned.
1085 // If we received an error the switch is not
1086 // in the correct role, so we need to disconnect it.
1087 // We could also resend the request but then we need to
1088 // check if there are other pending request in which
1089 // case we shouldn't resend. If we do resend we need
1090 // to make sure that the switch eventually accepts one
1091 // of our requests or disconnect the switch. This feels
1092 // cumbersome.
1093 sw.getChannel().close();
1094 }
1095 }
1096 // Once we support OF 1.2, we'd add code to handle it here.
1097 //if (error.getXid() == state.ofRoleRequestXid) {
1098 //}
1099 if (shouldLogError)
1100 logError(sw, error);
1101 break;
1102 case STATS_REPLY:
1103 if (state.hsState.ordinal() <
1104 HandshakeState.FEATURES_REPLY.ordinal()) {
1105 String em = "Unexpected STATS_REPLY from " + sw;
1106 throw new SwitchStateException(em);
1107 }
1108 sw.deliverStatisticsReply(m);
1109 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1110 processSwitchDescReply();
1111 }
1112 break;
1113 case PORT_STATUS:
1114 // We want to update our port state info even if we're in
1115 // the slave role, but we only want to update storage if
1116 // we're the master (or equal).
1117 boolean updateStorage = state.hsState.
1118 equals(HandshakeState.READY) &&
1119 (sw.getRole() != Role.SLAVE);
1120 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1121 shouldHandleMessage = true;
1122 break;
1123
1124 default:
1125 shouldHandleMessage = true;
1126 break;
1127 }
1128
1129 if (shouldHandleMessage) {
1130 sw.getListenerReadLock().lock();
1131 try {
1132 if (sw.isConnected()) {
1133 if (!state.hsState.equals(HandshakeState.READY)) {
1134 log.debug("Ignoring message type {} received " +
1135 "from switch {} before switch is " +
1136 "fully configured.", m.getType(), sw);
1137 }
1138 // Check if the controller is in the slave role for the
1139 // switch. If it is, then don't dispatch the message to
1140 // the listeners.
1141 // TODO: Should we dispatch messages that we expect to
1142 // receive when we're in the slave role, e.g. port
1143 // status messages? Since we're "hiding" switches from
1144 // the listeners when we're in the slave role, then it
1145 // seems a little weird to dispatch port status messages
1146 // to them. On the other hand there might be special
1147 // modules that care about all of the connected switches
1148 // and would like to receive port status notifications.
1149 else if (sw.getRole() == Role.SLAVE) {
1150 // Don't log message if it's a port status message
1151 // since we expect to receive those from the switch
1152 // and don't want to emit spurious messages.
1153 if (m.getType() != OFType.PORT_STATUS) {
1154 log.debug("Ignoring message type {} received " +
1155 "from switch {} while in the slave role.",
1156 m.getType(), sw);
1157 }
1158 } else {
1159 handleMessage(sw, m, null);
1160 }
1161 }
1162 }
1163 finally {
1164 sw.getListenerReadLock().unlock();
1165 }
1166 }
1167 }
1168 }
1169
1170 // ****************
1171 // Message handlers
1172 // ****************
1173
1174 protected void handlePortStatusMessage(IOFSwitch sw,
1175 OFPortStatus m,
1176 boolean updateStorage) {
1177 short portNumber = m.getDesc().getPortNumber();
1178 OFPhysicalPort port = m.getDesc();
1179 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001180 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1181 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001182 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001183 if (!portDown) {
Pankaj Berde6debb042013-01-16 18:04:32 -08001184 swStore.addPort(sw.getStringId(), port);
1185 } else {
1186 swStore.deletePort(sw.getStringId(), port.getPortNumber());
1187 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001188 if (updateStorage)
1189 updatePortInfo(sw, port);
1190 log.debug("Port #{} modified for {}", portNumber, sw);
1191 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1192 sw.setPort(port);
Pankaj Berde8557a462013-01-07 08:59:31 -08001193 swStore.addPort(sw.getStringId(), port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001194 if (updateStorage)
1195 updatePortInfo(sw, port);
1196 log.debug("Port #{} added for {}", portNumber, sw);
1197 } else if (m.getReason() ==
1198 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1199 sw.deletePort(portNumber);
Pankaj Berde8557a462013-01-07 08:59:31 -08001200 swStore.deletePort(sw.getStringId(), portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001201 if (updateStorage)
1202 removePortInfo(sw, portNumber);
1203 log.debug("Port #{} deleted for {}", portNumber, sw);
1204 }
1205 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1206 try {
1207 this.updates.put(update);
1208 } catch (InterruptedException e) {
1209 log.error("Failure adding update to queue", e);
1210 }
1211 }
1212
1213 /**
1214 * flcontext_cache - Keep a thread local stack of contexts
1215 */
1216 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1217 new ThreadLocal <Stack<FloodlightContext>> () {
1218 @Override
1219 protected Stack<FloodlightContext> initialValue() {
1220 return new Stack<FloodlightContext>();
1221 }
1222 };
1223
1224 /**
1225 * flcontext_alloc - pop a context off the stack, if required create a new one
1226 * @return FloodlightContext
1227 */
1228 protected static FloodlightContext flcontext_alloc() {
1229 FloodlightContext flcontext = null;
1230
1231 if (flcontext_cache.get().empty()) {
1232 flcontext = new FloodlightContext();
1233 }
1234 else {
1235 flcontext = flcontext_cache.get().pop();
1236 }
1237
1238 return flcontext;
1239 }
1240
1241 /**
1242 * flcontext_free - Free the context to the current thread
1243 * @param flcontext
1244 */
1245 protected void flcontext_free(FloodlightContext flcontext) {
1246 flcontext.getStorage().clear();
1247 flcontext_cache.get().push(flcontext);
1248 }
1249
1250 /**
1251 * Handle replies to certain OFMessages, and pass others off to listeners
1252 * @param sw The switch for the message
1253 * @param m The message
1254 * @param bContext The floodlight context. If null then floodlight context would
1255 * be allocated in this function
1256 * @throws IOException
1257 */
1258 @LogMessageDocs({
1259 @LogMessageDoc(level="ERROR",
1260 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1261 " field is empty.",
1262 explanation="The switch sent an improperly-formatted PacketIn" +
1263 " message",
1264 recommendation=LogMessageDoc.CHECK_SWITCH),
1265 @LogMessageDoc(level="WARN",
1266 message="Unhandled OF Message: {} from {}",
1267 explanation="The switch sent a message not handled by " +
1268 "the controller")
1269 })
1270 protected void handleMessage(IOFSwitch sw, OFMessage m,
1271 FloodlightContext bContext)
1272 throws IOException {
1273 Ethernet eth = null;
1274
1275 switch (m.getType()) {
1276 case PACKET_IN:
1277 OFPacketIn pi = (OFPacketIn)m;
1278
1279 if (pi.getPacketData().length <= 0) {
1280 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1281 ") because the data field is empty.");
1282 return;
1283 }
1284
1285 if (Controller.ALWAYS_DECODE_ETH) {
1286 eth = new Ethernet();
1287 eth.deserialize(pi.getPacketData(), 0,
1288 pi.getPacketData().length);
1289 counterStore.updatePacketInCounters(sw, m, eth);
1290 }
1291 // fall through to default case...
1292
1293 default:
1294
1295 List<IOFMessageListener> listeners = null;
1296 if (messageListeners.containsKey(m.getType())) {
1297 listeners = messageListeners.get(m.getType()).
1298 getOrderedListeners();
1299 }
1300
1301 FloodlightContext bc = null;
1302 if (listeners != null) {
1303 // Check if floodlight context is passed from the calling
1304 // function, if so use that floodlight context, otherwise
1305 // allocate one
1306 if (bContext == null) {
1307 bc = flcontext_alloc();
1308 } else {
1309 bc = bContext;
1310 }
1311 if (eth != null) {
1312 IFloodlightProviderService.bcStore.put(bc,
1313 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1314 eth);
1315 }
1316
1317 // Get the starting time (overall and per-component) of
1318 // the processing chain for this packet if performance
1319 // monitoring is turned on
1320 pktinProcTime.bootstrap(listeners);
1321 pktinProcTime.recordStartTimePktIn();
1322 Command cmd;
1323 for (IOFMessageListener listener : listeners) {
1324 if (listener instanceof IOFSwitchFilter) {
1325 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1326 continue;
1327 }
1328 }
1329
1330 pktinProcTime.recordStartTimeComp(listener);
1331 cmd = listener.receive(sw, m, bc);
1332 pktinProcTime.recordEndTimeComp(listener);
1333
1334 if (Command.STOP.equals(cmd)) {
1335 break;
1336 }
1337 }
1338 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1339 } else {
1340 log.warn("Unhandled OF Message: {} from {}", m, sw);
1341 }
1342
1343 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1344 }
1345 }
1346
1347 /**
1348 * Log an OpenFlow error message from a switch
1349 * @param sw The switch that sent the error
1350 * @param error The error message
1351 */
1352 @LogMessageDoc(level="ERROR",
1353 message="Error {error type} {error code} from {switch}",
1354 explanation="The switch responded with an unexpected error" +
1355 "to an OpenFlow message from the controller",
1356 recommendation="This could indicate improper network operation. " +
1357 "If the problem persists restarting the switch and " +
1358 "controller may help."
1359 )
1360 protected void logError(IOFSwitch sw, OFError error) {
1361 int etint = 0xffff & error.getErrorType();
1362 if (etint < 0 || etint >= OFErrorType.values().length) {
1363 log.error("Unknown error code {} from sw {}", etint, sw);
1364 }
1365 OFErrorType et = OFErrorType.values()[etint];
1366 switch (et) {
1367 case OFPET_HELLO_FAILED:
1368 OFHelloFailedCode hfc =
1369 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1370 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1371 break;
1372 case OFPET_BAD_REQUEST:
1373 OFBadRequestCode brc =
1374 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1375 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1376 break;
1377 case OFPET_BAD_ACTION:
1378 OFBadActionCode bac =
1379 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1380 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1381 break;
1382 case OFPET_FLOW_MOD_FAILED:
1383 OFFlowModFailedCode fmfc =
1384 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1385 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1386 break;
1387 case OFPET_PORT_MOD_FAILED:
1388 OFPortModFailedCode pmfc =
1389 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1390 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1391 break;
1392 case OFPET_QUEUE_OP_FAILED:
1393 OFQueueOpFailedCode qofc =
1394 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1395 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1396 break;
1397 default:
1398 break;
1399 }
1400 }
1401
1402 /**
1403 * Add a switch to the active switch list and call the switch listeners.
1404 * This happens either when a switch first connects (and the controller is
1405 * not in the slave role) or when the role of the controller changes from
1406 * slave to master.
1407 * @param sw the switch that has been added
1408 */
1409 // TODO: need to rethink locking and the synchronous switch update.
1410 // We can / should also handle duplicate DPIDs in connectedSwitches
1411 @LogMessageDoc(level="ERROR",
1412 message="New switch added {switch} for already-added switch {switch}",
1413 explanation="A switch with the same DPID as another switch " +
1414 "connected to the controller. This can be caused by " +
1415 "multiple switches configured with the same DPID, or " +
1416 "by a switch reconnected very quickly after " +
1417 "disconnecting.",
1418 recommendation="If this happens repeatedly, it is likely there " +
1419 "are switches with duplicate DPIDs on the network. " +
1420 "Reconfigure the appropriate switches. If it happens " +
1421 "very rarely, then it is likely this is a transient " +
1422 "network problem that can be ignored."
1423 )
1424 protected void addSwitch(IOFSwitch sw) {
1425 // TODO: is it safe to modify the HashMap without holding
1426 // the old switch's lock?
1427 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1428 if (sw == oldSw) {
1429 // Note == for object equality, not .equals for value
1430 log.info("New add switch for pre-existing switch {}", sw);
1431 return;
1432 }
1433
1434 if (oldSw != null) {
1435 oldSw.getListenerWriteLock().lock();
1436 try {
1437 log.error("New switch added {} for already-added switch {}",
1438 sw, oldSw);
1439 // Set the connected flag to false to suppress calling
1440 // the listeners for this switch in processOFMessage
1441 oldSw.setConnected(false);
1442
1443 oldSw.cancelAllStatisticsReplies();
1444
1445 updateInactiveSwitchInfo(oldSw);
1446
1447 // we need to clean out old switch state definitively
1448 // before adding the new switch
1449 // FIXME: It seems not completely kosher to call the
1450 // switch listeners here. I thought one of the points of
1451 // having the asynchronous switch update mechanism was so
1452 // the addedSwitch and removedSwitch were always called
1453 // from a single thread to simplify concurrency issues
1454 // for the listener.
1455 if (switchListeners != null) {
1456 for (IOFSwitchListener listener : switchListeners) {
1457 listener.removedSwitch(oldSw);
1458 }
1459 }
1460 // will eventually trigger a removeSwitch(), which will cause
1461 // a "Not removing Switch ... already removed debug message.
1462 // TODO: Figure out a way to handle this that avoids the
1463 // spurious debug message.
1464 oldSw.getChannel().close();
1465 }
1466 finally {
1467 oldSw.getListenerWriteLock().unlock();
1468 }
1469 }
1470
1471 updateActiveSwitchInfo(sw);
Pankaj Berde8557a462013-01-07 08:59:31 -08001472 swStore.update(sw.getStringId(), SwitchState.ACTIVE, DM_OPERATION.UPDATE);
Pankaj Berde0fc4e432013-01-12 09:47:22 -08001473 for (OFPhysicalPort port: sw.getPorts()) {
1474 swStore.addPort(sw.getStringId(), port);
1475 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001476 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1477 try {
1478 this.updates.put(update);
1479 } catch (InterruptedException e) {
1480 log.error("Failure adding update to queue", e);
1481 }
1482 }
1483
1484 /**
1485 * Remove a switch from the active switch list and call the switch listeners.
1486 * This happens either when the switch is disconnected or when the
1487 * controller's role for the switch changes from master to slave.
1488 * @param sw the switch that has been removed
1489 */
1490 protected void removeSwitch(IOFSwitch sw) {
1491 // No need to acquire the listener lock, since
1492 // this method is only called after netty has processed all
1493 // pending messages
1494 log.debug("removeSwitch: {}", sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001495 swStore.update(sw.getStringId(), SwitchState.INACTIVE, DM_OPERATION.UPDATE);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001496 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1497 log.debug("Not removing switch {}; already removed", sw);
1498 return;
1499 }
1500 // We cancel all outstanding statistics replies if the switch transition
1501 // from active. In the future we might allow statistics requests
1502 // from slave controllers. Then we need to move this cancelation
1503 // to switch disconnect
1504 sw.cancelAllStatisticsReplies();
Pankaj Berdeafb20532013-01-08 15:05:24 -08001505
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001506
1507 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1508 // here if role support is enabled. In that case if the switch is being
1509 // removed because we've been switched to being in the slave role, then I think
1510 // it's possible that the new master may have already been promoted to master
1511 // and written out the active switch state to storage. If we now execute
1512 // updateInactiveSwitchInfo we may wipe out all of the state that was
1513 // written out by the new master. Maybe need to revisit how we handle all
1514 // of the switch state that's written to storage.
1515
1516 updateInactiveSwitchInfo(sw);
Pankaj Berdeafb20532013-01-08 15:05:24 -08001517
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001518 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1519 try {
1520 this.updates.put(update);
1521 } catch (InterruptedException e) {
1522 log.error("Failure adding update to queue", e);
1523 }
1524 }
1525
1526 // ***************
1527 // IFloodlightProvider
1528 // ***************
1529
1530 @Override
1531 public synchronized void addOFMessageListener(OFType type,
1532 IOFMessageListener listener) {
1533 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1534 messageListeners.get(type);
1535 if (ldd == null) {
1536 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1537 messageListeners.put(type, ldd);
1538 }
1539 ldd.addListener(type, listener);
1540 }
1541
1542 @Override
1543 public synchronized void removeOFMessageListener(OFType type,
1544 IOFMessageListener listener) {
1545 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1546 messageListeners.get(type);
1547 if (ldd != null) {
1548 ldd.removeListener(listener);
1549 }
1550 }
1551
1552 private void logListeners() {
1553 for (Map.Entry<OFType,
1554 ListenerDispatcher<OFType,
1555 IOFMessageListener>> entry
1556 : messageListeners.entrySet()) {
1557
1558 OFType type = entry.getKey();
1559 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1560 entry.getValue();
1561
1562 StringBuffer sb = new StringBuffer();
1563 sb.append("OFListeners for ");
1564 sb.append(type);
1565 sb.append(": ");
1566 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1567 sb.append(l.getName());
1568 sb.append(",");
1569 }
1570 log.debug(sb.toString());
1571 }
1572 }
1573
1574 public void removeOFMessageListeners(OFType type) {
1575 messageListeners.remove(type);
1576 }
1577
1578 @Override
1579 public Map<Long, IOFSwitch> getSwitches() {
1580 return Collections.unmodifiableMap(this.activeSwitches);
1581 }
1582
1583 @Override
1584 public void addOFSwitchListener(IOFSwitchListener listener) {
1585 this.switchListeners.add(listener);
1586 }
1587
1588 @Override
1589 public void removeOFSwitchListener(IOFSwitchListener listener) {
1590 this.switchListeners.remove(listener);
1591 }
1592
1593 @Override
1594 public Map<OFType, List<IOFMessageListener>> getListeners() {
1595 Map<OFType, List<IOFMessageListener>> lers =
1596 new HashMap<OFType, List<IOFMessageListener>>();
1597 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1598 messageListeners.entrySet()) {
1599 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1600 }
1601 return Collections.unmodifiableMap(lers);
1602 }
1603
1604 @Override
1605 @LogMessageDocs({
1606 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1607 "a null switch",
1608 explanation="Failed to process a message because the switch " +
1609 " is no longer connected."),
1610 @LogMessageDoc(level="ERROR",
1611 message="Error reinjecting OFMessage on switch {switch}",
1612 explanation="An I/O error occured while attempting to " +
1613 "process an OpenFlow message",
1614 recommendation=LogMessageDoc.CHECK_SWITCH)
1615 })
1616 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1617 FloodlightContext bc) {
1618 if (sw == null) {
1619 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1620 return false;
1621 }
1622
1623 // FIXME: Do we need to be able to inject messages to switches
1624 // where we're the slave controller (i.e. they're connected but
1625 // not active)?
1626 // FIXME: Don't we need synchronization logic here so we're holding
1627 // the listener read lock when we call handleMessage? After some
1628 // discussions it sounds like the right thing to do here would be to
1629 // inject the message as a netty upstream channel event so it goes
1630 // through the normal netty event processing, including being
1631 // handled
1632 if (!activeSwitches.containsKey(sw.getId())) return false;
1633
1634 try {
1635 // Pass Floodlight context to the handleMessages()
1636 handleMessage(sw, msg, bc);
1637 } catch (IOException e) {
1638 log.error("Error reinjecting OFMessage on switch {}",
1639 HexString.toHexString(sw.getId()));
1640 return false;
1641 }
1642 return true;
1643 }
1644
1645 @Override
1646 @LogMessageDoc(message="Calling System.exit",
1647 explanation="The controller is terminating")
1648 public synchronized void terminate() {
1649 log.info("Calling System.exit");
1650 System.exit(1);
1651 }
1652
1653 @Override
1654 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1655 // call the overloaded version with floodlight context set to null
1656 return injectOfMessage(sw, msg, null);
1657 }
1658
1659 @Override
1660 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1661 FloodlightContext bc) {
1662 if (log.isTraceEnabled()) {
1663 String str = OFMessage.getDataAsString(sw, m, bc);
1664 log.trace("{}", str);
1665 }
1666
1667 List<IOFMessageListener> listeners = null;
1668 if (messageListeners.containsKey(m.getType())) {
1669 listeners =
1670 messageListeners.get(m.getType()).getOrderedListeners();
1671 }
1672
1673 if (listeners != null) {
1674 for (IOFMessageListener listener : listeners) {
1675 if (listener instanceof IOFSwitchFilter) {
1676 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1677 continue;
1678 }
1679 }
1680 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1681 break;
1682 }
1683 }
1684 }
1685 }
1686
1687 @Override
1688 public BasicFactory getOFMessageFactory() {
1689 return factory;
1690 }
1691
1692 @Override
1693 public String getControllerId() {
1694 return controllerId;
1695 }
1696
1697 // **************
1698 // Initialization
1699 // **************
1700
1701 protected void updateAllInactiveSwitchInfo() {
1702 if (role == Role.SLAVE) {
1703 return;
1704 }
1705 String controllerId = getControllerId();
1706 String[] switchColumns = { SWITCH_DATAPATH_ID,
1707 SWITCH_CONTROLLER_ID,
1708 SWITCH_ACTIVE };
1709 String[] portColumns = { PORT_ID, PORT_SWITCH };
1710 IResultSet switchResultSet = null;
1711 try {
1712 OperatorPredicate op =
1713 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1714 OperatorPredicate.Operator.EQ,
1715 controllerId);
1716 switchResultSet =
1717 storageSource.executeQuery(SWITCH_TABLE_NAME,
1718 switchColumns,
1719 op, null);
1720 while (switchResultSet.next()) {
1721 IResultSet portResultSet = null;
1722 try {
1723 String datapathId =
1724 switchResultSet.getString(SWITCH_DATAPATH_ID);
1725 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1726 op = new OperatorPredicate(PORT_SWITCH,
1727 OperatorPredicate.Operator.EQ,
1728 datapathId);
1729 portResultSet =
1730 storageSource.executeQuery(PORT_TABLE_NAME,
1731 portColumns,
1732 op, null);
1733 while (portResultSet.next()) {
1734 portResultSet.deleteRow();
1735 }
1736 portResultSet.save();
1737 }
1738 finally {
1739 if (portResultSet != null)
1740 portResultSet.close();
1741 }
1742 }
1743 switchResultSet.save();
1744 }
1745 finally {
1746 if (switchResultSet != null)
1747 switchResultSet.close();
1748 }
1749 }
1750
1751 protected void updateControllerInfo() {
1752 updateAllInactiveSwitchInfo();
1753
1754 // Write out the controller info to the storage source
1755 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1756 String id = getControllerId();
1757 controllerInfo.put(CONTROLLER_ID, id);
1758 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1759 }
1760
1761 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1762 if (role == Role.SLAVE) {
1763 return;
1764 }
1765 // Obtain the row info for the switch
1766 Map<String, Object> switchInfo = new HashMap<String, Object>();
1767 String datapathIdString = sw.getStringId();
1768 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1769 String controllerId = getControllerId();
1770 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1771 Date connectedSince = sw.getConnectedSince();
1772 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1773 Channel channel = sw.getChannel();
1774 SocketAddress socketAddress = channel.getRemoteAddress();
1775 if (socketAddress != null) {
1776 String socketAddressString = socketAddress.toString();
1777 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1778 if (socketAddress instanceof InetSocketAddress) {
1779 InetSocketAddress inetSocketAddress =
1780 (InetSocketAddress)socketAddress;
1781 InetAddress inetAddress = inetSocketAddress.getAddress();
1782 String ip = inetAddress.getHostAddress();
1783 switchInfo.put(SWITCH_IP, ip);
1784 }
1785 }
1786
1787 // Write out the switch features info
1788 long capabilities = U32.f(sw.getCapabilities());
1789 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1790 long buffers = U32.f(sw.getBuffers());
1791 switchInfo.put(SWITCH_BUFFERS, buffers);
1792 long tables = U32.f(sw.getTables());
1793 switchInfo.put(SWITCH_TABLES, tables);
1794 long actions = U32.f(sw.getActions());
1795 switchInfo.put(SWITCH_ACTIONS, actions);
1796 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1797
1798 // Update the switch
1799 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1800
1801 // Update the ports
1802 for (OFPhysicalPort port: sw.getPorts()) {
1803 updatePortInfo(sw, port);
1804 }
1805 }
1806
1807 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1808 if (role == Role.SLAVE) {
1809 return;
1810 }
1811 log.debug("Update DB with inactiveSW {}", sw);
1812 // Update the controller info in the storage source to be inactive
1813 Map<String, Object> switchInfo = new HashMap<String, Object>();
1814 String datapathIdString = sw.getStringId();
1815 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1816 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1817 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1818 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1819 }
1820
1821 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1822 if (role == Role.SLAVE) {
1823 return;
1824 }
1825 String datapathIdString = sw.getStringId();
1826 Map<String, Object> portInfo = new HashMap<String, Object>();
1827 int portNumber = U16.f(port.getPortNumber());
1828 String id = datapathIdString + "|" + portNumber;
1829 portInfo.put(PORT_ID, id);
1830 portInfo.put(PORT_SWITCH, datapathIdString);
1831 portInfo.put(PORT_NUMBER, portNumber);
1832 byte[] hardwareAddress = port.getHardwareAddress();
1833 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1834 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1835 String name = port.getName();
1836 portInfo.put(PORT_NAME, name);
1837 long config = U32.f(port.getConfig());
1838 portInfo.put(PORT_CONFIG, config);
1839 long state = U32.f(port.getState());
1840 portInfo.put(PORT_STATE, state);
1841 long currentFeatures = U32.f(port.getCurrentFeatures());
1842 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1843 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1844 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1845 long supportedFeatures = U32.f(port.getSupportedFeatures());
1846 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1847 long peerFeatures = U32.f(port.getPeerFeatures());
1848 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1849 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1850 }
1851
1852 /**
1853 * Read switch port data from storage and write it into a switch object
1854 * @param sw the switch to update
1855 */
1856 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1857 OperatorPredicate op =
1858 new OperatorPredicate(PORT_SWITCH,
1859 OperatorPredicate.Operator.EQ,
1860 sw.getStringId());
1861 IResultSet portResultSet =
1862 storageSource.executeQuery(PORT_TABLE_NAME,
1863 null, op, null);
1864 //Map<Short, OFPhysicalPort> oldports =
1865 // new HashMap<Short, OFPhysicalPort>();
1866 //oldports.putAll(sw.getPorts());
1867
1868 while (portResultSet.next()) {
1869 try {
1870 OFPhysicalPort p = new OFPhysicalPort();
1871 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1872 p.setName(portResultSet.getString(PORT_NAME));
1873 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1874 p.setState((int)portResultSet.getLong(PORT_STATE));
1875 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1876 p.setHardwareAddress(HexString.fromHexString(portMac));
1877 p.setCurrentFeatures((int)portResultSet.
1878 getLong(PORT_CURRENT_FEATURES));
1879 p.setAdvertisedFeatures((int)portResultSet.
1880 getLong(PORT_ADVERTISED_FEATURES));
1881 p.setSupportedFeatures((int)portResultSet.
1882 getLong(PORT_SUPPORTED_FEATURES));
1883 p.setPeerFeatures((int)portResultSet.
1884 getLong(PORT_PEER_FEATURES));
1885 //oldports.remove(Short.valueOf(p.getPortNumber()));
1886 sw.setPort(p);
1887 } catch (NullPointerException e) {
1888 // ignore
1889 }
1890 }
1891 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1892 try {
1893 this.updates.put(update);
1894 } catch (InterruptedException e) {
1895 log.error("Failure adding update to queue", e);
1896 }
1897 }
1898
1899 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1900 if (role == Role.SLAVE) {
1901 return;
1902 }
1903 String datapathIdString = sw.getStringId();
1904 String id = datapathIdString + "|" + portNumber;
1905 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1906 }
1907
1908 /**
1909 * Sets the initial role based on properties in the config params.
1910 * It looks for two different properties.
1911 * If the "role" property is specified then the value should be
1912 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1913 * controller is set to the specified value. If the "role" property
1914 * is not specified then it looks next for the "role.path" property.
1915 * In this case the value should be the path to a property file in
1916 * the file system that contains a property called "floodlight.role"
1917 * which can be one of the values listed above for the "role" property.
1918 * The idea behind the "role.path" mechanism is that you have some
1919 * separate heartbeat and master controller election algorithm that
1920 * determines the role of the controller. When a role transition happens,
1921 * it updates the current role in the file specified by the "role.path"
1922 * file. Then if floodlight restarts for some reason it can get the
1923 * correct current role of the controller from the file.
1924 * @param configParams The config params for the FloodlightProvider service
1925 * @return A valid role if role information is specified in the
1926 * config params, otherwise null
1927 */
1928 @LogMessageDocs({
1929 @LogMessageDoc(message="Controller role set to {role}",
1930 explanation="Setting the initial HA role to "),
1931 @LogMessageDoc(level="ERROR",
1932 message="Invalid current role value: {role}",
1933 explanation="An invalid HA role value was read from the " +
1934 "properties file",
1935 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1936 })
1937 protected Role getInitialRole(Map<String, String> configParams) {
1938 Role role = null;
1939 String roleString = configParams.get("role");
1940 if (roleString == null) {
1941 String rolePath = configParams.get("rolepath");
1942 if (rolePath != null) {
1943 Properties properties = new Properties();
1944 try {
1945 properties.load(new FileInputStream(rolePath));
1946 roleString = properties.getProperty("floodlight.role");
1947 }
1948 catch (IOException exc) {
1949 // Don't treat it as an error if the file specified by the
1950 // rolepath property doesn't exist. This lets us enable the
1951 // HA mechanism by just creating/setting the floodlight.role
1952 // property in that file without having to modify the
1953 // floodlight properties.
1954 }
1955 }
1956 }
1957
1958 if (roleString != null) {
1959 // Canonicalize the string to the form used for the enum constants
1960 roleString = roleString.trim().toUpperCase();
1961 try {
1962 role = Role.valueOf(roleString);
1963 }
1964 catch (IllegalArgumentException exc) {
1965 log.error("Invalid current role value: {}", roleString);
1966 }
1967 }
1968
1969 log.info("Controller role set to {}", role);
1970
1971 return role;
1972 }
1973
1974 /**
1975 * Tell controller that we're ready to accept switches loop
1976 * @throws IOException
1977 */
1978 @LogMessageDocs({
1979 @LogMessageDoc(message="Listening for switch connections on {address}",
1980 explanation="The controller is ready and listening for new" +
1981 " switch connections"),
1982 @LogMessageDoc(message="Storage exception in controller " +
1983 "updates loop; terminating process",
1984 explanation=ERROR_DATABASE,
1985 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1986 @LogMessageDoc(level="ERROR",
1987 message="Exception in controller updates loop",
1988 explanation="Failed to dispatch controller event",
1989 recommendation=LogMessageDoc.GENERIC_ACTION)
1990 })
1991 public void run() {
1992 if (log.isDebugEnabled()) {
1993 logListeners();
1994 }
1995
1996 try {
1997 final ServerBootstrap bootstrap = createServerBootStrap();
1998
1999 bootstrap.setOption("reuseAddr", true);
2000 bootstrap.setOption("child.keepAlive", true);
2001 bootstrap.setOption("child.tcpNoDelay", true);
2002 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2003
2004 ChannelPipelineFactory pfact =
2005 new OpenflowPipelineFactory(this, null);
2006 bootstrap.setPipelineFactory(pfact);
2007 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2008 final ChannelGroup cg = new DefaultChannelGroup();
2009 cg.add(bootstrap.bind(sa));
2010
2011 log.info("Listening for switch connections on {}", sa);
2012 } catch (Exception e) {
2013 throw new RuntimeException(e);
2014 }
2015
2016 // main loop
2017 while (true) {
2018 try {
2019 IUpdate update = updates.take();
2020 update.dispatch();
2021 } catch (InterruptedException e) {
2022 return;
2023 } catch (StorageException e) {
2024 log.error("Storage exception in controller " +
2025 "updates loop; terminating process", e);
2026 return;
2027 } catch (Exception e) {
2028 log.error("Exception in controller updates loop", e);
2029 }
2030 }
2031 }
2032
2033 private ServerBootstrap createServerBootStrap() {
2034 if (workerThreads == 0) {
2035 return new ServerBootstrap(
2036 new NioServerSocketChannelFactory(
2037 Executors.newCachedThreadPool(),
2038 Executors.newCachedThreadPool()));
2039 } else {
2040 return new ServerBootstrap(
2041 new NioServerSocketChannelFactory(
2042 Executors.newCachedThreadPool(),
2043 Executors.newCachedThreadPool(), workerThreads));
2044 }
2045 }
2046
2047 public void setConfigParams(Map<String, String> configParams) {
2048 String ofPort = configParams.get("openflowport");
2049 if (ofPort != null) {
2050 this.openFlowPort = Integer.parseInt(ofPort);
2051 }
2052 log.debug("OpenFlow port set to {}", this.openFlowPort);
2053 String threads = configParams.get("workerthreads");
2054 if (threads != null) {
2055 this.workerThreads = Integer.parseInt(threads);
2056 }
2057 log.debug("Number of worker threads set to {}", this.workerThreads);
2058 String controllerId = configParams.get("controllerid");
2059 if (controllerId != null) {
2060 this.controllerId = controllerId;
2061 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002062 else {
2063 //Try to get the hostname of the machine and use that for controller ID
2064 try {
2065 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2066 this.controllerId = hostname;
2067 } catch (UnknownHostException e) {
2068 // Can't get hostname, we'll just use the default
2069 }
2070 }
2071
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002072 log.debug("ControllerId set to {}", this.controllerId);
2073 }
2074
2075 private void initVendorMessages() {
2076 // Configure openflowj to be able to parse the role request/reply
2077 // vendor messages.
2078 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2079 OFNiciraVendorData.NX_VENDOR_ID, 4);
2080 OFVendorId.registerVendorId(niciraVendorId);
2081 OFBasicVendorDataType roleRequestVendorData =
2082 new OFBasicVendorDataType(
2083 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2084 OFRoleRequestVendorData.getInstantiable());
2085 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2086 OFBasicVendorDataType roleReplyVendorData =
2087 new OFBasicVendorDataType(
2088 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2089 OFRoleReplyVendorData.getInstantiable());
2090 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2091 }
2092
2093 /**
2094 * Initialize internal data structures
2095 */
2096 public void init(Map<String, String> configParams) {
2097 // These data structures are initialized here because other
2098 // module's startUp() might be called before ours
2099 this.messageListeners =
2100 new ConcurrentHashMap<OFType,
2101 ListenerDispatcher<OFType,
2102 IOFMessageListener>>();
2103 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2104 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2105 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2106 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2107 this.controllerNodeIPsCache = new HashMap<String, String>();
2108 this.updates = new LinkedBlockingQueue<IUpdate>();
2109 this.factory = new BasicFactory();
2110 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2111 setConfigParams(configParams);
2112 this.role = getInitialRole(configParams);
2113 this.roleChanger = new RoleChanger();
2114 initVendorMessages();
2115 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002116 }
2117
2118 /**
2119 * Startup all of the controller's components
2120 */
2121 @LogMessageDoc(message="Waiting for storage source",
2122 explanation="The system database is not yet ready",
2123 recommendation="If this message persists, this indicates " +
2124 "that the system database has failed to start. " +
2125 LogMessageDoc.CHECK_CONTROLLER)
2126 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002127 try {
2128 registryService.registerController(controllerId);
2129 } catch (RegistryException e2) {
2130 log.warn("Registry service error: {}", e2.getMessage());
2131 }
2132
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002133 // Create the table names we use
2134 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2135 storageSource.createTable(SWITCH_TABLE_NAME, null);
2136 storageSource.createTable(PORT_TABLE_NAME, null);
2137 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2138 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2139 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2140 CONTROLLER_ID);
2141 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2142 SWITCH_DATAPATH_ID);
2143 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2144 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2145 CONTROLLER_INTERFACE_ID);
2146 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2147
2148 while (true) {
2149 try {
2150 updateControllerInfo();
2151 break;
2152 }
2153 catch (StorageException e) {
2154 log.info("Waiting for storage source");
2155 try {
2156 Thread.sleep(1000);
2157 } catch (InterruptedException e1) {
2158 }
2159 }
2160 }
2161
2162 // Add our REST API
2163 restApi.addRestletRoutable(new CoreWebRoutable());
2164 }
2165
2166 @Override
2167 public void addInfoProvider(String type, IInfoProvider provider) {
2168 if (!providerMap.containsKey(type)) {
2169 providerMap.put(type, new ArrayList<IInfoProvider>());
2170 }
2171 providerMap.get(type).add(provider);
2172 }
2173
2174 @Override
2175 public void removeInfoProvider(String type, IInfoProvider provider) {
2176 if (!providerMap.containsKey(type)) {
2177 log.debug("Provider type {} doesn't exist.", type);
2178 return;
2179 }
2180
2181 providerMap.get(type).remove(provider);
2182 }
2183
2184 public Map<String, Object> getControllerInfo(String type) {
2185 if (!providerMap.containsKey(type)) return null;
2186
2187 Map<String, Object> result = new LinkedHashMap<String, Object>();
2188 for (IInfoProvider provider : providerMap.get(type)) {
2189 result.putAll(provider.getInfo(type));
2190 }
2191
2192 return result;
2193 }
2194
2195 @Override
2196 public void addHAListener(IHAListener listener) {
2197 this.haListeners.add(listener);
2198 }
2199
2200 @Override
2201 public void removeHAListener(IHAListener listener) {
2202 this.haListeners.remove(listener);
2203 }
2204
2205
2206 /**
2207 * Handle changes to the controller nodes IPs and dispatch update.
2208 */
2209 @SuppressWarnings("unchecked")
2210 protected void handleControllerNodeIPChanges() {
2211 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2212 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2213 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2214 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2215 CONTROLLER_INTERFACE_TYPE,
2216 CONTROLLER_INTERFACE_NUMBER,
2217 CONTROLLER_INTERFACE_DISCOVERED_IP };
2218 synchronized(controllerNodeIPsCache) {
2219 // We currently assume that interface Ethernet0 is the relevant
2220 // controller interface. Might change.
2221 // We could (should?) implement this using
2222 // predicates, but creating the individual and compound predicate
2223 // seems more overhead then just checking every row. Particularly,
2224 // since the number of rows is small and changes infrequent
2225 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2226 colNames,null, null);
2227 while (res.next()) {
2228 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2229 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2230 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2231 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2232 String curIP = controllerNodeIPsCache.get(controllerID);
2233
2234 curControllerNodeIPs.put(controllerID, discoveredIP);
2235 if (curIP == null) {
2236 // new controller node IP
2237 addedControllerNodeIPs.put(controllerID, discoveredIP);
2238 }
2239 else if (curIP != discoveredIP) {
2240 // IP changed
2241 removedControllerNodeIPs.put(controllerID, curIP);
2242 addedControllerNodeIPs.put(controllerID, discoveredIP);
2243 }
2244 }
2245 }
2246 // Now figure out if rows have been deleted. We can't use the
2247 // rowKeys from rowsDeleted directly, since the tables primary
2248 // key is a compound that we can't disassemble
2249 Set<String> curEntries = curControllerNodeIPs.keySet();
2250 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2251 removedEntries.removeAll(curEntries);
2252 for (String removedControllerID : removedEntries)
2253 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2254 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2255 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2256 curControllerNodeIPs, addedControllerNodeIPs,
2257 removedControllerNodeIPs);
2258 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2259 try {
2260 this.updates.put(update);
2261 } catch (InterruptedException e) {
2262 log.error("Failure adding update to queue", e);
2263 }
2264 }
2265 }
2266 }
2267
2268 @Override
2269 public Map<String, String> getControllerNodeIPs() {
2270 // We return a copy of the mapping so we can guarantee that
2271 // the mapping return is the same as one that will be (or was)
2272 // dispatched to IHAListeners
2273 HashMap<String,String> retval = new HashMap<String,String>();
2274 synchronized(controllerNodeIPsCache) {
2275 retval.putAll(controllerNodeIPsCache);
2276 }
2277 return retval;
2278 }
2279
2280 @Override
2281 public void rowsModified(String tableName, Set<Object> rowKeys) {
2282 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2283 handleControllerNodeIPChanges();
2284 }
2285
2286 }
2287
2288 @Override
2289 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2290 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2291 handleControllerNodeIPChanges();
2292 }
2293 }
2294
2295 @Override
2296 public long getSystemStartTime() {
2297 return (this.systemStartTime);
2298 }
2299
2300 @Override
2301 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2302 this.alwaysClearFlowsOnSwAdd = value;
2303 }
2304
2305 public boolean getAlwaysClearFlowsOnSwAdd() {
2306 return this.alwaysClearFlowsOnSwAdd;
2307 }
2308}