blob: 5f6e8f2a24fd5c6c3e68bcd75483ae297df59a96 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.core.internal;
19
20import java.io.FileInputStream;
21import java.io.IOException;
22import java.net.InetAddress;
23import java.net.InetSocketAddress;
24import java.net.SocketAddress;
Jonathan Hartd10008d2013-02-23 17:04:08 -080025import java.net.UnknownHostException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080026import java.nio.channels.ClosedChannelException;
Jonathan Hartd10008d2013-02-23 17:04:08 -080027import java.util.ArrayList;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080028import java.util.Collection;
29import java.util.Collections;
30import java.util.Date;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Iterator;
34import java.util.LinkedHashMap;
35import java.util.List;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Properties;
39import java.util.Set;
40import java.util.Stack;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ConcurrentHashMap;
43import java.util.concurrent.ConcurrentMap;
44import java.util.concurrent.CopyOnWriteArraySet;
45import java.util.concurrent.Executors;
46import java.util.concurrent.Future;
47import java.util.concurrent.LinkedBlockingQueue;
48import java.util.concurrent.RejectedExecutionException;
49import java.util.concurrent.TimeUnit;
50import java.util.concurrent.TimeoutException;
51
52import net.floodlightcontroller.core.FloodlightContext;
53import net.floodlightcontroller.core.IFloodlightProviderService;
54import net.floodlightcontroller.core.IHAListener;
55import net.floodlightcontroller.core.IInfoProvider;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080056import net.floodlightcontroller.core.IListener.Command;
Jonathan Hartd10008d2013-02-23 17:04:08 -080057import net.floodlightcontroller.core.IOFMessageListener;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080058import net.floodlightcontroller.core.IOFSwitch;
59import net.floodlightcontroller.core.IOFSwitchFilter;
60import net.floodlightcontroller.core.IOFSwitchListener;
61import net.floodlightcontroller.core.annotations.LogMessageDoc;
62import net.floodlightcontroller.core.annotations.LogMessageDocs;
63import net.floodlightcontroller.core.internal.OFChannelState.HandshakeState;
64import net.floodlightcontroller.core.util.ListenerDispatcher;
65import net.floodlightcontroller.core.web.CoreWebRoutable;
66import net.floodlightcontroller.counter.ICounterStoreService;
67import net.floodlightcontroller.packet.Ethernet;
68import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
69import net.floodlightcontroller.restserver.IRestApiService;
70import net.floodlightcontroller.storage.IResultSet;
71import net.floodlightcontroller.storage.IStorageSourceListener;
72import net.floodlightcontroller.storage.IStorageSourceService;
73import net.floodlightcontroller.storage.OperatorPredicate;
74import net.floodlightcontroller.storage.StorageException;
75import net.floodlightcontroller.threadpool.IThreadPoolService;
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -070076import net.onrc.onos.ofcontroller.core.IOFSwitchPortListener;
HIGUCHI Yuta20514902013-06-12 11:24:16 -070077import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoRouteService;
HIGUCHI Yuta60a10142013-06-14 15:50:10 -070078import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Jonathan Hartd82f20d2013-02-21 18:04:24 -080079import net.onrc.onos.registry.controller.IControllerRegistryService;
Jonathan Hartcc957a02013-02-26 10:39:04 -080080import net.onrc.onos.registry.controller.IControllerRegistryService.ControlChangeCallback;
Jonathan Hartd10008d2013-02-23 17:04:08 -080081import net.onrc.onos.registry.controller.RegistryException;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080082
83import org.jboss.netty.bootstrap.ServerBootstrap;
84import org.jboss.netty.buffer.ChannelBuffer;
85import org.jboss.netty.buffer.ChannelBuffers;
86import org.jboss.netty.channel.Channel;
87import org.jboss.netty.channel.ChannelHandlerContext;
88import org.jboss.netty.channel.ChannelPipelineFactory;
89import org.jboss.netty.channel.ChannelStateEvent;
90import org.jboss.netty.channel.ChannelUpstreamHandler;
91import org.jboss.netty.channel.Channels;
92import org.jboss.netty.channel.ExceptionEvent;
93import org.jboss.netty.channel.MessageEvent;
94import org.jboss.netty.channel.group.ChannelGroup;
95import org.jboss.netty.channel.group.DefaultChannelGroup;
96import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
97import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
98import org.jboss.netty.handler.timeout.IdleStateEvent;
99import org.jboss.netty.handler.timeout.ReadTimeoutException;
100import org.openflow.protocol.OFEchoReply;
101import org.openflow.protocol.OFError;
102import org.openflow.protocol.OFError.OFBadActionCode;
103import org.openflow.protocol.OFError.OFBadRequestCode;
104import org.openflow.protocol.OFError.OFErrorType;
105import org.openflow.protocol.OFError.OFFlowModFailedCode;
106import org.openflow.protocol.OFError.OFHelloFailedCode;
107import org.openflow.protocol.OFError.OFPortModFailedCode;
108import org.openflow.protocol.OFError.OFQueueOpFailedCode;
109import org.openflow.protocol.OFFeaturesReply;
110import org.openflow.protocol.OFGetConfigReply;
111import org.openflow.protocol.OFMessage;
112import org.openflow.protocol.OFPacketIn;
113import org.openflow.protocol.OFPhysicalPort;
Pankaj Berde6a4075d2013-01-22 16:42:54 -0800114import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
Pankaj Berde6debb042013-01-16 18:04:32 -0800115import org.openflow.protocol.OFPhysicalPort.OFPortState;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800116import org.openflow.protocol.OFPortStatus;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800117import org.openflow.protocol.OFPortStatus.OFPortReason;
118import org.openflow.protocol.OFSetConfig;
119import org.openflow.protocol.OFStatisticsRequest;
120import org.openflow.protocol.OFSwitchConfig;
121import org.openflow.protocol.OFType;
122import org.openflow.protocol.OFVendor;
123import org.openflow.protocol.factory.BasicFactory;
124import org.openflow.protocol.factory.MessageParseException;
125import org.openflow.protocol.statistics.OFDescriptionStatistics;
126import org.openflow.protocol.statistics.OFStatistics;
127import org.openflow.protocol.statistics.OFStatisticsType;
128import org.openflow.protocol.vendor.OFBasicVendorDataType;
129import org.openflow.protocol.vendor.OFBasicVendorId;
130import org.openflow.protocol.vendor.OFVendorId;
131import org.openflow.util.HexString;
132import org.openflow.util.U16;
133import org.openflow.util.U32;
134import org.openflow.vendor.nicira.OFNiciraVendorData;
135import org.openflow.vendor.nicira.OFRoleReplyVendorData;
136import org.openflow.vendor.nicira.OFRoleRequestVendorData;
137import org.openflow.vendor.nicira.OFRoleVendorData;
138import org.slf4j.Logger;
139import org.slf4j.LoggerFactory;
140
141
142/**
143 * The main controller class. Handles all setup and network listeners
HIGUCHI Yuta11360702013-06-17 10:28:06 -0700144 *
145 * Extensions made by ONOS are:
146 * - Detailed Port event: PORTCHANGED -> {PORTCHANGED, PORTADDED, PORTREMOVED}
147 * Available as net.onrc.onos.ofcontroller.core.IOFSwitchPortListener
148 * - Distributed ownership control of switch through RegistryService(IControllerRegistryService)
149 * - Register ONOS services. (IFlowService, ITopoRouteService, IControllerRegistryService)
150 * - Additional DEBUG logs
151 * - Try using hostname as controller ID, when ID was not explicitly given.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800152 */
153public class Controller implements IFloodlightProviderService,
154 IStorageSourceListener {
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700155
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800156 protected static Logger log = LoggerFactory.getLogger(Controller.class);
157
158 private static final String ERROR_DATABASE =
159 "The controller could not communicate with the system database.";
160
161 protected BasicFactory factory;
162 protected ConcurrentMap<OFType,
163 ListenerDispatcher<OFType,IOFMessageListener>>
164 messageListeners;
165 // The activeSwitches map contains only those switches that are actively
166 // being controlled by us -- it doesn't contain switches that are
167 // in the slave role
168 protected ConcurrentHashMap<Long, IOFSwitch> activeSwitches;
169 // connectedSwitches contains all connected switches, including ones where
170 // we're a slave controller. We need to keep track of them so that we can
171 // send role request messages to switches when our role changes to master
172 // We add a switch to this set after it successfully completes the
173 // handshake. Access to this Set needs to be synchronized with roleChanger
174 protected HashSet<OFSwitchImpl> connectedSwitches;
175
176 // The controllerNodeIPsCache maps Controller IDs to their IP address.
177 // It's only used by handleControllerNodeIPsChanged
178 protected HashMap<String, String> controllerNodeIPsCache;
179
180 protected Set<IOFSwitchListener> switchListeners;
181 protected Set<IHAListener> haListeners;
182 protected Map<String, List<IInfoProvider>> providerMap;
183 protected BlockingQueue<IUpdate> updates;
184
185 // Module dependencies
186 protected IRestApiService restApi;
187 protected ICounterStoreService counterStore = null;
188 protected IStorageSourceService storageSource;
189 protected IPktInProcessingTimeService pktinProcTime;
190 protected IThreadPoolService threadPool;
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800191 protected IFlowService flowService;
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800192 protected ITopoRouteService topoRouteService;
Jonathan Hartd10008d2013-02-23 17:04:08 -0800193 protected IControllerRegistryService registryService;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800194
195 // Configuration options
196 protected int openFlowPort = 6633;
197 protected int workerThreads = 0;
198 // The id for this controller node. Should be unique for each controller
199 // node in a controller cluster.
200 protected String controllerId = "localhost";
201
202 // The current role of the controller.
203 // If the controller isn't configured to support roles, then this is null.
204 protected Role role;
205 // A helper that handles sending and timeout handling for role requests
206 protected RoleChanger roleChanger;
207
208 // Start time of the controller
209 protected long systemStartTime;
210
211 // Flag to always flush flow table on switch reconnect (HA or otherwise)
212 protected boolean alwaysClearFlowsOnSwAdd = false;
213
214 // Storage table names
215 protected static final String CONTROLLER_TABLE_NAME = "controller_controller";
216 protected static final String CONTROLLER_ID = "id";
217
218 protected static final String SWITCH_TABLE_NAME = "controller_switch";
219 protected static final String SWITCH_DATAPATH_ID = "dpid";
220 protected static final String SWITCH_SOCKET_ADDRESS = "socket_address";
221 protected static final String SWITCH_IP = "ip";
222 protected static final String SWITCH_CONTROLLER_ID = "controller_id";
223 protected static final String SWITCH_ACTIVE = "active";
224 protected static final String SWITCH_CONNECTED_SINCE = "connected_since";
225 protected static final String SWITCH_CAPABILITIES = "capabilities";
226 protected static final String SWITCH_BUFFERS = "buffers";
227 protected static final String SWITCH_TABLES = "tables";
228 protected static final String SWITCH_ACTIONS = "actions";
229
230 protected static final String SWITCH_CONFIG_TABLE_NAME = "controller_switchconfig";
231 protected static final String SWITCH_CONFIG_CORE_SWITCH = "core_switch";
232
233 protected static final String PORT_TABLE_NAME = "controller_port";
234 protected static final String PORT_ID = "id";
235 protected static final String PORT_SWITCH = "switch_id";
236 protected static final String PORT_NUMBER = "number";
237 protected static final String PORT_HARDWARE_ADDRESS = "hardware_address";
238 protected static final String PORT_NAME = "name";
239 protected static final String PORT_CONFIG = "config";
240 protected static final String PORT_STATE = "state";
241 protected static final String PORT_CURRENT_FEATURES = "current_features";
242 protected static final String PORT_ADVERTISED_FEATURES = "advertised_features";
243 protected static final String PORT_SUPPORTED_FEATURES = "supported_features";
244 protected static final String PORT_PEER_FEATURES = "peer_features";
245
246 protected static final String CONTROLLER_INTERFACE_TABLE_NAME = "controller_controllerinterface";
247 protected static final String CONTROLLER_INTERFACE_ID = "id";
248 protected static final String CONTROLLER_INTERFACE_CONTROLLER_ID = "controller_id";
249 protected static final String CONTROLLER_INTERFACE_TYPE = "type";
250 protected static final String CONTROLLER_INTERFACE_NUMBER = "number";
251 protected static final String CONTROLLER_INTERFACE_DISCOVERED_IP = "discovered_ip";
252
253
254
255 // Perf. related configuration
256 protected static final int SEND_BUFFER_SIZE = 4 * 1024 * 1024;
257 protected static final int BATCH_MAX_SIZE = 100;
258 protected static final boolean ALWAYS_DECODE_ETH = true;
259
260 /**
261 * Updates handled by the main loop
262 */
263 protected interface IUpdate {
264 /**
265 * Calls the appropriate listeners
266 */
267 public void dispatch();
268 }
269 public enum SwitchUpdateType {
270 ADDED,
271 REMOVED,
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700272 PORTCHANGED,
273 PORTADDED,
274 PORTREMOVED
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800275 }
276 /**
277 * Update message indicating a switch was added or removed
278 */
279 protected class SwitchUpdate implements IUpdate {
280 public IOFSwitch sw;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700281 public OFPhysicalPort port;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800282 public SwitchUpdateType switchUpdateType;
283 public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) {
284 this.sw = sw;
285 this.switchUpdateType = switchUpdateType;
286 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700287 public SwitchUpdate(IOFSwitch sw, OFPhysicalPort port, SwitchUpdateType switchUpdateType) {
288 this.sw = sw;
289 this.port = port;
290 this.switchUpdateType = switchUpdateType;
291 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800292 public void dispatch() {
293 if (log.isTraceEnabled()) {
294 log.trace("Dispatching switch update {} {}",
295 sw, switchUpdateType);
296 }
297 if (switchListeners != null) {
298 for (IOFSwitchListener listener : switchListeners) {
299 switch(switchUpdateType) {
300 case ADDED:
301 listener.addedSwitch(sw);
302 break;
303 case REMOVED:
304 listener.removedSwitch(sw);
305 break;
306 case PORTCHANGED:
307 listener.switchPortChanged(sw.getId());
308 break;
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700309 case PORTADDED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700310 if (listener instanceof IOFSwitchPortListener) {
311 ((IOFSwitchPortListener) listener).switchPortAdded(sw.getId(), port);
312 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700313 break;
314 case PORTREMOVED:
HIGUCHI Yuta36cf0762013-06-14 14:25:38 -0700315 if (listener instanceof IOFSwitchPortListener) {
316 ((IOFSwitchPortListener) listener).switchPortRemoved(sw.getId(), port);
317 }
Pankaj Berde465ac7c2013-05-23 13:47:49 -0700318 break;
319 default:
320 break;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800321 }
322 }
323 }
324 }
325 }
326
327 /**
328 * Update message indicating controller's role has changed
329 */
330 protected class HARoleUpdate implements IUpdate {
331 public Role oldRole;
332 public Role newRole;
333 public HARoleUpdate(Role newRole, Role oldRole) {
334 this.oldRole = oldRole;
335 this.newRole = newRole;
336 }
337 public void dispatch() {
338 // Make sure that old and new roles are different.
339 if (oldRole == newRole) {
340 if (log.isTraceEnabled()) {
341 log.trace("HA role update ignored as the old and " +
342 "new roles are the same. newRole = {}" +
343 "oldRole = {}", newRole, oldRole);
344 }
345 return;
346 }
347 if (log.isTraceEnabled()) {
348 log.trace("Dispatching HA Role update newRole = {}, oldRole = {}",
349 newRole, oldRole);
350 }
351 if (haListeners != null) {
352 for (IHAListener listener : haListeners) {
353 listener.roleChanged(oldRole, newRole);
354 }
355 }
356 }
357 }
358
359 /**
360 * Update message indicating
361 * IPs of controllers in controller cluster have changed.
362 */
363 protected class HAControllerNodeIPUpdate implements IUpdate {
364 public Map<String,String> curControllerNodeIPs;
365 public Map<String,String> addedControllerNodeIPs;
366 public Map<String,String> removedControllerNodeIPs;
367 public HAControllerNodeIPUpdate(
368 HashMap<String,String> curControllerNodeIPs,
369 HashMap<String,String> addedControllerNodeIPs,
370 HashMap<String,String> removedControllerNodeIPs) {
371 this.curControllerNodeIPs = curControllerNodeIPs;
372 this.addedControllerNodeIPs = addedControllerNodeIPs;
373 this.removedControllerNodeIPs = removedControllerNodeIPs;
374 }
375 public void dispatch() {
376 if (log.isTraceEnabled()) {
377 log.trace("Dispatching HA Controller Node IP update "
378 + "curIPs = {}, addedIPs = {}, removedIPs = {}",
379 new Object[] { curControllerNodeIPs, addedControllerNodeIPs,
380 removedControllerNodeIPs }
381 );
382 }
383 if (haListeners != null) {
384 for (IHAListener listener: haListeners) {
385 listener.controllerNodeIPsChanged(curControllerNodeIPs,
386 addedControllerNodeIPs, removedControllerNodeIPs);
387 }
388 }
389 }
390 }
391
392 // ***************
393 // Getters/Setters
394 // ***************
395
396 public void setStorageSourceService(IStorageSourceService storageSource) {
397 this.storageSource = storageSource;
398 }
399
400 public void setCounterStore(ICounterStoreService counterStore) {
401 this.counterStore = counterStore;
402 }
403
404 public void setPktInProcessingService(IPktInProcessingTimeService pits) {
405 this.pktinProcTime = pits;
406 }
407
408 public void setRestApiService(IRestApiService restApi) {
409 this.restApi = restApi;
410 }
411
412 public void setThreadPoolService(IThreadPoolService tp) {
413 this.threadPool = tp;
414 }
415
Pavlin Radoslavov19b0e122013-02-21 18:47:38 -0800416 public void setFlowService(IFlowService serviceImpl) {
417 this.flowService = serviceImpl;
418 }
Pavlin Radoslavovd7d8b792013-02-22 10:24:38 -0800419
420 public void setTopoRouteService(ITopoRouteService serviceImpl) {
421 this.topoRouteService = serviceImpl;
422 }
Jonathan Hartc2e95ee2013-02-22 15:25:11 -0800423
Jonathan Hartd82f20d2013-02-21 18:04:24 -0800424 public void setMastershipService(IControllerRegistryService serviceImpl) {
Jonathan Hartd10008d2013-02-23 17:04:08 -0800425 this.registryService = serviceImpl;
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800426 }
427
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800428 @Override
429 public Role getRole() {
430 synchronized(roleChanger) {
431 return role;
432 }
433 }
434
435 @Override
436 public void setRole(Role role) {
437 if (role == null) throw new NullPointerException("Role can not be null.");
438 if (role == Role.MASTER && this.role == Role.SLAVE) {
439 // Reset db state to Inactive for all switches.
440 updateAllInactiveSwitchInfo();
441 }
442
443 // Need to synchronize to ensure a reliable ordering on role request
444 // messages send and to ensure the list of connected switches is stable
445 // RoleChanger will handle the actual sending of the message and
446 // timeout handling
447 // @see RoleChanger
448 synchronized(roleChanger) {
449 if (role.equals(this.role)) {
450 log.debug("Ignoring role change: role is already {}", role);
451 return;
452 }
453
454 Role oldRole = this.role;
455 this.role = role;
456
457 log.debug("Submitting role change request to role {}", role);
458 roleChanger.submitRequest(connectedSwitches, role);
459
460 // Enqueue an update for our listeners.
461 try {
462 this.updates.put(new HARoleUpdate(role, oldRole));
463 } catch (InterruptedException e) {
464 log.error("Failure adding update to queue", e);
465 }
466 }
467 }
468
469
470
471 // **********************
472 // ChannelUpstreamHandler
473 // **********************
474
475 /**
476 * Return a new channel handler for processing a switch connections
477 * @param state The channel state object for the connection
478 * @return the new channel handler
479 */
480 protected ChannelUpstreamHandler getChannelHandler(OFChannelState state) {
481 return new OFChannelHandler(state);
482 }
483
Jonathan Hartcc957a02013-02-26 10:39:04 -0800484 protected class RoleChangeCallback implements ControlChangeCallback {
485 @Override
486 public void controlChanged(long dpid, boolean hasControl) {
487 log.info("Role change callback for switch {}, hasControl {}",
488 HexString.toHexString(dpid), hasControl);
489
490 synchronized(roleChanger){
491 OFSwitchImpl sw = null;
492 for (OFSwitchImpl connectedSw : connectedSwitches){
493 if (connectedSw.getId() == dpid){
494 sw = connectedSw;
495 break;
496 }
497 }
498 if (sw == null){
499 log.warn("Switch {} not found in connected switches",
500 HexString.toHexString(dpid));
501 return;
502 }
503
504 Role role = null;
505
Pankaj Berde01939e92013-03-08 14:38:27 -0800506 /*
507 * issue #229
508 * Cannot rely on sw.getRole() as it can be behind due to pending
509 * role changes in the queue. Just submit it and late the RoleChanger
510 * handle duplicates.
511 */
512
513 if (hasControl){
Jonathan Hartcc957a02013-02-26 10:39:04 -0800514 role = Role.MASTER;
515 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800516 else {
Jonathan Hartcc957a02013-02-26 10:39:04 -0800517 role = Role.SLAVE;
518 }
Pankaj Berde01939e92013-03-08 14:38:27 -0800519
520 log.debug("Sending role request {} msg to {}", role, sw);
521 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
522 swList.add(sw);
523 roleChanger.submitRequest(swList, role);
524
Jonathan Hartcc957a02013-02-26 10:39:04 -0800525 }
526
527 }
528 }
529
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800530 /**
531 * Channel handler deals with the switch connection and dispatches
532 * switch messages to the appropriate locations.
533 * @author readams
534 */
535 protected class OFChannelHandler
536 extends IdleStateAwareChannelUpstreamHandler {
537 protected OFSwitchImpl sw;
538 protected OFChannelState state;
539
540 public OFChannelHandler(OFChannelState state) {
541 this.state = state;
542 }
543
544 @Override
545 @LogMessageDoc(message="New switch connection from {ip address}",
546 explanation="A new switch has connected from the " +
547 "specified IP address")
548 public void channelConnected(ChannelHandlerContext ctx,
549 ChannelStateEvent e) throws Exception {
550 log.info("New switch connection from {}",
551 e.getChannel().getRemoteAddress());
552
553 sw = new OFSwitchImpl();
554 sw.setChannel(e.getChannel());
555 sw.setFloodlightProvider(Controller.this);
556 sw.setThreadPoolService(threadPool);
557
558 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
559 msglist.add(factory.getMessage(OFType.HELLO));
560 e.getChannel().write(msglist);
561
562 }
563
564 @Override
565 @LogMessageDoc(message="Disconnected switch {switch information}",
566 explanation="The specified switch has disconnected.")
567 public void channelDisconnected(ChannelHandlerContext ctx,
568 ChannelStateEvent e) throws Exception {
569 if (sw != null && state.hsState == HandshakeState.READY) {
570 if (activeSwitches.containsKey(sw.getId())) {
571 // It's safe to call removeSwitch even though the map might
572 // not contain this particular switch but another with the
573 // same DPID
574 removeSwitch(sw);
575 }
576 synchronized(roleChanger) {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700577 if (controlRequested) {
578 registryService.releaseControl(sw.getId());
579 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800580 connectedSwitches.remove(sw);
581 }
582 sw.setConnected(false);
583 }
584 log.info("Disconnected switch {}", sw);
585 }
586
587 @Override
588 @LogMessageDocs({
589 @LogMessageDoc(level="ERROR",
590 message="Disconnecting switch {switch} due to read timeout",
591 explanation="The connected switch has failed to send any " +
592 "messages or respond to echo requests",
593 recommendation=LogMessageDoc.CHECK_SWITCH),
594 @LogMessageDoc(level="ERROR",
595 message="Disconnecting switch {switch}: failed to " +
596 "complete handshake",
597 explanation="The switch did not respond correctly " +
598 "to handshake messages",
599 recommendation=LogMessageDoc.CHECK_SWITCH),
600 @LogMessageDoc(level="ERROR",
601 message="Disconnecting switch {switch} due to IO Error: {}",
602 explanation="There was an error communicating with the switch",
603 recommendation=LogMessageDoc.CHECK_SWITCH),
604 @LogMessageDoc(level="ERROR",
605 message="Disconnecting switch {switch} due to switch " +
606 "state error: {error}",
607 explanation="The switch sent an unexpected message",
608 recommendation=LogMessageDoc.CHECK_SWITCH),
609 @LogMessageDoc(level="ERROR",
610 message="Disconnecting switch {switch} due to " +
611 "message parse failure",
612 explanation="Could not parse a message from the switch",
613 recommendation=LogMessageDoc.CHECK_SWITCH),
614 @LogMessageDoc(level="ERROR",
615 message="Terminating controller due to storage exception",
616 explanation=ERROR_DATABASE,
617 recommendation=LogMessageDoc.CHECK_CONTROLLER),
618 @LogMessageDoc(level="ERROR",
619 message="Could not process message: queue full",
620 explanation="OpenFlow messages are arriving faster than " +
621 " the controller can process them.",
622 recommendation=LogMessageDoc.CHECK_CONTROLLER),
623 @LogMessageDoc(level="ERROR",
624 message="Error while processing message " +
625 "from switch {switch} {cause}",
626 explanation="An error occurred processing the switch message",
627 recommendation=LogMessageDoc.GENERIC_ACTION)
628 })
629 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
630 throws Exception {
631 if (e.getCause() instanceof ReadTimeoutException) {
632 // switch timeout
633 log.error("Disconnecting switch {} due to read timeout", sw);
634 ctx.getChannel().close();
635 } else if (e.getCause() instanceof HandshakeTimeoutException) {
636 log.error("Disconnecting switch {}: failed to complete handshake",
637 sw);
638 ctx.getChannel().close();
639 } else if (e.getCause() instanceof ClosedChannelException) {
640 //log.warn("Channel for sw {} already closed", sw);
641 } else if (e.getCause() instanceof IOException) {
642 log.error("Disconnecting switch {} due to IO Error: {}",
643 sw, e.getCause().getMessage());
644 ctx.getChannel().close();
645 } else if (e.getCause() instanceof SwitchStateException) {
646 log.error("Disconnecting switch {} due to switch state error: {}",
647 sw, e.getCause().getMessage());
648 ctx.getChannel().close();
649 } else if (e.getCause() instanceof MessageParseException) {
650 log.error("Disconnecting switch " + sw +
651 " due to message parse failure",
652 e.getCause());
653 ctx.getChannel().close();
654 } else if (e.getCause() instanceof StorageException) {
655 log.error("Terminating controller due to storage exception",
656 e.getCause());
657 terminate();
658 } else if (e.getCause() instanceof RejectedExecutionException) {
659 log.warn("Could not process message: queue full");
660 } else {
661 log.error("Error while processing message from switch " + sw,
662 e.getCause());
663 ctx.getChannel().close();
664 }
665 }
666
667 @Override
668 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
669 throws Exception {
670 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
671 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
672 e.getChannel().write(msglist);
673 }
674
675 @Override
676 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
677 throws Exception {
678 if (e.getMessage() instanceof List) {
679 @SuppressWarnings("unchecked")
680 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
681
682 for (OFMessage ofm : msglist) {
683 try {
684 processOFMessage(ofm);
685 }
686 catch (Exception ex) {
687 // We are the last handler in the stream, so run the
688 // exception through the channel again by passing in
689 // ctx.getChannel().
690 Channels.fireExceptionCaught(ctx.getChannel(), ex);
691 }
692 }
693
694 // Flush all flow-mods/packet-out generated from this "train"
695 OFSwitchImpl.flush_all();
696 }
697 }
698
699 /**
700 * Process the request for the switch description
701 */
702 @LogMessageDoc(level="ERROR",
703 message="Exception in reading description " +
704 " during handshake {exception}",
705 explanation="Could not process the switch description string",
706 recommendation=LogMessageDoc.CHECK_SWITCH)
707 void processSwitchDescReply() {
708 try {
709 // Read description, if it has been updated
710 @SuppressWarnings("unchecked")
711 Future<List<OFStatistics>> desc_future =
712 (Future<List<OFStatistics>>)sw.
713 getAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
714 List<OFStatistics> values =
715 desc_future.get(0, TimeUnit.MILLISECONDS);
716 if (values != null) {
717 OFDescriptionStatistics description =
718 new OFDescriptionStatistics();
719 ChannelBuffer data =
720 ChannelBuffers.buffer(description.getLength());
721 for (OFStatistics f : values) {
722 f.writeTo(data);
723 description.readFrom(data);
724 break; // SHOULD be a list of length 1
725 }
726 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_DATA,
727 description);
728 sw.setSwitchProperties(description);
729 data = null;
730
731 // At this time, also set other switch properties from storage
732 boolean is_core_switch = false;
733 IResultSet resultSet = null;
734 try {
735 String swid = sw.getStringId();
736 resultSet =
737 storageSource.getRow(SWITCH_CONFIG_TABLE_NAME, swid);
738 for (Iterator<IResultSet> it =
739 resultSet.iterator(); it.hasNext();) {
740 // In case of multiple rows, use the status
741 // in last row?
742 Map<String, Object> row = it.next().getRow();
743 if (row.containsKey(SWITCH_CONFIG_CORE_SWITCH)) {
744 if (log.isDebugEnabled()) {
745 log.debug("Reading SWITCH_IS_CORE_SWITCH " +
746 "config for switch={}, is-core={}",
747 sw, row.get(SWITCH_CONFIG_CORE_SWITCH));
748 }
749 String ics =
750 (String)row.get(SWITCH_CONFIG_CORE_SWITCH);
751 is_core_switch = ics.equals("true");
752 }
753 }
754 }
755 finally {
756 if (resultSet != null)
757 resultSet.close();
758 }
759 if (is_core_switch) {
760 sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH,
761 new Boolean(true));
762 }
763 }
764 sw.removeAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE);
765 state.hasDescription = true;
766 checkSwitchReady();
767 }
768 catch (InterruptedException ex) {
769 // Ignore
770 }
771 catch (TimeoutException ex) {
772 // Ignore
773 } catch (Exception ex) {
774 log.error("Exception in reading description " +
775 " during handshake", ex);
776 }
777 }
778
779 /**
780 * Send initial switch setup information that we need before adding
781 * the switch
782 * @throws IOException
783 */
784 void sendHelloConfiguration() throws IOException {
785 // Send initial Features Request
Jonathan Hart9e92c512013-03-20 16:24:44 -0700786 log.debug("Sending FEATURES_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800787 sw.write(factory.getMessage(OFType.FEATURES_REQUEST), null);
788 }
789
790 /**
791 * Send the configuration requests we can only do after we have
792 * the features reply
793 * @throws IOException
794 */
795 void sendFeatureReplyConfiguration() throws IOException {
Jonathan Hart9e92c512013-03-20 16:24:44 -0700796 log.debug("Sending CONFIG_REQUEST to {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800797 // Ensure we receive the full packet via PacketIn
798 OFSetConfig config = (OFSetConfig) factory
799 .getMessage(OFType.SET_CONFIG);
800 config.setMissSendLength((short) 0xffff)
801 .setLengthU(OFSwitchConfig.MINIMUM_LENGTH);
802 sw.write(config, null);
803 sw.write(factory.getMessage(OFType.GET_CONFIG_REQUEST),
804 null);
805
806 // Get Description to set switch-specific flags
807 OFStatisticsRequest req = new OFStatisticsRequest();
808 req.setStatisticType(OFStatisticsType.DESC);
809 req.setLengthU(req.getLengthU());
810 Future<List<OFStatistics>> dfuture =
811 sw.getStatistics(req);
812 sw.setAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE,
813 dfuture);
814
815 }
HIGUCHI Yuta0ba6fd02013-06-14 12:46:56 -0700816
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700817 volatile Boolean controlRequested = Boolean.FALSE;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800818 protected void checkSwitchReady() {
819 if (state.hsState == HandshakeState.FEATURES_REPLY &&
820 state.hasDescription && state.hasGetConfigReply) {
821
822 state.hsState = HandshakeState.READY;
Jonathan Hart9e92c512013-03-20 16:24:44 -0700823 log.debug("Handshake with {} complete", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800824
825 synchronized(roleChanger) {
826 // We need to keep track of all of the switches that are connected
827 // to the controller, in any role, so that we can later send the
828 // role request messages when the controller role changes.
829 // We need to be synchronized while doing this: we must not
830 // send a another role request to the connectedSwitches until
831 // we were able to add this new switch to connectedSwitches
832 // *and* send the current role to the new switch.
833 connectedSwitches.add(sw);
834
835 if (role != null) {
Jonathan Hart97801ac2013-02-26 14:29:16 -0800836 //Put the switch in SLAVE mode until we know we have control
837 log.debug("Setting new switch {} to SLAVE", sw.getStringId());
838 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
839 swList.add(sw);
840 roleChanger.submitRequest(swList, Role.SLAVE);
841
Jonathan Hartcc957a02013-02-26 10:39:04 -0800842 //Request control of the switch from the global registry
843 try {
Pankaj Berdeda7187b2013-03-18 15:24:59 -0700844 controlRequested = Boolean.TRUE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800845 registryService.requestControl(sw.getId(),
846 new RoleChangeCallback());
847 } catch (RegistryException e) {
848 log.debug("Registry error: {}", e.getMessage());
Pankaj Berde99fcee12013-03-18 09:41:53 -0700849 controlRequested = Boolean.FALSE;
Jonathan Hartcc957a02013-02-26 10:39:04 -0800850 }
851
Jonathan Hart97801ac2013-02-26 14:29:16 -0800852
Jonathan Hartcc957a02013-02-26 10:39:04 -0800853
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800854 // Send a role request if role support is enabled for the controller
855 // This is a probe that we'll use to determine if the switch
856 // actually supports the role request message. If it does we'll
857 // get back a role reply message. If it doesn't we'll get back an
858 // OFError message.
859 // If role is MASTER we will promote switch to active
860 // list when we receive the switch's role reply messages
Jonathan Hartcc957a02013-02-26 10:39:04 -0800861 /*
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800862 log.debug("This controller's role is {}, " +
863 "sending initial role request msg to {}",
864 role, sw);
865 Collection<OFSwitchImpl> swList = new ArrayList<OFSwitchImpl>(1);
866 swList.add(sw);
867 roleChanger.submitRequest(swList, role);
Jonathan Hartcc957a02013-02-26 10:39:04 -0800868 */
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800869 }
870 else {
871 // Role supported not enabled on controller (for now)
872 // automatically promote switch to active state.
Umesh Krishnaswamyb56bb292013-02-12 20:28:27 -0800873 log.debug("This controller's role is {}, " +
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800874 "not sending role request msg to {}",
875 role, sw);
876 // Need to clear FlowMods before we add the switch
877 // and dispatch updates otherwise we have a race condition.
878 sw.clearAllFlowMods();
879 addSwitch(sw);
880 state.firstRoleReplyReceived = true;
881 }
882 }
Pankaj Berde99fcee12013-03-18 09:41:53 -0700883 if (!controlRequested) {
884 // yield to allow other thread(s) to release control
885 try {
886 Thread.sleep(10);
887 } catch (InterruptedException e) {
888 // Ignore interruptions
889 }
890 // safer to bounce the switch to reconnect here than proceeding further
Jonathan Hart9e92c512013-03-20 16:24:44 -0700891 log.debug("Closing {} because we weren't able to request control " +
892 "successfully" + sw);
Pankaj Berde99fcee12013-03-18 09:41:53 -0700893 sw.channel.close();
894 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -0800895 }
896 }
897
898 /* Handle a role reply message we received from the switch. Since
899 * netty serializes message dispatch we don't need to synchronize
900 * against other receive operations from the same switch, so no need
901 * to synchronize addSwitch(), removeSwitch() operations from the same
902 * connection.
903 * FIXME: However, when a switch with the same DPID connects we do
904 * need some synchronization. However, handling switches with same
905 * DPID needs to be revisited anyways (get rid of r/w-lock and synchronous
906 * removedSwitch notification):1
907 *
908 */
909 @LogMessageDoc(level="ERROR",
910 message="Invalid role value in role reply message",
911 explanation="Was unable to set the HA role (master or slave) " +
912 "for the controller.",
913 recommendation=LogMessageDoc.CHECK_CONTROLLER)
914 protected void handleRoleReplyMessage(OFVendor vendorMessage,
915 OFRoleReplyVendorData roleReplyVendorData) {
916 // Map from the role code in the message to our role enum
917 int nxRole = roleReplyVendorData.getRole();
918 Role role = null;
919 switch (nxRole) {
920 case OFRoleVendorData.NX_ROLE_OTHER:
921 role = Role.EQUAL;
922 break;
923 case OFRoleVendorData.NX_ROLE_MASTER:
924 role = Role.MASTER;
925 break;
926 case OFRoleVendorData.NX_ROLE_SLAVE:
927 role = Role.SLAVE;
928 break;
929 default:
930 log.error("Invalid role value in role reply message");
931 sw.getChannel().close();
932 return;
933 }
934
935 log.debug("Handling role reply for role {} from {}. " +
936 "Controller's role is {} ",
937 new Object[] { role, sw, Controller.this.role}
938 );
939
940 sw.deliverRoleReply(vendorMessage.getXid(), role);
941
942 boolean isActive = activeSwitches.containsKey(sw.getId());
943 if (!isActive && sw.isActive()) {
944 // Transition from SLAVE to MASTER.
945
946 if (!state.firstRoleReplyReceived ||
947 getAlwaysClearFlowsOnSwAdd()) {
948 // This is the first role-reply message we receive from
949 // this switch or roles were disabled when the switch
950 // connected:
951 // Delete all pre-existing flows for new connections to
952 // the master
953 //
954 // FIXME: Need to think more about what the test should
955 // be for when we flush the flow-table? For example,
956 // if all the controllers are temporarily in the backup
957 // role (e.g. right after a failure of the master
958 // controller) at the point the switch connects, then
959 // all of the controllers will initially connect as
960 // backup controllers and not flush the flow-table.
961 // Then when one of them is promoted to master following
962 // the master controller election the flow-table
963 // will still not be flushed because that's treated as
964 // a failover event where we don't want to flush the
965 // flow-table. The end result would be that the flow
966 // table for a newly connected switch is never
967 // flushed. Not sure how to handle that case though...
968 sw.clearAllFlowMods();
969 log.debug("First role reply from master switch {}, " +
970 "clear FlowTable to active switch list",
971 HexString.toHexString(sw.getId()));
972 }
973
974 // Some switches don't seem to update us with port
975 // status messages while in slave role.
976 readSwitchPortStateFromStorage(sw);
977
978 // Only add the switch to the active switch list if
979 // we're not in the slave role. Note that if the role
980 // attribute is null, then that means that the switch
981 // doesn't support the role request messages, so in that
982 // case we're effectively in the EQUAL role and the
983 // switch should be included in the active switch list.
984 addSwitch(sw);
985 log.debug("Added master switch {} to active switch list",
986 HexString.toHexString(sw.getId()));
987
988 }
989 else if (isActive && !sw.isActive()) {
990 // Transition from MASTER to SLAVE: remove switch
991 // from active switch list.
992 log.debug("Removed slave switch {} from active switch" +
993 " list", HexString.toHexString(sw.getId()));
994 removeSwitch(sw);
995 }
996
997 // Indicate that we have received a role reply message.
998 state.firstRoleReplyReceived = true;
999 }
1000
1001 protected boolean handleVendorMessage(OFVendor vendorMessage) {
1002 boolean shouldHandleMessage = false;
1003 int vendor = vendorMessage.getVendor();
1004 switch (vendor) {
1005 case OFNiciraVendorData.NX_VENDOR_ID:
1006 OFNiciraVendorData niciraVendorData =
1007 (OFNiciraVendorData)vendorMessage.getVendorData();
1008 int dataType = niciraVendorData.getDataType();
1009 switch (dataType) {
1010 case OFRoleReplyVendorData.NXT_ROLE_REPLY:
1011 OFRoleReplyVendorData roleReplyVendorData =
1012 (OFRoleReplyVendorData) niciraVendorData;
1013 handleRoleReplyMessage(vendorMessage,
1014 roleReplyVendorData);
1015 break;
1016 default:
1017 log.warn("Unhandled Nicira VENDOR message; " +
1018 "data type = {}", dataType);
1019 break;
1020 }
1021 break;
1022 default:
1023 log.warn("Unhandled VENDOR message; vendor id = {}", vendor);
1024 break;
1025 }
1026
1027 return shouldHandleMessage;
1028 }
1029
1030 /**
1031 * Dispatch an Openflow message from a switch to the appropriate
1032 * handler.
1033 * @param m The message to process
1034 * @throws IOException
1035 * @throws SwitchStateException
1036 */
1037 @LogMessageDocs({
1038 @LogMessageDoc(level="WARN",
1039 message="Config Reply from {switch} has " +
1040 "miss length set to {length}",
1041 explanation="The controller requires that the switch " +
1042 "use a miss length of 0xffff for correct " +
1043 "function",
1044 recommendation="Use a different switch to ensure " +
1045 "correct function"),
1046 @LogMessageDoc(level="WARN",
1047 message="Received ERROR from sw {switch} that "
1048 +"indicates roles are not supported "
1049 +"but we have received a valid "
1050 +"role reply earlier",
1051 explanation="The switch sent a confusing message to the" +
1052 "controller")
1053 })
1054 protected void processOFMessage(OFMessage m)
1055 throws IOException, SwitchStateException {
1056 boolean shouldHandleMessage = false;
1057
1058 switch (m.getType()) {
1059 case HELLO:
1060 if (log.isTraceEnabled())
1061 log.trace("HELLO from {}", sw);
1062
1063 if (state.hsState.equals(HandshakeState.START)) {
1064 state.hsState = HandshakeState.HELLO;
1065 sendHelloConfiguration();
1066 } else {
1067 throw new SwitchStateException("Unexpected HELLO from "
1068 + sw);
1069 }
1070 break;
1071 case ECHO_REQUEST:
1072 OFEchoReply reply =
1073 (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
1074 reply.setXid(m.getXid());
1075 sw.write(reply, null);
1076 break;
1077 case ECHO_REPLY:
1078 break;
1079 case FEATURES_REPLY:
1080 if (log.isTraceEnabled())
1081 log.trace("Features Reply from {}", sw);
1082
1083 sw.setFeaturesReply((OFFeaturesReply) m);
1084 if (state.hsState.equals(HandshakeState.HELLO)) {
1085 sendFeatureReplyConfiguration();
1086 state.hsState = HandshakeState.FEATURES_REPLY;
1087 // uncomment to enable "dumb" switches like cbench
1088 // state.hsState = HandshakeState.READY;
1089 // addSwitch(sw);
1090 } else {
1091 // return results to rest api caller
1092 sw.deliverOFFeaturesReply(m);
1093 // update database */
1094 updateActiveSwitchInfo(sw);
1095 }
1096 break;
1097 case GET_CONFIG_REPLY:
1098 if (log.isTraceEnabled())
1099 log.trace("Get config reply from {}", sw);
1100
1101 if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
1102 String em = "Unexpected GET_CONFIG_REPLY from " + sw;
1103 throw new SwitchStateException(em);
1104 }
1105 OFGetConfigReply cr = (OFGetConfigReply) m;
1106 if (cr.getMissSendLength() == (short)0xffff) {
1107 log.trace("Config Reply from {} confirms " +
1108 "miss length set to 0xffff", sw);
1109 } else {
1110 log.warn("Config Reply from {} has " +
1111 "miss length set to {}",
1112 sw, cr.getMissSendLength() & 0xffff);
1113 }
1114 state.hasGetConfigReply = true;
1115 checkSwitchReady();
1116 break;
1117 case VENDOR:
1118 shouldHandleMessage = handleVendorMessage((OFVendor)m);
1119 break;
1120 case ERROR:
Jonathan Hart3525df92013-03-19 14:09:13 -07001121 log.debug("Recieved ERROR message from switch {}: {}", sw, m);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001122 // TODO: we need better error handling. Especially for
1123 // request/reply style message (stats, roles) we should have
1124 // a unified way to lookup the xid in the error message.
1125 // This will probable involve rewriting the way we handle
1126 // request/reply style messages.
1127 OFError error = (OFError) m;
1128 boolean shouldLogError = true;
1129 // TODO: should we check that firstRoleReplyReceived is false,
1130 // i.e., check only whether the first request fails?
1131 if (sw.checkFirstPendingRoleRequestXid(error.getXid())) {
1132 boolean isBadVendorError =
1133 (error.getErrorType() == OFError.OFErrorType.
1134 OFPET_BAD_REQUEST.getValue());
1135 // We expect to receive a bad vendor error when
1136 // we're connected to a switch that doesn't support
1137 // the Nicira vendor extensions (i.e. not OVS or
1138 // derived from OVS). By protocol, it should also be
1139 // BAD_VENDOR, but too many switch implementations
1140 // get it wrong and we can already check the xid()
1141 // so we can ignore the type with confidence that this
1142 // is not a spurious error
1143 shouldLogError = !isBadVendorError;
1144 if (isBadVendorError) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001145 log.debug("Handling bad vendor error for {}", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001146 if (state.firstRoleReplyReceived && (role != null)) {
1147 log.warn("Received ERROR from sw {} that "
1148 +"indicates roles are not supported "
1149 +"but we have received a valid "
1150 +"role reply earlier", sw);
1151 }
1152 state.firstRoleReplyReceived = true;
Jonathan Harta95c6d92013-03-18 16:12:27 -07001153 Role requestedRole =
HIGUCHI Yutaeae374d2013-06-17 10:39:42 -07001154 sw.deliverRoleRequestNotSupportedEx(error.getXid());
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001155 synchronized(roleChanger) {
1156 if (sw.role == null && Controller.this.role==Role.SLAVE) {
Jonathan Harta95c6d92013-03-18 16:12:27 -07001157 //This will now never happen. The Controller's role
1158 //is now never SLAVE, always MASTER.
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001159 // the switch doesn't understand role request
1160 // messages and the current controller role is
1161 // slave. We need to disconnect the switch.
1162 // @see RoleChanger for rationale
Jonathan Hart9e92c512013-03-20 16:24:44 -07001163 log.warn("Closing {} channel because controller's role " +
1164 "is SLAVE", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001165 sw.getChannel().close();
1166 }
Jonathan Harta95c6d92013-03-18 16:12:27 -07001167 else if (sw.role == null && requestedRole == Role.MASTER) {
Jonathan Hart3525df92013-03-19 14:09:13 -07001168 log.debug("Adding switch {} because we got an error" +
1169 " returned from a MASTER role request", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001170 // Controller's role is master: add to
1171 // active
1172 // TODO: check if clearing flow table is
1173 // right choice here.
1174 // Need to clear FlowMods before we add the switch
1175 // and dispatch updates otherwise we have a race condition.
1176 // TODO: switch update is async. Won't we still have a potential
1177 // race condition?
1178 sw.clearAllFlowMods();
1179 addSwitch(sw);
1180 }
1181 }
1182 }
1183 else {
1184 // TODO: Is this the right thing to do if we receive
1185 // some other error besides a bad vendor error?
1186 // Presumably that means the switch did actually
1187 // understand the role request message, but there
1188 // was some other error from processing the message.
1189 // OF 1.2 specifies a OFPET_ROLE_REQUEST_FAILED
1190 // error code, but it doesn't look like the Nicira
1191 // role request has that. Should check OVS source
1192 // code to see if it's possible for any other errors
1193 // to be returned.
1194 // If we received an error the switch is not
1195 // in the correct role, so we need to disconnect it.
1196 // We could also resend the request but then we need to
1197 // check if there are other pending request in which
1198 // case we shouldn't resend. If we do resend we need
1199 // to make sure that the switch eventually accepts one
1200 // of our requests or disconnect the switch. This feels
1201 // cumbersome.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001202 log.debug("Closing {} channel because we recieved an " +
1203 "error other than BAD_VENDOR", sw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001204 sw.getChannel().close();
1205 }
1206 }
1207 // Once we support OF 1.2, we'd add code to handle it here.
1208 //if (error.getXid() == state.ofRoleRequestXid) {
1209 //}
1210 if (shouldLogError)
1211 logError(sw, error);
1212 break;
1213 case STATS_REPLY:
1214 if (state.hsState.ordinal() <
1215 HandshakeState.FEATURES_REPLY.ordinal()) {
1216 String em = "Unexpected STATS_REPLY from " + sw;
1217 throw new SwitchStateException(em);
1218 }
1219 sw.deliverStatisticsReply(m);
1220 if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
1221 processSwitchDescReply();
1222 }
1223 break;
1224 case PORT_STATUS:
1225 // We want to update our port state info even if we're in
1226 // the slave role, but we only want to update storage if
1227 // we're the master (or equal).
1228 boolean updateStorage = state.hsState.
1229 equals(HandshakeState.READY) &&
1230 (sw.getRole() != Role.SLAVE);
1231 handlePortStatusMessage(sw, (OFPortStatus)m, updateStorage);
1232 shouldHandleMessage = true;
1233 break;
1234
1235 default:
1236 shouldHandleMessage = true;
1237 break;
1238 }
1239
1240 if (shouldHandleMessage) {
1241 sw.getListenerReadLock().lock();
1242 try {
1243 if (sw.isConnected()) {
1244 if (!state.hsState.equals(HandshakeState.READY)) {
1245 log.debug("Ignoring message type {} received " +
1246 "from switch {} before switch is " +
1247 "fully configured.", m.getType(), sw);
1248 }
1249 // Check if the controller is in the slave role for the
1250 // switch. If it is, then don't dispatch the message to
1251 // the listeners.
1252 // TODO: Should we dispatch messages that we expect to
1253 // receive when we're in the slave role, e.g. port
1254 // status messages? Since we're "hiding" switches from
1255 // the listeners when we're in the slave role, then it
1256 // seems a little weird to dispatch port status messages
1257 // to them. On the other hand there might be special
1258 // modules that care about all of the connected switches
1259 // and would like to receive port status notifications.
1260 else if (sw.getRole() == Role.SLAVE) {
1261 // Don't log message if it's a port status message
1262 // since we expect to receive those from the switch
1263 // and don't want to emit spurious messages.
1264 if (m.getType() != OFType.PORT_STATUS) {
1265 log.debug("Ignoring message type {} received " +
1266 "from switch {} while in the slave role.",
1267 m.getType(), sw);
1268 }
1269 } else {
1270 handleMessage(sw, m, null);
1271 }
1272 }
1273 }
1274 finally {
1275 sw.getListenerReadLock().unlock();
1276 }
1277 }
1278 }
1279 }
1280
1281 // ****************
1282 // Message handlers
1283 // ****************
1284
1285 protected void handlePortStatusMessage(IOFSwitch sw,
1286 OFPortStatus m,
1287 boolean updateStorage) {
1288 short portNumber = m.getDesc().getPortNumber();
1289 OFPhysicalPort port = m.getDesc();
1290 if (m.getReason() == (byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001291 boolean portDown = ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0) ||
1292 ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001293 sw.setPort(port);
Pankaj Berde6a4075d2013-01-22 16:42:54 -08001294 if (!portDown) {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001295 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1296 try {
1297 this.updates.put(update);
1298 } catch (InterruptedException e) {
1299 log.error("Failure adding update to queue", e);
1300 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001301 } else {
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001302 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1303 try {
1304 this.updates.put(update);
1305 } catch (InterruptedException e) {
1306 log.error("Failure adding update to queue", e);
1307 }
Pankaj Berde6debb042013-01-16 18:04:32 -08001308 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001309 if (updateStorage)
1310 updatePortInfo(sw, port);
1311 log.debug("Port #{} modified for {}", portNumber, sw);
1312 } else if (m.getReason() == (byte)OFPortReason.OFPPR_ADD.ordinal()) {
1313 sw.setPort(port);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001314 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTADDED);
1315 try {
1316 this.updates.put(update);
1317 } catch (InterruptedException e) {
1318 log.error("Failure adding update to queue", e);
1319 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001320 if (updateStorage)
1321 updatePortInfo(sw, port);
1322 log.debug("Port #{} added for {}", portNumber, sw);
1323 } else if (m.getReason() ==
1324 (byte)OFPortReason.OFPPR_DELETE.ordinal()) {
1325 sw.deletePort(portNumber);
Pankaj Berde465ac7c2013-05-23 13:47:49 -07001326 SwitchUpdate update = new SwitchUpdate(sw, port, SwitchUpdateType.PORTREMOVED);
1327 try {
1328 this.updates.put(update);
1329 } catch (InterruptedException e) {
1330 log.error("Failure adding update to queue", e);
1331 }
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001332 if (updateStorage)
1333 removePortInfo(sw, portNumber);
1334 log.debug("Port #{} deleted for {}", portNumber, sw);
1335 }
1336 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
1337 try {
1338 this.updates.put(update);
1339 } catch (InterruptedException e) {
1340 log.error("Failure adding update to queue", e);
1341 }
1342 }
1343
1344 /**
1345 * flcontext_cache - Keep a thread local stack of contexts
1346 */
1347 protected static final ThreadLocal<Stack<FloodlightContext>> flcontext_cache =
1348 new ThreadLocal <Stack<FloodlightContext>> () {
1349 @Override
1350 protected Stack<FloodlightContext> initialValue() {
1351 return new Stack<FloodlightContext>();
1352 }
1353 };
1354
1355 /**
1356 * flcontext_alloc - pop a context off the stack, if required create a new one
1357 * @return FloodlightContext
1358 */
1359 protected static FloodlightContext flcontext_alloc() {
1360 FloodlightContext flcontext = null;
1361
1362 if (flcontext_cache.get().empty()) {
1363 flcontext = new FloodlightContext();
1364 }
1365 else {
1366 flcontext = flcontext_cache.get().pop();
1367 }
1368
1369 return flcontext;
1370 }
1371
1372 /**
1373 * flcontext_free - Free the context to the current thread
1374 * @param flcontext
1375 */
1376 protected void flcontext_free(FloodlightContext flcontext) {
1377 flcontext.getStorage().clear();
1378 flcontext_cache.get().push(flcontext);
1379 }
1380
1381 /**
1382 * Handle replies to certain OFMessages, and pass others off to listeners
1383 * @param sw The switch for the message
1384 * @param m The message
1385 * @param bContext The floodlight context. If null then floodlight context would
1386 * be allocated in this function
1387 * @throws IOException
1388 */
1389 @LogMessageDocs({
1390 @LogMessageDoc(level="ERROR",
1391 message="Ignoring PacketIn (Xid = {xid}) because the data" +
1392 " field is empty.",
1393 explanation="The switch sent an improperly-formatted PacketIn" +
1394 " message",
1395 recommendation=LogMessageDoc.CHECK_SWITCH),
1396 @LogMessageDoc(level="WARN",
1397 message="Unhandled OF Message: {} from {}",
1398 explanation="The switch sent a message not handled by " +
1399 "the controller")
1400 })
1401 protected void handleMessage(IOFSwitch sw, OFMessage m,
1402 FloodlightContext bContext)
1403 throws IOException {
1404 Ethernet eth = null;
1405
1406 switch (m.getType()) {
1407 case PACKET_IN:
1408 OFPacketIn pi = (OFPacketIn)m;
1409
1410 if (pi.getPacketData().length <= 0) {
1411 log.error("Ignoring PacketIn (Xid = " + pi.getXid() +
1412 ") because the data field is empty.");
1413 return;
1414 }
1415
1416 if (Controller.ALWAYS_DECODE_ETH) {
1417 eth = new Ethernet();
1418 eth.deserialize(pi.getPacketData(), 0,
1419 pi.getPacketData().length);
1420 counterStore.updatePacketInCounters(sw, m, eth);
1421 }
1422 // fall through to default case...
1423
1424 default:
1425
1426 List<IOFMessageListener> listeners = null;
1427 if (messageListeners.containsKey(m.getType())) {
1428 listeners = messageListeners.get(m.getType()).
1429 getOrderedListeners();
1430 }
1431
1432 FloodlightContext bc = null;
1433 if (listeners != null) {
1434 // Check if floodlight context is passed from the calling
1435 // function, if so use that floodlight context, otherwise
1436 // allocate one
1437 if (bContext == null) {
1438 bc = flcontext_alloc();
1439 } else {
1440 bc = bContext;
1441 }
1442 if (eth != null) {
1443 IFloodlightProviderService.bcStore.put(bc,
1444 IFloodlightProviderService.CONTEXT_PI_PAYLOAD,
1445 eth);
1446 }
1447
1448 // Get the starting time (overall and per-component) of
1449 // the processing chain for this packet if performance
1450 // monitoring is turned on
1451 pktinProcTime.bootstrap(listeners);
1452 pktinProcTime.recordStartTimePktIn();
1453 Command cmd;
1454 for (IOFMessageListener listener : listeners) {
1455 if (listener instanceof IOFSwitchFilter) {
1456 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1457 continue;
1458 }
1459 }
1460
1461 pktinProcTime.recordStartTimeComp(listener);
1462 cmd = listener.receive(sw, m, bc);
1463 pktinProcTime.recordEndTimeComp(listener);
1464
1465 if (Command.STOP.equals(cmd)) {
1466 break;
1467 }
1468 }
1469 pktinProcTime.recordEndTimePktIn(sw, m, bc);
1470 } else {
1471 log.warn("Unhandled OF Message: {} from {}", m, sw);
1472 }
1473
1474 if ((bContext == null) && (bc != null)) flcontext_free(bc);
1475 }
1476 }
1477
1478 /**
1479 * Log an OpenFlow error message from a switch
1480 * @param sw The switch that sent the error
1481 * @param error The error message
1482 */
1483 @LogMessageDoc(level="ERROR",
1484 message="Error {error type} {error code} from {switch}",
1485 explanation="The switch responded with an unexpected error" +
1486 "to an OpenFlow message from the controller",
1487 recommendation="This could indicate improper network operation. " +
1488 "If the problem persists restarting the switch and " +
1489 "controller may help."
1490 )
1491 protected void logError(IOFSwitch sw, OFError error) {
1492 int etint = 0xffff & error.getErrorType();
1493 if (etint < 0 || etint >= OFErrorType.values().length) {
1494 log.error("Unknown error code {} from sw {}", etint, sw);
1495 }
1496 OFErrorType et = OFErrorType.values()[etint];
1497 switch (et) {
1498 case OFPET_HELLO_FAILED:
1499 OFHelloFailedCode hfc =
1500 OFHelloFailedCode.values()[0xffff & error.getErrorCode()];
1501 log.error("Error {} {} from {}", new Object[] {et, hfc, sw});
1502 break;
1503 case OFPET_BAD_REQUEST:
1504 OFBadRequestCode brc =
1505 OFBadRequestCode.values()[0xffff & error.getErrorCode()];
1506 log.error("Error {} {} from {}", new Object[] {et, brc, sw});
1507 break;
1508 case OFPET_BAD_ACTION:
1509 OFBadActionCode bac =
1510 OFBadActionCode.values()[0xffff & error.getErrorCode()];
1511 log.error("Error {} {} from {}", new Object[] {et, bac, sw});
1512 break;
1513 case OFPET_FLOW_MOD_FAILED:
1514 OFFlowModFailedCode fmfc =
1515 OFFlowModFailedCode.values()[0xffff & error.getErrorCode()];
1516 log.error("Error {} {} from {}", new Object[] {et, fmfc, sw});
1517 break;
1518 case OFPET_PORT_MOD_FAILED:
1519 OFPortModFailedCode pmfc =
1520 OFPortModFailedCode.values()[0xffff & error.getErrorCode()];
1521 log.error("Error {} {} from {}", new Object[] {et, pmfc, sw});
1522 break;
1523 case OFPET_QUEUE_OP_FAILED:
1524 OFQueueOpFailedCode qofc =
1525 OFQueueOpFailedCode.values()[0xffff & error.getErrorCode()];
1526 log.error("Error {} {} from {}", new Object[] {et, qofc, sw});
1527 break;
1528 default:
1529 break;
1530 }
1531 }
1532
1533 /**
1534 * Add a switch to the active switch list and call the switch listeners.
1535 * This happens either when a switch first connects (and the controller is
1536 * not in the slave role) or when the role of the controller changes from
1537 * slave to master.
1538 * @param sw the switch that has been added
1539 */
1540 // TODO: need to rethink locking and the synchronous switch update.
1541 // We can / should also handle duplicate DPIDs in connectedSwitches
1542 @LogMessageDoc(level="ERROR",
1543 message="New switch added {switch} for already-added switch {switch}",
1544 explanation="A switch with the same DPID as another switch " +
1545 "connected to the controller. This can be caused by " +
1546 "multiple switches configured with the same DPID, or " +
1547 "by a switch reconnected very quickly after " +
1548 "disconnecting.",
1549 recommendation="If this happens repeatedly, it is likely there " +
1550 "are switches with duplicate DPIDs on the network. " +
1551 "Reconfigure the appropriate switches. If it happens " +
1552 "very rarely, then it is likely this is a transient " +
1553 "network problem that can be ignored."
1554 )
1555 protected void addSwitch(IOFSwitch sw) {
1556 // TODO: is it safe to modify the HashMap without holding
1557 // the old switch's lock?
1558 OFSwitchImpl oldSw = (OFSwitchImpl) this.activeSwitches.put(sw.getId(), sw);
1559 if (sw == oldSw) {
1560 // Note == for object equality, not .equals for value
1561 log.info("New add switch for pre-existing switch {}", sw);
1562 return;
1563 }
1564
1565 if (oldSw != null) {
1566 oldSw.getListenerWriteLock().lock();
1567 try {
1568 log.error("New switch added {} for already-added switch {}",
1569 sw, oldSw);
1570 // Set the connected flag to false to suppress calling
1571 // the listeners for this switch in processOFMessage
1572 oldSw.setConnected(false);
1573
1574 oldSw.cancelAllStatisticsReplies();
1575
1576 updateInactiveSwitchInfo(oldSw);
1577
1578 // we need to clean out old switch state definitively
1579 // before adding the new switch
1580 // FIXME: It seems not completely kosher to call the
1581 // switch listeners here. I thought one of the points of
1582 // having the asynchronous switch update mechanism was so
1583 // the addedSwitch and removedSwitch were always called
1584 // from a single thread to simplify concurrency issues
1585 // for the listener.
1586 if (switchListeners != null) {
1587 for (IOFSwitchListener listener : switchListeners) {
1588 listener.removedSwitch(oldSw);
1589 }
1590 }
1591 // will eventually trigger a removeSwitch(), which will cause
1592 // a "Not removing Switch ... already removed debug message.
1593 // TODO: Figure out a way to handle this that avoids the
1594 // spurious debug message.
Jonathan Hart9e92c512013-03-20 16:24:44 -07001595 log.debug("Closing {} because a new IOFSwitch got added " +
1596 "for this dpid", oldSw);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001597 oldSw.getChannel().close();
1598 }
1599 finally {
1600 oldSw.getListenerWriteLock().unlock();
1601 }
1602 }
1603
1604 updateActiveSwitchInfo(sw);
1605 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.ADDED);
1606 try {
1607 this.updates.put(update);
1608 } catch (InterruptedException e) {
1609 log.error("Failure adding update to queue", e);
1610 }
1611 }
1612
1613 /**
1614 * Remove a switch from the active switch list and call the switch listeners.
1615 * This happens either when the switch is disconnected or when the
1616 * controller's role for the switch changes from master to slave.
1617 * @param sw the switch that has been removed
1618 */
1619 protected void removeSwitch(IOFSwitch sw) {
1620 // No need to acquire the listener lock, since
1621 // this method is only called after netty has processed all
1622 // pending messages
1623 log.debug("removeSwitch: {}", sw);
1624 if (!this.activeSwitches.remove(sw.getId(), sw) || !sw.isConnected()) {
1625 log.debug("Not removing switch {}; already removed", sw);
1626 return;
1627 }
1628 // We cancel all outstanding statistics replies if the switch transition
1629 // from active. In the future we might allow statistics requests
1630 // from slave controllers. Then we need to move this cancelation
1631 // to switch disconnect
1632 sw.cancelAllStatisticsReplies();
1633
1634 // FIXME: I think there's a race condition if we call updateInactiveSwitchInfo
1635 // here if role support is enabled. In that case if the switch is being
1636 // removed because we've been switched to being in the slave role, then I think
1637 // it's possible that the new master may have already been promoted to master
1638 // and written out the active switch state to storage. If we now execute
1639 // updateInactiveSwitchInfo we may wipe out all of the state that was
1640 // written out by the new master. Maybe need to revisit how we handle all
1641 // of the switch state that's written to storage.
1642
1643 updateInactiveSwitchInfo(sw);
1644 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.REMOVED);
1645 try {
1646 this.updates.put(update);
1647 } catch (InterruptedException e) {
1648 log.error("Failure adding update to queue", e);
1649 }
1650 }
1651
1652 // ***************
1653 // IFloodlightProvider
1654 // ***************
1655
1656 @Override
1657 public synchronized void addOFMessageListener(OFType type,
1658 IOFMessageListener listener) {
1659 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1660 messageListeners.get(type);
1661 if (ldd == null) {
1662 ldd = new ListenerDispatcher<OFType, IOFMessageListener>();
1663 messageListeners.put(type, ldd);
1664 }
1665 ldd.addListener(type, listener);
1666 }
1667
1668 @Override
1669 public synchronized void removeOFMessageListener(OFType type,
1670 IOFMessageListener listener) {
1671 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1672 messageListeners.get(type);
1673 if (ldd != null) {
1674 ldd.removeListener(listener);
1675 }
1676 }
1677
1678 private void logListeners() {
1679 for (Map.Entry<OFType,
1680 ListenerDispatcher<OFType,
1681 IOFMessageListener>> entry
1682 : messageListeners.entrySet()) {
1683
1684 OFType type = entry.getKey();
1685 ListenerDispatcher<OFType, IOFMessageListener> ldd =
1686 entry.getValue();
1687
1688 StringBuffer sb = new StringBuffer();
1689 sb.append("OFListeners for ");
1690 sb.append(type);
1691 sb.append(": ");
1692 for (IOFMessageListener l : ldd.getOrderedListeners()) {
1693 sb.append(l.getName());
1694 sb.append(",");
1695 }
1696 log.debug(sb.toString());
1697 }
1698 }
1699
1700 public void removeOFMessageListeners(OFType type) {
1701 messageListeners.remove(type);
1702 }
1703
1704 @Override
1705 public Map<Long, IOFSwitch> getSwitches() {
1706 return Collections.unmodifiableMap(this.activeSwitches);
1707 }
1708
1709 @Override
1710 public void addOFSwitchListener(IOFSwitchListener listener) {
1711 this.switchListeners.add(listener);
1712 }
1713
1714 @Override
1715 public void removeOFSwitchListener(IOFSwitchListener listener) {
1716 this.switchListeners.remove(listener);
1717 }
1718
1719 @Override
1720 public Map<OFType, List<IOFMessageListener>> getListeners() {
1721 Map<OFType, List<IOFMessageListener>> lers =
1722 new HashMap<OFType, List<IOFMessageListener>>();
1723 for(Entry<OFType, ListenerDispatcher<OFType, IOFMessageListener>> e :
1724 messageListeners.entrySet()) {
1725 lers.put(e.getKey(), e.getValue().getOrderedListeners());
1726 }
1727 return Collections.unmodifiableMap(lers);
1728 }
1729
1730 @Override
1731 @LogMessageDocs({
1732 @LogMessageDoc(message="Failed to inject OFMessage {message} onto " +
1733 "a null switch",
1734 explanation="Failed to process a message because the switch " +
1735 " is no longer connected."),
1736 @LogMessageDoc(level="ERROR",
1737 message="Error reinjecting OFMessage on switch {switch}",
1738 explanation="An I/O error occured while attempting to " +
1739 "process an OpenFlow message",
1740 recommendation=LogMessageDoc.CHECK_SWITCH)
1741 })
1742 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg,
1743 FloodlightContext bc) {
1744 if (sw == null) {
1745 log.info("Failed to inject OFMessage {} onto a null switch", msg);
1746 return false;
1747 }
1748
1749 // FIXME: Do we need to be able to inject messages to switches
1750 // where we're the slave controller (i.e. they're connected but
1751 // not active)?
1752 // FIXME: Don't we need synchronization logic here so we're holding
1753 // the listener read lock when we call handleMessage? After some
1754 // discussions it sounds like the right thing to do here would be to
1755 // inject the message as a netty upstream channel event so it goes
1756 // through the normal netty event processing, including being
1757 // handled
1758 if (!activeSwitches.containsKey(sw.getId())) return false;
1759
1760 try {
1761 // Pass Floodlight context to the handleMessages()
1762 handleMessage(sw, msg, bc);
1763 } catch (IOException e) {
1764 log.error("Error reinjecting OFMessage on switch {}",
1765 HexString.toHexString(sw.getId()));
1766 return false;
1767 }
1768 return true;
1769 }
1770
1771 @Override
1772 @LogMessageDoc(message="Calling System.exit",
1773 explanation="The controller is terminating")
1774 public synchronized void terminate() {
1775 log.info("Calling System.exit");
1776 System.exit(1);
1777 }
1778
1779 @Override
1780 public boolean injectOfMessage(IOFSwitch sw, OFMessage msg) {
1781 // call the overloaded version with floodlight context set to null
1782 return injectOfMessage(sw, msg, null);
1783 }
1784
1785 @Override
1786 public void handleOutgoingMessage(IOFSwitch sw, OFMessage m,
1787 FloodlightContext bc) {
1788 if (log.isTraceEnabled()) {
1789 String str = OFMessage.getDataAsString(sw, m, bc);
1790 log.trace("{}", str);
1791 }
1792
1793 List<IOFMessageListener> listeners = null;
1794 if (messageListeners.containsKey(m.getType())) {
1795 listeners =
1796 messageListeners.get(m.getType()).getOrderedListeners();
1797 }
1798
1799 if (listeners != null) {
1800 for (IOFMessageListener listener : listeners) {
1801 if (listener instanceof IOFSwitchFilter) {
1802 if (!((IOFSwitchFilter)listener).isInterested(sw)) {
1803 continue;
1804 }
1805 }
1806 if (Command.STOP.equals(listener.receive(sw, m, bc))) {
1807 break;
1808 }
1809 }
1810 }
1811 }
1812
1813 @Override
1814 public BasicFactory getOFMessageFactory() {
1815 return factory;
1816 }
1817
1818 @Override
1819 public String getControllerId() {
1820 return controllerId;
1821 }
1822
1823 // **************
1824 // Initialization
1825 // **************
1826
1827 protected void updateAllInactiveSwitchInfo() {
1828 if (role == Role.SLAVE) {
1829 return;
1830 }
1831 String controllerId = getControllerId();
1832 String[] switchColumns = { SWITCH_DATAPATH_ID,
1833 SWITCH_CONTROLLER_ID,
1834 SWITCH_ACTIVE };
1835 String[] portColumns = { PORT_ID, PORT_SWITCH };
1836 IResultSet switchResultSet = null;
1837 try {
1838 OperatorPredicate op =
1839 new OperatorPredicate(SWITCH_CONTROLLER_ID,
1840 OperatorPredicate.Operator.EQ,
1841 controllerId);
1842 switchResultSet =
1843 storageSource.executeQuery(SWITCH_TABLE_NAME,
1844 switchColumns,
1845 op, null);
1846 while (switchResultSet.next()) {
1847 IResultSet portResultSet = null;
1848 try {
1849 String datapathId =
1850 switchResultSet.getString(SWITCH_DATAPATH_ID);
1851 switchResultSet.setBoolean(SWITCH_ACTIVE, Boolean.FALSE);
1852 op = new OperatorPredicate(PORT_SWITCH,
1853 OperatorPredicate.Operator.EQ,
1854 datapathId);
1855 portResultSet =
1856 storageSource.executeQuery(PORT_TABLE_NAME,
1857 portColumns,
1858 op, null);
1859 while (portResultSet.next()) {
1860 portResultSet.deleteRow();
1861 }
1862 portResultSet.save();
1863 }
1864 finally {
1865 if (portResultSet != null)
1866 portResultSet.close();
1867 }
1868 }
1869 switchResultSet.save();
1870 }
1871 finally {
1872 if (switchResultSet != null)
1873 switchResultSet.close();
1874 }
1875 }
1876
1877 protected void updateControllerInfo() {
1878 updateAllInactiveSwitchInfo();
1879
1880 // Write out the controller info to the storage source
1881 Map<String, Object> controllerInfo = new HashMap<String, Object>();
1882 String id = getControllerId();
1883 controllerInfo.put(CONTROLLER_ID, id);
1884 storageSource.updateRow(CONTROLLER_TABLE_NAME, controllerInfo);
1885 }
1886
1887 protected void updateActiveSwitchInfo(IOFSwitch sw) {
1888 if (role == Role.SLAVE) {
1889 return;
1890 }
1891 // Obtain the row info for the switch
1892 Map<String, Object> switchInfo = new HashMap<String, Object>();
1893 String datapathIdString = sw.getStringId();
1894 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1895 String controllerId = getControllerId();
1896 switchInfo.put(SWITCH_CONTROLLER_ID, controllerId);
1897 Date connectedSince = sw.getConnectedSince();
1898 switchInfo.put(SWITCH_CONNECTED_SINCE, connectedSince);
1899 Channel channel = sw.getChannel();
1900 SocketAddress socketAddress = channel.getRemoteAddress();
1901 if (socketAddress != null) {
1902 String socketAddressString = socketAddress.toString();
1903 switchInfo.put(SWITCH_SOCKET_ADDRESS, socketAddressString);
1904 if (socketAddress instanceof InetSocketAddress) {
1905 InetSocketAddress inetSocketAddress =
1906 (InetSocketAddress)socketAddress;
1907 InetAddress inetAddress = inetSocketAddress.getAddress();
1908 String ip = inetAddress.getHostAddress();
1909 switchInfo.put(SWITCH_IP, ip);
1910 }
1911 }
1912
1913 // Write out the switch features info
1914 long capabilities = U32.f(sw.getCapabilities());
1915 switchInfo.put(SWITCH_CAPABILITIES, capabilities);
1916 long buffers = U32.f(sw.getBuffers());
1917 switchInfo.put(SWITCH_BUFFERS, buffers);
1918 long tables = U32.f(sw.getTables());
1919 switchInfo.put(SWITCH_TABLES, tables);
1920 long actions = U32.f(sw.getActions());
1921 switchInfo.put(SWITCH_ACTIONS, actions);
1922 switchInfo.put(SWITCH_ACTIVE, Boolean.TRUE);
1923
1924 // Update the switch
1925 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1926
1927 // Update the ports
1928 for (OFPhysicalPort port: sw.getPorts()) {
1929 updatePortInfo(sw, port);
1930 }
1931 }
1932
1933 protected void updateInactiveSwitchInfo(IOFSwitch sw) {
1934 if (role == Role.SLAVE) {
1935 return;
1936 }
1937 log.debug("Update DB with inactiveSW {}", sw);
1938 // Update the controller info in the storage source to be inactive
1939 Map<String, Object> switchInfo = new HashMap<String, Object>();
1940 String datapathIdString = sw.getStringId();
1941 switchInfo.put(SWITCH_DATAPATH_ID, datapathIdString);
1942 //switchInfo.put(SWITCH_CONNECTED_SINCE, null);
1943 switchInfo.put(SWITCH_ACTIVE, Boolean.FALSE);
1944 storageSource.updateRowAsync(SWITCH_TABLE_NAME, switchInfo);
1945 }
1946
1947 protected void updatePortInfo(IOFSwitch sw, OFPhysicalPort port) {
1948 if (role == Role.SLAVE) {
1949 return;
1950 }
1951 String datapathIdString = sw.getStringId();
1952 Map<String, Object> portInfo = new HashMap<String, Object>();
1953 int portNumber = U16.f(port.getPortNumber());
1954 String id = datapathIdString + "|" + portNumber;
1955 portInfo.put(PORT_ID, id);
1956 portInfo.put(PORT_SWITCH, datapathIdString);
1957 portInfo.put(PORT_NUMBER, portNumber);
1958 byte[] hardwareAddress = port.getHardwareAddress();
1959 String hardwareAddressString = HexString.toHexString(hardwareAddress);
1960 portInfo.put(PORT_HARDWARE_ADDRESS, hardwareAddressString);
1961 String name = port.getName();
1962 portInfo.put(PORT_NAME, name);
1963 long config = U32.f(port.getConfig());
1964 portInfo.put(PORT_CONFIG, config);
1965 long state = U32.f(port.getState());
1966 portInfo.put(PORT_STATE, state);
1967 long currentFeatures = U32.f(port.getCurrentFeatures());
1968 portInfo.put(PORT_CURRENT_FEATURES, currentFeatures);
1969 long advertisedFeatures = U32.f(port.getAdvertisedFeatures());
1970 portInfo.put(PORT_ADVERTISED_FEATURES, advertisedFeatures);
1971 long supportedFeatures = U32.f(port.getSupportedFeatures());
1972 portInfo.put(PORT_SUPPORTED_FEATURES, supportedFeatures);
1973 long peerFeatures = U32.f(port.getPeerFeatures());
1974 portInfo.put(PORT_PEER_FEATURES, peerFeatures);
1975 storageSource.updateRowAsync(PORT_TABLE_NAME, portInfo);
1976 }
1977
1978 /**
1979 * Read switch port data from storage and write it into a switch object
1980 * @param sw the switch to update
1981 */
1982 protected void readSwitchPortStateFromStorage(OFSwitchImpl sw) {
1983 OperatorPredicate op =
1984 new OperatorPredicate(PORT_SWITCH,
1985 OperatorPredicate.Operator.EQ,
1986 sw.getStringId());
1987 IResultSet portResultSet =
1988 storageSource.executeQuery(PORT_TABLE_NAME,
1989 null, op, null);
1990 //Map<Short, OFPhysicalPort> oldports =
1991 // new HashMap<Short, OFPhysicalPort>();
1992 //oldports.putAll(sw.getPorts());
1993
1994 while (portResultSet.next()) {
1995 try {
1996 OFPhysicalPort p = new OFPhysicalPort();
1997 p.setPortNumber((short)portResultSet.getInt(PORT_NUMBER));
1998 p.setName(portResultSet.getString(PORT_NAME));
1999 p.setConfig((int)portResultSet.getLong(PORT_CONFIG));
2000 p.setState((int)portResultSet.getLong(PORT_STATE));
2001 String portMac = portResultSet.getString(PORT_HARDWARE_ADDRESS);
2002 p.setHardwareAddress(HexString.fromHexString(portMac));
2003 p.setCurrentFeatures((int)portResultSet.
2004 getLong(PORT_CURRENT_FEATURES));
2005 p.setAdvertisedFeatures((int)portResultSet.
2006 getLong(PORT_ADVERTISED_FEATURES));
2007 p.setSupportedFeatures((int)portResultSet.
2008 getLong(PORT_SUPPORTED_FEATURES));
2009 p.setPeerFeatures((int)portResultSet.
2010 getLong(PORT_PEER_FEATURES));
2011 //oldports.remove(Short.valueOf(p.getPortNumber()));
2012 sw.setPort(p);
2013 } catch (NullPointerException e) {
2014 // ignore
2015 }
2016 }
2017 SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED);
2018 try {
2019 this.updates.put(update);
2020 } catch (InterruptedException e) {
2021 log.error("Failure adding update to queue", e);
2022 }
2023 }
2024
2025 protected void removePortInfo(IOFSwitch sw, short portNumber) {
2026 if (role == Role.SLAVE) {
2027 return;
2028 }
2029 String datapathIdString = sw.getStringId();
2030 String id = datapathIdString + "|" + portNumber;
2031 storageSource.deleteRowAsync(PORT_TABLE_NAME, id);
2032 }
2033
2034 /**
2035 * Sets the initial role based on properties in the config params.
2036 * It looks for two different properties.
2037 * If the "role" property is specified then the value should be
2038 * either "EQUAL", "MASTER", or "SLAVE" and the role of the
2039 * controller is set to the specified value. If the "role" property
2040 * is not specified then it looks next for the "role.path" property.
2041 * In this case the value should be the path to a property file in
2042 * the file system that contains a property called "floodlight.role"
2043 * which can be one of the values listed above for the "role" property.
2044 * The idea behind the "role.path" mechanism is that you have some
2045 * separate heartbeat and master controller election algorithm that
2046 * determines the role of the controller. When a role transition happens,
2047 * it updates the current role in the file specified by the "role.path"
2048 * file. Then if floodlight restarts for some reason it can get the
2049 * correct current role of the controller from the file.
2050 * @param configParams The config params for the FloodlightProvider service
2051 * @return A valid role if role information is specified in the
2052 * config params, otherwise null
2053 */
2054 @LogMessageDocs({
2055 @LogMessageDoc(message="Controller role set to {role}",
2056 explanation="Setting the initial HA role to "),
2057 @LogMessageDoc(level="ERROR",
2058 message="Invalid current role value: {role}",
2059 explanation="An invalid HA role value was read from the " +
2060 "properties file",
2061 recommendation=LogMessageDoc.CHECK_CONTROLLER)
2062 })
2063 protected Role getInitialRole(Map<String, String> configParams) {
2064 Role role = null;
2065 String roleString = configParams.get("role");
2066 if (roleString == null) {
2067 String rolePath = configParams.get("rolepath");
2068 if (rolePath != null) {
2069 Properties properties = new Properties();
2070 try {
2071 properties.load(new FileInputStream(rolePath));
2072 roleString = properties.getProperty("floodlight.role");
2073 }
2074 catch (IOException exc) {
2075 // Don't treat it as an error if the file specified by the
2076 // rolepath property doesn't exist. This lets us enable the
2077 // HA mechanism by just creating/setting the floodlight.role
2078 // property in that file without having to modify the
2079 // floodlight properties.
2080 }
2081 }
2082 }
2083
2084 if (roleString != null) {
2085 // Canonicalize the string to the form used for the enum constants
2086 roleString = roleString.trim().toUpperCase();
2087 try {
2088 role = Role.valueOf(roleString);
2089 }
2090 catch (IllegalArgumentException exc) {
2091 log.error("Invalid current role value: {}", roleString);
2092 }
2093 }
2094
2095 log.info("Controller role set to {}", role);
2096
2097 return role;
2098 }
2099
2100 /**
2101 * Tell controller that we're ready to accept switches loop
2102 * @throws IOException
2103 */
2104 @LogMessageDocs({
2105 @LogMessageDoc(message="Listening for switch connections on {address}",
2106 explanation="The controller is ready and listening for new" +
2107 " switch connections"),
2108 @LogMessageDoc(message="Storage exception in controller " +
2109 "updates loop; terminating process",
2110 explanation=ERROR_DATABASE,
2111 recommendation=LogMessageDoc.CHECK_CONTROLLER),
2112 @LogMessageDoc(level="ERROR",
2113 message="Exception in controller updates loop",
2114 explanation="Failed to dispatch controller event",
2115 recommendation=LogMessageDoc.GENERIC_ACTION)
2116 })
2117 public void run() {
2118 if (log.isDebugEnabled()) {
2119 logListeners();
2120 }
2121
2122 try {
2123 final ServerBootstrap bootstrap = createServerBootStrap();
2124
2125 bootstrap.setOption("reuseAddr", true);
2126 bootstrap.setOption("child.keepAlive", true);
2127 bootstrap.setOption("child.tcpNoDelay", true);
2128 bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
2129
2130 ChannelPipelineFactory pfact =
2131 new OpenflowPipelineFactory(this, null);
2132 bootstrap.setPipelineFactory(pfact);
2133 InetSocketAddress sa = new InetSocketAddress(openFlowPort);
2134 final ChannelGroup cg = new DefaultChannelGroup();
2135 cg.add(bootstrap.bind(sa));
2136
2137 log.info("Listening for switch connections on {}", sa);
2138 } catch (Exception e) {
2139 throw new RuntimeException(e);
2140 }
2141
2142 // main loop
2143 while (true) {
2144 try {
2145 IUpdate update = updates.take();
2146 update.dispatch();
2147 } catch (InterruptedException e) {
2148 return;
2149 } catch (StorageException e) {
2150 log.error("Storage exception in controller " +
2151 "updates loop; terminating process", e);
2152 return;
2153 } catch (Exception e) {
2154 log.error("Exception in controller updates loop", e);
2155 }
2156 }
2157 }
2158
2159 private ServerBootstrap createServerBootStrap() {
2160 if (workerThreads == 0) {
2161 return new ServerBootstrap(
2162 new NioServerSocketChannelFactory(
2163 Executors.newCachedThreadPool(),
2164 Executors.newCachedThreadPool()));
2165 } else {
2166 return new ServerBootstrap(
2167 new NioServerSocketChannelFactory(
2168 Executors.newCachedThreadPool(),
2169 Executors.newCachedThreadPool(), workerThreads));
2170 }
2171 }
2172
2173 public void setConfigParams(Map<String, String> configParams) {
2174 String ofPort = configParams.get("openflowport");
2175 if (ofPort != null) {
2176 this.openFlowPort = Integer.parseInt(ofPort);
2177 }
2178 log.debug("OpenFlow port set to {}", this.openFlowPort);
2179 String threads = configParams.get("workerthreads");
2180 if (threads != null) {
2181 this.workerThreads = Integer.parseInt(threads);
2182 }
2183 log.debug("Number of worker threads set to {}", this.workerThreads);
2184 String controllerId = configParams.get("controllerid");
2185 if (controllerId != null) {
2186 this.controllerId = controllerId;
2187 }
Jonathan Hartd10008d2013-02-23 17:04:08 -08002188 else {
2189 //Try to get the hostname of the machine and use that for controller ID
2190 try {
2191 String hostname = java.net.InetAddress.getLocalHost().getHostName();
2192 this.controllerId = hostname;
2193 } catch (UnknownHostException e) {
2194 // Can't get hostname, we'll just use the default
2195 }
2196 }
2197
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002198 log.debug("ControllerId set to {}", this.controllerId);
2199 }
2200
2201 private void initVendorMessages() {
2202 // Configure openflowj to be able to parse the role request/reply
2203 // vendor messages.
2204 OFBasicVendorId niciraVendorId = new OFBasicVendorId(
2205 OFNiciraVendorData.NX_VENDOR_ID, 4);
2206 OFVendorId.registerVendorId(niciraVendorId);
2207 OFBasicVendorDataType roleRequestVendorData =
2208 new OFBasicVendorDataType(
2209 OFRoleRequestVendorData.NXT_ROLE_REQUEST,
2210 OFRoleRequestVendorData.getInstantiable());
2211 niciraVendorId.registerVendorDataType(roleRequestVendorData);
2212 OFBasicVendorDataType roleReplyVendorData =
2213 new OFBasicVendorDataType(
2214 OFRoleReplyVendorData.NXT_ROLE_REPLY,
2215 OFRoleReplyVendorData.getInstantiable());
2216 niciraVendorId.registerVendorDataType(roleReplyVendorData);
2217 }
2218
2219 /**
2220 * Initialize internal data structures
2221 */
2222 public void init(Map<String, String> configParams) {
2223 // These data structures are initialized here because other
2224 // module's startUp() might be called before ours
2225 this.messageListeners =
2226 new ConcurrentHashMap<OFType,
2227 ListenerDispatcher<OFType,
2228 IOFMessageListener>>();
2229 this.switchListeners = new CopyOnWriteArraySet<IOFSwitchListener>();
2230 this.haListeners = new CopyOnWriteArraySet<IHAListener>();
2231 this.activeSwitches = new ConcurrentHashMap<Long, IOFSwitch>();
2232 this.connectedSwitches = new HashSet<OFSwitchImpl>();
2233 this.controllerNodeIPsCache = new HashMap<String, String>();
2234 this.updates = new LinkedBlockingQueue<IUpdate>();
2235 this.factory = new BasicFactory();
2236 this.providerMap = new HashMap<String, List<IInfoProvider>>();
2237 setConfigParams(configParams);
Jonathan Hartcc957a02013-02-26 10:39:04 -08002238 //this.role = getInitialRole(configParams);
2239 //Set the controller's role to MASTER so it always tries to do role requests.
2240 this.role = Role.MASTER;
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002241 this.roleChanger = new RoleChanger();
2242 initVendorMessages();
2243 this.systemStartTime = System.currentTimeMillis();
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002244 }
2245
2246 /**
2247 * Startup all of the controller's components
2248 */
2249 @LogMessageDoc(message="Waiting for storage source",
2250 explanation="The system database is not yet ready",
2251 recommendation="If this message persists, this indicates " +
2252 "that the system database has failed to start. " +
2253 LogMessageDoc.CHECK_CONTROLLER)
2254 public void startupComponents() {
Jonathan Hartd10008d2013-02-23 17:04:08 -08002255 try {
2256 registryService.registerController(controllerId);
2257 } catch (RegistryException e2) {
2258 log.warn("Registry service error: {}", e2.getMessage());
2259 }
2260
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08002261 // Create the table names we use
2262 storageSource.createTable(CONTROLLER_TABLE_NAME, null);
2263 storageSource.createTable(SWITCH_TABLE_NAME, null);
2264 storageSource.createTable(PORT_TABLE_NAME, null);
2265 storageSource.createTable(CONTROLLER_INTERFACE_TABLE_NAME, null);
2266 storageSource.createTable(SWITCH_CONFIG_TABLE_NAME, null);
2267 storageSource.setTablePrimaryKeyName(CONTROLLER_TABLE_NAME,
2268 CONTROLLER_ID);
2269 storageSource.setTablePrimaryKeyName(SWITCH_TABLE_NAME,
2270 SWITCH_DATAPATH_ID);
2271 storageSource.setTablePrimaryKeyName(PORT_TABLE_NAME, PORT_ID);
2272 storageSource.setTablePrimaryKeyName(CONTROLLER_INTERFACE_TABLE_NAME,
2273 CONTROLLER_INTERFACE_ID);
2274 storageSource.addListener(CONTROLLER_INTERFACE_TABLE_NAME, this);
2275
2276 while (true) {
2277 try {
2278 updateControllerInfo();
2279 break;
2280 }
2281 catch (StorageException e) {
2282 log.info("Waiting for storage source");
2283 try {
2284 Thread.sleep(1000);
2285 } catch (InterruptedException e1) {
2286 }
2287 }
2288 }
2289
2290 // Add our REST API
2291 restApi.addRestletRoutable(new CoreWebRoutable());
2292 }
2293
2294 @Override
2295 public void addInfoProvider(String type, IInfoProvider provider) {
2296 if (!providerMap.containsKey(type)) {
2297 providerMap.put(type, new ArrayList<IInfoProvider>());
2298 }
2299 providerMap.get(type).add(provider);
2300 }
2301
2302 @Override
2303 public void removeInfoProvider(String type, IInfoProvider provider) {
2304 if (!providerMap.containsKey(type)) {
2305 log.debug("Provider type {} doesn't exist.", type);
2306 return;
2307 }
2308
2309 providerMap.get(type).remove(provider);
2310 }
2311
2312 public Map<String, Object> getControllerInfo(String type) {
2313 if (!providerMap.containsKey(type)) return null;
2314
2315 Map<String, Object> result = new LinkedHashMap<String, Object>();
2316 for (IInfoProvider provider : providerMap.get(type)) {
2317 result.putAll(provider.getInfo(type));
2318 }
2319
2320 return result;
2321 }
2322
2323 @Override
2324 public void addHAListener(IHAListener listener) {
2325 this.haListeners.add(listener);
2326 }
2327
2328 @Override
2329 public void removeHAListener(IHAListener listener) {
2330 this.haListeners.remove(listener);
2331 }
2332
2333
2334 /**
2335 * Handle changes to the controller nodes IPs and dispatch update.
2336 */
2337 @SuppressWarnings("unchecked")
2338 protected void handleControllerNodeIPChanges() {
2339 HashMap<String,String> curControllerNodeIPs = new HashMap<String,String>();
2340 HashMap<String,String> addedControllerNodeIPs = new HashMap<String,String>();
2341 HashMap<String,String> removedControllerNodeIPs =new HashMap<String,String>();
2342 String[] colNames = { CONTROLLER_INTERFACE_CONTROLLER_ID,
2343 CONTROLLER_INTERFACE_TYPE,
2344 CONTROLLER_INTERFACE_NUMBER,
2345 CONTROLLER_INTERFACE_DISCOVERED_IP };
2346 synchronized(controllerNodeIPsCache) {
2347 // We currently assume that interface Ethernet0 is the relevant
2348 // controller interface. Might change.
2349 // We could (should?) implement this using
2350 // predicates, but creating the individual and compound predicate
2351 // seems more overhead then just checking every row. Particularly,
2352 // since the number of rows is small and changes infrequent
2353 IResultSet res = storageSource.executeQuery(CONTROLLER_INTERFACE_TABLE_NAME,
2354 colNames,null, null);
2355 while (res.next()) {
2356 if (res.getString(CONTROLLER_INTERFACE_TYPE).equals("Ethernet") &&
2357 res.getInt(CONTROLLER_INTERFACE_NUMBER) == 0) {
2358 String controllerID = res.getString(CONTROLLER_INTERFACE_CONTROLLER_ID);
2359 String discoveredIP = res.getString(CONTROLLER_INTERFACE_DISCOVERED_IP);
2360 String curIP = controllerNodeIPsCache.get(controllerID);
2361
2362 curControllerNodeIPs.put(controllerID, discoveredIP);
2363 if (curIP == null) {
2364 // new controller node IP
2365 addedControllerNodeIPs.put(controllerID, discoveredIP);
2366 }
2367 else if (curIP != discoveredIP) {
2368 // IP changed
2369 removedControllerNodeIPs.put(controllerID, curIP);
2370 addedControllerNodeIPs.put(controllerID, discoveredIP);
2371 }
2372 }
2373 }
2374 // Now figure out if rows have been deleted. We can't use the
2375 // rowKeys from rowsDeleted directly, since the tables primary
2376 // key is a compound that we can't disassemble
2377 Set<String> curEntries = curControllerNodeIPs.keySet();
2378 Set<String> removedEntries = controllerNodeIPsCache.keySet();
2379 removedEntries.removeAll(curEntries);
2380 for (String removedControllerID : removedEntries)
2381 removedControllerNodeIPs.put(removedControllerID, controllerNodeIPsCache.get(removedControllerID));
2382 controllerNodeIPsCache = (HashMap<String, String>) curControllerNodeIPs.clone();
2383 HAControllerNodeIPUpdate update = new HAControllerNodeIPUpdate(
2384 curControllerNodeIPs, addedControllerNodeIPs,
2385 removedControllerNodeIPs);
2386 if (!removedControllerNodeIPs.isEmpty() || !addedControllerNodeIPs.isEmpty()) {
2387 try {
2388 this.updates.put(update);
2389 } catch (InterruptedException e) {
2390 log.error("Failure adding update to queue", e);
2391 }
2392 }
2393 }
2394 }
2395
2396 @Override
2397 public Map<String, String> getControllerNodeIPs() {
2398 // We return a copy of the mapping so we can guarantee that
2399 // the mapping return is the same as one that will be (or was)
2400 // dispatched to IHAListeners
2401 HashMap<String,String> retval = new HashMap<String,String>();
2402 synchronized(controllerNodeIPsCache) {
2403 retval.putAll(controllerNodeIPsCache);
2404 }
2405 return retval;
2406 }
2407
2408 @Override
2409 public void rowsModified(String tableName, Set<Object> rowKeys) {
2410 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2411 handleControllerNodeIPChanges();
2412 }
2413
2414 }
2415
2416 @Override
2417 public void rowsDeleted(String tableName, Set<Object> rowKeys) {
2418 if (tableName.equals(CONTROLLER_INTERFACE_TABLE_NAME)) {
2419 handleControllerNodeIPChanges();
2420 }
2421 }
2422
2423 @Override
2424 public long getSystemStartTime() {
2425 return (this.systemStartTime);
2426 }
2427
2428 @Override
2429 public void setAlwaysClearFlowsOnSwAdd(boolean value) {
2430 this.alwaysClearFlowsOnSwAdd = value;
2431 }
2432
2433 public boolean getAlwaysClearFlowsOnSwAdd() {
2434 return this.alwaysClearFlowsOnSwAdd;
2435 }
2436}