blob: 084d6c437063b234565d630b3e5c8141e3f0a8a6 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080022import java.net.InetSocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080023import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080024import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.util.Collection;
27import java.util.Collections;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.HashMap;
29import java.util.HashSet;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080030import java.util.LinkedHashMap;
31import java.util.List;
32import java.util.Map;
33import java.util.Map.Entry;
34import java.util.Properties;
35import java.util.Set;
36import java.util.Stack;
37import java.util.concurrent.BlockingQueue;
38import java.util.concurrent.ConcurrentHashMap;
39import java.util.concurrent.ConcurrentMap;
40import java.util.concurrent.CopyOnWriteArraySet;
41import java.util.concurrent.Executors;
42import java.util.concurrent.Future;
43import java.util.concurrent.LinkedBlockingQueue;
44import java.util.concurrent.RejectedExecutionException;
45import java.util.concurrent.TimeUnit;
46import java.util.concurrent.TimeoutException;
47
48import net.floodlightcontroller.core.FloodlightContext;
49import net.floodlightcontroller.core.IFloodlightProviderService;
50import net.floodlightcontroller.core.IHAListener;
51import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080052import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080053import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080054import net.floodlightcontroller.core.IOFSwitch;
55import net.floodlightcontroller.core.IOFSwitchFilter;
56import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berdedc73bb12013-08-14 13:46:38 -070057import net.floodlightcontroller.core.IUpdate;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.annotations.LogMessageDoc;
59import net.floodlightcontroller.core.annotations.LogMessageDocs;
60import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
61import net.floodlightcontroller.core.util.ListenerDispatcher;
62import net.floodlightcontroller.core.web.CoreWebRoutable;
63import net.floodlightcontroller.counter.ICounterStoreService;
64import net.floodlightcontroller.packet.Ethernet;
65import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
66import net.floodlightcontroller.restserver.IRestApiService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080067import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070068import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080069import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080070import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080071import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080072
73import org.jboss.netty.bootstrap.ServerBootstrap;
74import org.jboss.netty.buffer.ChannelBuffer;
75import org.jboss.netty.buffer.ChannelBuffers;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080076import org.jboss.netty.channel.ChannelHandlerContext;
77import org.jboss.netty.channel.ChannelPipelineFactory;
78import org.jboss.netty.channel.ChannelStateEvent;
79import org.jboss.netty.channel.ChannelUpstreamHandler;
80import org.jboss.netty.channel.Channels;
81import org.jboss.netty.channel.ExceptionEvent;
82import org.jboss.netty.channel.MessageEvent;
83import org.jboss.netty.channel.group.ChannelGroup;
84import org.jboss.netty.channel.group.DefaultChannelGroup;
85import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
86import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
87import org.jboss.netty.handler.timeout.IdleStateEvent;
88import org.jboss.netty.handler.timeout.ReadTimeoutException;
89import org.openflow.protocol.OFEchoReply;
90import org.openflow.protocol.OFError;
91import org.openflow.protocol.OFError.OFBadActionCode;
92import org.openflow.protocol.OFError.OFBadRequestCode;
93import org.openflow.protocol.OFError.OFErrorType;
94import org.openflow.protocol.OFError.OFFlowModFailedCode;
95import org.openflow.protocol.OFError.OFHelloFailedCode;
96import org.openflow.protocol.OFError.OFPortModFailedCode;
97import org.openflow.protocol.OFError.OFQueueOpFailedCode;
98import org.openflow.protocol.OFFeaturesReply;
99import org.openflow.protocol.OFGetConfigReply;
100import org.openflow.protocol.OFMessage;
101import org.openflow.protocol.OFPacketIn;
102import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800103import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800104import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800105import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800106import org.openflow.protocol.OFPortStatus.OFPortReason;
107import org.openflow.protocol.OFSetConfig;
108import org.openflow.protocol.OFStatisticsRequest;
109import org.openflow.protocol.OFSwitchConfig;
110import org.openflow.protocol.OFType;
111import org.openflow.protocol.OFVendor;
112import org.openflow.protocol.factory.BasicFactory;
113import org.openflow.protocol.factory.MessageParseException;
114import org.openflow.protocol.statistics.OFDescriptionStatistics;
115import org.openflow.protocol.statistics.OFStatistics;
116import org.openflow.protocol.statistics.OFStatisticsType;
117import org.openflow.protocol.vendor.OFBasicVendorDataType;
118import org.openflow.protocol.vendor.OFBasicVendorId;
119import org.openflow.protocol.vendor.OFVendorId;
120import org.openflow.util.HexString;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800121import org.openflow.vendor.nicira.OFNiciraVendorData;
122import org.openflow.vendor.nicira.OFRoleReplyVendorData;
123import org.openflow.vendor.nicira.OFRoleRequestVendorData;
124import org.openflow.vendor.nicira.OFRoleVendorData;
125import org.slf4j.Logger;
126import org.slf4j.LoggerFactory;
127
128
129/**
130 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700131 *
132 * Extensions made by ONOS are:
133 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
134 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
135 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
Pavlin Radoslavova653e9f2013-10-16 03:08:52 -0700136 * - Register ONOS services. (IControllerRegistryService)
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700137 * - Additional DEBUG logs
138 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800139 */
Jonathan Hart2fa28062013-11-25 20:16:28 -0800140public class Controller implements IFloodlightProviderService {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700141
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -0700142 protected final static Logger log = LoggerFactory.getLogger(Controller.class);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800143
144 private static final String ERROR_DATABASE =
145 "The controller could not communicate with the system database.";
146
147 protected BasicFactory factory;
148 protected ConcurrentMap<OFType,
149 ListenerDispatcher<OFType,IOFMessageListener>>
150 messageListeners;
151 // The activeSwitches map contains only those switches that are actively
152 // being controlled by us -- it doesn't contain switches that are
153 // in the slave role
154 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
155 // connectedSwitches contains all connected switches, including ones where
156 // we're a slave controller. We need to keep track of them so that we can
157 // send role request messages to switches when our role changes to master
158 // We add a switch to this set after it successfully completes the
159 // handshake. Access to this Set needs to be synchronized with roleChanger
160 protected HashSet<OFSwitchImpl> connectedSwitches;
161
162 // The controllerNodeIPsCache maps Controller IDs to their IP address.
163 // It's only used by handleControllerNodeIPsChanged
164 protected HashMap<String, String> controllerNodeIPsCache;
165
166 protected Set<IOFSwitchListener> switchListeners;
167 protected Set<IHAListener> haListeners;
168 protected Map<String, List<IInfoProvider>> providerMap;
169 protected BlockingQueue<IUpdate> updates;
170
171 // Module dependencies
172 protected IRestApiService restApi;
173 protected ICounterStoreService counterStore = null;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800174 protected IPktInProcessingTimeService pktinProcTime;
175 protected IThreadPoolService threadPool;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800176 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800177
178 // Configuration options
179 protected int openFlowPort = 6633;
180 protected int workerThreads = 0;
181 // The id for this controller node. Should be unique for each controller
182 // node in a controller cluster.
183 protected String controllerId = "localhost";
184
185 // The current role of the controller.
186 // If the controller isn't configured to support roles, then this is null.
187 protected Role role;
188 // A helper that handles sending and timeout handling for role requests
189 protected RoleChanger roleChanger;
190
191 // Start time of the controller
192 protected long systemStartTime;
193
194 // Flag to always flush flow table on switch reconnect (HA or otherwise)
195 protected boolean alwaysClearFlowsOnSwAdd = false;
196
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800197 // Perf. related configuration
198 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
199 protected static final int BATCH_MAX_SIZE = 100;
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700200 protected static final boolean ALWAYS_DECODE_ETH = true;
201
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800202 public enum SwitchUpdateType {
203 ADDED,
204 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700205 PORTCHANGED,
206 PORTADDED,
207 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800208 }
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700209
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800210 /**
211 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700212 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800213 */
214 protected class SwitchUpdate implements IUpdate {
215 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700216 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800217 public SwitchUpdateType switchUpdateType;
218 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
219 this.sw = sw;
220 this.switchUpdateType = switchUpdateType;
221 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700222 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
223 this.sw = sw;
224 this.port = port;
225 this.switchUpdateType = switchUpdateType;
226 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800227 public void dispatch() {
228 if (log.isTraceEnabled()) {
229 log.trace("Dispatching switch update {} {}",
230 sw, switchUpdateType);
231 }
232 if (switchListeners != null) {
233 for (IOFSwitchListener listener : switchListeners) {
234 switch(switchUpdateType) {
235 case ADDED:
236 listener.addedSwitch(sw);
237 break;
238 case REMOVED:
239 listener.removedSwitch(sw);
240 break;
241 case PORTCHANGED:
242 listener.switchPortChanged(sw.getId());
243 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700244 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700245 if (listener instanceof IOFSwitchPortListener) {
246 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
247 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700248 break;
249 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700250 if (listener instanceof IOFSwitchPortListener) {
251 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
252 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700253 break;
254 default:
255 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800256 }
257 }
258 }
259 }
260 }
261
262 /**
263 * Update message indicating controller's role has changed
264 */
265 protected class HARoleUpdate implements IUpdate {
266 public Role oldRole;
267 public Role newRole;
268 public HARoleUpdate(Role newRole, Role oldRole) {
269 this.oldRole = oldRole;
270 this.newRole = newRole;
271 }
272 public void dispatch() {
273 // Make sure that old and new roles are different.
274 if (oldRole == newRole) {
275 if (log.isTraceEnabled()) {
276 log.trace("HA role update ignored as the old and " +
277 "new roles are the same. newRole = {}" +
278 "oldRole = {}", newRole, oldRole);
279 }
280 return;
281 }
282 if (log.isTraceEnabled()) {
283 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
284 newRole, oldRole);
285 }
286 if (haListeners != null) {
287 for (IHAListener listener : haListeners) {
288 listener.roleChanged(oldRole, newRole);
289 }
290 }
291 }
292 }
293
294 /**
295 * Update message indicating
296 * IPs of controllers in controller cluster have changed.
297 */
298 protected class HAControllerNodeIPUpdate implements IUpdate {
299 public Map<String,String> curControllerNodeIPs;
300 public Map<String,String> addedControllerNodeIPs;
301 public Map<String,String> removedControllerNodeIPs;
302 public HAControllerNodeIPUpdate(
303 HashMap<String,String> curControllerNodeIPs,
304 HashMap<String,String> addedControllerNodeIPs,
305 HashMap<String,String> removedControllerNodeIPs) {
306 this.curControllerNodeIPs = curControllerNodeIPs;
307 this.addedControllerNodeIPs = addedControllerNodeIPs;
308 this.removedControllerNodeIPs = removedControllerNodeIPs;
309 }
310 public void dispatch() {
311 if (log.isTraceEnabled()) {
312 log.trace("Dispatching HA Controller Node IP update "
313 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
314 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
315 removedControllerNodeIPs }
316 );
317 }
318 if (haListeners != null) {
319 for (IHAListener listener: haListeners) {
320 listener.controllerNodeIPsChanged(curControllerNodeIPs,
321 addedControllerNodeIPs, removedControllerNodeIPs);
322 }
323 }
324 }
325 }
326
327 // ***************
328 // Getters/Setters
329 // ***************
330
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800331 public void setCounterStore(ICounterStoreService counterStore) {
332 this.counterStore = counterStore;
333 }
334
335 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
336 this.pktinProcTime = pits;
337 }
338
339 public void setRestApiService(IRestApiService restApi) {
340 this.restApi = restApi;
341 }
342
343 public void setThreadPoolService(IThreadPoolService tp) {
344 this.threadPool = tp;
345 }
346
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800347 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800348 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800349 }
350
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800351 @Override
352 public Role getRole() {
353 synchronized(roleChanger) {
354 return role;
355 }
356 }
357
358 @Override
359 public void setRole(Role role) {
360 if (role == null) throw new NullPointerException("Role can not be null.");
Jonathan Hart2fa28062013-11-25 20:16:28 -0800361 //if (role == Role.MASTER && this.role == Role.SLAVE) {
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800362 // Reset db state to Inactive for all switches.
Jonathan Hart2fa28062013-11-25 20:16:28 -0800363 //updateAllInactiveSwitchInfo();
364 //}
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800365
366 // Need to synchronize to ensure a reliable ordering on role request
367 // messages send and to ensure the list of connected switches is stable
368 // RoleChanger will handle the actual sending of the message and
369 // timeout handling
370 // @see RoleChanger
371 synchronized(roleChanger) {
372 if (role.equals(this.role)) {
373 log.debug("Ignoring role change: role is already {}", role);
374 return;
375 }
376
377 Role oldRole = this.role;
378 this.role = role;
379
380 log.debug("Submitting role change request to role {}", role);
381 roleChanger.submitRequest(connectedSwitches, role);
382
383 // Enqueue an update for our listeners.
384 try {
385 this.updates.put(new HARoleUpdate(role, oldRole));
386 } catch (InterruptedException e) {
387 log.error("Failure adding update to queue", e);
388 }
389 }
390 }
391
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700392 public void publishUpdate(IUpdate update) {
393 try {
394 this.updates.put(update);
395 } catch (InterruptedException e) {
396 log.error("Failure adding update to queue", e);
397 }
398 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800399
400 // **********************
401 // ChannelUpstreamHandler
402 // **********************
403
404 /**
405 * Return a new channel handler for processing a switch connections
406 * @param state The channel state object for the connection
407 * @return the new channel handler
408 */
409 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
410 return new OFChannelHandler(state);
411 }
412
Jonathan Hartcc957a02013-02-26 10:39:04 -0800413 protected class RoleChangeCallback implements ControlChangeCallback {
414 @Override
415 public void controlChanged(long dpid, boolean hasControl) {
416 log.info("Role change callback for switch {}, hasControl {}",
417 HexString.toHexString(dpid), hasControl);
418
419 synchronized(roleChanger){
420 OFSwitchImpl sw = null;
421 for (OFSwitchImpl connectedSw : connectedSwitches){
422 if (connectedSw.getId() == dpid){
423 sw = connectedSw;
424 break;
425 }
426 }
427 if (sw == null){
428 log.warn("Switch {} not found in connected switches",
429 HexString.toHexString(dpid));
430 return;
431 }
432
433 Role role = null;
434
Pankaj Berde01939e92013-03-08 14:38:27 -0800435 /*
436 * issue #229
437 * Cannot rely on sw.getRole() as it can be behind due to pending
438 * role changes in the queue. Just submit it and late the RoleChanger
439 * handle duplicates.
440 */
441
442 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800443 role = Role.MASTER;
444 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800445 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800446 role = Role.SLAVE;
447 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800448
449 log.debug("Sending role request {} msg to {}", role, sw);
450 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
451 swList.add(sw);
452 roleChanger.submitRequest(swList, role);
453
Jonathan Hartcc957a02013-02-26 10:39:04 -0800454 }
455
456 }
457 }
458
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800459 /**
460 * Channel handler deals with the switch connection and dispatches
461 * switch messages to the appropriate locations.
462 * @author readams
463 */
464 protected class OFChannelHandler
465 extends IdleStateAwareChannelUpstreamHandler {
466 protected OFSwitchImpl sw;
467 protected OFChannelState state;
468
469 public OFChannelHandler(OFChannelState state) {
470 this.state = state;
471 }
472
473 @Override
474 @LogMessageDoc(message="New switch connection from {ip address}",
475 explanation="A new switch has connected from the " +
476 "specified IP address")
477 public void channelConnected(ChannelHandlerContext ctx,
478 ChannelStateEvent e) throws Exception {
479 log.info("New switch connection from {}",
480 e.getChannel().getRemoteAddress());
481
482 sw = new OFSwitchImpl();
483 sw.setChannel(e.getChannel());
484 sw.setFloodlightProvider(Controller.this);
485 sw.setThreadPoolService(threadPool);
486
487 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
488 msglist.add(factory.getMessage(OFType.HELLO));
489 e.getChannel().write(msglist);
490
491 }
492
493 @Override
494 @LogMessageDoc(message="Disconnected switch {switch information}",
495 explanation="The specified switch has disconnected.")
496 public void channelDisconnected(ChannelHandlerContext ctx,
497 ChannelStateEvent e) throws Exception {
498 if (sw != null && state.hsState == HandshakeState.READY) {
499 if (activeSwitches.containsKey(sw.getId())) {
500 // It's safe to call removeSwitch even though the map might
501 // not contain this particular switch but another with the
502 // same DPID
503 removeSwitch(sw);
504 }
505 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700506 if (controlRequested) {
507 registryService.releaseControl(sw.getId());
508 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800509 connectedSwitches.remove(sw);
510 }
511 sw.setConnected(false);
512 }
513 log.info("Disconnected switch {}", sw);
514 }
515
516 @Override
517 @LogMessageDocs({
518 @LogMessageDoc(level="ERROR",
519 message="Disconnecting switch {switch} due to read timeout",
520 explanation="The connected switch has failed to send any " +
521 "messages or respond to echo requests",
522 recommendation=LogMessageDoc.CHECK_SWITCH),
523 @LogMessageDoc(level="ERROR",
524 message="Disconnecting switch {switch}: failed to " +
525 "complete handshake",
526 explanation="The switch did not respond correctly " +
527 "to handshake messages",
528 recommendation=LogMessageDoc.CHECK_SWITCH),
529 @LogMessageDoc(level="ERROR",
530 message="Disconnecting switch {switch} due to IO Error: {}",
531 explanation="There was an error communicating with the switch",
532 recommendation=LogMessageDoc.CHECK_SWITCH),
533 @LogMessageDoc(level="ERROR",
534 message="Disconnecting switch {switch} due to switch " +
535 "state error: {error}",
536 explanation="The switch sent an unexpected message",
537 recommendation=LogMessageDoc.CHECK_SWITCH),
538 @LogMessageDoc(level="ERROR",
539 message="Disconnecting switch {switch} due to " +
540 "message parse failure",
541 explanation="Could not parse a message from the switch",
542 recommendation=LogMessageDoc.CHECK_SWITCH),
543 @LogMessageDoc(level="ERROR",
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800544 message="Could not process message: queue full",
545 explanation="OpenFlow messages are arriving faster than " +
546 " the controller can process them.",
547 recommendation=LogMessageDoc.CHECK_CONTROLLER),
548 @LogMessageDoc(level="ERROR",
549 message="Error while processing message " +
550 "from switch {switch} {cause}",
551 explanation="An error occurred processing the switch message",
552 recommendation=LogMessageDoc.GENERIC_ACTION)
553 })
554 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
555 throws Exception {
556 if (e.getCause() instanceof ReadTimeoutException) {
557 // switch timeout
558 log.error("Disconnecting switch {} due to read timeout", sw);
559 ctx.getChannel().close();
560 } else if (e.getCause() instanceof HandshakeTimeoutException) {
561 log.error("Disconnecting switch {}: failed to complete handshake",
562 sw);
563 ctx.getChannel().close();
564 } else if (e.getCause() instanceof ClosedChannelException) {
565 //log.warn("Channel for sw {} already closed", sw);
566 } else if (e.getCause() instanceof IOException) {
567 log.error("Disconnecting switch {} due to IO Error: {}",
568 sw, e.getCause().getMessage());
569 ctx.getChannel().close();
570 } else if (e.getCause() instanceof SwitchStateException) {
571 log.error("Disconnecting switch {} due to switch state error: {}",
572 sw, e.getCause().getMessage());
573 ctx.getChannel().close();
574 } else if (e.getCause() instanceof MessageParseException) {
575 log.error("Disconnecting switch " + sw +
576 " due to message parse failure",
577 e.getCause());
578 ctx.getChannel().close();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800579 } else if (e.getCause() instanceof RejectedExecutionException) {
580 log.warn("Could not process message: queue full");
581 } else {
582 log.error("Error while processing message from switch " + sw,
583 e.getCause());
584 ctx.getChannel().close();
585 }
586 }
587
588 @Override
589 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
590 throws Exception {
591 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
592 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
593 e.getChannel().write(msglist);
594 }
595
596 @Override
597 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
598 throws Exception {
599 if (e.getMessage() instanceof List) {
600 @SuppressWarnings("unchecked")
601 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
602
603 for (OFMessage ofm : msglist) {
604 try {
605 processOFMessage(ofm);
606 }
607 catch (Exception ex) {
608 // We are the last handler in the stream, so run the
609 // exception through the channel again by passing in
610 // ctx.getChannel().
611 Channels.fireExceptionCaught(ctx.getChannel(), ex);
612 }
613 }
614
615 // Flush all flow-mods/packet-out generated from this "train"
616 OFSwitchImpl.flush_all();
617 }
618 }
619
620 /**
621 * Process the request for the switch description
622 */
623 @LogMessageDoc(level="ERROR",
624 message="Exception in reading description " +
625 " during handshake {exception}",
626 explanation="Could not process the switch description string",
627 recommendation=LogMessageDoc.CHECK_SWITCH)
628 void processSwitchDescReply() {
629 try {
630 // Read description, if it has been updated
631 @SuppressWarnings("unchecked")
632 Future<List<OFStatistics>> desc_future =
633 (Future<List<OFStatistics>>)sw.
634 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
635 List<OFStatistics> values =
636 desc_future.get(0, TimeUnit.MILLISECONDS);
637 if (values != null) {
638 OFDescriptionStatistics description =
639 new OFDescriptionStatistics();
640 ChannelBuffer data =
641 ChannelBuffers.buffer(description.getLength());
642 for (OFStatistics f : values) {
643 f.writeTo(data);
644 description.readFrom(data);
645 break; // SHOULD be a list of length 1
646 }
647 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
648 description);
649 sw.setSwitchProperties(description);
650 data = null;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800651 }
Jonathan Hart2fa28062013-11-25 20:16:28 -0800652
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800653 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
654 state.hasDescription = true;
655 checkSwitchReady();
656 }
657 catch (InterruptedException ex) {
658 // Ignore
659 }
660 catch (TimeoutException ex) {
661 // Ignore
662 } catch (Exception ex) {
663 log.error("Exception in reading description " +
664 " during handshake", ex);
665 }
666 }
667
668 /**
669 * Send initial switch setup information that we need before adding
670 * the switch
671 * @throws IOException
672 */
673 void sendHelloConfiguration() throws IOException {
674 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700675 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800676 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
677 }
678
679 /**
680 * Send the configuration requests we can only do after we have
681 * the features reply
682 * @throws IOException
683 */
684 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700685 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800686 // Ensure we receive the full packet via PacketIn
687 OFSetConfig config = (OFSetConfig) factory
688 .getMessage(OFType.SET_CONFIG);
689 config.setMissSendLength((short) 0xffff)
690 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
691 sw.write(config, null);
692 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
693 null);
694
695 // Get Description to set switch-specific flags
696 OFStatisticsRequest req = new OFStatisticsRequest();
697 req.setStatisticType(OFStatisticsType.DESC);
698 req.setLengthU(req.getLengthU());
699 Future<List<OFStatistics>> dfuture =
700 sw.getStatistics(req);
701 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
702 dfuture);
703
704 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700705
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700706 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800707 protected void checkSwitchReady() {
708 if (state.hsState == HandshakeState.FEATURES_REPLY &&
709 state.hasDescription && state.hasGetConfigReply) {
710
711 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700712 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800713
714 synchronized(roleChanger) {
715 // We need to keep track of all of the switches that are connected
716 // to the controller, in any role, so that we can later send the
717 // role request messages when the controller role changes.
718 // We need to be synchronized while doing this: we must not
719 // send a another role request to the connectedSwitches until
720 // we were able to add this new switch to connectedSwitches
721 // *and* send the current role to the new switch.
722 connectedSwitches.add(sw);
723
724 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800725 //Put the switch in SLAVE mode until we know we have control
726 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
727 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
728 swList.add(sw);
729 roleChanger.submitRequest(swList, Role.SLAVE);
730
Jonathan Hartcc957a02013-02-26 10:39:04 -0800731 //Request control of the switch from the global registry
732 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700733 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800734 registryService.requestControl(sw.getId(),
735 new RoleChangeCallback());
736 } catch (RegistryException e) {
737 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700738 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800739 }
740
Jonathan Hart97801ac2013-02-26 14:29:16 -0800741
Jonathan Hartcc957a02013-02-26 10:39:04 -0800742
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800743 // Send a role request if role support is enabled for the controller
744 // This is a probe that we'll use to determine if the switch
745 // actually supports the role request message. If it does we'll
746 // get back a role reply message. If it doesn't we'll get back an
747 // OFError message.
748 // If role is MASTER we will promote switch to active
749 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800750 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800751 log.debug("This controller's role is {}, " +
752 "sending initial role request msg to {}",
753 role, sw);
754 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
755 swList.add(sw);
756 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800757 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800758 }
759 else {
760 // Role supported not enabled on controller (for now)
761 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800762 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800763 "not sending role request msg to {}",
764 role, sw);
765 // Need to clear FlowMods before we add the switch
766 // and dispatch updates otherwise we have a race condition.
767 sw.clearAllFlowMods();
768 addSwitch(sw);
769 state.firstRoleReplyReceived = true;
770 }
771 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700772 if (!controlRequested) {
773 // yield to allow other thread(s) to release control
774 try {
775 Thread.sleep(10);
776 } catch (InterruptedException e) {
777 // Ignore interruptions
778 }
779 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700780 log.debug("Closing {} because we weren't able to request control " +
781 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700782 sw.channel.close();
783 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800784 }
785 }
786
787 /* Handle a role reply message we received from the switch. Since
788 * netty serializes message dispatch we don't need to synchronize
789 * against other receive operations from the same switch, so no need
790 * to synchronize addSwitch(), removeSwitch() operations from the same
791 * connection.
792 * FIXME: However, when a switch with the same DPID connects we do
793 * need some synchronization. However, handling switches with same
794 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
795 * removedSwitch notification):1
796 *
797 */
798 @LogMessageDoc(level="ERROR",
799 message="Invalid role value in role reply message",
800 explanation="Was unable to set the HA role (master or slave) " +
801 "for the controller.",
802 recommendation=LogMessageDoc.CHECK_CONTROLLER)
803 protected void handleRoleReplyMessage(OFVendor vendorMessage,
804 OFRoleReplyVendorData roleReplyVendorData) {
805 // Map from the role code in the message to our role enum
806 int nxRole = roleReplyVendorData.getRole();
807 Role role = null;
808 switch (nxRole) {
809 case OFRoleVendorData.NX_ROLE_OTHER:
810 role = Role.EQUAL;
811 break;
812 case OFRoleVendorData.NX_ROLE_MASTER:
813 role = Role.MASTER;
814 break;
815 case OFRoleVendorData.NX_ROLE_SLAVE:
816 role = Role.SLAVE;
817 break;
818 default:
819 log.error("Invalid role value in role reply message");
820 sw.getChannel().close();
821 return;
822 }
823
824 log.debug("Handling role reply for role {} from {}. " +
825 "Controller's role is {} ",
826 new Object[] { role, sw, Controller.this.role}
827 );
828
829 sw.deliverRoleReply(vendorMessage.getXid(), role);
830
831 boolean isActive = activeSwitches.containsKey(sw.getId());
832 if (!isActive && sw.isActive()) {
833 // Transition from SLAVE to MASTER.
834
835 if (!state.firstRoleReplyReceived ||
836 getAlwaysClearFlowsOnSwAdd()) {
837 // This is the first role-reply message we receive from
838 // this switch or roles were disabled when the switch
839 // connected:
840 // Delete all pre-existing flows for new connections to
841 // the master
842 //
843 // FIXME: Need to think more about what the test should
844 // be for when we flush the flow-table? For example,
845 // if all the controllers are temporarily in the backup
846 // role (e.g. right after a failure of the master
847 // controller) at the point the switch connects, then
848 // all of the controllers will initially connect as
849 // backup controllers and not flush the flow-table.
850 // Then when one of them is promoted to master following
851 // the master controller election the flow-table
852 // will still not be flushed because that's treated as
853 // a failover event where we don't want to flush the
854 // flow-table. The end result would be that the flow
855 // table for a newly connected switch is never
856 // flushed. Not sure how to handle that case though...
857 sw.clearAllFlowMods();
858 log.debug("First role reply from master switch {}, " +
859 "clear FlowTable to active switch list",
860 HexString.toHexString(sw.getId()));
861 }
862
863 // Some switches don't seem to update us with port
864 // status messages while in slave role.
Jonathan Hart2fa28062013-11-25 20:16:28 -0800865 //readSwitchPortStateFromStorage(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800866
867 // Only add the switch to the active switch list if
868 // we're not in the slave role. Note that if the role
869 // attribute is null, then that means that the switch
870 // doesn't support the role request messages, so in that
871 // case we're effectively in the EQUAL role and the
872 // switch should be included in the active switch list.
873 addSwitch(sw);
874 log.debug("Added master switch {} to active switch list",
875 HexString.toHexString(sw.getId()));
876
877 }
878 else if (isActive && !sw.isActive()) {
879 // Transition from MASTER to SLAVE: remove switch
880 // from active switch list.
881 log.debug("Removed slave switch {} from active switch" +
882 " list", HexString.toHexString(sw.getId()));
883 removeSwitch(sw);
884 }
885
886 // Indicate that we have received a role reply message.
887 state.firstRoleReplyReceived = true;
888 }
889
890 protected boolean handleVendorMessage(OFVendor vendorMessage) {
891 boolean shouldHandleMessage = false;
892 int vendor = vendorMessage.getVendor();
893 switch (vendor) {
894 case OFNiciraVendorData.NX_VENDOR_ID:
895 OFNiciraVendorData niciraVendorData =
896 (OFNiciraVendorData)vendorMessage.getVendorData();
897 int dataType = niciraVendorData.getDataType();
898 switch (dataType) {
899 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
900 OFRoleReplyVendorData roleReplyVendorData =
901 (OFRoleReplyVendorData) niciraVendorData;
902 handleRoleReplyMessage(vendorMessage,
903 roleReplyVendorData);
904 break;
905 default:
906 log.warn("Unhandled Nicira VENDOR message; " +
907 "data type = {}", dataType);
908 break;
909 }
910 break;
911 default:
912 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
913 break;
914 }
915
916 return shouldHandleMessage;
917 }
918
919 /**
920 * Dispatch an Openflow message from a switch to the appropriate
921 * handler.
922 * @param m The message to process
923 * @throws IOException
924 * @throws SwitchStateException
925 */
926 @LogMessageDocs({
927 @LogMessageDoc(level="WARN",
928 message="Config Reply from {switch} has " +
929 "miss length set to {length}",
930 explanation="The controller requires that the switch " +
931 "use a miss length of 0xffff for correct " +
932 "function",
933 recommendation="Use a different switch to ensure " +
934 "correct function"),
935 @LogMessageDoc(level="WARN",
936 message="Received ERROR from sw {switch} that "
937 +"indicates roles are not supported "
938 +"but we have received a valid "
939 +"role reply earlier",
940 explanation="The switch sent a confusing message to the" +
941 "controller")
942 })
943 protected void processOFMessage(OFMessage m)
944 throws IOException, SwitchStateException {
945 boolean shouldHandleMessage = false;
946
947 switch (m.getType()) {
948 case HELLO:
949 if (log.isTraceEnabled())
950 log.trace("HELLO from {}", sw);
951
952 if (state.hsState.equals(HandshakeState.START)) {
953 state.hsState = HandshakeState.HELLO;
954 sendHelloConfiguration();
955 } else {
956 throw new SwitchStateException("Unexpected HELLO from "
957 + sw);
958 }
959 break;
960 case ECHO_REQUEST:
961 OFEchoReply reply =
962 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
963 reply.setXid(m.getXid());
964 sw.write(reply, null);
965 break;
966 case ECHO_REPLY:
967 break;
968 case FEATURES_REPLY:
969 if (log.isTraceEnabled())
970 log.trace("Features Reply from {}", sw);
971
972 sw.setFeaturesReply((OFFeaturesReply) m);
973 if (state.hsState.equals(HandshakeState.HELLO)) {
974 sendFeatureReplyConfiguration();
975 state.hsState = HandshakeState.FEATURES_REPLY;
976 // uncomment to enable "dumb" switches like cbench
977 // state.hsState = HandshakeState.READY;
978 // addSwitch(sw);
979 } else {
980 // return results to rest api caller
981 sw.deliverOFFeaturesReply(m);
982 // update database */
Jonathan Hart2fa28062013-11-25 20:16:28 -0800983 //updateActiveSwitchInfo(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800984 }
985 break;
986 case GET_CONFIG_REPLY:
987 if (log.isTraceEnabled())
988 log.trace("Get config reply from {}", sw);
989
990 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
991 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
992 throw new SwitchStateException(em);
993 }
994 OFGetConfigReply cr = (OFGetConfigReply) m;
995 if (cr.getMissSendLength() == (short)0xffff) {
996 log.trace("Config Reply from {} confirms " +
997 "miss length set to 0xffff", sw);
998 } else {
999 log.warn("Config Reply from {} has " +
1000 "miss length set to {}",
1001 sw, cr.getMissSendLength() & 0xffff);
1002 }
1003 state.hasGetConfigReply = true;
1004 checkSwitchReady();
1005 break;
1006 case VENDOR:
1007 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1008 break;
1009 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001010 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001011 // TODO: we need better error handling. Especially for
1012 // request/reply style message (stats, roles) we should have
1013 // a unified way to lookup the xid in the error message.
1014 // This will probable involve rewriting the way we handle
1015 // request/reply style messages.
1016 OFError error = (OFError) m;
1017 boolean shouldLogError = true;
1018 // TODO: should we check that firstRoleReplyReceived is false,
1019 // i.e., check only whether the first request fails?
1020 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1021 boolean isBadVendorError =
1022 (error.getErrorType() == OFError.OFErrorType.
1023 OFPET_BAD_REQUEST.getValue());
1024 // We expect to receive a bad vendor error when
1025 // we're connected to a switch that doesn't support
1026 // the Nicira vendor extensions (i.e. not OVS or
1027 // derived from OVS). By protocol, it should also be
1028 // BAD_VENDOR, but too many switch implementations
1029 // get it wrong and we can already check the xid()
1030 // so we can ignore the type with confidence that this
1031 // is not a spurious error
1032 shouldLogError = !isBadVendorError;
1033 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001034 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001035 if (state.firstRoleReplyReceived && (role != null)) {
1036 log.warn("Received ERROR from sw {} that "
1037 +"indicates roles are not supported "
1038 +"but we have received a valid "
1039 +"role reply earlier", sw);
1040 }
1041 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001042 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001043 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001044 synchronized(roleChanger) {
1045 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001046 //This will now never happen. The Controller's role
1047 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001048 // the switch doesn't understand role request
1049 // messages and the current controller role is
1050 // slave. We need to disconnect the switch.
1051 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001052 log.warn("Closing {} channel because controller's role " +
1053 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001054 sw.getChannel().close();
1055 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001056 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001057 log.debug("Adding switch {} because we got an error" +
1058 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001059 // Controller's role is master: add to
1060 // active
1061 // TODO: check if clearing flow table is
1062 // right choice here.
1063 // Need to clear FlowMods before we add the switch
1064 // and dispatch updates otherwise we have a race condition.
1065 // TODO: switch update is async. Won't we still have a potential
1066 // race condition?
1067 sw.clearAllFlowMods();
1068 addSwitch(sw);
1069 }
1070 }
1071 }
1072 else {
1073 // TODO: Is this the right thing to do if we receive
1074 // some other error besides a bad vendor error?
1075 // Presumably that means the switch did actually
1076 // understand the role request message, but there
1077 // was some other error from processing the message.
1078 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1079 // error code, but it doesn't look like the Nicira
1080 // role request has that. Should check OVS source
1081 // code to see if it's possible for any other errors
1082 // to be returned.
1083 // If we received an error the switch is not
1084 // in the correct role, so we need to disconnect it.
1085 // We could also resend the request but then we need to
1086 // check if there are other pending request in which
1087 // case we shouldn't resend. If we do resend we need
1088 // to make sure that the switch eventually accepts one
1089 // of our requests or disconnect the switch. This feels
1090 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001091 log.debug("Closing {} channel because we recieved an " +
1092 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001093 sw.getChannel().close();
1094 }
1095 }
1096 // Once we support OF 1.2, we'd add code to handle it here.
1097 //if (error.getXid() == state.ofRoleRequestXid) {
1098 //}
1099 if (shouldLogError)
1100 logError(sw, error);
1101 break;
1102 case STATS_REPLY:
1103 if (state.hsState.ordinal() <
1104 HandshakeState.FEATURES_REPLY.ordinal()) {
1105 String em = "Unexpected STATS_REPLY from " + sw;
1106 throw new SwitchStateException(em);
1107 }
1108 sw.deliverStatisticsReply(m);
1109 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1110 processSwitchDescReply();
1111 }
1112 break;
1113 case PORT_STATUS:
1114 // We want to update our port state info even if we're in
1115 // the slave role, but we only want to update storage if
1116 // we're the master (or equal).
1117 boolean updateStorage = state.hsState.
1118 equals(HandshakeState.READY) &&
1119 (sw.getRole() != Role.SLAVE);
1120 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1121 shouldHandleMessage = true;
1122 break;
1123
1124 default:
1125 shouldHandleMessage = true;
1126 break;
1127 }
1128
1129 if (shouldHandleMessage) {
1130 sw.getListenerReadLock().lock();
1131 try {
1132 if (sw.isConnected()) {
1133 if (!state.hsState.equals(HandshakeState.READY)) {
1134 log.debug("Ignoring message type {} received " +
1135 "from switch {} before switch is " +
1136 "fully configured.", m.getType(), sw);
1137 }
1138 // Check if the controller is in the slave role for the
1139 // switch. If it is, then don't dispatch the message to
1140 // the listeners.
1141 // TODO: Should we dispatch messages that we expect to
1142 // receive when we're in the slave role, e.g. port
1143 // status messages? Since we're "hiding" switches from
1144 // the listeners when we're in the slave role, then it
1145 // seems a little weird to dispatch port status messages
1146 // to them. On the other hand there might be special
1147 // modules that care about all of the connected switches
1148 // and would like to receive port status notifications.
1149 else if (sw.getRole() == Role.SLAVE) {
1150 // Don't log message if it's a port status message
1151 // since we expect to receive those from the switch
1152 // and don't want to emit spurious messages.
1153 if (m.getType() != OFType.PORT_STATUS) {
1154 log.debug("Ignoring message type {} received " +
1155 "from switch {} while in the slave role.",
1156 m.getType(), sw);
1157 }
1158 } else {
1159 handleMessage(sw, m, null);
1160 }
1161 }
1162 }
1163 finally {
1164 sw.getListenerReadLock().unlock();
1165 }
1166 }
1167 }
1168 }
1169
1170 // ****************
1171 // Message handlers
1172 // ****************
1173
1174 protected void handlePortStatusMessage(IOFSwitch sw,
1175 OFPortStatus m,
1176 boolean updateStorage) {
1177 short portNumber = m.getDesc().getPortNumber();
1178 OFPhysicalPort port = m.getDesc();
1179 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001180 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1181 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001182 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001183 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001184 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1185 try {
1186 this.updates.put(update);
1187 } catch (InterruptedException e) {
1188 log.error("Failure adding update to queue", e);
1189 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001190 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001191 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1192 try {
1193 this.updates.put(update);
1194 } catch (InterruptedException e) {
1195 log.error("Failure adding update to queue", e);
1196 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001197 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001198 //if (updateStorage)
1199 //updatePortInfo(sw, port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001200 log.debug("Port #{} modified for {}", portNumber, sw);
1201 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1202 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001203 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1204 try {
1205 this.updates.put(update);
1206 } catch (InterruptedException e) {
1207 log.error("Failure adding update to queue", e);
1208 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001209 //if (updateStorage)
1210 //updatePortInfo(sw, port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001211 log.debug("Port #{} added for {}", portNumber, sw);
1212 } else if (m.getReason() ==
1213 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1214 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001215 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1216 try {
1217 this.updates.put(update);
1218 } catch (InterruptedException e) {
1219 log.error("Failure adding update to queue", e);
1220 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001221 //if (updateStorage)
1222 //removePortInfo(sw, portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001223 log.debug("Port #{} deleted for {}", portNumber, sw);
1224 }
1225 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1226 try {
1227 this.updates.put(update);
1228 } catch (InterruptedException e) {
1229 log.error("Failure adding update to queue", e);
1230 }
1231 }
1232
1233 /**
1234 * flcontext_cache - Keep a thread local stack of contexts
1235 */
1236 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1237 new ThreadLocal <Stack<FloodlightContext>> () {
1238 @Override
1239 protected Stack<FloodlightContext> initialValue() {
1240 return new Stack<FloodlightContext>();
1241 }
1242 };
1243
1244 /**
1245 * flcontext_alloc - pop a context off the stack, if required create a new one
1246 * @return FloodlightContext
1247 */
1248 protected static FloodlightContext flcontext_alloc() {
1249 FloodlightContext flcontext = null;
1250
1251 if (flcontext_cache.get().empty()) {
1252 flcontext = new FloodlightContext();
1253 }
1254 else {
1255 flcontext = flcontext_cache.get().pop();
1256 }
1257
1258 return flcontext;
1259 }
1260
1261 /**
1262 * flcontext_free - Free the context to the current thread
1263 * @param flcontext
1264 */
1265 protected void flcontext_free(FloodlightContext flcontext) {
1266 flcontext.getStorage().clear();
1267 flcontext_cache.get().push(flcontext);
1268 }
1269
1270 /**
1271 * Handle replies to certain OFMessages, and pass others off to listeners
1272 * @param sw The switch for the message
1273 * @param m The message
1274 * @param bContext The floodlight context. If null then floodlight context would
1275 * be allocated in this function
1276 * @throws IOException
1277 */
1278 @LogMessageDocs({
1279 @LogMessageDoc(level="ERROR",
1280 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1281 " field is empty.",
1282 explanation="The switch sent an improperly-formatted PacketIn" +
1283 " message",
1284 recommendation=LogMessageDoc.CHECK_SWITCH),
1285 @LogMessageDoc(level="WARN",
1286 message="Unhandled OF Message: {} from {}",
1287 explanation="The switch sent a message not handled by " +
1288 "the controller")
1289 })
1290 protected void handleMessage(IOFSwitch sw, OFMessage m,
1291 FloodlightContext bContext)
1292 throws IOException {
1293 Ethernet eth = null;
1294
1295 switch (m.getType()) {
1296 case PACKET_IN:
1297 OFPacketIn pi = (OFPacketIn)m;
1298
1299 if (pi.getPacketData().length <= 0) {
1300 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1301 ") because the data field is empty.");
1302 return;
1303 }
1304
1305 if (Controller.ALWAYS_DECODE_ETH) {
1306 eth = new Ethernet();
1307 eth.deserialize(pi.getPacketData(), 0,
1308 pi.getPacketData().length);
1309 counterStore.updatePacketInCounters(sw, m, eth);
1310 }
1311 // fall through to default case...
1312
1313 default:
1314
1315 List<IOFMessageListener> listeners = null;
1316 if (messageListeners.containsKey(m.getType())) {
1317 listeners = messageListeners.get(m.getType()).
1318 getOrderedListeners();
1319 }
1320
1321 FloodlightContext bc = null;
1322 if (listeners != null) {
1323 // Check if floodlight context is passed from the calling
1324 // function, if so use that floodlight context, otherwise
1325 // allocate one
1326 if (bContext == null) {
1327 bc = flcontext_alloc();
1328 } else {
1329 bc = bContext;
1330 }
1331 if (eth != null) {
1332 IFloodlightProviderService.bcStore.put(bc,
1333 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1334 eth);
1335 }
1336
1337 // Get the starting time (overall and per-component) of
1338 // the processing chain for this packet if performance
1339 // monitoring is turned on
1340 pktinProcTime.bootstrap(listeners);
1341 pktinProcTime.recordStartTimePktIn();
1342 Command cmd;
1343 for (IOFMessageListener listener : listeners) {
1344 if (listener instanceof IOFSwitchFilter) {
1345 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1346 continue;
1347 }
1348 }
1349
1350 pktinProcTime.recordStartTimeComp(listener);
1351 cmd = listener.receive(sw, m, bc);
1352 pktinProcTime.recordEndTimeComp(listener);
1353
1354 if (Command.STOP.equals(cmd)) {
1355 break;
1356 }
1357 }
1358 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1359 } else {
1360 log.warn("Unhandled OF Message: {} from {}", m, sw);
1361 }
1362
1363 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1364 }
1365 }
1366
1367 /**
1368 * Log an OpenFlow error message from a switch
1369 * @param sw The switch that sent the error
1370 * @param error The error message
1371 */
1372 @LogMessageDoc(level="ERROR",
1373 message="Error {error type} {error code} from {switch}",
1374 explanation="The switch responded with an unexpected error" +
1375 "to an OpenFlow message from the controller",
1376 recommendation="This could indicate improper network operation. " +
1377 "If the problem persists restarting the switch and " +
1378 "controller may help."
1379 )
1380 protected void logError(IOFSwitch sw, OFError error) {
1381 int etint = 0xffff & error.getErrorType();
1382 if (etint < 0 || etint >= OFErrorType.values().length) {
1383 log.error("Unknown error code {} from sw {}", etint, sw);
1384 }
1385 OFErrorType et = OFErrorType.values()[etint];
1386 switch (et) {
1387 case OFPET_HELLO_FAILED:
1388 OFHelloFailedCode hfc =
1389 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1390 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1391 break;
1392 case OFPET_BAD_REQUEST:
1393 OFBadRequestCode brc =
1394 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1395 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1396 break;
1397 case OFPET_BAD_ACTION:
1398 OFBadActionCode bac =
1399 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1400 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1401 break;
1402 case OFPET_FLOW_MOD_FAILED:
1403 OFFlowModFailedCode fmfc =
1404 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1405 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1406 break;
1407 case OFPET_PORT_MOD_FAILED:
1408 OFPortModFailedCode pmfc =
1409 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1410 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1411 break;
1412 case OFPET_QUEUE_OP_FAILED:
1413 OFQueueOpFailedCode qofc =
1414 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1415 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1416 break;
1417 default:
1418 break;
1419 }
1420 }
1421
1422 /**
1423 * Add a switch to the active switch list and call the switch listeners.
1424 * This happens either when a switch first connects (and the controller is
1425 * not in the slave role) or when the role of the controller changes from
1426 * slave to master.
1427 * @param sw the switch that has been added
1428 */
1429 // TODO: need to rethink locking and the synchronous switch update.
1430 // We can / should also handle duplicate DPIDs in connectedSwitches
1431 @LogMessageDoc(level="ERROR",
1432 message="New switch added {switch} for already-added switch {switch}",
1433 explanation="A switch with the same DPID as another switch " +
1434 "connected to the controller. This can be caused by " +
1435 "multiple switches configured with the same DPID, or " +
1436 "by a switch reconnected very quickly after " +
1437 "disconnecting.",
1438 recommendation="If this happens repeatedly, it is likely there " +
1439 "are switches with duplicate DPIDs on the network. " +
1440 "Reconfigure the appropriate switches. If it happens " +
1441 "very rarely, then it is likely this is a transient " +
1442 "network problem that can be ignored."
1443 )
1444 protected void addSwitch(IOFSwitch sw) {
1445 // TODO: is it safe to modify the HashMap without holding
1446 // the old switch's lock?
1447 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1448 if (sw == oldSw) {
1449 // Note == for object equality, not .equals for value
1450 log.info("New add switch for pre-existing switch {}", sw);
1451 return;
1452 }
1453
1454 if (oldSw != null) {
1455 oldSw.getListenerWriteLock().lock();
1456 try {
1457 log.error("New switch added {} for already-added switch {}",
1458 sw, oldSw);
1459 // Set the connected flag to false to suppress calling
1460 // the listeners for this switch in processOFMessage
1461 oldSw.setConnected(false);
1462
1463 oldSw.cancelAllStatisticsReplies();
1464
Jonathan Hart2fa28062013-11-25 20:16:28 -08001465 //updateInactiveSwitchInfo(oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001466
1467 // we need to clean out old switch state definitively
1468 // before adding the new switch
1469 // FIXME: It seems not completely kosher to call the
1470 // switch listeners here. I thought one of the points of
1471 // having the asynchronous switch update mechanism was so
1472 // the addedSwitch and removedSwitch were always called
1473 // from a single thread to simplify concurrency issues
1474 // for the listener.
1475 if (switchListeners != null) {
1476 for (IOFSwitchListener listener : switchListeners) {
1477 listener.removedSwitch(oldSw);
1478 }
1479 }
1480 // will eventually trigger a removeSwitch(), which will cause
1481 // a "Not removing Switch ... already removed debug message.
1482 // TODO: Figure out a way to handle this that avoids the
1483 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001484 log.debug("Closing {} because a new IOFSwitch got added " +
1485 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001486 oldSw.getChannel().close();
1487 }
1488 finally {
1489 oldSw.getListenerWriteLock().unlock();
1490 }
1491 }
1492
Jonathan Hart2fa28062013-11-25 20:16:28 -08001493 //updateActiveSwitchInfo(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001494 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1495 try {
1496 this.updates.put(update);
1497 } catch (InterruptedException e) {
1498 log.error("Failure adding update to queue", e);
1499 }
1500 }
1501
1502 /**
1503 * Remove a switch from the active switch list and call the switch listeners.
1504 * This happens either when the switch is disconnected or when the
1505 * controller's role for the switch changes from master to slave.
1506 * @param sw the switch that has been removed
1507 */
1508 protected void removeSwitch(IOFSwitch sw) {
1509 // No need to acquire the listener lock, since
1510 // this method is only called after netty has processed all
1511 // pending messages
1512 log.debug("removeSwitch: {}", sw);
1513 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1514 log.debug("Not removing switch {}; already removed", sw);
1515 return;
1516 }
1517 // We cancel all outstanding statistics replies if the switch transition
1518 // from active. In the future we might allow statistics requests
1519 // from slave controllers. Then we need to move this cancelation
1520 // to switch disconnect
1521 sw.cancelAllStatisticsReplies();
1522
1523 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1524 // here if role support is enabled. In that case if the switch is being
1525 // removed because we've been switched to being in the slave role, then I think
1526 // it's possible that the new master may have already been promoted to master
1527 // and written out the active switch state to storage. If we now execute
1528 // updateInactiveSwitchInfo we may wipe out all of the state that was
1529 // written out by the new master. Maybe need to revisit how we handle all
1530 // of the switch state that's written to storage.
1531
Jonathan Hart2fa28062013-11-25 20:16:28 -08001532 //updateInactiveSwitchInfo(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001533 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1534 try {
1535 this.updates.put(update);
1536 } catch (InterruptedException e) {
1537 log.error("Failure adding update to queue", e);
1538 }
1539 }
1540
1541 // ***************
1542 // IFloodlightProvider
1543 // ***************
1544
1545 @Override
1546 public synchronized void addOFMessageListener(OFType type,
1547 IOFMessageListener listener) {
1548 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1549 messageListeners.get(type);
1550 if (ldd == null) {
1551 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1552 messageListeners.put(type, ldd);
1553 }
1554 ldd.addListener(type, listener);
1555 }
1556
1557 @Override
1558 public synchronized void removeOFMessageListener(OFType type,
1559 IOFMessageListener listener) {
1560 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1561 messageListeners.get(type);
1562 if (ldd != null) {
1563 ldd.removeListener(listener);
1564 }
1565 }
1566
1567 private void logListeners() {
1568 for (Map.Entry<OFType,
1569 ListenerDispatcher<OFType,
1570 IOFMessageListener>> entry
1571 : messageListeners.entrySet()) {
1572
1573 OFType type = entry.getKey();
1574 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1575 entry.getValue();
1576
1577 StringBuffer sb = new StringBuffer();
1578 sb.append("OFListeners for ");
1579 sb.append(type);
1580 sb.append(": ");
1581 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1582 sb.append(l.getName());
1583 sb.append(",");
1584 }
1585 log.debug(sb.toString());
1586 }
1587 }
1588
1589 public void removeOFMessageListeners(OFType type) {
1590 messageListeners.remove(type);
1591 }
1592
1593 @Override
1594 public Map<Long, IOFSwitch> getSwitches() {
1595 return Collections.unmodifiableMap(this.activeSwitches);
1596 }
1597
1598 @Override
1599 public void addOFSwitchListener(IOFSwitchListener listener) {
1600 this.switchListeners.add(listener);
1601 }
1602
1603 @Override
1604 public void removeOFSwitchListener(IOFSwitchListener listener) {
1605 this.switchListeners.remove(listener);
1606 }
1607
1608 @Override
1609 public Map<OFType, List<IOFMessageListener>> getListeners() {
1610 Map<OFType, List<IOFMessageListener>> lers =
1611 new HashMap<OFType, List<IOFMessageListener>>();
1612 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1613 messageListeners.entrySet()) {
1614 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1615 }
1616 return Collections.unmodifiableMap(lers);
1617 }
1618
1619 @Override
1620 @LogMessageDocs({
1621 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1622 "a null switch",
1623 explanation="Failed to process a message because the switch " +
1624 " is no longer connected."),
1625 @LogMessageDoc(level="ERROR",
1626 message="Error reinjecting OFMessage on switch {switch}",
1627 explanation="An I/O error occured while attempting to " +
1628 "process an OpenFlow message",
1629 recommendation=LogMessageDoc.CHECK_SWITCH)
1630 })
1631 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1632 FloodlightContext bc) {
1633 if (sw == null) {
1634 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1635 return false;
1636 }
1637
1638 // FIXME: Do we need to be able to inject messages to switches
1639 // where we're the slave controller (i.e. they're connected but
1640 // not active)?
1641 // FIXME: Don't we need synchronization logic here so we're holding
1642 // the listener read lock when we call handleMessage? After some
1643 // discussions it sounds like the right thing to do here would be to
1644 // inject the message as a netty upstream channel event so it goes
1645 // through the normal netty event processing, including being
1646 // handled
1647 if (!activeSwitches.containsKey(sw.getId())) return false;
1648
1649 try {
1650 // Pass Floodlight context to the handleMessages()
1651 handleMessage(sw, msg, bc);
1652 } catch (IOException e) {
1653 log.error("Error reinjecting OFMessage on switch {}",
1654 HexString.toHexString(sw.getId()));
1655 return false;
1656 }
1657 return true;
1658 }
1659
1660 @Override
1661 @LogMessageDoc(message="Calling System.exit",
1662 explanation="The controller is terminating")
1663 public synchronized void terminate() {
1664 log.info("Calling System.exit");
1665 System.exit(1);
1666 }
1667
1668 @Override
1669 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1670 // call the overloaded version with floodlight context set to null
1671 return injectOfMessage(sw, msg, null);
1672 }
1673
1674 @Override
1675 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1676 FloodlightContext bc) {
1677 if (log.isTraceEnabled()) {
1678 String str = OFMessage.getDataAsString(sw, m, bc);
1679 log.trace("{}", str);
1680 }
1681
1682 List<IOFMessageListener> listeners = null;
1683 if (messageListeners.containsKey(m.getType())) {
1684 listeners =
1685 messageListeners.get(m.getType()).getOrderedListeners();
1686 }
1687
1688 if (listeners != null) {
1689 for (IOFMessageListener listener : listeners) {
1690 if (listener instanceof IOFSwitchFilter) {
1691 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1692 continue;
1693 }
1694 }
1695 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1696 break;
1697 }
1698 }
1699 }
1700 }
1701
1702 @Override
1703 public BasicFactory getOFMessageFactory() {
1704 return factory;
1705 }
1706
1707 @Override
1708 public String getControllerId() {
1709 return controllerId;
1710 }
1711
1712 // **************
1713 // Initialization
1714 // **************
1715
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001716 /**
1717 * Sets the initial role based on properties in the config params.
1718 * It looks for two different properties.
1719 * If the "role" property is specified then the value should be
1720 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1721 * controller is set to the specified value. If the "role" property
1722 * is not specified then it looks next for the "role.path" property.
1723 * In this case the value should be the path to a property file in
1724 * the file system that contains a property called "floodlight.role"
1725 * which can be one of the values listed above for the "role" property.
1726 * The idea behind the "role.path" mechanism is that you have some
1727 * separate heartbeat and master controller election algorithm that
1728 * determines the role of the controller. When a role transition happens,
1729 * it updates the current role in the file specified by the "role.path"
1730 * file. Then if floodlight restarts for some reason it can get the
1731 * correct current role of the controller from the file.
1732 * @param configParams The config params for the FloodlightProvider service
1733 * @return A valid role if role information is specified in the
1734 * config params, otherwise null
1735 */
1736 @LogMessageDocs({
1737 @LogMessageDoc(message="Controller role set to {role}",
1738 explanation="Setting the initial HA role to "),
1739 @LogMessageDoc(level="ERROR",
1740 message="Invalid current role value: {role}",
1741 explanation="An invalid HA role value was read from the " +
1742 "properties file",
1743 recommendation=LogMessageDoc.CHECK_CONTROLLER)
1744 })
1745 protected Role getInitialRole(Map<String, String> configParams) {
1746 Role role = null;
1747 String roleString = configParams.get("role");
1748 if (roleString == null) {
1749 String rolePath = configParams.get("rolepath");
1750 if (rolePath != null) {
1751 Properties properties = new Properties();
1752 try {
1753 properties.load(new FileInputStream(rolePath));
1754 roleString = properties.getProperty("floodlight.role");
1755 }
1756 catch (IOException exc) {
1757 // Don't treat it as an error if the file specified by the
1758 // rolepath property doesn't exist. This lets us enable the
1759 // HA mechanism by just creating/setting the floodlight.role
1760 // property in that file without having to modify the
1761 // floodlight properties.
1762 }
1763 }
1764 }
1765
1766 if (roleString != null) {
1767 // Canonicalize the string to the form used for the enum constants
1768 roleString = roleString.trim().toUpperCase();
1769 try {
1770 role = Role.valueOf(roleString);
1771 }
1772 catch (IllegalArgumentException exc) {
1773 log.error("Invalid current role value: {}", roleString);
1774 }
1775 }
1776
1777 log.info("Controller role set to {}", role);
1778
1779 return role;
1780 }
1781
1782 /**
1783 * Tell controller that we're ready to accept switches loop
1784 * @throws IOException
1785 */
1786 @LogMessageDocs({
1787 @LogMessageDoc(message="Listening for switch connections on {address}",
1788 explanation="The controller is ready and listening for new" +
1789 " switch connections"),
1790 @LogMessageDoc(message="Storage exception in controller " +
1791 "updates loop; terminating process",
1792 explanation=ERROR_DATABASE,
1793 recommendation=LogMessageDoc.CHECK_CONTROLLER),
1794 @LogMessageDoc(level="ERROR",
1795 message="Exception in controller updates loop",
1796 explanation="Failed to dispatch controller event",
1797 recommendation=LogMessageDoc.GENERIC_ACTION)
1798 })
1799 public void run() {
1800 if (log.isDebugEnabled()) {
1801 logListeners();
1802 }
1803
1804 try {
1805 final ServerBootstrap bootstrap = createServerBootStrap();
1806
1807 bootstrap.setOption("reuseAddr", true);
1808 bootstrap.setOption("child.keepAlive", true);
1809 bootstrap.setOption("child.tcpNoDelay", true);
1810 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
1811
1812 ChannelPipelineFactory pfact =
1813 new OpenflowPipelineFactory(this, null);
1814 bootstrap.setPipelineFactory(pfact);
1815 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
1816 final ChannelGroup cg = new DefaultChannelGroup();
1817 cg.add(bootstrap.bind(sa));
1818
1819 log.info("Listening for switch connections on {}", sa);
1820 } catch (Exception e) {
1821 throw new RuntimeException(e);
1822 }
1823
1824 // main loop
1825 while (true) {
1826 try {
1827 IUpdate update = updates.take();
1828 update.dispatch();
1829 } catch (InterruptedException e) {
1830 return;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001831 } catch (Exception e) {
1832 log.error("Exception in controller updates loop", e);
1833 }
1834 }
1835 }
1836
1837 private ServerBootstrap createServerBootStrap() {
1838 if (workerThreads == 0) {
1839 return new ServerBootstrap(
1840 new NioServerSocketChannelFactory(
1841 Executors.newCachedThreadPool(),
1842 Executors.newCachedThreadPool()));
1843 } else {
1844 return new ServerBootstrap(
1845 new NioServerSocketChannelFactory(
1846 Executors.newCachedThreadPool(),
1847 Executors.newCachedThreadPool(), workerThreads));
1848 }
1849 }
1850
1851 public void setConfigParams(Map<String, String> configParams) {
1852 String ofPort = configParams.get("openflowport");
1853 if (ofPort != null) {
1854 this.openFlowPort = Integer.parseInt(ofPort);
1855 }
1856 log.debug("OpenFlow port set to {}", this.openFlowPort);
1857 String threads = configParams.get("workerthreads");
1858 if (threads != null) {
1859 this.workerThreads = Integer.parseInt(threads);
1860 }
1861 log.debug("Number of worker threads set to {}", this.workerThreads);
1862 String controllerId = configParams.get("controllerid");
1863 if (controllerId != null) {
1864 this.controllerId = controllerId;
1865 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08001866 else {
1867 //Try to get the hostname of the machine and use that for controller ID
1868 try {
1869 String hostname = java.net.InetAddress.getLocalHost().getHostName();
1870 this.controllerId = hostname;
1871 } catch (UnknownHostException e) {
1872 // Can't get hostname, we'll just use the default
1873 }
1874 }
1875
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001876 log.debug("ControllerId set to {}", this.controllerId);
1877 }
1878
1879 private void initVendorMessages() {
1880 // Configure openflowj to be able to parse the role request/reply
1881 // vendor messages.
1882 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
1883 OFNiciraVendorData.NX_VENDOR_ID, 4);
1884 OFVendorId.registerVendorId(niciraVendorId);
1885 OFBasicVendorDataType roleRequestVendorData =
1886 new OFBasicVendorDataType(
1887 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
1888 OFRoleRequestVendorData.getInstantiable());
1889 niciraVendorId.registerVendorDataType(roleRequestVendorData);
1890 OFBasicVendorDataType roleReplyVendorData =
1891 new OFBasicVendorDataType(
1892 OFRoleReplyVendorData.NXT_ROLE_REPLY,
1893 OFRoleReplyVendorData.getInstantiable());
1894 niciraVendorId.registerVendorDataType(roleReplyVendorData);
1895 }
1896
1897 /**
1898 * Initialize internal data structures
1899 */
1900 public void init(Map<String, String> configParams) {
1901 // These data structures are initialized here because other
1902 // module's startUp() might be called before ours
1903 this.messageListeners =
1904 new ConcurrentHashMap<OFType,
1905 ListenerDispatcher<OFType,
1906 IOFMessageListener>>();
1907 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
1908 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
1909 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
1910 this.connectedSwitches = new HashSet<OFSwitchImpl>();
1911 this.controllerNodeIPsCache = new HashMap<String, String>();
1912 this.updates = new LinkedBlockingQueue<IUpdate>();
1913 this.factory = new BasicFactory();
1914 this.providerMap = new HashMap<String, List<IInfoProvider>>();
1915 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08001916 //Set the controller's role to MASTER so it always tries to do role requests.
1917 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001918 this.roleChanger = new RoleChanger();
1919 initVendorMessages();
1920 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001921 }
1922
1923 /**
1924 * Startup all of the controller's components
1925 */
1926 @LogMessageDoc(message="Waiting for storage source",
1927 explanation="The system database is not yet ready",
1928 recommendation="If this message persists, this indicates " +
1929 "that the system database has failed to start. " +
1930 LogMessageDoc.CHECK_CONTROLLER)
1931 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08001932 try {
1933 registryService.registerController(controllerId);
Jonathan Hartb0904bf2013-11-26 14:41:11 -08001934 } catch (RegistryException e) {
1935 log.warn("Registry service error: {}", e.getMessage());
Jonathan Hartd10008d2013-02-23 17:04:08 -08001936 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001937
1938 // Add our REST API
1939 restApi.addRestletRoutable(new CoreWebRoutable());
1940 }
1941
1942 @Override
1943 public void addInfoProvider(String type, IInfoProvider provider) {
1944 if (!providerMap.containsKey(type)) {
1945 providerMap.put(type, new ArrayList<IInfoProvider>());
1946 }
1947 providerMap.get(type).add(provider);
1948 }
1949
1950 @Override
1951 public void removeInfoProvider(String type, IInfoProvider provider) {
1952 if (!providerMap.containsKey(type)) {
1953 log.debug("Provider type {} doesn't exist.", type);
1954 return;
1955 }
1956
1957 providerMap.get(type).remove(provider);
1958 }
1959
1960 public Map<String, Object> getControllerInfo(String type) {
1961 if (!providerMap.containsKey(type)) return null;
1962
1963 Map<String, Object> result = new LinkedHashMap<String, Object>();
1964 for (IInfoProvider provider : providerMap.get(type)) {
1965 result.putAll(provider.getInfo(type));
1966 }
1967
1968 return result;
1969 }
1970
1971 @Override
1972 public void addHAListener(IHAListener listener) {
1973 this.haListeners.add(listener);
1974 }
1975
1976 @Override
1977 public void removeHAListener(IHAListener listener) {
1978 this.haListeners.remove(listener);
1979 }
1980
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001981 @Override
1982 public Map<String, String> getControllerNodeIPs() {
1983 // We return a copy of the mapping so we can guarantee that
1984 // the mapping return is the same as one that will be (or was)
1985 // dispatched to IHAListeners
1986 HashMap<String,String> retval = new HashMap<String,String>();
1987 synchronized(controllerNodeIPsCache) {
1988 retval.putAll(controllerNodeIPsCache);
1989 }
1990 return retval;
1991 }
1992
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001993 @Override
1994 public long getSystemStartTime() {
1995 return (this.systemStartTime);
1996 }
1997
1998 @Override
1999 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2000 this.alwaysClearFlowsOnSwAdd = value;
2001 }
2002
2003 public boolean getAlwaysClearFlowsOnSwAdd() {
2004 return this.alwaysClearFlowsOnSwAdd;
2005 }
2006}