blob: 7f788b198a02d25cf6aff6ecccf64879b42a247b [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080022import java.net.InetSocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080023import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080024import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.util.Collection;
27import java.util.Collections;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.HashMap;
29import java.util.HashSet;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080030import java.util.LinkedHashMap;
31import java.util.List;
32import java.util.Map;
33import java.util.Map.Entry;
34import java.util.Properties;
35import java.util.Set;
36import java.util.Stack;
37import java.util.concurrent.BlockingQueue;
38import java.util.concurrent.ConcurrentHashMap;
39import java.util.concurrent.ConcurrentMap;
40import java.util.concurrent.CopyOnWriteArraySet;
41import java.util.concurrent.Executors;
42import java.util.concurrent.Future;
43import java.util.concurrent.LinkedBlockingQueue;
44import java.util.concurrent.RejectedExecutionException;
45import java.util.concurrent.TimeUnit;
46import java.util.concurrent.TimeoutException;
47
48import net.floodlightcontroller.core.FloodlightContext;
49import net.floodlightcontroller.core.IFloodlightProviderService;
50import net.floodlightcontroller.core.IHAListener;
51import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080052import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080053import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080054import net.floodlightcontroller.core.IOFSwitch;
55import net.floodlightcontroller.core.IOFSwitchFilter;
56import net.floodlightcontroller.core.IOFSwitchListener;
Pankaj Berdedc73bb12013-08-14 13:46:38 -070057import net.floodlightcontroller.core.IUpdate;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.annotations.LogMessageDoc;
59import net.floodlightcontroller.core.annotations.LogMessageDocs;
60import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
61import net.floodlightcontroller.core.util.ListenerDispatcher;
62import net.floodlightcontroller.core.web.CoreWebRoutable;
63import net.floodlightcontroller.counter.ICounterStoreService;
64import net.floodlightcontroller.packet.Ethernet;
65import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
66import net.floodlightcontroller.restserver.IRestApiService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080067import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070068import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080069import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080070import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080071import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080072
73import org.jboss.netty.bootstrap.ServerBootstrap;
74import org.jboss.netty.buffer.ChannelBuffer;
75import org.jboss.netty.buffer.ChannelBuffers;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080076import org.jboss.netty.channel.ChannelHandlerContext;
77import org.jboss.netty.channel.ChannelPipelineFactory;
78import org.jboss.netty.channel.ChannelStateEvent;
79import org.jboss.netty.channel.ChannelUpstreamHandler;
80import org.jboss.netty.channel.Channels;
81import org.jboss.netty.channel.ExceptionEvent;
82import org.jboss.netty.channel.MessageEvent;
83import org.jboss.netty.channel.group.ChannelGroup;
84import org.jboss.netty.channel.group.DefaultChannelGroup;
85import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
86import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
87import org.jboss.netty.handler.timeout.IdleStateEvent;
88import org.jboss.netty.handler.timeout.ReadTimeoutException;
89import org.openflow.protocol.OFEchoReply;
90import org.openflow.protocol.OFError;
91import org.openflow.protocol.OFError.OFBadActionCode;
92import org.openflow.protocol.OFError.OFBadRequestCode;
93import org.openflow.protocol.OFError.OFErrorType;
94import org.openflow.protocol.OFError.OFFlowModFailedCode;
95import org.openflow.protocol.OFError.OFHelloFailedCode;
96import org.openflow.protocol.OFError.OFPortModFailedCode;
97import org.openflow.protocol.OFError.OFQueueOpFailedCode;
98import org.openflow.protocol.OFFeaturesReply;
99import org.openflow.protocol.OFGetConfigReply;
100import org.openflow.protocol.OFMessage;
101import org.openflow.protocol.OFPacketIn;
102import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800103import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800104import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800105import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800106import org.openflow.protocol.OFPortStatus.OFPortReason;
107import org.openflow.protocol.OFSetConfig;
108import org.openflow.protocol.OFStatisticsRequest;
109import org.openflow.protocol.OFSwitchConfig;
110import org.openflow.protocol.OFType;
111import org.openflow.protocol.OFVendor;
112import org.openflow.protocol.factory.BasicFactory;
113import org.openflow.protocol.factory.MessageParseException;
114import org.openflow.protocol.statistics.OFDescriptionStatistics;
115import org.openflow.protocol.statistics.OFStatistics;
116import org.openflow.protocol.statistics.OFStatisticsType;
117import org.openflow.protocol.vendor.OFBasicVendorDataType;
118import org.openflow.protocol.vendor.OFBasicVendorId;
119import org.openflow.protocol.vendor.OFVendorId;
120import org.openflow.util.HexString;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800121import org.openflow.vendor.nicira.OFNiciraVendorData;
122import org.openflow.vendor.nicira.OFRoleReplyVendorData;
123import org.openflow.vendor.nicira.OFRoleRequestVendorData;
124import org.openflow.vendor.nicira.OFRoleVendorData;
125import org.slf4j.Logger;
126import org.slf4j.LoggerFactory;
127
128
129/**
130 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700131 *
132 * Extensions made by ONOS are:
133 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
134 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
135 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
Pavlin Radoslavova653e9f2013-10-16 03:08:52 -0700136 * - Register ONOS services. (IControllerRegistryService)
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700137 * - Additional DEBUG logs
138 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800139 */
Jonathan Hart2fa28062013-11-25 20:16:28 -0800140public class Controller implements IFloodlightProviderService {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700141
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -0700142 protected final static Logger log = LoggerFactory.getLogger(Controller.class);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800143
144 private static final String ERROR_DATABASE =
145 "The controller could not communicate with the system database.";
146
147 protected BasicFactory factory;
148 protected ConcurrentMap<OFType,
149 ListenerDispatcher<OFType,IOFMessageListener>>
150 messageListeners;
151 // The activeSwitches map contains only those switches that are actively
152 // being controlled by us -- it doesn't contain switches that are
153 // in the slave role
154 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
155 // connectedSwitches contains all connected switches, including ones where
156 // we're a slave controller. We need to keep track of them so that we can
157 // send role request messages to switches when our role changes to master
158 // We add a switch to this set after it successfully completes the
159 // handshake. Access to this Set needs to be synchronized with roleChanger
160 protected HashSet<OFSwitchImpl> connectedSwitches;
161
162 // The controllerNodeIPsCache maps Controller IDs to their IP address.
163 // It's only used by handleControllerNodeIPsChanged
164 protected HashMap<String, String> controllerNodeIPsCache;
165
166 protected Set<IOFSwitchListener> switchListeners;
167 protected Set<IHAListener> haListeners;
168 protected Map<String, List<IInfoProvider>> providerMap;
169 protected BlockingQueue<IUpdate> updates;
170
171 // Module dependencies
172 protected IRestApiService restApi;
173 protected ICounterStoreService counterStore = null;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800174 protected IPktInProcessingTimeService pktinProcTime;
175 protected IThreadPoolService threadPool;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800176 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800177
178 // Configuration options
179 protected int openFlowPort = 6633;
180 protected int workerThreads = 0;
181 // The id for this controller node. Should be unique for each controller
182 // node in a controller cluster.
183 protected String controllerId = "localhost";
184
185 // The current role of the controller.
186 // If the controller isn't configured to support roles, then this is null.
187 protected Role role;
188 // A helper that handles sending and timeout handling for role requests
189 protected RoleChanger roleChanger;
190
191 // Start time of the controller
192 protected long systemStartTime;
193
194 // Flag to always flush flow table on switch reconnect (HA or otherwise)
195 protected boolean alwaysClearFlowsOnSwAdd = false;
196
Jonathan Hart2fa28062013-11-25 20:16:28 -0800197 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800198 // Storage table names
199 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
200 protected static final String CONTROLLER_ID = "id";
201
202 protected static final String SWITCH_TABLE_NAME = "controller_switch";
203 protected static final String SWITCH_DATAPATH_ID = "dpid";
204 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
205 protected static final String SWITCH_IP = "ip";
206 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
207 protected static final String SWITCH_ACTIVE = "active";
208 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
209 protected static final String SWITCH_CAPABILITIES = "capabilities";
210 protected static final String SWITCH_BUFFERS = "buffers";
211 protected static final String SWITCH_TABLES = "tables";
212 protected static final String SWITCH_ACTIONS = "actions";
213
214 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
215 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
216
217 protected static final String PORT_TABLE_NAME = "controller_port";
218 protected static final String PORT_ID = "id";
219 protected static final String PORT_SWITCH = "switch_id";
220 protected static final String PORT_NUMBER = "number";
221 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
222 protected static final String PORT_NAME = "name";
223 protected static final String PORT_CONFIG = "config";
224 protected static final String PORT_STATE = "state";
225 protected static final String PORT_CURRENT_FEATURES = "current_features";
226 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
227 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
228 protected static final String PORT_PEER_FEATURES = "peer_features";
229
230 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
231 protected static final String CONTROLLER_INTERFACE_ID = "id";
232 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
233 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
234 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
235 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
Jonathan Hart2fa28062013-11-25 20:16:28 -0800236 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800237
238
239 // Perf. related configuration
240 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
241 protected static final int BATCH_MAX_SIZE = 100;
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700242 protected static final boolean ALWAYS_DECODE_ETH = true;
243
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800244 public enum SwitchUpdateType {
245 ADDED,
246 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700247 PORTCHANGED,
248 PORTADDED,
249 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800250 }
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700251
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800252 /**
253 * Update message indicating a switch was added or removed
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700254 * ONOS: This message extended to indicate Port add or removed event.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800255 */
256 protected class SwitchUpdate implements IUpdate {
257 public IOFSwitch sw;
HIGUCHI Yutaec4bff82013-06-17 11:49:31 -0700258 public OFPhysicalPort port; // Added by ONOS
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800259 public SwitchUpdateType switchUpdateType;
260 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
261 this.sw = sw;
262 this.switchUpdateType = switchUpdateType;
263 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700264 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
265 this.sw = sw;
266 this.port = port;
267 this.switchUpdateType = switchUpdateType;
268 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800269 public void dispatch() {
270 if (log.isTraceEnabled()) {
271 log.trace("Dispatching switch update {} {}",
272 sw, switchUpdateType);
273 }
274 if (switchListeners != null) {
275 for (IOFSwitchListener listener : switchListeners) {
276 switch(switchUpdateType) {
277 case ADDED:
278 listener.addedSwitch(sw);
279 break;
280 case REMOVED:
281 listener.removedSwitch(sw);
282 break;
283 case PORTCHANGED:
284 listener.switchPortChanged(sw.getId());
285 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700286 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700287 if (listener instanceof IOFSwitchPortListener) {
288 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
289 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700290 break;
291 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700292 if (listener instanceof IOFSwitchPortListener) {
293 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
294 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700295 break;
296 default:
297 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800298 }
299 }
300 }
301 }
302 }
303
304 /**
305 * Update message indicating controller's role has changed
306 */
307 protected class HARoleUpdate implements IUpdate {
308 public Role oldRole;
309 public Role newRole;
310 public HARoleUpdate(Role newRole, Role oldRole) {
311 this.oldRole = oldRole;
312 this.newRole = newRole;
313 }
314 public void dispatch() {
315 // Make sure that old and new roles are different.
316 if (oldRole == newRole) {
317 if (log.isTraceEnabled()) {
318 log.trace("HA role update ignored as the old and " +
319 "new roles are the same. newRole = {}" +
320 "oldRole = {}", newRole, oldRole);
321 }
322 return;
323 }
324 if (log.isTraceEnabled()) {
325 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
326 newRole, oldRole);
327 }
328 if (haListeners != null) {
329 for (IHAListener listener : haListeners) {
330 listener.roleChanged(oldRole, newRole);
331 }
332 }
333 }
334 }
335
336 /**
337 * Update message indicating
338 * IPs of controllers in controller cluster have changed.
339 */
340 protected class HAControllerNodeIPUpdate implements IUpdate {
341 public Map<String,String> curControllerNodeIPs;
342 public Map<String,String> addedControllerNodeIPs;
343 public Map<String,String> removedControllerNodeIPs;
344 public HAControllerNodeIPUpdate(
345 HashMap<String,String> curControllerNodeIPs,
346 HashMap<String,String> addedControllerNodeIPs,
347 HashMap<String,String> removedControllerNodeIPs) {
348 this.curControllerNodeIPs = curControllerNodeIPs;
349 this.addedControllerNodeIPs = addedControllerNodeIPs;
350 this.removedControllerNodeIPs = removedControllerNodeIPs;
351 }
352 public void dispatch() {
353 if (log.isTraceEnabled()) {
354 log.trace("Dispatching HA Controller Node IP update "
355 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
356 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
357 removedControllerNodeIPs }
358 );
359 }
360 if (haListeners != null) {
361 for (IHAListener listener: haListeners) {
362 listener.controllerNodeIPsChanged(curControllerNodeIPs,
363 addedControllerNodeIPs, removedControllerNodeIPs);
364 }
365 }
366 }
367 }
368
369 // ***************
370 // Getters/Setters
371 // ***************
372
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800373 public void setCounterStore(ICounterStoreService counterStore) {
374 this.counterStore = counterStore;
375 }
376
377 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
378 this.pktinProcTime = pits;
379 }
380
381 public void setRestApiService(IRestApiService restApi) {
382 this.restApi = restApi;
383 }
384
385 public void setThreadPoolService(IThreadPoolService tp) {
386 this.threadPool = tp;
387 }
388
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800389 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800390 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800391 }
392
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800393 @Override
394 public Role getRole() {
395 synchronized(roleChanger) {
396 return role;
397 }
398 }
399
400 @Override
401 public void setRole(Role role) {
402 if (role == null) throw new NullPointerException("Role can not be null.");
Jonathan Hart2fa28062013-11-25 20:16:28 -0800403 //if (role == Role.MASTER && this.role == Role.SLAVE) {
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800404 // Reset db state to Inactive for all switches.
Jonathan Hart2fa28062013-11-25 20:16:28 -0800405 //updateAllInactiveSwitchInfo();
406 //}
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800407
408 // Need to synchronize to ensure a reliable ordering on role request
409 // messages send and to ensure the list of connected switches is stable
410 // RoleChanger will handle the actual sending of the message and
411 // timeout handling
412 // @see RoleChanger
413 synchronized(roleChanger) {
414 if (role.equals(this.role)) {
415 log.debug("Ignoring role change: role is already {}", role);
416 return;
417 }
418
419 Role oldRole = this.role;
420 this.role = role;
421
422 log.debug("Submitting role change request to role {}", role);
423 roleChanger.submitRequest(connectedSwitches, role);
424
425 // Enqueue an update for our listeners.
426 try {
427 this.updates.put(new HARoleUpdate(role, oldRole));
428 } catch (InterruptedException e) {
429 log.error("Failure adding update to queue", e);
430 }
431 }
432 }
433
Pankaj Berdedc73bb12013-08-14 13:46:38 -0700434 public void publishUpdate(IUpdate update) {
435 try {
436 this.updates.put(update);
437 } catch (InterruptedException e) {
438 log.error("Failure adding update to queue", e);
439 }
440 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800441
442 // **********************
443 // ChannelUpstreamHandler
444 // **********************
445
446 /**
447 * Return a new channel handler for processing a switch connections
448 * @param state The channel state object for the connection
449 * @return the new channel handler
450 */
451 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
452 return new OFChannelHandler(state);
453 }
454
Jonathan Hartcc957a02013-02-26 10:39:04 -0800455 protected class RoleChangeCallback implements ControlChangeCallback {
456 @Override
457 public void controlChanged(long dpid, boolean hasControl) {
458 log.info("Role change callback for switch {}, hasControl {}",
459 HexString.toHexString(dpid), hasControl);
460
461 synchronized(roleChanger){
462 OFSwitchImpl sw = null;
463 for (OFSwitchImpl connectedSw : connectedSwitches){
464 if (connectedSw.getId() == dpid){
465 sw = connectedSw;
466 break;
467 }
468 }
469 if (sw == null){
470 log.warn("Switch {} not found in connected switches",
471 HexString.toHexString(dpid));
472 return;
473 }
474
475 Role role = null;
476
Pankaj Berde01939e92013-03-08 14:38:27 -0800477 /*
478 * issue #229
479 * Cannot rely on sw.getRole() as it can be behind due to pending
480 * role changes in the queue. Just submit it and late the RoleChanger
481 * handle duplicates.
482 */
483
484 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800485 role = Role.MASTER;
486 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800487 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800488 role = Role.SLAVE;
489 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800490
491 log.debug("Sending role request {} msg to {}", role, sw);
492 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
493 swList.add(sw);
494 roleChanger.submitRequest(swList, role);
495
Jonathan Hartcc957a02013-02-26 10:39:04 -0800496 }
497
498 }
499 }
500
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800501 /**
502 * Channel handler deals with the switch connection and dispatches
503 * switch messages to the appropriate locations.
504 * @author readams
505 */
506 protected class OFChannelHandler
507 extends IdleStateAwareChannelUpstreamHandler {
508 protected OFSwitchImpl sw;
509 protected OFChannelState state;
510
511 public OFChannelHandler(OFChannelState state) {
512 this.state = state;
513 }
514
515 @Override
516 @LogMessageDoc(message="New switch connection from {ip address}",
517 explanation="A new switch has connected from the " +
518 "specified IP address")
519 public void channelConnected(ChannelHandlerContext ctx,
520 ChannelStateEvent e) throws Exception {
521 log.info("New switch connection from {}",
522 e.getChannel().getRemoteAddress());
523
524 sw = new OFSwitchImpl();
525 sw.setChannel(e.getChannel());
526 sw.setFloodlightProvider(Controller.this);
527 sw.setThreadPoolService(threadPool);
528
529 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
530 msglist.add(factory.getMessage(OFType.HELLO));
531 e.getChannel().write(msglist);
532
533 }
534
535 @Override
536 @LogMessageDoc(message="Disconnected switch {switch information}",
537 explanation="The specified switch has disconnected.")
538 public void channelDisconnected(ChannelHandlerContext ctx,
539 ChannelStateEvent e) throws Exception {
540 if (sw != null && state.hsState == HandshakeState.READY) {
541 if (activeSwitches.containsKey(sw.getId())) {
542 // It's safe to call removeSwitch even though the map might
543 // not contain this particular switch but another with the
544 // same DPID
545 removeSwitch(sw);
546 }
547 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700548 if (controlRequested) {
549 registryService.releaseControl(sw.getId());
550 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800551 connectedSwitches.remove(sw);
552 }
553 sw.setConnected(false);
554 }
555 log.info("Disconnected switch {}", sw);
556 }
557
558 @Override
559 @LogMessageDocs({
560 @LogMessageDoc(level="ERROR",
561 message="Disconnecting switch {switch} due to read timeout",
562 explanation="The connected switch has failed to send any " +
563 "messages or respond to echo requests",
564 recommendation=LogMessageDoc.CHECK_SWITCH),
565 @LogMessageDoc(level="ERROR",
566 message="Disconnecting switch {switch}: failed to " +
567 "complete handshake",
568 explanation="The switch did not respond correctly " +
569 "to handshake messages",
570 recommendation=LogMessageDoc.CHECK_SWITCH),
571 @LogMessageDoc(level="ERROR",
572 message="Disconnecting switch {switch} due to IO Error: {}",
573 explanation="There was an error communicating with the switch",
574 recommendation=LogMessageDoc.CHECK_SWITCH),
575 @LogMessageDoc(level="ERROR",
576 message="Disconnecting switch {switch} due to switch " +
577 "state error: {error}",
578 explanation="The switch sent an unexpected message",
579 recommendation=LogMessageDoc.CHECK_SWITCH),
580 @LogMessageDoc(level="ERROR",
581 message="Disconnecting switch {switch} due to " +
582 "message parse failure",
583 explanation="Could not parse a message from the switch",
584 recommendation=LogMessageDoc.CHECK_SWITCH),
585 @LogMessageDoc(level="ERROR",
586 message="Terminating controller due to storage exception",
587 explanation=ERROR_DATABASE,
588 recommendation=LogMessageDoc.CHECK_CONTROLLER),
589 @LogMessageDoc(level="ERROR",
590 message="Could not process message: queue full",
591 explanation="OpenFlow messages are arriving faster than " +
592 " the controller can process them.",
593 recommendation=LogMessageDoc.CHECK_CONTROLLER),
594 @LogMessageDoc(level="ERROR",
595 message="Error while processing message " +
596 "from switch {switch} {cause}",
597 explanation="An error occurred processing the switch message",
598 recommendation=LogMessageDoc.GENERIC_ACTION)
599 })
600 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
601 throws Exception {
602 if (e.getCause() instanceof ReadTimeoutException) {
603 // switch timeout
604 log.error("Disconnecting switch {} due to read timeout", sw);
605 ctx.getChannel().close();
606 } else if (e.getCause() instanceof HandshakeTimeoutException) {
607 log.error("Disconnecting switch {}: failed to complete handshake",
608 sw);
609 ctx.getChannel().close();
610 } else if (e.getCause() instanceof ClosedChannelException) {
611 //log.warn("Channel for sw {} already closed", sw);
612 } else if (e.getCause() instanceof IOException) {
613 log.error("Disconnecting switch {} due to IO Error: {}",
614 sw, e.getCause().getMessage());
615 ctx.getChannel().close();
616 } else if (e.getCause() instanceof SwitchStateException) {
617 log.error("Disconnecting switch {} due to switch state error: {}",
618 sw, e.getCause().getMessage());
619 ctx.getChannel().close();
620 } else if (e.getCause() instanceof MessageParseException) {
621 log.error("Disconnecting switch " + sw +
622 " due to message parse failure",
623 e.getCause());
624 ctx.getChannel().close();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800625 } else if (e.getCause() instanceof RejectedExecutionException) {
626 log.warn("Could not process message: queue full");
627 } else {
628 log.error("Error while processing message from switch " + sw,
629 e.getCause());
630 ctx.getChannel().close();
631 }
632 }
633
634 @Override
635 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
636 throws Exception {
637 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
638 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
639 e.getChannel().write(msglist);
640 }
641
642 @Override
643 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
644 throws Exception {
645 if (e.getMessage() instanceof List) {
646 @SuppressWarnings("unchecked")
647 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
648
649 for (OFMessage ofm : msglist) {
650 try {
651 processOFMessage(ofm);
652 }
653 catch (Exception ex) {
654 // We are the last handler in the stream, so run the
655 // exception through the channel again by passing in
656 // ctx.getChannel().
657 Channels.fireExceptionCaught(ctx.getChannel(), ex);
658 }
659 }
660
661 // Flush all flow-mods/packet-out generated from this "train"
662 OFSwitchImpl.flush_all();
663 }
664 }
665
666 /**
667 * Process the request for the switch description
668 */
669 @LogMessageDoc(level="ERROR",
670 message="Exception in reading description " +
671 " during handshake {exception}",
672 explanation="Could not process the switch description string",
673 recommendation=LogMessageDoc.CHECK_SWITCH)
674 void processSwitchDescReply() {
675 try {
676 // Read description, if it has been updated
677 @SuppressWarnings("unchecked")
678 Future<List<OFStatistics>> desc_future =
679 (Future<List<OFStatistics>>)sw.
680 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
681 List<OFStatistics> values =
682 desc_future.get(0, TimeUnit.MILLISECONDS);
683 if (values != null) {
684 OFDescriptionStatistics description =
685 new OFDescriptionStatistics();
686 ChannelBuffer data =
687 ChannelBuffers.buffer(description.getLength());
688 for (OFStatistics f : values) {
689 f.writeTo(data);
690 description.readFrom(data);
691 break; // SHOULD be a list of length 1
692 }
693 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
694 description);
695 sw.setSwitchProperties(description);
696 data = null;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800697 }
Jonathan Hart2fa28062013-11-25 20:16:28 -0800698
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800699 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
700 state.hasDescription = true;
701 checkSwitchReady();
702 }
703 catch (InterruptedException ex) {
704 // Ignore
705 }
706 catch (TimeoutException ex) {
707 // Ignore
708 } catch (Exception ex) {
709 log.error("Exception in reading description " +
710 " during handshake", ex);
711 }
712 }
713
714 /**
715 * Send initial switch setup information that we need before adding
716 * the switch
717 * @throws IOException
718 */
719 void sendHelloConfiguration() throws IOException {
720 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700721 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800722 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
723 }
724
725 /**
726 * Send the configuration requests we can only do after we have
727 * the features reply
728 * @throws IOException
729 */
730 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700731 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800732 // Ensure we receive the full packet via PacketIn
733 OFSetConfig config = (OFSetConfig) factory
734 .getMessage(OFType.SET_CONFIG);
735 config.setMissSendLength((short) 0xffff)
736 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
737 sw.write(config, null);
738 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
739 null);
740
741 // Get Description to set switch-specific flags
742 OFStatisticsRequest req = new OFStatisticsRequest();
743 req.setStatisticType(OFStatisticsType.DESC);
744 req.setLengthU(req.getLengthU());
745 Future<List<OFStatistics>> dfuture =
746 sw.getStatistics(req);
747 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
748 dfuture);
749
750 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700751
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700752 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800753 protected void checkSwitchReady() {
754 if (state.hsState == HandshakeState.FEATURES_REPLY &&
755 state.hasDescription && state.hasGetConfigReply) {
756
757 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700758 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800759
760 synchronized(roleChanger) {
761 // We need to keep track of all of the switches that are connected
762 // to the controller, in any role, so that we can later send the
763 // role request messages when the controller role changes.
764 // We need to be synchronized while doing this: we must not
765 // send a another role request to the connectedSwitches until
766 // we were able to add this new switch to connectedSwitches
767 // *and* send the current role to the new switch.
768 connectedSwitches.add(sw);
769
770 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800771 //Put the switch in SLAVE mode until we know we have control
772 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
773 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
774 swList.add(sw);
775 roleChanger.submitRequest(swList, Role.SLAVE);
776
Jonathan Hartcc957a02013-02-26 10:39:04 -0800777 //Request control of the switch from the global registry
778 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700779 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800780 registryService.requestControl(sw.getId(),
781 new RoleChangeCallback());
782 } catch (RegistryException e) {
783 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700784 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800785 }
786
Jonathan Hart97801ac2013-02-26 14:29:16 -0800787
Jonathan Hartcc957a02013-02-26 10:39:04 -0800788
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800789 // Send a role request if role support is enabled for the controller
790 // This is a probe that we'll use to determine if the switch
791 // actually supports the role request message. If it does we'll
792 // get back a role reply message. If it doesn't we'll get back an
793 // OFError message.
794 // If role is MASTER we will promote switch to active
795 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800796 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800797 log.debug("This controller's role is {}, " +
798 "sending initial role request msg to {}",
799 role, sw);
800 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
801 swList.add(sw);
802 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800803 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800804 }
805 else {
806 // Role supported not enabled on controller (for now)
807 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800808 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800809 "not sending role request msg to {}",
810 role, sw);
811 // Need to clear FlowMods before we add the switch
812 // and dispatch updates otherwise we have a race condition.
813 sw.clearAllFlowMods();
814 addSwitch(sw);
815 state.firstRoleReplyReceived = true;
816 }
817 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700818 if (!controlRequested) {
819 // yield to allow other thread(s) to release control
820 try {
821 Thread.sleep(10);
822 } catch (InterruptedException e) {
823 // Ignore interruptions
824 }
825 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700826 log.debug("Closing {} because we weren't able to request control " +
827 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700828 sw.channel.close();
829 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800830 }
831 }
832
833 /* Handle a role reply message we received from the switch. Since
834 * netty serializes message dispatch we don't need to synchronize
835 * against other receive operations from the same switch, so no need
836 * to synchronize addSwitch(), removeSwitch() operations from the same
837 * connection.
838 * FIXME: However, when a switch with the same DPID connects we do
839 * need some synchronization. However, handling switches with same
840 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
841 * removedSwitch notification):1
842 *
843 */
844 @LogMessageDoc(level="ERROR",
845 message="Invalid role value in role reply message",
846 explanation="Was unable to set the HA role (master or slave) " +
847 "for the controller.",
848 recommendation=LogMessageDoc.CHECK_CONTROLLER)
849 protected void handleRoleReplyMessage(OFVendor vendorMessage,
850 OFRoleReplyVendorData roleReplyVendorData) {
851 // Map from the role code in the message to our role enum
852 int nxRole = roleReplyVendorData.getRole();
853 Role role = null;
854 switch (nxRole) {
855 case OFRoleVendorData.NX_ROLE_OTHER:
856 role = Role.EQUAL;
857 break;
858 case OFRoleVendorData.NX_ROLE_MASTER:
859 role = Role.MASTER;
860 break;
861 case OFRoleVendorData.NX_ROLE_SLAVE:
862 role = Role.SLAVE;
863 break;
864 default:
865 log.error("Invalid role value in role reply message");
866 sw.getChannel().close();
867 return;
868 }
869
870 log.debug("Handling role reply for role {} from {}. " +
871 "Controller's role is {} ",
872 new Object[] { role, sw, Controller.this.role}
873 );
874
875 sw.deliverRoleReply(vendorMessage.getXid(), role);
876
877 boolean isActive = activeSwitches.containsKey(sw.getId());
878 if (!isActive && sw.isActive()) {
879 // Transition from SLAVE to MASTER.
880
881 if (!state.firstRoleReplyReceived ||
882 getAlwaysClearFlowsOnSwAdd()) {
883 // This is the first role-reply message we receive from
884 // this switch or roles were disabled when the switch
885 // connected:
886 // Delete all pre-existing flows for new connections to
887 // the master
888 //
889 // FIXME: Need to think more about what the test should
890 // be for when we flush the flow-table? For example,
891 // if all the controllers are temporarily in the backup
892 // role (e.g. right after a failure of the master
893 // controller) at the point the switch connects, then
894 // all of the controllers will initially connect as
895 // backup controllers and not flush the flow-table.
896 // Then when one of them is promoted to master following
897 // the master controller election the flow-table
898 // will still not be flushed because that's treated as
899 // a failover event where we don't want to flush the
900 // flow-table. The end result would be that the flow
901 // table for a newly connected switch is never
902 // flushed. Not sure how to handle that case though...
903 sw.clearAllFlowMods();
904 log.debug("First role reply from master switch {}, " +
905 "clear FlowTable to active switch list",
906 HexString.toHexString(sw.getId()));
907 }
908
909 // Some switches don't seem to update us with port
910 // status messages while in slave role.
Jonathan Hart2fa28062013-11-25 20:16:28 -0800911 //readSwitchPortStateFromStorage(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800912
913 // Only add the switch to the active switch list if
914 // we're not in the slave role. Note that if the role
915 // attribute is null, then that means that the switch
916 // doesn't support the role request messages, so in that
917 // case we're effectively in the EQUAL role and the
918 // switch should be included in the active switch list.
919 addSwitch(sw);
920 log.debug("Added master switch {} to active switch list",
921 HexString.toHexString(sw.getId()));
922
923 }
924 else if (isActive && !sw.isActive()) {
925 // Transition from MASTER to SLAVE: remove switch
926 // from active switch list.
927 log.debug("Removed slave switch {} from active switch" +
928 " list", HexString.toHexString(sw.getId()));
929 removeSwitch(sw);
930 }
931
932 // Indicate that we have received a role reply message.
933 state.firstRoleReplyReceived = true;
934 }
935
936 protected boolean handleVendorMessage(OFVendor vendorMessage) {
937 boolean shouldHandleMessage = false;
938 int vendor = vendorMessage.getVendor();
939 switch (vendor) {
940 case OFNiciraVendorData.NX_VENDOR_ID:
941 OFNiciraVendorData niciraVendorData =
942 (OFNiciraVendorData)vendorMessage.getVendorData();
943 int dataType = niciraVendorData.getDataType();
944 switch (dataType) {
945 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
946 OFRoleReplyVendorData roleReplyVendorData =
947 (OFRoleReplyVendorData) niciraVendorData;
948 handleRoleReplyMessage(vendorMessage,
949 roleReplyVendorData);
950 break;
951 default:
952 log.warn("Unhandled Nicira VENDOR message; " +
953 "data type = {}", dataType);
954 break;
955 }
956 break;
957 default:
958 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
959 break;
960 }
961
962 return shouldHandleMessage;
963 }
964
965 /**
966 * Dispatch an Openflow message from a switch to the appropriate
967 * handler.
968 * @param m The message to process
969 * @throws IOException
970 * @throws SwitchStateException
971 */
972 @LogMessageDocs({
973 @LogMessageDoc(level="WARN",
974 message="Config Reply from {switch} has " +
975 "miss length set to {length}",
976 explanation="The controller requires that the switch " +
977 "use a miss length of 0xffff for correct " +
978 "function",
979 recommendation="Use a different switch to ensure " +
980 "correct function"),
981 @LogMessageDoc(level="WARN",
982 message="Received ERROR from sw {switch} that "
983 +"indicates roles are not supported "
984 +"but we have received a valid "
985 +"role reply earlier",
986 explanation="The switch sent a confusing message to the" +
987 "controller")
988 })
989 protected void processOFMessage(OFMessage m)
990 throws IOException, SwitchStateException {
991 boolean shouldHandleMessage = false;
992
993 switch (m.getType()) {
994 case HELLO:
995 if (log.isTraceEnabled())
996 log.trace("HELLO from {}", sw);
997
998 if (state.hsState.equals(HandshakeState.START)) {
999 state.hsState = HandshakeState.HELLO;
1000 sendHelloConfiguration();
1001 } else {
1002 throw new SwitchStateException("Unexpected HELLO from "
1003 + sw);
1004 }
1005 break;
1006 case ECHO_REQUEST:
1007 OFEchoReply reply =
1008 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1009 reply.setXid(m.getXid());
1010 sw.write(reply, null);
1011 break;
1012 case ECHO_REPLY:
1013 break;
1014 case FEATURES_REPLY:
1015 if (log.isTraceEnabled())
1016 log.trace("Features Reply from {}", sw);
1017
1018 sw.setFeaturesReply((OFFeaturesReply) m);
1019 if (state.hsState.equals(HandshakeState.HELLO)) {
1020 sendFeatureReplyConfiguration();
1021 state.hsState = HandshakeState.FEATURES_REPLY;
1022 // uncomment to enable "dumb" switches like cbench
1023 // state.hsState = HandshakeState.READY;
1024 // addSwitch(sw);
1025 } else {
1026 // return results to rest api caller
1027 sw.deliverOFFeaturesReply(m);
1028 // update database */
Jonathan Hart2fa28062013-11-25 20:16:28 -08001029 //updateActiveSwitchInfo(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001030 }
1031 break;
1032 case GET_CONFIG_REPLY:
1033 if (log.isTraceEnabled())
1034 log.trace("Get config reply from {}", sw);
1035
1036 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1037 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1038 throw new SwitchStateException(em);
1039 }
1040 OFGetConfigReply cr = (OFGetConfigReply) m;
1041 if (cr.getMissSendLength() == (short)0xffff) {
1042 log.trace("Config Reply from {} confirms " +
1043 "miss length set to 0xffff", sw);
1044 } else {
1045 log.warn("Config Reply from {} has " +
1046 "miss length set to {}",
1047 sw, cr.getMissSendLength() & 0xffff);
1048 }
1049 state.hasGetConfigReply = true;
1050 checkSwitchReady();
1051 break;
1052 case VENDOR:
1053 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1054 break;
1055 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001056 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001057 // TODO: we need better error handling. Especially for
1058 // request/reply style message (stats, roles) we should have
1059 // a unified way to lookup the xid in the error message.
1060 // This will probable involve rewriting the way we handle
1061 // request/reply style messages.
1062 OFError error = (OFError) m;
1063 boolean shouldLogError = true;
1064 // TODO: should we check that firstRoleReplyReceived is false,
1065 // i.e., check only whether the first request fails?
1066 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1067 boolean isBadVendorError =
1068 (error.getErrorType() == OFError.OFErrorType.
1069 OFPET_BAD_REQUEST.getValue());
1070 // We expect to receive a bad vendor error when
1071 // we're connected to a switch that doesn't support
1072 // the Nicira vendor extensions (i.e. not OVS or
1073 // derived from OVS). By protocol, it should also be
1074 // BAD_VENDOR, but too many switch implementations
1075 // get it wrong and we can already check the xid()
1076 // so we can ignore the type with confidence that this
1077 // is not a spurious error
1078 shouldLogError = !isBadVendorError;
1079 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001080 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001081 if (state.firstRoleReplyReceived && (role != null)) {
1082 log.warn("Received ERROR from sw {} that "
1083 +"indicates roles are not supported "
1084 +"but we have received a valid "
1085 +"role reply earlier", sw);
1086 }
1087 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001088 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001089 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001090 synchronized(roleChanger) {
1091 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001092 //This will now never happen. The Controller's role
1093 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001094 // the switch doesn't understand role request
1095 // messages and the current controller role is
1096 // slave. We need to disconnect the switch.
1097 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001098 log.warn("Closing {} channel because controller's role " +
1099 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001100 sw.getChannel().close();
1101 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001102 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001103 log.debug("Adding switch {} because we got an error" +
1104 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001105 // Controller's role is master: add to
1106 // active
1107 // TODO: check if clearing flow table is
1108 // right choice here.
1109 // Need to clear FlowMods before we add the switch
1110 // and dispatch updates otherwise we have a race condition.
1111 // TODO: switch update is async. Won't we still have a potential
1112 // race condition?
1113 sw.clearAllFlowMods();
1114 addSwitch(sw);
1115 }
1116 }
1117 }
1118 else {
1119 // TODO: Is this the right thing to do if we receive
1120 // some other error besides a bad vendor error?
1121 // Presumably that means the switch did actually
1122 // understand the role request message, but there
1123 // was some other error from processing the message.
1124 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1125 // error code, but it doesn't look like the Nicira
1126 // role request has that. Should check OVS source
1127 // code to see if it's possible for any other errors
1128 // to be returned.
1129 // If we received an error the switch is not
1130 // in the correct role, so we need to disconnect it.
1131 // We could also resend the request but then we need to
1132 // check if there are other pending request in which
1133 // case we shouldn't resend. If we do resend we need
1134 // to make sure that the switch eventually accepts one
1135 // of our requests or disconnect the switch. This feels
1136 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001137 log.debug("Closing {} channel because we recieved an " +
1138 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001139 sw.getChannel().close();
1140 }
1141 }
1142 // Once we support OF 1.2, we'd add code to handle it here.
1143 //if (error.getXid() == state.ofRoleRequestXid) {
1144 //}
1145 if (shouldLogError)
1146 logError(sw, error);
1147 break;
1148 case STATS_REPLY:
1149 if (state.hsState.ordinal() <
1150 HandshakeState.FEATURES_REPLY.ordinal()) {
1151 String em = "Unexpected STATS_REPLY from " + sw;
1152 throw new SwitchStateException(em);
1153 }
1154 sw.deliverStatisticsReply(m);
1155 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1156 processSwitchDescReply();
1157 }
1158 break;
1159 case PORT_STATUS:
1160 // We want to update our port state info even if we're in
1161 // the slave role, but we only want to update storage if
1162 // we're the master (or equal).
1163 boolean updateStorage = state.hsState.
1164 equals(HandshakeState.READY) &&
1165 (sw.getRole() != Role.SLAVE);
1166 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1167 shouldHandleMessage = true;
1168 break;
1169
1170 default:
1171 shouldHandleMessage = true;
1172 break;
1173 }
1174
1175 if (shouldHandleMessage) {
1176 sw.getListenerReadLock().lock();
1177 try {
1178 if (sw.isConnected()) {
1179 if (!state.hsState.equals(HandshakeState.READY)) {
1180 log.debug("Ignoring message type {} received " +
1181 "from switch {} before switch is " +
1182 "fully configured.", m.getType(), sw);
1183 }
1184 // Check if the controller is in the slave role for the
1185 // switch. If it is, then don't dispatch the message to
1186 // the listeners.
1187 // TODO: Should we dispatch messages that we expect to
1188 // receive when we're in the slave role, e.g. port
1189 // status messages? Since we're "hiding" switches from
1190 // the listeners when we're in the slave role, then it
1191 // seems a little weird to dispatch port status messages
1192 // to them. On the other hand there might be special
1193 // modules that care about all of the connected switches
1194 // and would like to receive port status notifications.
1195 else if (sw.getRole() == Role.SLAVE) {
1196 // Don't log message if it's a port status message
1197 // since we expect to receive those from the switch
1198 // and don't want to emit spurious messages.
1199 if (m.getType() != OFType.PORT_STATUS) {
1200 log.debug("Ignoring message type {} received " +
1201 "from switch {} while in the slave role.",
1202 m.getType(), sw);
1203 }
1204 } else {
1205 handleMessage(sw, m, null);
1206 }
1207 }
1208 }
1209 finally {
1210 sw.getListenerReadLock().unlock();
1211 }
1212 }
1213 }
1214 }
1215
1216 // ****************
1217 // Message handlers
1218 // ****************
1219
1220 protected void handlePortStatusMessage(IOFSwitch sw,
1221 OFPortStatus m,
1222 boolean updateStorage) {
1223 short portNumber = m.getDesc().getPortNumber();
1224 OFPhysicalPort port = m.getDesc();
1225 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001226 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1227 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001228 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001229 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001230 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1231 try {
1232 this.updates.put(update);
1233 } catch (InterruptedException e) {
1234 log.error("Failure adding update to queue", e);
1235 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001236 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001237 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1238 try {
1239 this.updates.put(update);
1240 } catch (InterruptedException e) {
1241 log.error("Failure adding update to queue", e);
1242 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001243 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001244 //if (updateStorage)
1245 //updatePortInfo(sw, port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001246 log.debug("Port #{} modified for {}", portNumber, sw);
1247 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1248 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001249 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1250 try {
1251 this.updates.put(update);
1252 } catch (InterruptedException e) {
1253 log.error("Failure adding update to queue", e);
1254 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001255 //if (updateStorage)
1256 //updatePortInfo(sw, port);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001257 log.debug("Port #{} added for {}", portNumber, sw);
1258 } else if (m.getReason() ==
1259 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1260 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001261 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1262 try {
1263 this.updates.put(update);
1264 } catch (InterruptedException e) {
1265 log.error("Failure adding update to queue", e);
1266 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001267 //if (updateStorage)
1268 //removePortInfo(sw, portNumber);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001269 log.debug("Port #{} deleted for {}", portNumber, sw);
1270 }
1271 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1272 try {
1273 this.updates.put(update);
1274 } catch (InterruptedException e) {
1275 log.error("Failure adding update to queue", e);
1276 }
1277 }
1278
1279 /**
1280 * flcontext_cache - Keep a thread local stack of contexts
1281 */
1282 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1283 new ThreadLocal <Stack<FloodlightContext>> () {
1284 @Override
1285 protected Stack<FloodlightContext> initialValue() {
1286 return new Stack<FloodlightContext>();
1287 }
1288 };
1289
1290 /**
1291 * flcontext_alloc - pop a context off the stack, if required create a new one
1292 * @return FloodlightContext
1293 */
1294 protected static FloodlightContext flcontext_alloc() {
1295 FloodlightContext flcontext = null;
1296
1297 if (flcontext_cache.get().empty()) {
1298 flcontext = new FloodlightContext();
1299 }
1300 else {
1301 flcontext = flcontext_cache.get().pop();
1302 }
1303
1304 return flcontext;
1305 }
1306
1307 /**
1308 * flcontext_free - Free the context to the current thread
1309 * @param flcontext
1310 */
1311 protected void flcontext_free(FloodlightContext flcontext) {
1312 flcontext.getStorage().clear();
1313 flcontext_cache.get().push(flcontext);
1314 }
1315
1316 /**
1317 * Handle replies to certain OFMessages, and pass others off to listeners
1318 * @param sw The switch for the message
1319 * @param m The message
1320 * @param bContext The floodlight context. If null then floodlight context would
1321 * be allocated in this function
1322 * @throws IOException
1323 */
1324 @LogMessageDocs({
1325 @LogMessageDoc(level="ERROR",
1326 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1327 " field is empty.",
1328 explanation="The switch sent an improperly-formatted PacketIn" +
1329 " message",
1330 recommendation=LogMessageDoc.CHECK_SWITCH),
1331 @LogMessageDoc(level="WARN",
1332 message="Unhandled OF Message: {} from {}",
1333 explanation="The switch sent a message not handled by " +
1334 "the controller")
1335 })
1336 protected void handleMessage(IOFSwitch sw, OFMessage m,
1337 FloodlightContext bContext)
1338 throws IOException {
1339 Ethernet eth = null;
1340
1341 switch (m.getType()) {
1342 case PACKET_IN:
1343 OFPacketIn pi = (OFPacketIn)m;
1344
1345 if (pi.getPacketData().length <= 0) {
1346 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1347 ") because the data field is empty.");
1348 return;
1349 }
1350
1351 if (Controller.ALWAYS_DECODE_ETH) {
1352 eth = new Ethernet();
1353 eth.deserialize(pi.getPacketData(), 0,
1354 pi.getPacketData().length);
1355 counterStore.updatePacketInCounters(sw, m, eth);
1356 }
1357 // fall through to default case...
1358
1359 default:
1360
1361 List<IOFMessageListener> listeners = null;
1362 if (messageListeners.containsKey(m.getType())) {
1363 listeners = messageListeners.get(m.getType()).
1364 getOrderedListeners();
1365 }
1366
1367 FloodlightContext bc = null;
1368 if (listeners != null) {
1369 // Check if floodlight context is passed from the calling
1370 // function, if so use that floodlight context, otherwise
1371 // allocate one
1372 if (bContext == null) {
1373 bc = flcontext_alloc();
1374 } else {
1375 bc = bContext;
1376 }
1377 if (eth != null) {
1378 IFloodlightProviderService.bcStore.put(bc,
1379 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1380 eth);
1381 }
1382
1383 // Get the starting time (overall and per-component) of
1384 // the processing chain for this packet if performance
1385 // monitoring is turned on
1386 pktinProcTime.bootstrap(listeners);
1387 pktinProcTime.recordStartTimePktIn();
1388 Command cmd;
1389 for (IOFMessageListener listener : listeners) {
1390 if (listener instanceof IOFSwitchFilter) {
1391 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1392 continue;
1393 }
1394 }
1395
1396 pktinProcTime.recordStartTimeComp(listener);
1397 cmd = listener.receive(sw, m, bc);
1398 pktinProcTime.recordEndTimeComp(listener);
1399
1400 if (Command.STOP.equals(cmd)) {
1401 break;
1402 }
1403 }
1404 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1405 } else {
1406 log.warn("Unhandled OF Message: {} from {}", m, sw);
1407 }
1408
1409 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1410 }
1411 }
1412
1413 /**
1414 * Log an OpenFlow error message from a switch
1415 * @param sw The switch that sent the error
1416 * @param error The error message
1417 */
1418 @LogMessageDoc(level="ERROR",
1419 message="Error {error type} {error code} from {switch}",
1420 explanation="The switch responded with an unexpected error" +
1421 "to an OpenFlow message from the controller",
1422 recommendation="This could indicate improper network operation. " +
1423 "If the problem persists restarting the switch and " +
1424 "controller may help."
1425 )
1426 protected void logError(IOFSwitch sw, OFError error) {
1427 int etint = 0xffff & error.getErrorType();
1428 if (etint < 0 || etint >= OFErrorType.values().length) {
1429 log.error("Unknown error code {} from sw {}", etint, sw);
1430 }
1431 OFErrorType et = OFErrorType.values()[etint];
1432 switch (et) {
1433 case OFPET_HELLO_FAILED:
1434 OFHelloFailedCode hfc =
1435 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1436 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1437 break;
1438 case OFPET_BAD_REQUEST:
1439 OFBadRequestCode brc =
1440 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1441 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1442 break;
1443 case OFPET_BAD_ACTION:
1444 OFBadActionCode bac =
1445 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1446 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1447 break;
1448 case OFPET_FLOW_MOD_FAILED:
1449 OFFlowModFailedCode fmfc =
1450 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1451 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1452 break;
1453 case OFPET_PORT_MOD_FAILED:
1454 OFPortModFailedCode pmfc =
1455 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1456 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1457 break;
1458 case OFPET_QUEUE_OP_FAILED:
1459 OFQueueOpFailedCode qofc =
1460 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1461 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1462 break;
1463 default:
1464 break;
1465 }
1466 }
1467
1468 /**
1469 * Add a switch to the active switch list and call the switch listeners.
1470 * This happens either when a switch first connects (and the controller is
1471 * not in the slave role) or when the role of the controller changes from
1472 * slave to master.
1473 * @param sw the switch that has been added
1474 */
1475 // TODO: need to rethink locking and the synchronous switch update.
1476 // We can / should also handle duplicate DPIDs in connectedSwitches
1477 @LogMessageDoc(level="ERROR",
1478 message="New switch added {switch} for already-added switch {switch}",
1479 explanation="A switch with the same DPID as another switch " +
1480 "connected to the controller. This can be caused by " +
1481 "multiple switches configured with the same DPID, or " +
1482 "by a switch reconnected very quickly after " +
1483 "disconnecting.",
1484 recommendation="If this happens repeatedly, it is likely there " +
1485 "are switches with duplicate DPIDs on the network. " +
1486 "Reconfigure the appropriate switches. If it happens " +
1487 "very rarely, then it is likely this is a transient " +
1488 "network problem that can be ignored."
1489 )
1490 protected void addSwitch(IOFSwitch sw) {
1491 // TODO: is it safe to modify the HashMap without holding
1492 // the old switch's lock?
1493 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1494 if (sw == oldSw) {
1495 // Note == for object equality, not .equals for value
1496 log.info("New add switch for pre-existing switch {}", sw);
1497 return;
1498 }
1499
1500 if (oldSw != null) {
1501 oldSw.getListenerWriteLock().lock();
1502 try {
1503 log.error("New switch added {} for already-added switch {}",
1504 sw, oldSw);
1505 // Set the connected flag to false to suppress calling
1506 // the listeners for this switch in processOFMessage
1507 oldSw.setConnected(false);
1508
1509 oldSw.cancelAllStatisticsReplies();
1510
Jonathan Hart2fa28062013-11-25 20:16:28 -08001511 //updateInactiveSwitchInfo(oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001512
1513 // we need to clean out old switch state definitively
1514 // before adding the new switch
1515 // FIXME: It seems not completely kosher to call the
1516 // switch listeners here. I thought one of the points of
1517 // having the asynchronous switch update mechanism was so
1518 // the addedSwitch and removedSwitch were always called
1519 // from a single thread to simplify concurrency issues
1520 // for the listener.
1521 if (switchListeners != null) {
1522 for (IOFSwitchListener listener : switchListeners) {
1523 listener.removedSwitch(oldSw);
1524 }
1525 }
1526 // will eventually trigger a removeSwitch(), which will cause
1527 // a "Not removing Switch ... already removed debug message.
1528 // TODO: Figure out a way to handle this that avoids the
1529 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001530 log.debug("Closing {} because a new IOFSwitch got added " +
1531 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001532 oldSw.getChannel().close();
1533 }
1534 finally {
1535 oldSw.getListenerWriteLock().unlock();
1536 }
1537 }
1538
Jonathan Hart2fa28062013-11-25 20:16:28 -08001539 //updateActiveSwitchInfo(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001540 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1541 try {
1542 this.updates.put(update);
1543 } catch (InterruptedException e) {
1544 log.error("Failure adding update to queue", e);
1545 }
1546 }
1547
1548 /**
1549 * Remove a switch from the active switch list and call the switch listeners.
1550 * This happens either when the switch is disconnected or when the
1551 * controller's role for the switch changes from master to slave.
1552 * @param sw the switch that has been removed
1553 */
1554 protected void removeSwitch(IOFSwitch sw) {
1555 // No need to acquire the listener lock, since
1556 // this method is only called after netty has processed all
1557 // pending messages
1558 log.debug("removeSwitch: {}", sw);
1559 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1560 log.debug("Not removing switch {}; already removed", sw);
1561 return;
1562 }
1563 // We cancel all outstanding statistics replies if the switch transition
1564 // from active. In the future we might allow statistics requests
1565 // from slave controllers. Then we need to move this cancelation
1566 // to switch disconnect
1567 sw.cancelAllStatisticsReplies();
1568
1569 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1570 // here if role support is enabled. In that case if the switch is being
1571 // removed because we've been switched to being in the slave role, then I think
1572 // it's possible that the new master may have already been promoted to master
1573 // and written out the active switch state to storage. If we now execute
1574 // updateInactiveSwitchInfo we may wipe out all of the state that was
1575 // written out by the new master. Maybe need to revisit how we handle all
1576 // of the switch state that's written to storage.
1577
Jonathan Hart2fa28062013-11-25 20:16:28 -08001578 //updateInactiveSwitchInfo(sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001579 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1580 try {
1581 this.updates.put(update);
1582 } catch (InterruptedException e) {
1583 log.error("Failure adding update to queue", e);
1584 }
1585 }
1586
1587 // ***************
1588 // IFloodlightProvider
1589 // ***************
1590
1591 @Override
1592 public synchronized void addOFMessageListener(OFType type,
1593 IOFMessageListener listener) {
1594 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1595 messageListeners.get(type);
1596 if (ldd == null) {
1597 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1598 messageListeners.put(type, ldd);
1599 }
1600 ldd.addListener(type, listener);
1601 }
1602
1603 @Override
1604 public synchronized void removeOFMessageListener(OFType type,
1605 IOFMessageListener listener) {
1606 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1607 messageListeners.get(type);
1608 if (ldd != null) {
1609 ldd.removeListener(listener);
1610 }
1611 }
1612
1613 private void logListeners() {
1614 for (Map.Entry<OFType,
1615 ListenerDispatcher<OFType,
1616 IOFMessageListener>> entry
1617 : messageListeners.entrySet()) {
1618
1619 OFType type = entry.getKey();
1620 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1621 entry.getValue();
1622
1623 StringBuffer sb = new StringBuffer();
1624 sb.append("OFListeners for ");
1625 sb.append(type);
1626 sb.append(": ");
1627 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1628 sb.append(l.getName());
1629 sb.append(",");
1630 }
1631 log.debug(sb.toString());
1632 }
1633 }
1634
1635 public void removeOFMessageListeners(OFType type) {
1636 messageListeners.remove(type);
1637 }
1638
1639 @Override
1640 public Map<Long, IOFSwitch> getSwitches() {
1641 return Collections.unmodifiableMap(this.activeSwitches);
1642 }
1643
1644 @Override
1645 public void addOFSwitchListener(IOFSwitchListener listener) {
1646 this.switchListeners.add(listener);
1647 }
1648
1649 @Override
1650 public void removeOFSwitchListener(IOFSwitchListener listener) {
1651 this.switchListeners.remove(listener);
1652 }
1653
1654 @Override
1655 public Map<OFType, List<IOFMessageListener>> getListeners() {
1656 Map<OFType, List<IOFMessageListener>> lers =
1657 new HashMap<OFType, List<IOFMessageListener>>();
1658 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1659 messageListeners.entrySet()) {
1660 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1661 }
1662 return Collections.unmodifiableMap(lers);
1663 }
1664
1665 @Override
1666 @LogMessageDocs({
1667 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1668 "a null switch",
1669 explanation="Failed to process a message because the switch " +
1670 " is no longer connected."),
1671 @LogMessageDoc(level="ERROR",
1672 message="Error reinjecting OFMessage on switch {switch}",
1673 explanation="An I/O error occured while attempting to " +
1674 "process an OpenFlow message",
1675 recommendation=LogMessageDoc.CHECK_SWITCH)
1676 })
1677 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1678 FloodlightContext bc) {
1679 if (sw == null) {
1680 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1681 return false;
1682 }
1683
1684 // FIXME: Do we need to be able to inject messages to switches
1685 // where we're the slave controller (i.e. they're connected but
1686 // not active)?
1687 // FIXME: Don't we need synchronization logic here so we're holding
1688 // the listener read lock when we call handleMessage? After some
1689 // discussions it sounds like the right thing to do here would be to
1690 // inject the message as a netty upstream channel event so it goes
1691 // through the normal netty event processing, including being
1692 // handled
1693 if (!activeSwitches.containsKey(sw.getId())) return false;
1694
1695 try {
1696 // Pass Floodlight context to the handleMessages()
1697 handleMessage(sw, msg, bc);
1698 } catch (IOException e) {
1699 log.error("Error reinjecting OFMessage on switch {}",
1700 HexString.toHexString(sw.getId()));
1701 return false;
1702 }
1703 return true;
1704 }
1705
1706 @Override
1707 @LogMessageDoc(message="Calling System.exit",
1708 explanation="The controller is terminating")
1709 public synchronized void terminate() {
1710 log.info("Calling System.exit");
1711 System.exit(1);
1712 }
1713
1714 @Override
1715 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1716 // call the overloaded version with floodlight context set to null
1717 return injectOfMessage(sw, msg, null);
1718 }
1719
1720 @Override
1721 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1722 FloodlightContext bc) {
1723 if (log.isTraceEnabled()) {
1724 String str = OFMessage.getDataAsString(sw, m, bc);
1725 log.trace("{}", str);
1726 }
1727
1728 List<IOFMessageListener> listeners = null;
1729 if (messageListeners.containsKey(m.getType())) {
1730 listeners =
1731 messageListeners.get(m.getType()).getOrderedListeners();
1732 }
1733
1734 if (listeners != null) {
1735 for (IOFMessageListener listener : listeners) {
1736 if (listener instanceof IOFSwitchFilter) {
1737 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1738 continue;
1739 }
1740 }
1741 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1742 break;
1743 }
1744 }
1745 }
1746 }
1747
1748 @Override
1749 public BasicFactory getOFMessageFactory() {
1750 return factory;
1751 }
1752
1753 @Override
1754 public String getControllerId() {
1755 return controllerId;
1756 }
1757
1758 // **************
1759 // Initialization
1760 // **************
1761
Jonathan Hart2fa28062013-11-25 20:16:28 -08001762 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001763 protected void updateAllInactiveSwitchInfo() {
1764 if (role == Role.SLAVE) {
1765 return;
1766 }
1767 String controllerId = getControllerId();
1768 String[] switchColumns = { SWITCH_DATAPATH_ID,
1769 SWITCH_CONTROLLER_ID,
1770 SWITCH_ACTIVE };
1771 String[] portColumns = { PORT_ID, PORT_SWITCH };
1772 IResultSet switchResultSet = null;
1773 try {
1774 OperatorPredicate op =
1775 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1776 OperatorPredicate.Operator.EQ,
1777 controllerId);
1778 switchResultSet =
1779 storageSource.executeQuery(SWITCH_TABLE_NAME,
1780 switchColumns,
1781 op, null);
1782 while (switchResultSet.next()) {
1783 IResultSet portResultSet = null;
1784 try {
1785 String datapathId =
1786 switchResultSet.getString(SWITCH_DATAPATH_ID);
1787 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1788 op = new OperatorPredicate(PORT_SWITCH,
1789 OperatorPredicate.Operator.EQ,
1790 datapathId);
1791 portResultSet =
1792 storageSource.executeQuery(PORT_TABLE_NAME,
1793 portColumns,
1794 op, null);
1795 while (portResultSet.next()) {
1796 portResultSet.deleteRow();
1797 }
1798 portResultSet.save();
1799 }
1800 finally {
1801 if (portResultSet != null)
1802 portResultSet.close();
1803 }
1804 }
1805 switchResultSet.save();
1806 }
1807 finally {
1808 if (switchResultSet != null)
1809 switchResultSet.close();
1810 }
1811 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001812 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001813
Jonathan Hart2fa28062013-11-25 20:16:28 -08001814 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001815 protected void updateControllerInfo() {
1816 updateAllInactiveSwitchInfo();
1817
1818 // Write out the controller info to the storage source
1819 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1820 String id = getControllerId();
1821 controllerInfo.put(CONTROLLER_ID, id);
1822 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1823 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001824 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001825
Jonathan Hart2fa28062013-11-25 20:16:28 -08001826 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001827 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1828 if (role == Role.SLAVE) {
1829 return;
1830 }
1831 // Obtain the row info for the switch
1832 Map<String, Object> switchInfo = new HashMap<String, Object>();
1833 String datapathIdString = sw.getStringId();
1834 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1835 String controllerId = getControllerId();
1836 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1837 Date connectedSince = sw.getConnectedSince();
1838 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1839 Channel channel = sw.getChannel();
1840 SocketAddress socketAddress = channel.getRemoteAddress();
1841 if (socketAddress != null) {
1842 String socketAddressString = socketAddress.toString();
1843 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1844 if (socketAddress instanceof InetSocketAddress) {
1845 InetSocketAddress inetSocketAddress =
1846 (InetSocketAddress)socketAddress;
1847 InetAddress inetAddress = inetSocketAddress.getAddress();
1848 String ip = inetAddress.getHostAddress();
1849 switchInfo.put(SWITCH_IP, ip);
1850 }
1851 }
1852
1853 // Write out the switch features info
1854 long capabilities = U32.f(sw.getCapabilities());
1855 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1856 long buffers = U32.f(sw.getBuffers());
1857 switchInfo.put(SWITCH_BUFFERS, buffers);
1858 long tables = U32.f(sw.getTables());
1859 switchInfo.put(SWITCH_TABLES, tables);
1860 long actions = U32.f(sw.getActions());
1861 switchInfo.put(SWITCH_ACTIONS, actions);
1862 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1863
1864 // Update the switch
1865 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1866
1867 // Update the ports
1868 for (OFPhysicalPort port: sw.getPorts()) {
1869 updatePortInfo(sw, port);
1870 }
1871 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001872 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001873
Jonathan Hart2fa28062013-11-25 20:16:28 -08001874 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001875 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1876 if (role == Role.SLAVE) {
1877 return;
1878 }
1879 log.debug("Update DB with inactiveSW {}", sw);
1880 // Update the controller info in the storage source to be inactive
1881 Map<String, Object> switchInfo = new HashMap<String, Object>();
1882 String datapathIdString = sw.getStringId();
1883 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1884 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1885 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1886 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1887 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001888 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001889
Jonathan Hart2fa28062013-11-25 20:16:28 -08001890 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001891 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1892 if (role == Role.SLAVE) {
1893 return;
1894 }
1895 String datapathIdString = sw.getStringId();
1896 Map<String, Object> portInfo = new HashMap<String, Object>();
1897 int portNumber = U16.f(port.getPortNumber());
1898 String id = datapathIdString + "|" + portNumber;
1899 portInfo.put(PORT_ID, id);
1900 portInfo.put(PORT_SWITCH, datapathIdString);
1901 portInfo.put(PORT_NUMBER, portNumber);
1902 byte[] hardwareAddress = port.getHardwareAddress();
1903 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1904 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1905 String name = port.getName();
1906 portInfo.put(PORT_NAME, name);
1907 long config = U32.f(port.getConfig());
1908 portInfo.put(PORT_CONFIG, config);
1909 long state = U32.f(port.getState());
1910 portInfo.put(PORT_STATE, state);
1911 long currentFeatures = U32.f(port.getCurrentFeatures());
1912 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1913 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1914 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1915 long supportedFeatures = U32.f(port.getSupportedFeatures());
1916 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1917 long peerFeatures = U32.f(port.getPeerFeatures());
1918 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1919 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1920 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001921 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001922
1923 /**
1924 * Read switch port data from storage and write it into a switch object
1925 * @param sw the switch to update
1926 */
Jonathan Hart2fa28062013-11-25 20:16:28 -08001927 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001928 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1929 OperatorPredicate op =
1930 new OperatorPredicate(PORT_SWITCH,
1931 OperatorPredicate.Operator.EQ,
1932 sw.getStringId());
1933 IResultSet portResultSet =
1934 storageSource.executeQuery(PORT_TABLE_NAME,
1935 null, op, null);
1936 //Map<Short, OFPhysicalPort> oldports =
1937 // new HashMap<Short, OFPhysicalPort>();
1938 //oldports.putAll(sw.getPorts());
1939
1940 while (portResultSet.next()) {
1941 try {
1942 OFPhysicalPort p = new OFPhysicalPort();
1943 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1944 p.setName(portResultSet.getString(PORT_NAME));
1945 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
1946 p.setState((int)portResultSet.getLong(PORT_STATE));
1947 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
1948 p.setHardwareAddress(HexString.fromHexString(portMac));
1949 p.setCurrentFeatures((int)portResultSet.
1950 getLong(PORT_CURRENT_FEATURES));
1951 p.setAdvertisedFeatures((int)portResultSet.
1952 getLong(PORT_ADVERTISED_FEATURES));
1953 p.setSupportedFeatures((int)portResultSet.
1954 getLong(PORT_SUPPORTED_FEATURES));
1955 p.setPeerFeatures((int)portResultSet.
1956 getLong(PORT_PEER_FEATURES));
1957 //oldports.remove(Short.valueOf(p.getPortNumber()));
1958 sw.setPort(p);
1959 } catch (NullPointerException e) {
1960 // ignore
1961 }
1962 }
1963 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1964 try {
1965 this.updates.put(update);
1966 } catch (InterruptedException e) {
1967 log.error("Failure adding update to queue", e);
1968 }
1969 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001970 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001971
Jonathan Hart2fa28062013-11-25 20:16:28 -08001972 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001973 protected void removePortInfo(IOFSwitch sw, short portNumber) {
1974 if (role == Role.SLAVE) {
1975 return;
1976 }
1977 String datapathIdString = sw.getStringId();
1978 String id = datapathIdString + "|" + portNumber;
1979 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
1980 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08001981 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001982
1983 /**
1984 * Sets the initial role based on properties in the config params.
1985 * It looks for two different properties.
1986 * If the "role" property is specified then the value should be
1987 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
1988 * controller is set to the specified value. If the "role" property
1989 * is not specified then it looks next for the "role.path" property.
1990 * In this case the value should be the path to a property file in
1991 * the file system that contains a property called "floodlight.role"
1992 * which can be one of the values listed above for the "role" property.
1993 * The idea behind the "role.path" mechanism is that you have some
1994 * separate heartbeat and master controller election algorithm that
1995 * determines the role of the controller. When a role transition happens,
1996 * it updates the current role in the file specified by the "role.path"
1997 * file. Then if floodlight restarts for some reason it can get the
1998 * correct current role of the controller from the file.
1999 * @param configParams The config params for the FloodlightProvider service
2000 * @return A valid role if role information is specified in the
2001 * config params, otherwise null
2002 */
2003 @LogMessageDocs({
2004 @LogMessageDoc(message="Controller role set to {role}",
2005 explanation="Setting the initial HA role to "),
2006 @LogMessageDoc(level="ERROR",
2007 message="Invalid current role value: {role}",
2008 explanation="An invalid HA role value was read from the " +
2009 "properties file",
2010 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2011 })
2012 protected Role getInitialRole(Map<String, String> configParams) {
2013 Role role = null;
2014 String roleString = configParams.get("role");
2015 if (roleString == null) {
2016 String rolePath = configParams.get("rolepath");
2017 if (rolePath != null) {
2018 Properties properties = new Properties();
2019 try {
2020 properties.load(new FileInputStream(rolePath));
2021 roleString = properties.getProperty("floodlight.role");
2022 }
2023 catch (IOException exc) {
2024 // Don't treat it as an error if the file specified by the
2025 // rolepath property doesn't exist. This lets us enable the
2026 // HA mechanism by just creating/setting the floodlight.role
2027 // property in that file without having to modify the
2028 // floodlight properties.
2029 }
2030 }
2031 }
2032
2033 if (roleString != null) {
2034 // Canonicalize the string to the form used for the enum constants
2035 roleString = roleString.trim().toUpperCase();
2036 try {
2037 role = Role.valueOf(roleString);
2038 }
2039 catch (IllegalArgumentException exc) {
2040 log.error("Invalid current role value: {}", roleString);
2041 }
2042 }
2043
2044 log.info("Controller role set to {}", role);
2045
2046 return role;
2047 }
2048
2049 /**
2050 * Tell controller that we're ready to accept switches loop
2051 * @throws IOException
2052 */
2053 @LogMessageDocs({
2054 @LogMessageDoc(message="Listening for switch connections on {address}",
2055 explanation="The controller is ready and listening for new" +
2056 " switch connections"),
2057 @LogMessageDoc(message="Storage exception in controller " +
2058 "updates loop; terminating process",
2059 explanation=ERROR_DATABASE,
2060 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2061 @LogMessageDoc(level="ERROR",
2062 message="Exception in controller updates loop",
2063 explanation="Failed to dispatch controller event",
2064 recommendation=LogMessageDoc.GENERIC_ACTION)
2065 })
2066 public void run() {
2067 if (log.isDebugEnabled()) {
2068 logListeners();
2069 }
2070
2071 try {
2072 final ServerBootstrap bootstrap = createServerBootStrap();
2073
2074 bootstrap.setOption("reuseAddr", true);
2075 bootstrap.setOption("child.keepAlive", true);
2076 bootstrap.setOption("child.tcpNoDelay", true);
2077 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2078
2079 ChannelPipelineFactory pfact =
2080 new OpenflowPipelineFactory(this, null);
2081 bootstrap.setPipelineFactory(pfact);
2082 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2083 final ChannelGroup cg = new DefaultChannelGroup();
2084 cg.add(bootstrap.bind(sa));
2085
2086 log.info("Listening for switch connections on {}", sa);
2087 } catch (Exception e) {
2088 throw new RuntimeException(e);
2089 }
2090
2091 // main loop
2092 while (true) {
2093 try {
2094 IUpdate update = updates.take();
2095 update.dispatch();
2096 } catch (InterruptedException e) {
2097 return;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002098 } catch (Exception e) {
2099 log.error("Exception in controller updates loop", e);
2100 }
2101 }
2102 }
2103
2104 private ServerBootstrap createServerBootStrap() {
2105 if (workerThreads == 0) {
2106 return new ServerBootstrap(
2107 new NioServerSocketChannelFactory(
2108 Executors.newCachedThreadPool(),
2109 Executors.newCachedThreadPool()));
2110 } else {
2111 return new ServerBootstrap(
2112 new NioServerSocketChannelFactory(
2113 Executors.newCachedThreadPool(),
2114 Executors.newCachedThreadPool(), workerThreads));
2115 }
2116 }
2117
2118 public void setConfigParams(Map<String, String> configParams) {
2119 String ofPort = configParams.get("openflowport");
2120 if (ofPort != null) {
2121 this.openFlowPort = Integer.parseInt(ofPort);
2122 }
2123 log.debug("OpenFlow port set to {}", this.openFlowPort);
2124 String threads = configParams.get("workerthreads");
2125 if (threads != null) {
2126 this.workerThreads = Integer.parseInt(threads);
2127 }
2128 log.debug("Number of worker threads set to {}", this.workerThreads);
2129 String controllerId = configParams.get("controllerid");
2130 if (controllerId != null) {
2131 this.controllerId = controllerId;
2132 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002133 else {
2134 //Try to get the hostname of the machine and use that for controller ID
2135 try {
2136 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2137 this.controllerId = hostname;
2138 } catch (UnknownHostException e) {
2139 // Can't get hostname, we'll just use the default
2140 }
2141 }
2142
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002143 log.debug("ControllerId set to {}", this.controllerId);
2144 }
2145
2146 private void initVendorMessages() {
2147 // Configure openflowj to be able to parse the role request/reply
2148 // vendor messages.
2149 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2150 OFNiciraVendorData.NX_VENDOR_ID, 4);
2151 OFVendorId.registerVendorId(niciraVendorId);
2152 OFBasicVendorDataType roleRequestVendorData =
2153 new OFBasicVendorDataType(
2154 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2155 OFRoleRequestVendorData.getInstantiable());
2156 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2157 OFBasicVendorDataType roleReplyVendorData =
2158 new OFBasicVendorDataType(
2159 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2160 OFRoleReplyVendorData.getInstantiable());
2161 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2162 }
2163
2164 /**
2165 * Initialize internal data structures
2166 */
2167 public void init(Map<String, String> configParams) {
2168 // These data structures are initialized here because other
2169 // module's startUp() might be called before ours
2170 this.messageListeners =
2171 new ConcurrentHashMap<OFType,
2172 ListenerDispatcher<OFType,
2173 IOFMessageListener>>();
2174 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2175 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2176 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2177 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2178 this.controllerNodeIPsCache = new HashMap<String, String>();
2179 this.updates = new LinkedBlockingQueue<IUpdate>();
2180 this.factory = new BasicFactory();
2181 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2182 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002183 //this.role = getInitialRole(configParams);
2184 //Set the controller's role to MASTER so it always tries to do role requests.
2185 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002186 this.roleChanger = new RoleChanger();
2187 initVendorMessages();
2188 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002189 }
2190
2191 /**
2192 * Startup all of the controller's components
2193 */
2194 @LogMessageDoc(message="Waiting for storage source",
2195 explanation="The system database is not yet ready",
2196 recommendation="If this message persists, this indicates " +
2197 "that the system database has failed to start. " +
2198 LogMessageDoc.CHECK_CONTROLLER)
2199 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002200 try {
2201 registryService.registerController(controllerId);
2202 } catch (RegistryException e2) {
2203 log.warn("Registry service error: {}", e2.getMessage());
2204 }
2205
Jonathan Hart2fa28062013-11-25 20:16:28 -08002206 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002207 // Create the table names we use
2208 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2209 storageSource.createTable(SWITCH_TABLE_NAME, null);
2210 storageSource.createTable(PORT_TABLE_NAME, null);
2211 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2212 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2213 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2214 CONTROLLER_ID);
2215 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2216 SWITCH_DATAPATH_ID);
2217 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2218 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2219 CONTROLLER_INTERFACE_ID);
2220 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2221
2222 while (true) {
2223 try {
2224 updateControllerInfo();
2225 break;
2226 }
2227 catch (StorageException e) {
2228 log.info("Waiting for storage source");
2229 try {
2230 Thread.sleep(1000);
2231 } catch (InterruptedException e1) {
2232 }
2233 }
2234 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08002235 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002236
2237 // Add our REST API
2238 restApi.addRestletRoutable(new CoreWebRoutable());
2239 }
2240
2241 @Override
2242 public void addInfoProvider(String type, IInfoProvider provider) {
2243 if (!providerMap.containsKey(type)) {
2244 providerMap.put(type, new ArrayList<IInfoProvider>());
2245 }
2246 providerMap.get(type).add(provider);
2247 }
2248
2249 @Override
2250 public void removeInfoProvider(String type, IInfoProvider provider) {
2251 if (!providerMap.containsKey(type)) {
2252 log.debug("Provider type {} doesn't exist.", type);
2253 return;
2254 }
2255
2256 providerMap.get(type).remove(provider);
2257 }
2258
2259 public Map<String, Object> getControllerInfo(String type) {
2260 if (!providerMap.containsKey(type)) return null;
2261
2262 Map<String, Object> result = new LinkedHashMap<String, Object>();
2263 for (IInfoProvider provider : providerMap.get(type)) {
2264 result.putAll(provider.getInfo(type));
2265 }
2266
2267 return result;
2268 }
2269
2270 @Override
2271 public void addHAListener(IHAListener listener) {
2272 this.haListeners.add(listener);
2273 }
2274
2275 @Override
2276 public void removeHAListener(IHAListener listener) {
2277 this.haListeners.remove(listener);
2278 }
2279
2280
2281 /**
2282 * Handle changes to the controller nodes IPs and dispatch update.
2283 */
Jonathan Hart2fa28062013-11-25 20:16:28 -08002284 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002285 @SuppressWarnings("unchecked")
2286 protected void handleControllerNodeIPChanges() {
2287 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2288 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2289 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2290 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2291 CONTROLLER_INTERFACE_TYPE,
2292 CONTROLLER_INTERFACE_NUMBER,
2293 CONTROLLER_INTERFACE_DISCOVERED_IP };
2294 synchronized(controllerNodeIPsCache) {
2295 // We currently assume that interface Ethernet0 is the relevant
2296 // controller interface. Might change.
2297 // We could (should?) implement this using
2298 // predicates, but creating the individual and compound predicate
2299 // seems more overhead then just checking every row. Particularly,
2300 // since the number of rows is small and changes infrequent
2301 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2302 colNames,null, null);
2303 while (res.next()) {
2304 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2305 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2306 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2307 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2308 String curIP = controllerNodeIPsCache.get(controllerID);
2309
2310 curControllerNodeIPs.put(controllerID, discoveredIP);
2311 if (curIP == null) {
2312 // new controller node IP
2313 addedControllerNodeIPs.put(controllerID, discoveredIP);
2314 }
HIGUCHI Yuta63b30722013-10-18 18:33:46 -07002315 else if (!curIP.equals(discoveredIP)) {
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002316 // IP changed
2317 removedControllerNodeIPs.put(controllerID, curIP);
2318 addedControllerNodeIPs.put(controllerID, discoveredIP);
2319 }
2320 }
2321 }
2322 // Now figure out if rows have been deleted. We can't use the
2323 // rowKeys from rowsDeleted directly, since the tables primary
2324 // key is a compound that we can't disassemble
2325 Set<String> curEntries = curControllerNodeIPs.keySet();
2326 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2327 removedEntries.removeAll(curEntries);
2328 for (String removedControllerID : removedEntries)
2329 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2330 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2331 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2332 curControllerNodeIPs, addedControllerNodeIPs,
2333 removedControllerNodeIPs);
2334 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2335 try {
2336 this.updates.put(update);
2337 } catch (InterruptedException e) {
2338 log.error("Failure adding update to queue", e);
2339 }
2340 }
2341 }
2342 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08002343 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002344
2345 @Override
2346 public Map<String, String> getControllerNodeIPs() {
2347 // We return a copy of the mapping so we can guarantee that
2348 // the mapping return is the same as one that will be (or was)
2349 // dispatched to IHAListeners
2350 HashMap<String,String> retval = new HashMap<String,String>();
2351 synchronized(controllerNodeIPsCache) {
2352 retval.putAll(controllerNodeIPsCache);
2353 }
2354 return retval;
2355 }
2356
Jonathan Hart2fa28062013-11-25 20:16:28 -08002357 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002358 @Override
2359 public void rowsModified(String tableName, Set<Object> rowKeys) {
2360 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2361 handleControllerNodeIPChanges();
2362 }
2363
2364 }
2365
2366 @Override
2367 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2368 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2369 handleControllerNodeIPChanges();
2370 }
2371 }
Jonathan Hart2fa28062013-11-25 20:16:28 -08002372 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002373
2374 @Override
2375 public long getSystemStartTime() {
2376 return (this.systemStartTime);
2377 }
2378
2379 @Override
2380 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2381 this.alwaysClearFlowsOnSwAdd = value;
2382 }
2383
2384 public boolean getAlwaysClearFlowsOnSwAdd() {
2385 return this.alwaysClearFlowsOnSwAdd;
2386 }
2387}